From 9820c87537e2fecbb37006907b1db1ab832f427f Mon Sep 17 00:00:00 2001 From: chengyongru Date: Fri, 13 Mar 2026 13:57:06 +0800 Subject: [PATCH] fix(loop): restore /new immediate return with safe background consolidation PR #881 (commit 755e424) fixed the race condition between normal consolidation and /new consolidation, but did so by making /new wait for consolidation to complete before returning. This hurts user experience - /new should be instant. This PR restores the original immediate-return behavior while keeping safety: 1. **Immediate return**: Session clears and user sees "New session started" right away 2. **Background archival**: Consolidation runs in background via asyncio.create_task 3. **Serialized consolidation**: Uses the same lock as normal consolidation via `memory_consolidator.get_lock()` to prevent concurrent writes If consolidation fails after session clear, archived messages may be lost. This is acceptable because: - User already sees the new session and can continue working - Failure is logged for debugging - The alternative (blocking /new on every call) hurts UX for all users --- nanobot/agent/loop.py | 33 ++++++++++++++++++-------------- tests/test_consolidate_offset.py | 17 ++++++++++++---- 2 files changed, 32 insertions(+), 18 deletions(-) diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index 2c0d29a..9f69e5b 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -380,24 +380,29 @@ class AgentLoop: # Slash commands cmd = msg.content.strip().lower() if cmd == "/new": - try: - if not await self.memory_consolidator.archive_unconsolidated(session): - return OutboundMessage( - channel=msg.channel, - chat_id=msg.chat_id, - content="Memory archival failed, session not cleared. Please try again.", - ) - except Exception: - logger.exception("/new archival failed for {}", session.key) - return OutboundMessage( - channel=msg.channel, - chat_id=msg.chat_id, - content="Memory archival failed, session not cleared. Please try again.", - ) + # Capture messages before clearing for background archival + messages_to_archive = session.messages[session.last_consolidated:] + # Immediately clear session and return session.clear() self.sessions.save(session) self.sessions.invalidate(session.key) + + # Schedule background archival (serialized with normal consolidation via lock) + if messages_to_archive: + + async def _archive_in_background(): + lock = self.memory_consolidator.get_lock(session.key) + async with lock: + try: + success = await self.memory_consolidator.consolidate_messages(messages_to_archive) + if not success: + logger.warning("/new background archival failed for {}", session.key) + except Exception: + logger.exception("/new background archival error for {}", session.key) + + asyncio.create_task(_archive_in_background()) + return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, content="New session started.") if cmd == "/help": diff --git a/tests/test_consolidate_offset.py b/tests/test_consolidate_offset.py index 7d12338..aafaeaf 100644 --- a/tests/test_consolidate_offset.py +++ b/tests/test_consolidate_offset.py @@ -505,7 +505,8 @@ class TestNewCommandArchival: return loop @pytest.mark.asyncio - async def test_new_does_not_clear_session_when_archive_fails(self, tmp_path: Path) -> None: + async def test_new_clears_session_immediately_even_if_archive_fails(self, tmp_path: Path) -> None: + """/new clears session immediately, archive failure only logs warning.""" from nanobot.bus.events import InboundMessage loop = self._make_loop(tmp_path) @@ -514,7 +515,6 @@ class TestNewCommandArchival: session.add_message("user", f"msg{i}") session.add_message("assistant", f"resp{i}") loop.sessions.save(session) - before_count = len(session.messages) async def _failing_consolidate(_messages) -> bool: return False @@ -524,9 +524,13 @@ class TestNewCommandArchival: new_msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="/new") response = await loop._process_message(new_msg) + # /new returns immediately with success message assert response is not None - assert "failed" in response.content.lower() - assert len(loop.sessions.get_or_create("cli:test").messages) == before_count + assert "new session started" in response.content.lower() + + # Session is cleared immediately, even though archive will fail in background + session_after = loop.sessions.get_or_create("cli:test") + assert len(session_after.messages) == 0 @pytest.mark.asyncio async def test_new_archives_only_unconsolidated_messages(self, tmp_path: Path) -> None: @@ -541,10 +545,12 @@ class TestNewCommandArchival: loop.sessions.save(session) archived_count = -1 + archive_done = asyncio.Event() async def _fake_consolidate(messages) -> bool: nonlocal archived_count archived_count = len(messages) + archive_done.set() return True loop.memory_consolidator.consolidate_messages = _fake_consolidate # type: ignore[method-assign] @@ -554,6 +560,9 @@ class TestNewCommandArchival: assert response is not None assert "new session started" in response.content.lower() + + # Wait for background archival to complete + await asyncio.wait_for(archive_done.wait(), timeout=1.0) assert archived_count == 3 @pytest.mark.asyncio