From b29275a1d2c66ebba3f955b2e9512d7dbfaac3de Mon Sep 17 00:00:00 2001 From: Xubin Ren Date: Mon, 16 Mar 2026 08:33:03 +0000 Subject: [PATCH] refactor(/new): background archival with guaranteed persistence MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace fire-and-forget consolidation with archive_messages(), which retries until the raw-dump fallback triggers — making it effectively infallible. /new now clears the session immediately and archives in the background. Pending archive tasks are drained on shutdown via close_mcp() so no data is lost on process exit. --- nanobot/agent/loop.py | 31 +++++++++------------- nanobot/agent/memory.py | 14 +++++----- tests/test_consolidate_offset.py | 44 +++++++++++++++++++++++++++----- 3 files changed, 56 insertions(+), 33 deletions(-) diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index 9f69e5b..26b39c2 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -100,6 +100,7 @@ class AgentLoop: self._mcp_connected = False self._mcp_connecting = False self._active_tasks: dict[str, list[asyncio.Task]] = {} # session_key -> tasks + self._pending_archives: list[asyncio.Task] = [] self._processing_lock = asyncio.Lock() self.memory_consolidator = MemoryConsolidator( workspace=workspace, @@ -330,7 +331,10 @@ class AgentLoop: )) async def close_mcp(self) -> None: - """Close MCP connections.""" + """Drain pending background archives, then close MCP connections.""" + if self._pending_archives: + await asyncio.gather(*self._pending_archives, return_exceptions=True) + self._pending_archives.clear() if self._mcp_stack: try: await self._mcp_stack.aclose() @@ -380,28 +384,17 @@ class AgentLoop: # Slash commands cmd = msg.content.strip().lower() if cmd == "/new": - # Capture messages before clearing for background archival - messages_to_archive = session.messages[session.last_consolidated:] - - # Immediately clear session and return + snapshot = session.messages[session.last_consolidated:] session.clear() self.sessions.save(session) self.sessions.invalidate(session.key) - # Schedule background archival (serialized with normal consolidation via lock) - if messages_to_archive: - - async def _archive_in_background(): - lock = self.memory_consolidator.get_lock(session.key) - async with lock: - try: - success = await self.memory_consolidator.consolidate_messages(messages_to_archive) - if not success: - logger.warning("/new background archival failed for {}", session.key) - except Exception: - logger.exception("/new background archival error for {}", session.key) - - asyncio.create_task(_archive_in_background()) + if snapshot: + task = asyncio.create_task( + self.memory_consolidator.archive_messages(snapshot) + ) + self._pending_archives.append(task) + task.add_done_callback(self._pending_archives.remove) return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, content="New session started.") diff --git a/nanobot/agent/memory.py b/nanobot/agent/memory.py index f220f23..5fdfa7a 100644 --- a/nanobot/agent/memory.py +++ b/nanobot/agent/memory.py @@ -290,14 +290,14 @@ class MemoryConsolidator: self._get_tool_definitions(), ) - async def archive_unconsolidated(self, session: Session) -> bool: - """Archive the full unconsolidated tail for /new-style session rollover.""" - lock = self.get_lock(session.key) - async with lock: - snapshot = session.messages[session.last_consolidated:] - if not snapshot: + async def archive_messages(self, messages: list[dict[str, object]]) -> bool: + """Archive messages with guaranteed persistence (retries until raw-dump fallback).""" + if not messages: + return True + for _ in range(self.store._MAX_FAILURES_BEFORE_RAW_ARCHIVE): + if await self.consolidate_messages(messages): return True - return await self.consolidate_messages(snapshot) + return True async def maybe_consolidate_by_tokens(self, session: Session) -> None: """Loop: archive old messages until prompt fits within half the context window.""" diff --git a/tests/test_consolidate_offset.py b/tests/test_consolidate_offset.py index aafaeaf..b97dd87 100644 --- a/tests/test_consolidate_offset.py +++ b/tests/test_consolidate_offset.py @@ -506,7 +506,7 @@ class TestNewCommandArchival: @pytest.mark.asyncio async def test_new_clears_session_immediately_even_if_archive_fails(self, tmp_path: Path) -> None: - """/new clears session immediately, archive failure only logs warning.""" + """/new clears session immediately; archive_messages retries until raw dump.""" from nanobot.bus.events import InboundMessage loop = self._make_loop(tmp_path) @@ -516,7 +516,11 @@ class TestNewCommandArchival: session.add_message("assistant", f"resp{i}") loop.sessions.save(session) + call_count = 0 + async def _failing_consolidate(_messages) -> bool: + nonlocal call_count + call_count += 1 return False loop.memory_consolidator.consolidate_messages = _failing_consolidate # type: ignore[method-assign] @@ -524,14 +528,15 @@ class TestNewCommandArchival: new_msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="/new") response = await loop._process_message(new_msg) - # /new returns immediately with success message assert response is not None assert "new session started" in response.content.lower() - # Session is cleared immediately, even though archive will fail in background session_after = loop.sessions.get_or_create("cli:test") assert len(session_after.messages) == 0 + await loop.close_mcp() + assert call_count == 3 # retried up to raw-archive threshold + @pytest.mark.asyncio async def test_new_archives_only_unconsolidated_messages(self, tmp_path: Path) -> None: from nanobot.bus.events import InboundMessage @@ -545,12 +550,10 @@ class TestNewCommandArchival: loop.sessions.save(session) archived_count = -1 - archive_done = asyncio.Event() async def _fake_consolidate(messages) -> bool: nonlocal archived_count archived_count = len(messages) - archive_done.set() return True loop.memory_consolidator.consolidate_messages = _fake_consolidate # type: ignore[method-assign] @@ -561,8 +564,7 @@ class TestNewCommandArchival: assert response is not None assert "new session started" in response.content.lower() - # Wait for background archival to complete - await asyncio.wait_for(archive_done.wait(), timeout=1.0) + await loop.close_mcp() assert archived_count == 3 @pytest.mark.asyncio @@ -587,3 +589,31 @@ class TestNewCommandArchival: assert response is not None assert "new session started" in response.content.lower() assert loop.sessions.get_or_create("cli:test").messages == [] + + @pytest.mark.asyncio + async def test_close_mcp_drains_pending_archives(self, tmp_path: Path) -> None: + """close_mcp waits for background archive tasks to complete.""" + from nanobot.bus.events import InboundMessage + + loop = self._make_loop(tmp_path) + session = loop.sessions.get_or_create("cli:test") + for i in range(3): + session.add_message("user", f"msg{i}") + session.add_message("assistant", f"resp{i}") + loop.sessions.save(session) + + archived = asyncio.Event() + + async def _slow_consolidate(_messages) -> bool: + await asyncio.sleep(0.1) + archived.set() + return True + + loop.memory_consolidator.consolidate_messages = _slow_consolidate # type: ignore[method-assign] + + new_msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="/new") + await loop._process_message(new_msg) + + assert not archived.is_set() + await loop.close_mcp() + assert archived.is_set()