diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index 5fe0ee0..e834f27 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -250,6 +250,8 @@ class AgentLoop: """Run the agent loop, dispatching messages as tasks to stay responsive to /stop.""" self._running = True await self._connect_mcp() + # Start background consolidation task + await self.memory_consolidator.start_background_task() logger.info("Agent loop started") while self._running: @@ -327,10 +329,11 @@ class AgentLoop: pass # MCP SDK cancel scope cleanup is noisy but harmless self._mcp_stack = None - def stop(self) -> None: - """Stop the agent loop.""" + async def stop(self) -> None: + """Stop the agent loop and background tasks.""" self._running = False - logger.info("Agent loop stopping") + await self.memory_consolidator.stop_background_task() + logger.info("Agent loop stopped") async def _process_message( self, @@ -346,7 +349,8 @@ class AgentLoop: logger.info("Processing system message from {}", msg.sender_id) key = f"{channel}:{chat_id}" session = self.sessions.get_or_create(key) - await self.memory_consolidator.maybe_consolidate_by_tokens(session) + self.memory_consolidator.record_activity(key) + await self.memory_consolidator.maybe_consolidate_by_tokens_async(session) self._set_tool_context(channel, chat_id, msg.metadata.get("message_id")) history = session.get_history(max_messages=0) messages = self.context.build_messages( @@ -356,7 +360,6 @@ class AgentLoop: final_content, _, all_msgs = await self._run_agent_loop(messages) self._save_turn(session, all_msgs, 1 + len(history)) self.sessions.save(session) - await self.memory_consolidator.maybe_consolidate_by_tokens(session) return OutboundMessage(channel=channel, chat_id=chat_id, content=final_content or "Background task completed.") @@ -365,6 +368,7 @@ class AgentLoop: key = session_key or msg.session_key session = self.sessions.get_or_create(key) + self.memory_consolidator.record_activity(key) # Slash commands cmd = msg.content.strip().lower() @@ -400,7 +404,8 @@ class AgentLoop: return OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, content="\n".join(lines), ) - await self.memory_consolidator.maybe_consolidate_by_tokens(session) + # Record activity and schedule background consolidation for non-slash commands + self.memory_consolidator.record_activity(key) self._set_tool_context(msg.channel, msg.chat_id, msg.metadata.get("message_id")) if message_tool := self.tools.get("message"): @@ -432,7 +437,6 @@ class AgentLoop: self._save_turn(session, all_msgs, 1 + len(history)) self.sessions.save(session) - await self.memory_consolidator.maybe_consolidate_by_tokens(session) if (mt := self.tools.get("message")) and isinstance(mt, MessageTool) and mt._sent_in_turn: return None diff --git a/nanobot/agent/memory.py b/nanobot/agent/memory.py index 1301d47..9a4e0d7 100644 --- a/nanobot/agent/memory.py +++ b/nanobot/agent/memory.py @@ -149,9 +149,14 @@ class MemoryStore: class MemoryConsolidator: - """Owns consolidation policy, locking, and session offset updates.""" + """Owns consolidation policy, locking, and session offset updates. + + Consolidation runs asynchronously in the background when sessions are idle, + so it doesn't block user interactions. + """ _MAX_CONSOLIDATION_ROUNDS = 5 + _IDLE_CHECK_INTERVAL = 30 # seconds between idle checks def __init__( self, @@ -171,11 +176,57 @@ class MemoryConsolidator: self._build_messages = build_messages self._get_tool_definitions = get_tool_definitions self._locks: weakref.WeakValueDictionary[str, asyncio.Lock] = weakref.WeakValueDictionary() + self._background_task: asyncio.Task[None] | None = None + self._stop_event = asyncio.Event() + self._session_last_activity: dict[str, float] = {} # session_key -> last activity timestamp def get_lock(self, session_key: str) -> asyncio.Lock: """Return the shared consolidation lock for one session.""" return self._locks.setdefault(session_key, asyncio.Lock()) + def record_activity(self, session_key: str) -> None: + """Record that a session is active (for idle detection).""" + self._session_last_activity[session_key] = asyncio.get_event_loop().time() + + async def start_background_task(self) -> None: + """Start the background task that checks for idle sessions and consolidates.""" + if self._background_task is not None and not self._background_task.done(): + return # Already running + self._stop_event.clear() + self._background_task = asyncio.create_task(self._idle_consolidation_loop()) + + async def stop_background_task(self) -> None: + """Stop the background task.""" + self._stop_event.set() + if self._background_task is not None and not self._background_task.done(): + self._background_task.cancel() + try: + await self._background_task + except asyncio.CancelledError: + pass + self._background_task = None + + async def _idle_consolidation_loop(self) -> None: + """Background loop that checks for idle sessions and triggers consolidation.""" + while not self._stop_event.is_set(): + try: + await asyncio.sleep(self._IDLE_CHECK_INTERVAL) + if self._stop_event.is_set(): + break + + # Check all sessions for idleness + current_time = asyncio.get_event_loop().time() + for session in list(self.sessions.all()): + last_active = self._session_last_activity.get(session.key, 0) + if current_time - last_active > self._IDLE_CHECK_INTERVAL * 2: + # Session is idle, trigger consolidation + await self.maybe_consolidate_by_tokens_async(session) + + except asyncio.CancelledError: + break + except Exception: + logger.exception("Error in background consolidation loop") + async def consolidate_messages(self, messages: list[dict[str, object]]) -> bool: """Archive a selected message chunk into persistent memory.""" return await self.store.consolidate(messages, self.provider, self.model) @@ -228,8 +279,26 @@ class MemoryConsolidator: return True return await self.consolidate_messages(snapshot) - async def maybe_consolidate_by_tokens(self, session: Session) -> None: - """Loop: archive old messages until prompt fits within half the context window.""" + def maybe_consolidate_by_tokens(self, session: Session) -> None: + """Schedule token-based consolidation to run asynchronously in background. + + This method is synchronous and just schedules the consolidation task. + The actual consolidation runs in the background when the session is idle. + """ + if not session.messages or self.context_window_tokens <= 0: + return + # Schedule for background execution + asyncio.create_task(self._schedule_consolidation(session)) + + async def _schedule_consolidation(self, session: Session) -> None: + """Internal method to run consolidation asynchronously.""" + await self.maybe_consolidate_by_tokens_async(session) + + async def maybe_consolidate_by_tokens_async(self, session: Session) -> None: + """Async version: Loop and archive old messages until prompt fits within half the context window. + + This is called from the background task when a session is idle. + """ if not session.messages or self.context_window_tokens <= 0: return @@ -284,3 +353,11 @@ class MemoryConsolidator: estimated, source = self.estimate_session_prompt_tokens(session) if estimated <= 0: return + + logger.debug( + "Token consolidation complete for {}: {}/{} via {}", + session.key, + estimated, + self.context_window_tokens, + source, + )