refactor(/new): background archival with guaranteed persistence
Replace fire-and-forget consolidation with archive_messages(), which retries until the raw-dump fallback triggers — making it effectively infallible. /new now clears the session immediately and archives in the background. Pending archive tasks are drained on shutdown via close_mcp() so no data is lost on process exit.
This commit is contained in:
@@ -100,6 +100,7 @@ class AgentLoop:
|
|||||||
self._mcp_connected = False
|
self._mcp_connected = False
|
||||||
self._mcp_connecting = False
|
self._mcp_connecting = False
|
||||||
self._active_tasks: dict[str, list[asyncio.Task]] = {} # session_key -> tasks
|
self._active_tasks: dict[str, list[asyncio.Task]] = {} # session_key -> tasks
|
||||||
|
self._pending_archives: list[asyncio.Task] = []
|
||||||
self._processing_lock = asyncio.Lock()
|
self._processing_lock = asyncio.Lock()
|
||||||
self.memory_consolidator = MemoryConsolidator(
|
self.memory_consolidator = MemoryConsolidator(
|
||||||
workspace=workspace,
|
workspace=workspace,
|
||||||
@@ -330,7 +331,10 @@ class AgentLoop:
|
|||||||
))
|
))
|
||||||
|
|
||||||
async def close_mcp(self) -> None:
|
async def close_mcp(self) -> None:
|
||||||
"""Close MCP connections."""
|
"""Drain pending background archives, then close MCP connections."""
|
||||||
|
if self._pending_archives:
|
||||||
|
await asyncio.gather(*self._pending_archives, return_exceptions=True)
|
||||||
|
self._pending_archives.clear()
|
||||||
if self._mcp_stack:
|
if self._mcp_stack:
|
||||||
try:
|
try:
|
||||||
await self._mcp_stack.aclose()
|
await self._mcp_stack.aclose()
|
||||||
@@ -380,28 +384,17 @@ class AgentLoop:
|
|||||||
# Slash commands
|
# Slash commands
|
||||||
cmd = msg.content.strip().lower()
|
cmd = msg.content.strip().lower()
|
||||||
if cmd == "/new":
|
if cmd == "/new":
|
||||||
# Capture messages before clearing for background archival
|
snapshot = session.messages[session.last_consolidated:]
|
||||||
messages_to_archive = session.messages[session.last_consolidated:]
|
|
||||||
|
|
||||||
# Immediately clear session and return
|
|
||||||
session.clear()
|
session.clear()
|
||||||
self.sessions.save(session)
|
self.sessions.save(session)
|
||||||
self.sessions.invalidate(session.key)
|
self.sessions.invalidate(session.key)
|
||||||
|
|
||||||
# Schedule background archival (serialized with normal consolidation via lock)
|
if snapshot:
|
||||||
if messages_to_archive:
|
task = asyncio.create_task(
|
||||||
|
self.memory_consolidator.archive_messages(snapshot)
|
||||||
async def _archive_in_background():
|
)
|
||||||
lock = self.memory_consolidator.get_lock(session.key)
|
self._pending_archives.append(task)
|
||||||
async with lock:
|
task.add_done_callback(self._pending_archives.remove)
|
||||||
try:
|
|
||||||
success = await self.memory_consolidator.consolidate_messages(messages_to_archive)
|
|
||||||
if not success:
|
|
||||||
logger.warning("/new background archival failed for {}", session.key)
|
|
||||||
except Exception:
|
|
||||||
logger.exception("/new background archival error for {}", session.key)
|
|
||||||
|
|
||||||
asyncio.create_task(_archive_in_background())
|
|
||||||
|
|
||||||
return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id,
|
return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id,
|
||||||
content="New session started.")
|
content="New session started.")
|
||||||
|
|||||||
@@ -290,14 +290,14 @@ class MemoryConsolidator:
|
|||||||
self._get_tool_definitions(),
|
self._get_tool_definitions(),
|
||||||
)
|
)
|
||||||
|
|
||||||
async def archive_unconsolidated(self, session: Session) -> bool:
|
async def archive_messages(self, messages: list[dict[str, object]]) -> bool:
|
||||||
"""Archive the full unconsolidated tail for /new-style session rollover."""
|
"""Archive messages with guaranteed persistence (retries until raw-dump fallback)."""
|
||||||
lock = self.get_lock(session.key)
|
if not messages:
|
||||||
async with lock:
|
return True
|
||||||
snapshot = session.messages[session.last_consolidated:]
|
for _ in range(self.store._MAX_FAILURES_BEFORE_RAW_ARCHIVE):
|
||||||
if not snapshot:
|
if await self.consolidate_messages(messages):
|
||||||
|
return True
|
||||||
return True
|
return True
|
||||||
return await self.consolidate_messages(snapshot)
|
|
||||||
|
|
||||||
async def maybe_consolidate_by_tokens(self, session: Session) -> None:
|
async def maybe_consolidate_by_tokens(self, session: Session) -> None:
|
||||||
"""Loop: archive old messages until prompt fits within half the context window."""
|
"""Loop: archive old messages until prompt fits within half the context window."""
|
||||||
|
|||||||
@@ -506,7 +506,7 @@ class TestNewCommandArchival:
|
|||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_new_clears_session_immediately_even_if_archive_fails(self, tmp_path: Path) -> None:
|
async def test_new_clears_session_immediately_even_if_archive_fails(self, tmp_path: Path) -> None:
|
||||||
"""/new clears session immediately, archive failure only logs warning."""
|
"""/new clears session immediately; archive_messages retries until raw dump."""
|
||||||
from nanobot.bus.events import InboundMessage
|
from nanobot.bus.events import InboundMessage
|
||||||
|
|
||||||
loop = self._make_loop(tmp_path)
|
loop = self._make_loop(tmp_path)
|
||||||
@@ -516,7 +516,11 @@ class TestNewCommandArchival:
|
|||||||
session.add_message("assistant", f"resp{i}")
|
session.add_message("assistant", f"resp{i}")
|
||||||
loop.sessions.save(session)
|
loop.sessions.save(session)
|
||||||
|
|
||||||
|
call_count = 0
|
||||||
|
|
||||||
async def _failing_consolidate(_messages) -> bool:
|
async def _failing_consolidate(_messages) -> bool:
|
||||||
|
nonlocal call_count
|
||||||
|
call_count += 1
|
||||||
return False
|
return False
|
||||||
|
|
||||||
loop.memory_consolidator.consolidate_messages = _failing_consolidate # type: ignore[method-assign]
|
loop.memory_consolidator.consolidate_messages = _failing_consolidate # type: ignore[method-assign]
|
||||||
@@ -524,14 +528,15 @@ class TestNewCommandArchival:
|
|||||||
new_msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="/new")
|
new_msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="/new")
|
||||||
response = await loop._process_message(new_msg)
|
response = await loop._process_message(new_msg)
|
||||||
|
|
||||||
# /new returns immediately with success message
|
|
||||||
assert response is not None
|
assert response is not None
|
||||||
assert "new session started" in response.content.lower()
|
assert "new session started" in response.content.lower()
|
||||||
|
|
||||||
# Session is cleared immediately, even though archive will fail in background
|
|
||||||
session_after = loop.sessions.get_or_create("cli:test")
|
session_after = loop.sessions.get_or_create("cli:test")
|
||||||
assert len(session_after.messages) == 0
|
assert len(session_after.messages) == 0
|
||||||
|
|
||||||
|
await loop.close_mcp()
|
||||||
|
assert call_count == 3 # retried up to raw-archive threshold
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_new_archives_only_unconsolidated_messages(self, tmp_path: Path) -> None:
|
async def test_new_archives_only_unconsolidated_messages(self, tmp_path: Path) -> None:
|
||||||
from nanobot.bus.events import InboundMessage
|
from nanobot.bus.events import InboundMessage
|
||||||
@@ -545,12 +550,10 @@ class TestNewCommandArchival:
|
|||||||
loop.sessions.save(session)
|
loop.sessions.save(session)
|
||||||
|
|
||||||
archived_count = -1
|
archived_count = -1
|
||||||
archive_done = asyncio.Event()
|
|
||||||
|
|
||||||
async def _fake_consolidate(messages) -> bool:
|
async def _fake_consolidate(messages) -> bool:
|
||||||
nonlocal archived_count
|
nonlocal archived_count
|
||||||
archived_count = len(messages)
|
archived_count = len(messages)
|
||||||
archive_done.set()
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
loop.memory_consolidator.consolidate_messages = _fake_consolidate # type: ignore[method-assign]
|
loop.memory_consolidator.consolidate_messages = _fake_consolidate # type: ignore[method-assign]
|
||||||
@@ -561,8 +564,7 @@ class TestNewCommandArchival:
|
|||||||
assert response is not None
|
assert response is not None
|
||||||
assert "new session started" in response.content.lower()
|
assert "new session started" in response.content.lower()
|
||||||
|
|
||||||
# Wait for background archival to complete
|
await loop.close_mcp()
|
||||||
await asyncio.wait_for(archive_done.wait(), timeout=1.0)
|
|
||||||
assert archived_count == 3
|
assert archived_count == 3
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
@@ -587,3 +589,31 @@ class TestNewCommandArchival:
|
|||||||
assert response is not None
|
assert response is not None
|
||||||
assert "new session started" in response.content.lower()
|
assert "new session started" in response.content.lower()
|
||||||
assert loop.sessions.get_or_create("cli:test").messages == []
|
assert loop.sessions.get_or_create("cli:test").messages == []
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_close_mcp_drains_pending_archives(self, tmp_path: Path) -> None:
|
||||||
|
"""close_mcp waits for background archive tasks to complete."""
|
||||||
|
from nanobot.bus.events import InboundMessage
|
||||||
|
|
||||||
|
loop = self._make_loop(tmp_path)
|
||||||
|
session = loop.sessions.get_or_create("cli:test")
|
||||||
|
for i in range(3):
|
||||||
|
session.add_message("user", f"msg{i}")
|
||||||
|
session.add_message("assistant", f"resp{i}")
|
||||||
|
loop.sessions.save(session)
|
||||||
|
|
||||||
|
archived = asyncio.Event()
|
||||||
|
|
||||||
|
async def _slow_consolidate(_messages) -> bool:
|
||||||
|
await asyncio.sleep(0.1)
|
||||||
|
archived.set()
|
||||||
|
return True
|
||||||
|
|
||||||
|
loop.memory_consolidator.consolidate_messages = _slow_consolidate # type: ignore[method-assign]
|
||||||
|
|
||||||
|
new_msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="/new")
|
||||||
|
await loop._process_message(new_msg)
|
||||||
|
|
||||||
|
assert not archived.is_set()
|
||||||
|
await loop.close_mcp()
|
||||||
|
assert archived.is_set()
|
||||||
|
|||||||
Reference in New Issue
Block a user