diff --git a/.gitignore b/.gitignore index ef52b9c..fce6e07 100644 --- a/.gitignore +++ b/.gitignore @@ -20,4 +20,6 @@ __pycache__/ poetry.lock .pytest_cache/ botpy.log -nano.*.save \ No newline at end of file +nano.*.save +.DS_Store +uv.lock diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index 26b39c2..34f5baa 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -100,7 +100,7 @@ class AgentLoop: self._mcp_connected = False self._mcp_connecting = False self._active_tasks: dict[str, list[asyncio.Task]] = {} # session_key -> tasks - self._pending_archives: list[asyncio.Task] = [] + self._background_tasks: list[asyncio.Task] = [] self._processing_lock = asyncio.Lock() self.memory_consolidator = MemoryConsolidator( workspace=workspace, @@ -332,9 +332,9 @@ class AgentLoop: async def close_mcp(self) -> None: """Drain pending background archives, then close MCP connections.""" - if self._pending_archives: - await asyncio.gather(*self._pending_archives, return_exceptions=True) - self._pending_archives.clear() + if self._background_tasks: + await asyncio.gather(*self._background_tasks, return_exceptions=True) + self._background_tasks.clear() if self._mcp_stack: try: await self._mcp_stack.aclose() @@ -342,6 +342,12 @@ class AgentLoop: pass # MCP SDK cancel scope cleanup is noisy but harmless self._mcp_stack = None + def _schedule_background(self, coro) -> None: + """Schedule a coroutine as a tracked background task (drained on shutdown).""" + task = asyncio.create_task(coro) + self._background_tasks.append(task) + task.add_done_callback(self._background_tasks.remove) + def stop(self) -> None: """Stop the agent loop.""" self._running = False @@ -371,7 +377,7 @@ class AgentLoop: final_content, _, all_msgs = await self._run_agent_loop(messages) self._save_turn(session, all_msgs, 1 + len(history)) self.sessions.save(session) - await self.memory_consolidator.maybe_consolidate_by_tokens(session) + self._schedule_background(self.memory_consolidator.maybe_consolidate_by_tokens(session)) return OutboundMessage(channel=channel, chat_id=chat_id, content=final_content or "Background task completed.") @@ -390,11 +396,7 @@ class AgentLoop: self.sessions.invalidate(session.key) if snapshot: - task = asyncio.create_task( - self.memory_consolidator.archive_messages(snapshot) - ) - self._pending_archives.append(task) - task.add_done_callback(self._pending_archives.remove) + self._schedule_background(self.memory_consolidator.archive_messages(snapshot)) return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, content="New session started.") @@ -441,7 +443,7 @@ class AgentLoop: self._save_turn(session, all_msgs, 1 + len(history)) self.sessions.save(session) - await self.memory_consolidator.maybe_consolidate_by_tokens(session) + self._schedule_background(self.memory_consolidator.maybe_consolidate_by_tokens(session)) if (mt := self.tools.get("message")) and isinstance(mt, MessageTool) and mt._sent_in_turn: return None diff --git a/tests/test_consolidate_offset.py b/tests/test_consolidate_offset.py index b97dd87..21e1e78 100644 --- a/tests/test_consolidate_offset.py +++ b/tests/test_consolidate_offset.py @@ -591,8 +591,8 @@ class TestNewCommandArchival: assert loop.sessions.get_or_create("cli:test").messages == [] @pytest.mark.asyncio - async def test_close_mcp_drains_pending_archives(self, tmp_path: Path) -> None: - """close_mcp waits for background archive tasks to complete.""" + async def test_close_mcp_drains_background_tasks(self, tmp_path: Path) -> None: + """close_mcp waits for background tasks to complete.""" from nanobot.bus.events import InboundMessage loop = self._make_loop(tmp_path)