fix(loop): restore /new immediate return with safe background consolidation
PR #881 (commit 755e424) fixed the race condition between normal consolidation
and /new consolidation, but did so by making /new wait for consolidation to
complete before returning. This hurts user experience - /new should be instant.
This PR restores the original immediate-return behavior while keeping safety:
1. **Immediate return**: Session clears and user sees "New session started" right away
2. **Background archival**: Consolidation runs in background via asyncio.create_task
3. **Serialized consolidation**: Uses the same lock as normal consolidation via
`memory_consolidator.get_lock()` to prevent concurrent writes
If consolidation fails after session clear, archived messages may be lost.
This is acceptable because:
- User already sees the new session and can continue working
- Failure is logged for debugging
- The alternative (blocking /new on every call) hurts UX for all users
This commit is contained in:
@@ -380,24 +380,29 @@ class AgentLoop:
|
|||||||
# Slash commands
|
# Slash commands
|
||||||
cmd = msg.content.strip().lower()
|
cmd = msg.content.strip().lower()
|
||||||
if cmd == "/new":
|
if cmd == "/new":
|
||||||
try:
|
# Capture messages before clearing for background archival
|
||||||
if not await self.memory_consolidator.archive_unconsolidated(session):
|
messages_to_archive = session.messages[session.last_consolidated:]
|
||||||
return OutboundMessage(
|
|
||||||
channel=msg.channel,
|
|
||||||
chat_id=msg.chat_id,
|
|
||||||
content="Memory archival failed, session not cleared. Please try again.",
|
|
||||||
)
|
|
||||||
except Exception:
|
|
||||||
logger.exception("/new archival failed for {}", session.key)
|
|
||||||
return OutboundMessage(
|
|
||||||
channel=msg.channel,
|
|
||||||
chat_id=msg.chat_id,
|
|
||||||
content="Memory archival failed, session not cleared. Please try again.",
|
|
||||||
)
|
|
||||||
|
|
||||||
|
# Immediately clear session and return
|
||||||
session.clear()
|
session.clear()
|
||||||
self.sessions.save(session)
|
self.sessions.save(session)
|
||||||
self.sessions.invalidate(session.key)
|
self.sessions.invalidate(session.key)
|
||||||
|
|
||||||
|
# Schedule background archival (serialized with normal consolidation via lock)
|
||||||
|
if messages_to_archive:
|
||||||
|
|
||||||
|
async def _archive_in_background():
|
||||||
|
lock = self.memory_consolidator.get_lock(session.key)
|
||||||
|
async with lock:
|
||||||
|
try:
|
||||||
|
success = await self.memory_consolidator.consolidate_messages(messages_to_archive)
|
||||||
|
if not success:
|
||||||
|
logger.warning("/new background archival failed for {}", session.key)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("/new background archival error for {}", session.key)
|
||||||
|
|
||||||
|
asyncio.create_task(_archive_in_background())
|
||||||
|
|
||||||
return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id,
|
return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id,
|
||||||
content="New session started.")
|
content="New session started.")
|
||||||
if cmd == "/help":
|
if cmd == "/help":
|
||||||
|
|||||||
@@ -505,7 +505,8 @@ class TestNewCommandArchival:
|
|||||||
return loop
|
return loop
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_new_does_not_clear_session_when_archive_fails(self, tmp_path: Path) -> None:
|
async def test_new_clears_session_immediately_even_if_archive_fails(self, tmp_path: Path) -> None:
|
||||||
|
"""/new clears session immediately, archive failure only logs warning."""
|
||||||
from nanobot.bus.events import InboundMessage
|
from nanobot.bus.events import InboundMessage
|
||||||
|
|
||||||
loop = self._make_loop(tmp_path)
|
loop = self._make_loop(tmp_path)
|
||||||
@@ -514,7 +515,6 @@ class TestNewCommandArchival:
|
|||||||
session.add_message("user", f"msg{i}")
|
session.add_message("user", f"msg{i}")
|
||||||
session.add_message("assistant", f"resp{i}")
|
session.add_message("assistant", f"resp{i}")
|
||||||
loop.sessions.save(session)
|
loop.sessions.save(session)
|
||||||
before_count = len(session.messages)
|
|
||||||
|
|
||||||
async def _failing_consolidate(_messages) -> bool:
|
async def _failing_consolidate(_messages) -> bool:
|
||||||
return False
|
return False
|
||||||
@@ -524,9 +524,13 @@ class TestNewCommandArchival:
|
|||||||
new_msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="/new")
|
new_msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="/new")
|
||||||
response = await loop._process_message(new_msg)
|
response = await loop._process_message(new_msg)
|
||||||
|
|
||||||
|
# /new returns immediately with success message
|
||||||
assert response is not None
|
assert response is not None
|
||||||
assert "failed" in response.content.lower()
|
assert "new session started" in response.content.lower()
|
||||||
assert len(loop.sessions.get_or_create("cli:test").messages) == before_count
|
|
||||||
|
# Session is cleared immediately, even though archive will fail in background
|
||||||
|
session_after = loop.sessions.get_or_create("cli:test")
|
||||||
|
assert len(session_after.messages) == 0
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_new_archives_only_unconsolidated_messages(self, tmp_path: Path) -> None:
|
async def test_new_archives_only_unconsolidated_messages(self, tmp_path: Path) -> None:
|
||||||
@@ -541,10 +545,12 @@ class TestNewCommandArchival:
|
|||||||
loop.sessions.save(session)
|
loop.sessions.save(session)
|
||||||
|
|
||||||
archived_count = -1
|
archived_count = -1
|
||||||
|
archive_done = asyncio.Event()
|
||||||
|
|
||||||
async def _fake_consolidate(messages) -> bool:
|
async def _fake_consolidate(messages) -> bool:
|
||||||
nonlocal archived_count
|
nonlocal archived_count
|
||||||
archived_count = len(messages)
|
archived_count = len(messages)
|
||||||
|
archive_done.set()
|
||||||
return True
|
return True
|
||||||
|
|
||||||
loop.memory_consolidator.consolidate_messages = _fake_consolidate # type: ignore[method-assign]
|
loop.memory_consolidator.consolidate_messages = _fake_consolidate # type: ignore[method-assign]
|
||||||
@@ -554,6 +560,9 @@ class TestNewCommandArchival:
|
|||||||
|
|
||||||
assert response is not None
|
assert response is not None
|
||||||
assert "new session started" in response.content.lower()
|
assert "new session started" in response.content.lower()
|
||||||
|
|
||||||
|
# Wait for background archival to complete
|
||||||
|
await asyncio.wait_for(archive_done.wait(), timeout=1.0)
|
||||||
assert archived_count == 3
|
assert archived_count == 3
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
|
|||||||
Reference in New Issue
Block a user