perf: background post-response memory consolidation for faster replies
This commit is contained in:
2
.gitignore
vendored
2
.gitignore
vendored
@@ -21,3 +21,5 @@ poetry.lock
|
|||||||
.pytest_cache/
|
.pytest_cache/
|
||||||
botpy.log
|
botpy.log
|
||||||
nano.*.save
|
nano.*.save
|
||||||
|
.DS_Store
|
||||||
|
uv.lock
|
||||||
|
|||||||
@@ -100,7 +100,7 @@ class AgentLoop:
|
|||||||
self._mcp_connected = False
|
self._mcp_connected = False
|
||||||
self._mcp_connecting = False
|
self._mcp_connecting = False
|
||||||
self._active_tasks: dict[str, list[asyncio.Task]] = {} # session_key -> tasks
|
self._active_tasks: dict[str, list[asyncio.Task]] = {} # session_key -> tasks
|
||||||
self._pending_archives: list[asyncio.Task] = []
|
self._background_tasks: list[asyncio.Task] = []
|
||||||
self._processing_lock = asyncio.Lock()
|
self._processing_lock = asyncio.Lock()
|
||||||
self.memory_consolidator = MemoryConsolidator(
|
self.memory_consolidator = MemoryConsolidator(
|
||||||
workspace=workspace,
|
workspace=workspace,
|
||||||
@@ -332,9 +332,9 @@ class AgentLoop:
|
|||||||
|
|
||||||
async def close_mcp(self) -> None:
|
async def close_mcp(self) -> None:
|
||||||
"""Drain pending background archives, then close MCP connections."""
|
"""Drain pending background archives, then close MCP connections."""
|
||||||
if self._pending_archives:
|
if self._background_tasks:
|
||||||
await asyncio.gather(*self._pending_archives, return_exceptions=True)
|
await asyncio.gather(*self._background_tasks, return_exceptions=True)
|
||||||
self._pending_archives.clear()
|
self._background_tasks.clear()
|
||||||
if self._mcp_stack:
|
if self._mcp_stack:
|
||||||
try:
|
try:
|
||||||
await self._mcp_stack.aclose()
|
await self._mcp_stack.aclose()
|
||||||
@@ -342,6 +342,12 @@ class AgentLoop:
|
|||||||
pass # MCP SDK cancel scope cleanup is noisy but harmless
|
pass # MCP SDK cancel scope cleanup is noisy but harmless
|
||||||
self._mcp_stack = None
|
self._mcp_stack = None
|
||||||
|
|
||||||
|
def _schedule_background(self, coro) -> None:
|
||||||
|
"""Schedule a coroutine as a tracked background task (drained on shutdown)."""
|
||||||
|
task = asyncio.create_task(coro)
|
||||||
|
self._background_tasks.append(task)
|
||||||
|
task.add_done_callback(self._background_tasks.remove)
|
||||||
|
|
||||||
def stop(self) -> None:
|
def stop(self) -> None:
|
||||||
"""Stop the agent loop."""
|
"""Stop the agent loop."""
|
||||||
self._running = False
|
self._running = False
|
||||||
@@ -371,7 +377,7 @@ class AgentLoop:
|
|||||||
final_content, _, all_msgs = await self._run_agent_loop(messages)
|
final_content, _, all_msgs = await self._run_agent_loop(messages)
|
||||||
self._save_turn(session, all_msgs, 1 + len(history))
|
self._save_turn(session, all_msgs, 1 + len(history))
|
||||||
self.sessions.save(session)
|
self.sessions.save(session)
|
||||||
await self.memory_consolidator.maybe_consolidate_by_tokens(session)
|
self._schedule_background(self.memory_consolidator.maybe_consolidate_by_tokens(session))
|
||||||
return OutboundMessage(channel=channel, chat_id=chat_id,
|
return OutboundMessage(channel=channel, chat_id=chat_id,
|
||||||
content=final_content or "Background task completed.")
|
content=final_content or "Background task completed.")
|
||||||
|
|
||||||
@@ -390,11 +396,7 @@ class AgentLoop:
|
|||||||
self.sessions.invalidate(session.key)
|
self.sessions.invalidate(session.key)
|
||||||
|
|
||||||
if snapshot:
|
if snapshot:
|
||||||
task = asyncio.create_task(
|
self._schedule_background(self.memory_consolidator.archive_messages(snapshot))
|
||||||
self.memory_consolidator.archive_messages(snapshot)
|
|
||||||
)
|
|
||||||
self._pending_archives.append(task)
|
|
||||||
task.add_done_callback(self._pending_archives.remove)
|
|
||||||
|
|
||||||
return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id,
|
return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id,
|
||||||
content="New session started.")
|
content="New session started.")
|
||||||
@@ -441,7 +443,7 @@ class AgentLoop:
|
|||||||
|
|
||||||
self._save_turn(session, all_msgs, 1 + len(history))
|
self._save_turn(session, all_msgs, 1 + len(history))
|
||||||
self.sessions.save(session)
|
self.sessions.save(session)
|
||||||
await self.memory_consolidator.maybe_consolidate_by_tokens(session)
|
self._schedule_background(self.memory_consolidator.maybe_consolidate_by_tokens(session))
|
||||||
|
|
||||||
if (mt := self.tools.get("message")) and isinstance(mt, MessageTool) and mt._sent_in_turn:
|
if (mt := self.tools.get("message")) and isinstance(mt, MessageTool) and mt._sent_in_turn:
|
||||||
return None
|
return None
|
||||||
|
|||||||
@@ -591,8 +591,8 @@ class TestNewCommandArchival:
|
|||||||
assert loop.sessions.get_or_create("cli:test").messages == []
|
assert loop.sessions.get_or_create("cli:test").messages == []
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_close_mcp_drains_pending_archives(self, tmp_path: Path) -> None:
|
async def test_close_mcp_drains_background_tasks(self, tmp_path: Path) -> None:
|
||||||
"""close_mcp waits for background archive tasks to complete."""
|
"""close_mcp waits for background tasks to complete."""
|
||||||
from nanobot.bus.events import InboundMessage
|
from nanobot.bus.events import InboundMessage
|
||||||
|
|
||||||
loop = self._make_loop(tmp_path)
|
loop = self._make_loop(tmp_path)
|
||||||
|
|||||||
Reference in New Issue
Block a user