From 755e42412717c0c4372b79c4d53f0dcb050351f7 Mon Sep 17 00:00:00 2001 From: Alexander Minges Date: Fri, 20 Feb 2026 12:38:43 +0100 Subject: [PATCH 01/17] fix(loop): serialize /new consolidation and track task refs --- nanobot/agent/loop.py | 251 ++++++++++++++++++++----------- tests/test_consolidate_offset.py | 246 ++++++++++++++++++++++++++++++ 2 files changed, 407 insertions(+), 90 deletions(-) diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index 3016d92..7806fb8 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -21,7 +21,6 @@ from nanobot.agent.tools.web import WebSearchTool, WebFetchTool from nanobot.agent.tools.message import MessageTool from nanobot.agent.tools.spawn import SpawnTool from nanobot.agent.tools.cron import CronTool -from nanobot.agent.memory import MemoryStore from nanobot.agent.subagent import SubagentManager from nanobot.session.manager import Session, SessionManager @@ -57,6 +56,7 @@ class AgentLoop: ): from nanobot.config.schema import ExecToolConfig from nanobot.cron.service import CronService + self.bus = bus self.provider = provider self.workspace = workspace @@ -84,14 +84,16 @@ class AgentLoop: exec_config=self.exec_config, restrict_to_workspace=restrict_to_workspace, ) - + self._running = False self._mcp_servers = mcp_servers or {} self._mcp_stack: AsyncExitStack | None = None self._mcp_connected = False self._consolidating: set[str] = set() # Session keys with consolidation in progress + self._consolidation_tasks: set[asyncio.Task] = set() # Keep strong refs for in-flight tasks + self._consolidation_locks: dict[str, asyncio.Lock] = {} self._register_default_tools() - + def _register_default_tools(self) -> None: """Register the default set of tools.""" # File tools (workspace for relative paths, restrict if configured) @@ -100,36 +102,39 @@ class AgentLoop: self.tools.register(WriteFileTool(workspace=self.workspace, allowed_dir=allowed_dir)) self.tools.register(EditFileTool(workspace=self.workspace, allowed_dir=allowed_dir)) self.tools.register(ListDirTool(workspace=self.workspace, allowed_dir=allowed_dir)) - + # Shell tool - self.tools.register(ExecTool( - working_dir=str(self.workspace), - timeout=self.exec_config.timeout, - restrict_to_workspace=self.restrict_to_workspace, - )) - + self.tools.register( + ExecTool( + working_dir=str(self.workspace), + timeout=self.exec_config.timeout, + restrict_to_workspace=self.restrict_to_workspace, + ) + ) + # Web tools self.tools.register(WebSearchTool(api_key=self.brave_api_key)) self.tools.register(WebFetchTool()) - + # Message tool message_tool = MessageTool(send_callback=self.bus.publish_outbound) self.tools.register(message_tool) - + # Spawn tool (for subagents) spawn_tool = SpawnTool(manager=self.subagents) self.tools.register(spawn_tool) - + # Cron tool (for scheduling) if self.cron_service: self.tools.register(CronTool(self.cron_service)) - + async def _connect_mcp(self) -> None: """Connect to configured MCP servers (one-time, lazy).""" if self._mcp_connected or not self._mcp_servers: return self._mcp_connected = True from nanobot.agent.tools.mcp import connect_mcp_servers + self._mcp_stack = AsyncExitStack() await self._mcp_stack.__aenter__() await connect_mcp_servers(self._mcp_servers, self.tools, self._mcp_stack) @@ -158,11 +163,13 @@ class AgentLoop: @staticmethod def _tool_hint(tool_calls: list) -> str: """Format tool calls as concise hint, e.g. 'web_search("query")'.""" + def _fmt(tc): val = next(iter(tc.arguments.values()), None) if tc.arguments else None if not isinstance(val, str): return tc.name return f'{tc.name}("{val[:40]}…")' if len(val) > 40 else f'{tc.name}("{val}")' + return ", ".join(_fmt(tc) for tc in tool_calls) async def _run_agent_loop( @@ -210,13 +217,15 @@ class AgentLoop: "type": "function", "function": { "name": tc.name, - "arguments": json.dumps(tc.arguments, ensure_ascii=False) - } + "arguments": json.dumps(tc.arguments, ensure_ascii=False), + }, } for tc in response.tool_calls ] messages = self.context.add_assistant_message( - messages, response.content, tool_call_dicts, + messages, + response.content, + tool_call_dicts, reasoning_content=response.reasoning_content, ) @@ -234,9 +243,13 @@ class AgentLoop: # Give them one retry; don't forward the text to avoid duplicates. if not tools_used and not text_only_retried and final_content: text_only_retried = True - logger.debug("Interim text response (no tools used yet), retrying: {}", final_content[:80]) + logger.debug( + "Interim text response (no tools used yet), retrying: {}", + final_content[:80], + ) messages = self.context.add_assistant_message( - messages, response.content, + messages, + response.content, reasoning_content=response.reasoning_content, ) final_content = None @@ -253,24 +266,23 @@ class AgentLoop: while self._running: try: - msg = await asyncio.wait_for( - self.bus.consume_inbound(), - timeout=1.0 - ) + msg = await asyncio.wait_for(self.bus.consume_inbound(), timeout=1.0) try: response = await self._process_message(msg) if response: await self.bus.publish_outbound(response) except Exception as e: logger.error("Error processing message: {}", e) - await self.bus.publish_outbound(OutboundMessage( - channel=msg.channel, - chat_id=msg.chat_id, - content=f"Sorry, I encountered an error: {str(e)}" - )) + await self.bus.publish_outbound( + OutboundMessage( + channel=msg.channel, + chat_id=msg.chat_id, + content=f"Sorry, I encountered an error: {str(e)}", + ) + ) except asyncio.TimeoutError: continue - + async def close_mcp(self) -> None: """Close MCP connections.""" if self._mcp_stack: @@ -284,7 +296,15 @@ class AgentLoop: """Stop the agent loop.""" self._running = False logger.info("Agent loop stopping") - + + def _get_consolidation_lock(self, session_key: str) -> asyncio.Lock: + """Return a per-session lock for memory consolidation writers.""" + lock = self._consolidation_locks.get(session_key) + if lock is None: + lock = asyncio.Lock() + self._consolidation_locks[session_key] = lock + return lock + async def _process_message( self, msg: InboundMessage, @@ -293,56 +313,75 @@ class AgentLoop: ) -> OutboundMessage | None: """ Process a single inbound message. - + Args: msg: The inbound message to process. session_key: Override session key (used by process_direct). on_progress: Optional callback for intermediate output (defaults to bus publish). - + Returns: The response message, or None if no response needed. """ # System messages route back via chat_id ("channel:chat_id") if msg.channel == "system": return await self._process_system_message(msg) - + preview = msg.content[:80] + "..." if len(msg.content) > 80 else msg.content logger.info("Processing message from {}:{}: {}", msg.channel, msg.sender_id, preview) - + key = session_key or msg.session_key session = self.sessions.get_or_create(key) - + # Handle slash commands cmd = msg.content.strip().lower() if cmd == "/new": - # Capture messages before clearing (avoid race condition with background task) messages_to_archive = session.messages.copy() + lock = self._get_consolidation_lock(session.key) + + try: + async with lock: + temp_session = Session(key=session.key) + temp_session.messages = messages_to_archive + await self._consolidate_memory(temp_session, archive_all=True) + except Exception as e: + logger.error("/new archival failed for {}: {}", session.key, e) + return OutboundMessage( + channel=msg.channel, + chat_id=msg.chat_id, + content="Could not start a new session because memory archival failed. Please try again.", + ) + session.clear() self.sessions.save(session) self.sessions.invalidate(session.key) - - async def _consolidate_and_cleanup(): - temp_session = Session(key=session.key) - temp_session.messages = messages_to_archive - await self._consolidate_memory(temp_session, archive_all=True) - - asyncio.create_task(_consolidate_and_cleanup()) - return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, - content="New session started. Memory consolidation in progress.") + return OutboundMessage( + channel=msg.channel, + chat_id=msg.chat_id, + content="New session started. Memory consolidation in progress.", + ) if cmd == "/help": - return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, - content="🐈 nanobot commands:\n/new — Start a new conversation\n/help — Show available commands") - + return OutboundMessage( + channel=msg.channel, + chat_id=msg.chat_id, + content="🐈 nanobot commands:\n/new — Start a new conversation\n/help — Show available commands", + ) + if len(session.messages) > self.memory_window and session.key not in self._consolidating: self._consolidating.add(session.key) + lock = self._get_consolidation_lock(session.key) async def _consolidate_and_unlock(): try: - await self._consolidate_memory(session) + async with lock: + await self._consolidate_memory(session) finally: self._consolidating.discard(session.key) + task = asyncio.current_task() + if task is not None: + self._consolidation_tasks.discard(task) - asyncio.create_task(_consolidate_and_unlock()) + task = asyncio.create_task(_consolidate_and_unlock()) + self._consolidation_tasks.add(task) self._set_tool_context(msg.channel, msg.chat_id, msg.metadata.get("message_id")) initial_messages = self.context.build_messages( @@ -354,42 +393,49 @@ class AgentLoop: ) async def _bus_progress(content: str) -> None: - await self.bus.publish_outbound(OutboundMessage( - channel=msg.channel, chat_id=msg.chat_id, content=content, - metadata=msg.metadata or {}, - )) + await self.bus.publish_outbound( + OutboundMessage( + channel=msg.channel, + chat_id=msg.chat_id, + content=content, + metadata=msg.metadata or {}, + ) + ) final_content, tools_used = await self._run_agent_loop( - initial_messages, on_progress=on_progress or _bus_progress, + initial_messages, + on_progress=on_progress or _bus_progress, ) if final_content is None: final_content = "I've completed processing but have no response to give." - + preview = final_content[:120] + "..." if len(final_content) > 120 else final_content logger.info("Response to {}:{}: {}", msg.channel, msg.sender_id, preview) - + session.add_message("user", msg.content) - session.add_message("assistant", final_content, - tools_used=tools_used if tools_used else None) + session.add_message( + "assistant", final_content, tools_used=tools_used if tools_used else None + ) self.sessions.save(session) - + return OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, content=final_content, - metadata=msg.metadata or {}, # Pass through for channel-specific needs (e.g. Slack thread_ts) + metadata=msg.metadata + or {}, # Pass through for channel-specific needs (e.g. Slack thread_ts) ) - + async def _process_system_message(self, msg: InboundMessage) -> OutboundMessage | None: """ Process a system message (e.g., subagent announce). - + The chat_id field contains "original_channel:original_chat_id" to route the response back to the correct destination. """ logger.info("Processing system message from {}", msg.sender_id) - + # Parse origin from chat_id (format: "channel:chat_id") if ":" in msg.chat_id: parts = msg.chat_id.split(":", 1) @@ -399,7 +445,7 @@ class AgentLoop: # Fallback origin_channel = "cli" origin_chat_id = msg.chat_id - + session_key = f"{origin_channel}:{origin_chat_id}" session = self.sessions.get_or_create(session_key) self._set_tool_context(origin_channel, origin_chat_id, msg.metadata.get("message_id")) @@ -413,17 +459,15 @@ class AgentLoop: if final_content is None: final_content = "Background task completed." - + session.add_message("user", f"[System: {msg.sender_id}] {msg.content}") session.add_message("assistant", final_content) self.sessions.save(session) - + return OutboundMessage( - channel=origin_channel, - chat_id=origin_chat_id, - content=final_content + channel=origin_channel, chat_id=origin_chat_id, content=final_content ) - + async def _consolidate_memory(self, session, archive_all: bool = False) -> None: """Consolidate old messages into MEMORY.md + HISTORY.md. @@ -431,34 +475,54 @@ class AgentLoop: archive_all: If True, clear all messages and reset session (for /new command). If False, only write to files without modifying session. """ - memory = MemoryStore(self.workspace) + memory = self.context.memory if archive_all: old_messages = session.messages keep_count = 0 - logger.info("Memory consolidation (archive_all): {} total messages archived", len(session.messages)) + logger.info( + "Memory consolidation (archive_all): {} total messages archived", + len(session.messages), + ) else: keep_count = self.memory_window // 2 if len(session.messages) <= keep_count: - logger.debug("Session {}: No consolidation needed (messages={}, keep={})", session.key, len(session.messages), keep_count) + logger.debug( + "Session {}: No consolidation needed (messages={}, keep={})", + session.key, + len(session.messages), + keep_count, + ) return messages_to_process = len(session.messages) - session.last_consolidated if messages_to_process <= 0: - logger.debug("Session {}: No new messages to consolidate (last_consolidated={}, total={})", session.key, session.last_consolidated, len(session.messages)) + logger.debug( + "Session {}: No new messages to consolidate (last_consolidated={}, total={})", + session.key, + session.last_consolidated, + len(session.messages), + ) return - old_messages = session.messages[session.last_consolidated:-keep_count] + old_messages = session.messages[session.last_consolidated : -keep_count] if not old_messages: return - logger.info("Memory consolidation started: {} total, {} new to consolidate, {} keep", len(session.messages), len(old_messages), keep_count) + logger.info( + "Memory consolidation started: {} total, {} new to consolidate, {} keep", + len(session.messages), + len(old_messages), + keep_count, + ) lines = [] for m in old_messages: if not m.get("content"): continue tools = f" [tools: {', '.join(m['tools_used'])}]" if m.get("tools_used") else "" - lines.append(f"[{m.get('timestamp', '?')[:16]}] {m['role'].upper()}{tools}: {m['content']}") + lines.append( + f"[{m.get('timestamp', '?')[:16]}] {m['role'].upper()}{tools}: {m['content']}" + ) conversation = "\n".join(lines) current_memory = memory.read_long_term() @@ -487,7 +551,10 @@ Respond with ONLY valid JSON, no markdown fences.""" try: response = await self.provider.chat( messages=[ - {"role": "system", "content": "You are a memory consolidation agent. Respond only with valid JSON."}, + { + "role": "system", + "content": "You are a memory consolidation agent. Respond only with valid JSON.", + }, {"role": "user", "content": prompt}, ], model=self.model, @@ -500,7 +567,10 @@ Respond with ONLY valid JSON, no markdown fences.""" text = text.split("\n", 1)[-1].rsplit("```", 1)[0].strip() result = json_repair.loads(text) if not isinstance(result, dict): - logger.warning("Memory consolidation: unexpected response type, skipping. Response: {}", text[:200]) + logger.warning( + "Memory consolidation: unexpected response type, skipping. Response: {}", + text[:200], + ) return if entry := result.get("history_entry"): @@ -519,7 +589,11 @@ Respond with ONLY valid JSON, no markdown fences.""" session.last_consolidated = 0 else: session.last_consolidated = len(session.messages) - keep_count - logger.info("Memory consolidation done: {} messages, last_consolidated={}", len(session.messages), session.last_consolidated) + logger.info( + "Memory consolidation done: {} messages, last_consolidated={}", + len(session.messages), + session.last_consolidated, + ) except Exception as e: logger.error("Memory consolidation failed: {}", e) @@ -533,24 +607,21 @@ Respond with ONLY valid JSON, no markdown fences.""" ) -> str: """ Process a message directly (for CLI or cron usage). - + Args: content: The message content. session_key: Session identifier (overrides channel:chat_id for session lookup). channel: Source channel (for tool context routing). chat_id: Source chat ID (for tool context routing). on_progress: Optional callback for intermediate output. - + Returns: The agent's response. """ await self._connect_mcp() - msg = InboundMessage( - channel=channel, - sender_id="user", - chat_id=chat_id, - content=content + msg = InboundMessage(channel=channel, sender_id="user", chat_id=chat_id, content=content) + + response = await self._process_message( + msg, session_key=session_key, on_progress=on_progress ) - - response = await self._process_message(msg, session_key=session_key, on_progress=on_progress) return response.content if response else "" diff --git a/tests/test_consolidate_offset.py b/tests/test_consolidate_offset.py index e204733..6162fa0 100644 --- a/tests/test_consolidate_offset.py +++ b/tests/test_consolidate_offset.py @@ -1,5 +1,8 @@ """Test session management with cache-friendly message handling.""" +import asyncio +from unittest.mock import AsyncMock, MagicMock + import pytest from pathlib import Path from nanobot.session.manager import Session, SessionManager @@ -475,3 +478,246 @@ class TestEmptyAndBoundarySessions: expected_count = 60 - KEEP_COUNT - 10 assert len(old_messages) == expected_count assert_messages_content(old_messages, 10, 34) + + +class TestConsolidationDeduplicationGuard: + """Test that consolidation tasks are deduplicated and serialized.""" + + @pytest.mark.asyncio + async def test_consolidation_guard_prevents_duplicate_tasks(self, tmp_path: Path) -> None: + """Concurrent messages above memory_window spawn only one consolidation task.""" + from nanobot.agent.loop import AgentLoop + from nanobot.bus.events import InboundMessage + from nanobot.bus.queue import MessageBus + from nanobot.providers.base import LLMResponse + + bus = MessageBus() + provider = MagicMock() + provider.get_default_model.return_value = "test-model" + loop = AgentLoop( + bus=bus, provider=provider, workspace=tmp_path, model="test-model", memory_window=10 + ) + + loop.provider.chat = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[])) + loop.tools.get_definitions = MagicMock(return_value=[]) + + session = loop.sessions.get_or_create("cli:test") + for i in range(15): + session.add_message("user", f"msg{i}") + session.add_message("assistant", f"resp{i}") + loop.sessions.save(session) + + consolidation_calls = 0 + + async def _fake_consolidate(_session, archive_all: bool = False) -> None: + nonlocal consolidation_calls + consolidation_calls += 1 + await asyncio.sleep(0.05) + + loop._consolidate_memory = _fake_consolidate # type: ignore[method-assign] + + msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="hello") + await loop._process_message(msg) + await loop._process_message(msg) + await asyncio.sleep(0.1) + + assert consolidation_calls == 1, ( + f"Expected exactly 1 consolidation, got {consolidation_calls}" + ) + + @pytest.mark.asyncio + async def test_new_command_guard_prevents_concurrent_consolidation( + self, tmp_path: Path + ) -> None: + """/new command does not run consolidation concurrently with in-flight consolidation.""" + from nanobot.agent.loop import AgentLoop + from nanobot.bus.events import InboundMessage + from nanobot.bus.queue import MessageBus + from nanobot.providers.base import LLMResponse + + bus = MessageBus() + provider = MagicMock() + provider.get_default_model.return_value = "test-model" + loop = AgentLoop( + bus=bus, provider=provider, workspace=tmp_path, model="test-model", memory_window=10 + ) + + loop.provider.chat = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[])) + loop.tools.get_definitions = MagicMock(return_value=[]) + + session = loop.sessions.get_or_create("cli:test") + for i in range(15): + session.add_message("user", f"msg{i}") + session.add_message("assistant", f"resp{i}") + loop.sessions.save(session) + + consolidation_calls = 0 + active = 0 + max_active = 0 + + async def _fake_consolidate(_session, archive_all: bool = False) -> None: + nonlocal consolidation_calls, active, max_active + consolidation_calls += 1 + active += 1 + max_active = max(max_active, active) + await asyncio.sleep(0.05) + active -= 1 + + loop._consolidate_memory = _fake_consolidate # type: ignore[method-assign] + + msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="hello") + await loop._process_message(msg) + + new_msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="/new") + await loop._process_message(new_msg) + await asyncio.sleep(0.1) + + assert consolidation_calls == 2, ( + f"Expected normal + /new consolidations, got {consolidation_calls}" + ) + assert max_active == 1, ( + f"Expected serialized consolidation, observed concurrency={max_active}" + ) + + @pytest.mark.asyncio + async def test_consolidation_tasks_are_referenced(self, tmp_path: Path) -> None: + """create_task results are tracked in _consolidation_tasks while in flight.""" + from nanobot.agent.loop import AgentLoop + from nanobot.bus.events import InboundMessage + from nanobot.bus.queue import MessageBus + from nanobot.providers.base import LLMResponse + + bus = MessageBus() + provider = MagicMock() + provider.get_default_model.return_value = "test-model" + loop = AgentLoop( + bus=bus, provider=provider, workspace=tmp_path, model="test-model", memory_window=10 + ) + + loop.provider.chat = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[])) + loop.tools.get_definitions = MagicMock(return_value=[]) + + session = loop.sessions.get_or_create("cli:test") + for i in range(15): + session.add_message("user", f"msg{i}") + session.add_message("assistant", f"resp{i}") + loop.sessions.save(session) + + started = asyncio.Event() + + async def _slow_consolidate(_session, archive_all: bool = False) -> None: + started.set() + await asyncio.sleep(0.1) + + loop._consolidate_memory = _slow_consolidate # type: ignore[method-assign] + + msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="hello") + await loop._process_message(msg) + + await started.wait() + assert len(loop._consolidation_tasks) == 1, "Task must be referenced while in-flight" + + await asyncio.sleep(0.15) + assert len(loop._consolidation_tasks) == 0, ( + "Task reference must be removed after completion" + ) + + @pytest.mark.asyncio + async def test_new_waits_for_inflight_consolidation_and_preserves_messages( + self, tmp_path: Path + ) -> None: + """/new waits for in-flight consolidation and archives before clear.""" + from nanobot.agent.loop import AgentLoop + from nanobot.bus.events import InboundMessage + from nanobot.bus.queue import MessageBus + from nanobot.providers.base import LLMResponse + + bus = MessageBus() + provider = MagicMock() + provider.get_default_model.return_value = "test-model" + loop = AgentLoop( + bus=bus, provider=provider, workspace=tmp_path, model="test-model", memory_window=10 + ) + + loop.provider.chat = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[])) + loop.tools.get_definitions = MagicMock(return_value=[]) + + session = loop.sessions.get_or_create("cli:test") + for i in range(15): + session.add_message("user", f"msg{i}") + session.add_message("assistant", f"resp{i}") + loop.sessions.save(session) + + started = asyncio.Event() + release = asyncio.Event() + archived_count = 0 + + async def _fake_consolidate(sess, archive_all: bool = False) -> None: + nonlocal archived_count + if archive_all: + archived_count = len(sess.messages) + return + started.set() + await release.wait() + + loop._consolidate_memory = _fake_consolidate # type: ignore[method-assign] + + msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="hello") + await loop._process_message(msg) + await started.wait() + + new_msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="/new") + pending_new = asyncio.create_task(loop._process_message(new_msg)) + + await asyncio.sleep(0.02) + assert not pending_new.done(), "/new should wait while consolidation is in-flight" + + release.set() + response = await pending_new + assert response is not None + assert "new session started" in response.content.lower() + assert archived_count > 0, "Expected /new archival to process a non-empty snapshot" + + session_after = loop.sessions.get_or_create("cli:test") + assert session_after.messages == [], "Session should be cleared after successful archival" + + @pytest.mark.asyncio + async def test_new_does_not_clear_session_when_archive_fails(self, tmp_path: Path) -> None: + """/new keeps session data if archive step fails.""" + from nanobot.agent.loop import AgentLoop + from nanobot.bus.events import InboundMessage + from nanobot.bus.queue import MessageBus + from nanobot.providers.base import LLMResponse + + bus = MessageBus() + provider = MagicMock() + provider.get_default_model.return_value = "test-model" + loop = AgentLoop( + bus=bus, provider=provider, workspace=tmp_path, model="test-model", memory_window=10 + ) + + loop.provider.chat = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[])) + loop.tools.get_definitions = MagicMock(return_value=[]) + + session = loop.sessions.get_or_create("cli:test") + for i in range(5): + session.add_message("user", f"msg{i}") + session.add_message("assistant", f"resp{i}") + loop.sessions.save(session) + before_count = len(session.messages) + + async def _failing_consolidate(_session, archive_all: bool = False) -> None: + if archive_all: + raise RuntimeError("forced archive failure") + + loop._consolidate_memory = _failing_consolidate # type: ignore[method-assign] + + new_msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="/new") + response = await loop._process_message(new_msg) + + assert response is not None + assert "failed" in response.content.lower() + session_after = loop.sessions.get_or_create("cli:test") + assert len(session_after.messages) == before_count, ( + "Session must remain intact when /new archival fails" + ) From 5f9eca466484e52ce535d4f20f4d0b87581da5db Mon Sep 17 00:00:00 2001 From: Alexander Minges Date: Fri, 20 Feb 2026 12:46:11 +0100 Subject: [PATCH 02/17] style(loop): remove formatting-only changes from upstream PR 881 --- nanobot/agent/loop.py | 220 ++++++++++++++++-------------------------- 1 file changed, 85 insertions(+), 135 deletions(-) diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index 7806fb8..481b72e 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -56,7 +56,6 @@ class AgentLoop: ): from nanobot.config.schema import ExecToolConfig from nanobot.cron.service import CronService - self.bus = bus self.provider = provider self.workspace = workspace @@ -84,16 +83,16 @@ class AgentLoop: exec_config=self.exec_config, restrict_to_workspace=restrict_to_workspace, ) - + self._running = False self._mcp_servers = mcp_servers or {} self._mcp_stack: AsyncExitStack | None = None self._mcp_connected = False self._consolidating: set[str] = set() # Session keys with consolidation in progress - self._consolidation_tasks: set[asyncio.Task] = set() # Keep strong refs for in-flight tasks + self._consolidation_tasks: set[asyncio.Task] = set() # Strong refs to in-flight tasks self._consolidation_locks: dict[str, asyncio.Lock] = {} self._register_default_tools() - + def _register_default_tools(self) -> None: """Register the default set of tools.""" # File tools (workspace for relative paths, restrict if configured) @@ -102,39 +101,36 @@ class AgentLoop: self.tools.register(WriteFileTool(workspace=self.workspace, allowed_dir=allowed_dir)) self.tools.register(EditFileTool(workspace=self.workspace, allowed_dir=allowed_dir)) self.tools.register(ListDirTool(workspace=self.workspace, allowed_dir=allowed_dir)) - + # Shell tool - self.tools.register( - ExecTool( - working_dir=str(self.workspace), - timeout=self.exec_config.timeout, - restrict_to_workspace=self.restrict_to_workspace, - ) - ) - + self.tools.register(ExecTool( + working_dir=str(self.workspace), + timeout=self.exec_config.timeout, + restrict_to_workspace=self.restrict_to_workspace, + )) + # Web tools self.tools.register(WebSearchTool(api_key=self.brave_api_key)) self.tools.register(WebFetchTool()) - + # Message tool message_tool = MessageTool(send_callback=self.bus.publish_outbound) self.tools.register(message_tool) - + # Spawn tool (for subagents) spawn_tool = SpawnTool(manager=self.subagents) self.tools.register(spawn_tool) - + # Cron tool (for scheduling) if self.cron_service: self.tools.register(CronTool(self.cron_service)) - + async def _connect_mcp(self) -> None: """Connect to configured MCP servers (one-time, lazy).""" if self._mcp_connected or not self._mcp_servers: return self._mcp_connected = True from nanobot.agent.tools.mcp import connect_mcp_servers - self._mcp_stack = AsyncExitStack() await self._mcp_stack.__aenter__() await connect_mcp_servers(self._mcp_servers, self.tools, self._mcp_stack) @@ -163,13 +159,11 @@ class AgentLoop: @staticmethod def _tool_hint(tool_calls: list) -> str: """Format tool calls as concise hint, e.g. 'web_search("query")'.""" - def _fmt(tc): val = next(iter(tc.arguments.values()), None) if tc.arguments else None if not isinstance(val, str): return tc.name return f'{tc.name}("{val[:40]}…")' if len(val) > 40 else f'{tc.name}("{val}")' - return ", ".join(_fmt(tc) for tc in tool_calls) async def _run_agent_loop( @@ -217,15 +211,13 @@ class AgentLoop: "type": "function", "function": { "name": tc.name, - "arguments": json.dumps(tc.arguments, ensure_ascii=False), - }, + "arguments": json.dumps(tc.arguments, ensure_ascii=False) + } } for tc in response.tool_calls ] messages = self.context.add_assistant_message( - messages, - response.content, - tool_call_dicts, + messages, response.content, tool_call_dicts, reasoning_content=response.reasoning_content, ) @@ -243,13 +235,9 @@ class AgentLoop: # Give them one retry; don't forward the text to avoid duplicates. if not tools_used and not text_only_retried and final_content: text_only_retried = True - logger.debug( - "Interim text response (no tools used yet), retrying: {}", - final_content[:80], - ) + logger.debug("Interim text response (no tools used yet), retrying: {}", final_content[:80]) messages = self.context.add_assistant_message( - messages, - response.content, + messages, response.content, reasoning_content=response.reasoning_content, ) final_content = None @@ -266,23 +254,24 @@ class AgentLoop: while self._running: try: - msg = await asyncio.wait_for(self.bus.consume_inbound(), timeout=1.0) + msg = await asyncio.wait_for( + self.bus.consume_inbound(), + timeout=1.0 + ) try: response = await self._process_message(msg) if response: await self.bus.publish_outbound(response) except Exception as e: logger.error("Error processing message: {}", e) - await self.bus.publish_outbound( - OutboundMessage( - channel=msg.channel, - chat_id=msg.chat_id, - content=f"Sorry, I encountered an error: {str(e)}", - ) - ) + await self.bus.publish_outbound(OutboundMessage( + channel=msg.channel, + chat_id=msg.chat_id, + content=f"Sorry, I encountered an error: {str(e)}" + )) except asyncio.TimeoutError: continue - + async def close_mcp(self) -> None: """Close MCP connections.""" if self._mcp_stack: @@ -298,13 +287,12 @@ class AgentLoop: logger.info("Agent loop stopping") def _get_consolidation_lock(self, session_key: str) -> asyncio.Lock: - """Return a per-session lock for memory consolidation writers.""" lock = self._consolidation_locks.get(session_key) if lock is None: lock = asyncio.Lock() self._consolidation_locks[session_key] = lock return lock - + async def _process_message( self, msg: InboundMessage, @@ -313,25 +301,25 @@ class AgentLoop: ) -> OutboundMessage | None: """ Process a single inbound message. - + Args: msg: The inbound message to process. session_key: Override session key (used by process_direct). on_progress: Optional callback for intermediate output (defaults to bus publish). - + Returns: The response message, or None if no response needed. """ # System messages route back via chat_id ("channel:chat_id") if msg.channel == "system": return await self._process_system_message(msg) - + preview = msg.content[:80] + "..." if len(msg.content) > 80 else msg.content logger.info("Processing message from {}:{}: {}", msg.channel, msg.sender_id, preview) - + key = session_key or msg.session_key session = self.sessions.get_or_create(key) - + # Handle slash commands cmd = msg.content.strip().lower() if cmd == "/new": @@ -348,24 +336,18 @@ class AgentLoop: return OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, - content="Could not start a new session because memory archival failed. Please try again.", + content="Could not start a new session because memory archival failed. Please try again." ) session.clear() self.sessions.save(session) self.sessions.invalidate(session.key) - return OutboundMessage( - channel=msg.channel, - chat_id=msg.chat_id, - content="New session started. Memory consolidation in progress.", - ) + return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, + content="New session started. Memory consolidation in progress.") if cmd == "/help": - return OutboundMessage( - channel=msg.channel, - chat_id=msg.chat_id, - content="🐈 nanobot commands:\n/new — Start a new conversation\n/help — Show available commands", - ) - + return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, + content="🐈 nanobot commands:\n/new — Start a new conversation\n/help — Show available commands") + if len(session.messages) > self.memory_window and session.key not in self._consolidating: self._consolidating.add(session.key) lock = self._get_consolidation_lock(session.key) @@ -376,12 +358,12 @@ class AgentLoop: await self._consolidate_memory(session) finally: self._consolidating.discard(session.key) - task = asyncio.current_task() - if task is not None: - self._consolidation_tasks.discard(task) + _task = asyncio.current_task() + if _task is not None: + self._consolidation_tasks.discard(_task) - task = asyncio.create_task(_consolidate_and_unlock()) - self._consolidation_tasks.add(task) + _task = asyncio.create_task(_consolidate_and_unlock()) + self._consolidation_tasks.add(_task) self._set_tool_context(msg.channel, msg.chat_id, msg.metadata.get("message_id")) initial_messages = self.context.build_messages( @@ -393,49 +375,42 @@ class AgentLoop: ) async def _bus_progress(content: str) -> None: - await self.bus.publish_outbound( - OutboundMessage( - channel=msg.channel, - chat_id=msg.chat_id, - content=content, - metadata=msg.metadata or {}, - ) - ) + await self.bus.publish_outbound(OutboundMessage( + channel=msg.channel, chat_id=msg.chat_id, content=content, + metadata=msg.metadata or {}, + )) final_content, tools_used = await self._run_agent_loop( - initial_messages, - on_progress=on_progress or _bus_progress, + initial_messages, on_progress=on_progress or _bus_progress, ) if final_content is None: final_content = "I've completed processing but have no response to give." - + preview = final_content[:120] + "..." if len(final_content) > 120 else final_content logger.info("Response to {}:{}: {}", msg.channel, msg.sender_id, preview) - + session.add_message("user", msg.content) - session.add_message( - "assistant", final_content, tools_used=tools_used if tools_used else None - ) + session.add_message("assistant", final_content, + tools_used=tools_used if tools_used else None) self.sessions.save(session) - + return OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, content=final_content, - metadata=msg.metadata - or {}, # Pass through for channel-specific needs (e.g. Slack thread_ts) + metadata=msg.metadata or {}, # Pass through for channel-specific needs (e.g. Slack thread_ts) ) - + async def _process_system_message(self, msg: InboundMessage) -> OutboundMessage | None: """ Process a system message (e.g., subagent announce). - + The chat_id field contains "original_channel:original_chat_id" to route the response back to the correct destination. """ logger.info("Processing system message from {}", msg.sender_id) - + # Parse origin from chat_id (format: "channel:chat_id") if ":" in msg.chat_id: parts = msg.chat_id.split(":", 1) @@ -445,7 +420,7 @@ class AgentLoop: # Fallback origin_channel = "cli" origin_chat_id = msg.chat_id - + session_key = f"{origin_channel}:{origin_chat_id}" session = self.sessions.get_or_create(session_key) self._set_tool_context(origin_channel, origin_chat_id, msg.metadata.get("message_id")) @@ -459,15 +434,17 @@ class AgentLoop: if final_content is None: final_content = "Background task completed." - + session.add_message("user", f"[System: {msg.sender_id}] {msg.content}") session.add_message("assistant", final_content) self.sessions.save(session) - + return OutboundMessage( - channel=origin_channel, chat_id=origin_chat_id, content=final_content + channel=origin_channel, + chat_id=origin_chat_id, + content=final_content ) - + async def _consolidate_memory(self, session, archive_all: bool = False) -> None: """Consolidate old messages into MEMORY.md + HISTORY.md. @@ -480,49 +457,29 @@ class AgentLoop: if archive_all: old_messages = session.messages keep_count = 0 - logger.info( - "Memory consolidation (archive_all): {} total messages archived", - len(session.messages), - ) + logger.info("Memory consolidation (archive_all): {} total messages archived", len(session.messages)) else: keep_count = self.memory_window // 2 if len(session.messages) <= keep_count: - logger.debug( - "Session {}: No consolidation needed (messages={}, keep={})", - session.key, - len(session.messages), - keep_count, - ) + logger.debug("Session {}: No consolidation needed (messages={}, keep={})", session.key, len(session.messages), keep_count) return messages_to_process = len(session.messages) - session.last_consolidated if messages_to_process <= 0: - logger.debug( - "Session {}: No new messages to consolidate (last_consolidated={}, total={})", - session.key, - session.last_consolidated, - len(session.messages), - ) + logger.debug("Session {}: No new messages to consolidate (last_consolidated={}, total={})", session.key, session.last_consolidated, len(session.messages)) return - old_messages = session.messages[session.last_consolidated : -keep_count] + old_messages = session.messages[session.last_consolidated:-keep_count] if not old_messages: return - logger.info( - "Memory consolidation started: {} total, {} new to consolidate, {} keep", - len(session.messages), - len(old_messages), - keep_count, - ) + logger.info("Memory consolidation started: {} total, {} new to consolidate, {} keep", len(session.messages), len(old_messages), keep_count) lines = [] for m in old_messages: if not m.get("content"): continue tools = f" [tools: {', '.join(m['tools_used'])}]" if m.get("tools_used") else "" - lines.append( - f"[{m.get('timestamp', '?')[:16]}] {m['role'].upper()}{tools}: {m['content']}" - ) + lines.append(f"[{m.get('timestamp', '?')[:16]}] {m['role'].upper()}{tools}: {m['content']}") conversation = "\n".join(lines) current_memory = memory.read_long_term() @@ -551,10 +508,7 @@ Respond with ONLY valid JSON, no markdown fences.""" try: response = await self.provider.chat( messages=[ - { - "role": "system", - "content": "You are a memory consolidation agent. Respond only with valid JSON.", - }, + {"role": "system", "content": "You are a memory consolidation agent. Respond only with valid JSON."}, {"role": "user", "content": prompt}, ], model=self.model, @@ -567,10 +521,7 @@ Respond with ONLY valid JSON, no markdown fences.""" text = text.split("\n", 1)[-1].rsplit("```", 1)[0].strip() result = json_repair.loads(text) if not isinstance(result, dict): - logger.warning( - "Memory consolidation: unexpected response type, skipping. Response: {}", - text[:200], - ) + logger.warning("Memory consolidation: unexpected response type, skipping. Response: {}", text[:200]) return if entry := result.get("history_entry"): @@ -589,11 +540,7 @@ Respond with ONLY valid JSON, no markdown fences.""" session.last_consolidated = 0 else: session.last_consolidated = len(session.messages) - keep_count - logger.info( - "Memory consolidation done: {} messages, last_consolidated={}", - len(session.messages), - session.last_consolidated, - ) + logger.info("Memory consolidation done: {} messages, last_consolidated={}", len(session.messages), session.last_consolidated) except Exception as e: logger.error("Memory consolidation failed: {}", e) @@ -607,21 +554,24 @@ Respond with ONLY valid JSON, no markdown fences.""" ) -> str: """ Process a message directly (for CLI or cron usage). - + Args: content: The message content. session_key: Session identifier (overrides channel:chat_id for session lookup). channel: Source channel (for tool context routing). chat_id: Source chat ID (for tool context routing). on_progress: Optional callback for intermediate output. - + Returns: The agent's response. """ await self._connect_mcp() - msg = InboundMessage(channel=channel, sender_id="user", chat_id=chat_id, content=content) - - response = await self._process_message( - msg, session_key=session_key, on_progress=on_progress + msg = InboundMessage( + channel=channel, + sender_id="user", + chat_id=chat_id, + content=content ) + + response = await self._process_message(msg, session_key=session_key, on_progress=on_progress) return response.content if response else "" From 9ada8e68547bae6daeb8e6e43b7c1babdb519724 Mon Sep 17 00:00:00 2001 From: Alexander Minges Date: Fri, 20 Feb 2026 12:48:54 +0100 Subject: [PATCH 03/17] fix(loop): require successful archival before /new clear --- nanobot/agent/loop.py | 230 +++++++++++++++++++------------ tests/test_consolidate_offset.py | 12 +- 2 files changed, 151 insertions(+), 91 deletions(-) diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index 481b72e..4ff01ea 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -56,6 +56,7 @@ class AgentLoop: ): from nanobot.config.schema import ExecToolConfig from nanobot.cron.service import CronService + self.bus = bus self.provider = provider self.workspace = workspace @@ -83,7 +84,7 @@ class AgentLoop: exec_config=self.exec_config, restrict_to_workspace=restrict_to_workspace, ) - + self._running = False self._mcp_servers = mcp_servers or {} self._mcp_stack: AsyncExitStack | None = None @@ -92,7 +93,7 @@ class AgentLoop: self._consolidation_tasks: set[asyncio.Task] = set() # Strong refs to in-flight tasks self._consolidation_locks: dict[str, asyncio.Lock] = {} self._register_default_tools() - + def _register_default_tools(self) -> None: """Register the default set of tools.""" # File tools (workspace for relative paths, restrict if configured) @@ -101,36 +102,39 @@ class AgentLoop: self.tools.register(WriteFileTool(workspace=self.workspace, allowed_dir=allowed_dir)) self.tools.register(EditFileTool(workspace=self.workspace, allowed_dir=allowed_dir)) self.tools.register(ListDirTool(workspace=self.workspace, allowed_dir=allowed_dir)) - + # Shell tool - self.tools.register(ExecTool( - working_dir=str(self.workspace), - timeout=self.exec_config.timeout, - restrict_to_workspace=self.restrict_to_workspace, - )) - + self.tools.register( + ExecTool( + working_dir=str(self.workspace), + timeout=self.exec_config.timeout, + restrict_to_workspace=self.restrict_to_workspace, + ) + ) + # Web tools self.tools.register(WebSearchTool(api_key=self.brave_api_key)) self.tools.register(WebFetchTool()) - + # Message tool message_tool = MessageTool(send_callback=self.bus.publish_outbound) self.tools.register(message_tool) - + # Spawn tool (for subagents) spawn_tool = SpawnTool(manager=self.subagents) self.tools.register(spawn_tool) - + # Cron tool (for scheduling) if self.cron_service: self.tools.register(CronTool(self.cron_service)) - + async def _connect_mcp(self) -> None: """Connect to configured MCP servers (one-time, lazy).""" if self._mcp_connected or not self._mcp_servers: return self._mcp_connected = True from nanobot.agent.tools.mcp import connect_mcp_servers + self._mcp_stack = AsyncExitStack() await self._mcp_stack.__aenter__() await connect_mcp_servers(self._mcp_servers, self.tools, self._mcp_stack) @@ -159,11 +163,13 @@ class AgentLoop: @staticmethod def _tool_hint(tool_calls: list) -> str: """Format tool calls as concise hint, e.g. 'web_search("query")'.""" + def _fmt(tc): val = next(iter(tc.arguments.values()), None) if tc.arguments else None if not isinstance(val, str): return tc.name return f'{tc.name}("{val[:40]}…")' if len(val) > 40 else f'{tc.name}("{val}")' + return ", ".join(_fmt(tc) for tc in tool_calls) async def _run_agent_loop( @@ -211,13 +217,15 @@ class AgentLoop: "type": "function", "function": { "name": tc.name, - "arguments": json.dumps(tc.arguments, ensure_ascii=False) - } + "arguments": json.dumps(tc.arguments, ensure_ascii=False), + }, } for tc in response.tool_calls ] messages = self.context.add_assistant_message( - messages, response.content, tool_call_dicts, + messages, + response.content, + tool_call_dicts, reasoning_content=response.reasoning_content, ) @@ -235,9 +243,13 @@ class AgentLoop: # Give them one retry; don't forward the text to avoid duplicates. if not tools_used and not text_only_retried and final_content: text_only_retried = True - logger.debug("Interim text response (no tools used yet), retrying: {}", final_content[:80]) + logger.debug( + "Interim text response (no tools used yet), retrying: {}", + final_content[:80], + ) messages = self.context.add_assistant_message( - messages, response.content, + messages, + response.content, reasoning_content=response.reasoning_content, ) final_content = None @@ -254,24 +266,23 @@ class AgentLoop: while self._running: try: - msg = await asyncio.wait_for( - self.bus.consume_inbound(), - timeout=1.0 - ) + msg = await asyncio.wait_for(self.bus.consume_inbound(), timeout=1.0) try: response = await self._process_message(msg) if response: await self.bus.publish_outbound(response) except Exception as e: logger.error("Error processing message: {}", e) - await self.bus.publish_outbound(OutboundMessage( - channel=msg.channel, - chat_id=msg.chat_id, - content=f"Sorry, I encountered an error: {str(e)}" - )) + await self.bus.publish_outbound( + OutboundMessage( + channel=msg.channel, + chat_id=msg.chat_id, + content=f"Sorry, I encountered an error: {str(e)}", + ) + ) except asyncio.TimeoutError: continue - + async def close_mcp(self) -> None: """Close MCP connections.""" if self._mcp_stack: @@ -292,7 +303,7 @@ class AgentLoop: lock = asyncio.Lock() self._consolidation_locks[session_key] = lock return lock - + async def _process_message( self, msg: InboundMessage, @@ -301,25 +312,25 @@ class AgentLoop: ) -> OutboundMessage | None: """ Process a single inbound message. - + Args: msg: The inbound message to process. session_key: Override session key (used by process_direct). on_progress: Optional callback for intermediate output (defaults to bus publish). - + Returns: The response message, or None if no response needed. """ # System messages route back via chat_id ("channel:chat_id") if msg.channel == "system": return await self._process_system_message(msg) - + preview = msg.content[:80] + "..." if len(msg.content) > 80 else msg.content logger.info("Processing message from {}:{}: {}", msg.channel, msg.sender_id, preview) - + key = session_key or msg.session_key session = self.sessions.get_or_create(key) - + # Handle slash commands cmd = msg.content.strip().lower() if cmd == "/new": @@ -330,24 +341,37 @@ class AgentLoop: async with lock: temp_session = Session(key=session.key) temp_session.messages = messages_to_archive - await self._consolidate_memory(temp_session, archive_all=True) + archived = await self._consolidate_memory(temp_session, archive_all=True) except Exception as e: logger.error("/new archival failed for {}: {}", session.key, e) return OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, - content="Could not start a new session because memory archival failed. Please try again." + content="Could not start a new session because memory archival failed. Please try again.", + ) + + if messages_to_archive and not archived: + return OutboundMessage( + channel=msg.channel, + chat_id=msg.chat_id, + content="Could not start a new session because memory archival failed. Please try again.", ) session.clear() self.sessions.save(session) self.sessions.invalidate(session.key) - return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, - content="New session started. Memory consolidation in progress.") + return OutboundMessage( + channel=msg.channel, + chat_id=msg.chat_id, + content="New session started. Memory consolidation in progress.", + ) if cmd == "/help": - return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, - content="🐈 nanobot commands:\n/new — Start a new conversation\n/help — Show available commands") - + return OutboundMessage( + channel=msg.channel, + chat_id=msg.chat_id, + content="🐈 nanobot commands:\n/new — Start a new conversation\n/help — Show available commands", + ) + if len(session.messages) > self.memory_window and session.key not in self._consolidating: self._consolidating.add(session.key) lock = self._get_consolidation_lock(session.key) @@ -375,42 +399,49 @@ class AgentLoop: ) async def _bus_progress(content: str) -> None: - await self.bus.publish_outbound(OutboundMessage( - channel=msg.channel, chat_id=msg.chat_id, content=content, - metadata=msg.metadata or {}, - )) + await self.bus.publish_outbound( + OutboundMessage( + channel=msg.channel, + chat_id=msg.chat_id, + content=content, + metadata=msg.metadata or {}, + ) + ) final_content, tools_used = await self._run_agent_loop( - initial_messages, on_progress=on_progress or _bus_progress, + initial_messages, + on_progress=on_progress or _bus_progress, ) if final_content is None: final_content = "I've completed processing but have no response to give." - + preview = final_content[:120] + "..." if len(final_content) > 120 else final_content logger.info("Response to {}:{}: {}", msg.channel, msg.sender_id, preview) - + session.add_message("user", msg.content) - session.add_message("assistant", final_content, - tools_used=tools_used if tools_used else None) + session.add_message( + "assistant", final_content, tools_used=tools_used if tools_used else None + ) self.sessions.save(session) - + return OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, content=final_content, - metadata=msg.metadata or {}, # Pass through for channel-specific needs (e.g. Slack thread_ts) + metadata=msg.metadata + or {}, # Pass through for channel-specific needs (e.g. Slack thread_ts) ) - + async def _process_system_message(self, msg: InboundMessage) -> OutboundMessage | None: """ Process a system message (e.g., subagent announce). - + The chat_id field contains "original_channel:original_chat_id" to route the response back to the correct destination. """ logger.info("Processing system message from {}", msg.sender_id) - + # Parse origin from chat_id (format: "channel:chat_id") if ":" in msg.chat_id: parts = msg.chat_id.split(":", 1) @@ -420,7 +451,7 @@ class AgentLoop: # Fallback origin_channel = "cli" origin_chat_id = msg.chat_id - + session_key = f"{origin_channel}:{origin_chat_id}" session = self.sessions.get_or_create(session_key) self._set_tool_context(origin_channel, origin_chat_id, msg.metadata.get("message_id")) @@ -434,18 +465,16 @@ class AgentLoop: if final_content is None: final_content = "Background task completed." - + session.add_message("user", f"[System: {msg.sender_id}] {msg.content}") session.add_message("assistant", final_content) self.sessions.save(session) - + return OutboundMessage( - channel=origin_channel, - chat_id=origin_chat_id, - content=final_content + channel=origin_channel, chat_id=origin_chat_id, content=final_content ) - - async def _consolidate_memory(self, session, archive_all: bool = False) -> None: + + async def _consolidate_memory(self, session, archive_all: bool = False) -> bool: """Consolidate old messages into MEMORY.md + HISTORY.md. Args: @@ -457,29 +486,49 @@ class AgentLoop: if archive_all: old_messages = session.messages keep_count = 0 - logger.info("Memory consolidation (archive_all): {} total messages archived", len(session.messages)) + logger.info( + "Memory consolidation (archive_all): {} total messages archived", + len(session.messages), + ) else: keep_count = self.memory_window // 2 if len(session.messages) <= keep_count: - logger.debug("Session {}: No consolidation needed (messages={}, keep={})", session.key, len(session.messages), keep_count) - return + logger.debug( + "Session {}: No consolidation needed (messages={}, keep={})", + session.key, + len(session.messages), + keep_count, + ) + return True messages_to_process = len(session.messages) - session.last_consolidated if messages_to_process <= 0: - logger.debug("Session {}: No new messages to consolidate (last_consolidated={}, total={})", session.key, session.last_consolidated, len(session.messages)) - return + logger.debug( + "Session {}: No new messages to consolidate (last_consolidated={}, total={})", + session.key, + session.last_consolidated, + len(session.messages), + ) + return True - old_messages = session.messages[session.last_consolidated:-keep_count] + old_messages = session.messages[session.last_consolidated : -keep_count] if not old_messages: - return - logger.info("Memory consolidation started: {} total, {} new to consolidate, {} keep", len(session.messages), len(old_messages), keep_count) + return True + logger.info( + "Memory consolidation started: {} total, {} new to consolidate, {} keep", + len(session.messages), + len(old_messages), + keep_count, + ) lines = [] for m in old_messages: if not m.get("content"): continue tools = f" [tools: {', '.join(m['tools_used'])}]" if m.get("tools_used") else "" - lines.append(f"[{m.get('timestamp', '?')[:16]}] {m['role'].upper()}{tools}: {m['content']}") + lines.append( + f"[{m.get('timestamp', '?')[:16]}] {m['role'].upper()}{tools}: {m['content']}" + ) conversation = "\n".join(lines) current_memory = memory.read_long_term() @@ -508,7 +557,10 @@ Respond with ONLY valid JSON, no markdown fences.""" try: response = await self.provider.chat( messages=[ - {"role": "system", "content": "You are a memory consolidation agent. Respond only with valid JSON."}, + { + "role": "system", + "content": "You are a memory consolidation agent. Respond only with valid JSON.", + }, {"role": "user", "content": prompt}, ], model=self.model, @@ -516,13 +568,16 @@ Respond with ONLY valid JSON, no markdown fences.""" text = (response.content or "").strip() if not text: logger.warning("Memory consolidation: LLM returned empty response, skipping") - return + return False if text.startswith("```"): text = text.split("\n", 1)[-1].rsplit("```", 1)[0].strip() result = json_repair.loads(text) if not isinstance(result, dict): - logger.warning("Memory consolidation: unexpected response type, skipping. Response: {}", text[:200]) - return + logger.warning( + "Memory consolidation: unexpected response type, skipping. Response: {}", + text[:200], + ) + return False if entry := result.get("history_entry"): # Defensive: ensure entry is a string (LLM may return dict) @@ -540,9 +595,15 @@ Respond with ONLY valid JSON, no markdown fences.""" session.last_consolidated = 0 else: session.last_consolidated = len(session.messages) - keep_count - logger.info("Memory consolidation done: {} messages, last_consolidated={}", len(session.messages), session.last_consolidated) + logger.info( + "Memory consolidation done: {} messages, last_consolidated={}", + len(session.messages), + session.last_consolidated, + ) + return True except Exception as e: logger.error("Memory consolidation failed: {}", e) + return False async def process_direct( self, @@ -554,24 +615,21 @@ Respond with ONLY valid JSON, no markdown fences.""" ) -> str: """ Process a message directly (for CLI or cron usage). - + Args: content: The message content. session_key: Session identifier (overrides channel:chat_id for session lookup). channel: Source channel (for tool context routing). chat_id: Source chat ID (for tool context routing). on_progress: Optional callback for intermediate output. - + Returns: The agent's response. """ await self._connect_mcp() - msg = InboundMessage( - channel=channel, - sender_id="user", - chat_id=chat_id, - content=content + msg = InboundMessage(channel=channel, sender_id="user", chat_id=chat_id, content=content) + + response = await self._process_message( + msg, session_key=session_key, on_progress=on_progress ) - - response = await self._process_message(msg, session_key=session_key, on_progress=on_progress) return response.content if response else "" diff --git a/tests/test_consolidate_offset.py b/tests/test_consolidate_offset.py index 6162fa0..6be808d 100644 --- a/tests/test_consolidate_offset.py +++ b/tests/test_consolidate_offset.py @@ -652,13 +652,14 @@ class TestConsolidationDeduplicationGuard: release = asyncio.Event() archived_count = 0 - async def _fake_consolidate(sess, archive_all: bool = False) -> None: + async def _fake_consolidate(sess, archive_all: bool = False) -> bool: nonlocal archived_count if archive_all: archived_count = len(sess.messages) - return + return True started.set() await release.wait() + return True loop._consolidate_memory = _fake_consolidate # type: ignore[method-assign] @@ -683,7 +684,7 @@ class TestConsolidationDeduplicationGuard: @pytest.mark.asyncio async def test_new_does_not_clear_session_when_archive_fails(self, tmp_path: Path) -> None: - """/new keeps session data if archive step fails.""" + """/new must keep session data if archive step reports failure.""" from nanobot.agent.loop import AgentLoop from nanobot.bus.events import InboundMessage from nanobot.bus.queue import MessageBus @@ -706,9 +707,10 @@ class TestConsolidationDeduplicationGuard: loop.sessions.save(session) before_count = len(session.messages) - async def _failing_consolidate(_session, archive_all: bool = False) -> None: + async def _failing_consolidate(sess, archive_all: bool = False) -> bool: if archive_all: - raise RuntimeError("forced archive failure") + return False + return True loop._consolidate_memory = _failing_consolidate # type: ignore[method-assign] From c1b5e8c8d29a2225a1f82a36a93da5a0b61d702f Mon Sep 17 00:00:00 2001 From: Alexander Minges Date: Fri, 20 Feb 2026 13:29:18 +0100 Subject: [PATCH 04/17] fix(loop): lock /new snapshot and prune stale consolidation locks --- nanobot/agent/loop.py | 14 ++++- tests/test_consolidate_offset.py | 103 +++++++++++++++++++++++++++++++ 2 files changed, 115 insertions(+), 2 deletions(-) diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index 4ff01ea..b0bace5 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -304,6 +304,14 @@ class AgentLoop: self._consolidation_locks[session_key] = lock return lock + def _prune_consolidation_lock(self, session_key: str, lock: asyncio.Lock) -> None: + """Drop unused per-session lock entries to avoid unbounded growth.""" + waiters = getattr(lock, "_waiters", None) + has_waiters = bool(waiters) + if lock.locked() or has_waiters: + return + self._consolidation_locks.pop(session_key, None) + async def _process_message( self, msg: InboundMessage, @@ -334,11 +342,11 @@ class AgentLoop: # Handle slash commands cmd = msg.content.strip().lower() if cmd == "/new": - messages_to_archive = session.messages.copy() lock = self._get_consolidation_lock(session.key) - + messages_to_archive: list[dict[str, Any]] = [] try: async with lock: + messages_to_archive = session.messages[session.last_consolidated :].copy() temp_session = Session(key=session.key) temp_session.messages = messages_to_archive archived = await self._consolidate_memory(temp_session, archive_all=True) @@ -360,6 +368,7 @@ class AgentLoop: session.clear() self.sessions.save(session) self.sessions.invalidate(session.key) + self._prune_consolidation_lock(session.key, lock) return OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, @@ -382,6 +391,7 @@ class AgentLoop: await self._consolidate_memory(session) finally: self._consolidating.discard(session.key) + self._prune_consolidation_lock(session.key, lock) _task = asyncio.current_task() if _task is not None: self._consolidation_tasks.discard(_task) diff --git a/tests/test_consolidate_offset.py b/tests/test_consolidate_offset.py index 6be808d..323519e 100644 --- a/tests/test_consolidate_offset.py +++ b/tests/test_consolidate_offset.py @@ -723,3 +723,106 @@ class TestConsolidationDeduplicationGuard: assert len(session_after.messages) == before_count, ( "Session must remain intact when /new archival fails" ) + + @pytest.mark.asyncio + async def test_new_archives_only_unconsolidated_messages_after_inflight_task( + self, tmp_path: Path + ) -> None: + """/new should archive only messages not yet consolidated by prior task.""" + from nanobot.agent.loop import AgentLoop + from nanobot.bus.events import InboundMessage + from nanobot.bus.queue import MessageBus + from nanobot.providers.base import LLMResponse + + bus = MessageBus() + provider = MagicMock() + provider.get_default_model.return_value = "test-model" + loop = AgentLoop( + bus=bus, provider=provider, workspace=tmp_path, model="test-model", memory_window=10 + ) + + loop.provider.chat = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[])) + loop.tools.get_definitions = MagicMock(return_value=[]) + + session = loop.sessions.get_or_create("cli:test") + for i in range(15): + session.add_message("user", f"msg{i}") + session.add_message("assistant", f"resp{i}") + loop.sessions.save(session) + + started = asyncio.Event() + release = asyncio.Event() + archived_count = -1 + + async def _fake_consolidate(sess, archive_all: bool = False) -> bool: + nonlocal archived_count + if archive_all: + archived_count = len(sess.messages) + return True + + started.set() + await release.wait() + sess.last_consolidated = len(sess.messages) - 3 + return True + + loop._consolidate_memory = _fake_consolidate # type: ignore[method-assign] + + msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="hello") + await loop._process_message(msg) + await started.wait() + + new_msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="/new") + pending_new = asyncio.create_task(loop._process_message(new_msg)) + await asyncio.sleep(0.02) + assert not pending_new.done() + + release.set() + response = await pending_new + + assert response is not None + assert "new session started" in response.content.lower() + assert archived_count == 3, ( + f"Expected only unconsolidated tail to archive, got {archived_count}" + ) + + @pytest.mark.asyncio + async def test_new_cleans_up_consolidation_lock_for_invalidated_session( + self, tmp_path: Path + ) -> None: + """/new should remove lock entry for fully invalidated session key.""" + from nanobot.agent.loop import AgentLoop + from nanobot.bus.events import InboundMessage + from nanobot.bus.queue import MessageBus + from nanobot.providers.base import LLMResponse + + bus = MessageBus() + provider = MagicMock() + provider.get_default_model.return_value = "test-model" + loop = AgentLoop( + bus=bus, provider=provider, workspace=tmp_path, model="test-model", memory_window=10 + ) + + loop.provider.chat = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[])) + loop.tools.get_definitions = MagicMock(return_value=[]) + + session = loop.sessions.get_or_create("cli:test") + for i in range(3): + session.add_message("user", f"msg{i}") + session.add_message("assistant", f"resp{i}") + loop.sessions.save(session) + + # Ensure lock exists before /new. + _ = loop._get_consolidation_lock(session.key) + assert session.key in loop._consolidation_locks + + async def _ok_consolidate(sess, archive_all: bool = False) -> bool: + return True + + loop._consolidate_memory = _ok_consolidate # type: ignore[method-assign] + + new_msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="/new") + response = await loop._process_message(new_msg) + + assert response is not None + assert "new session started" in response.content.lower() + assert session.key not in loop._consolidation_locks From df022febaf4782270588431c0ba7c6b84f4b29c9 Mon Sep 17 00:00:00 2001 From: Alexander Minges Date: Fri, 20 Feb 2026 13:31:15 +0100 Subject: [PATCH 05/17] refactor(loop): drop redundant Any typing in /new snapshot --- nanobot/agent/loop.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index b0bace5..42ab351 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -6,7 +6,7 @@ import json import json_repair from pathlib import Path import re -from typing import Any, Awaitable, Callable +from typing import Awaitable, Callable from loguru import logger @@ -343,7 +343,7 @@ class AgentLoop: cmd = msg.content.strip().lower() if cmd == "/new": lock = self._get_consolidation_lock(session.key) - messages_to_archive: list[dict[str, Any]] = [] + messages_to_archive = [] try: async with lock: messages_to_archive = session.messages[session.last_consolidated :].copy() From 426ef71ce7a87d0c104bbf34e63e2399166bf83e Mon Sep 17 00:00:00 2001 From: Alexander Minges Date: Fri, 20 Feb 2026 13:57:39 +0100 Subject: [PATCH 06/17] style(loop): drop formatting-only churn against upstream main --- nanobot/agent/loop.py | 209 ++++++++++++++++-------------------------- 1 file changed, 80 insertions(+), 129 deletions(-) diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index 42ab351..06ab1f7 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -6,7 +6,7 @@ import json import json_repair from pathlib import Path import re -from typing import Awaitable, Callable +from typing import Any, Awaitable, Callable from loguru import logger @@ -56,7 +56,6 @@ class AgentLoop: ): from nanobot.config.schema import ExecToolConfig from nanobot.cron.service import CronService - self.bus = bus self.provider = provider self.workspace = workspace @@ -84,7 +83,7 @@ class AgentLoop: exec_config=self.exec_config, restrict_to_workspace=restrict_to_workspace, ) - + self._running = False self._mcp_servers = mcp_servers or {} self._mcp_stack: AsyncExitStack | None = None @@ -93,7 +92,7 @@ class AgentLoop: self._consolidation_tasks: set[asyncio.Task] = set() # Strong refs to in-flight tasks self._consolidation_locks: dict[str, asyncio.Lock] = {} self._register_default_tools() - + def _register_default_tools(self) -> None: """Register the default set of tools.""" # File tools (workspace for relative paths, restrict if configured) @@ -102,39 +101,36 @@ class AgentLoop: self.tools.register(WriteFileTool(workspace=self.workspace, allowed_dir=allowed_dir)) self.tools.register(EditFileTool(workspace=self.workspace, allowed_dir=allowed_dir)) self.tools.register(ListDirTool(workspace=self.workspace, allowed_dir=allowed_dir)) - + # Shell tool - self.tools.register( - ExecTool( - working_dir=str(self.workspace), - timeout=self.exec_config.timeout, - restrict_to_workspace=self.restrict_to_workspace, - ) - ) - + self.tools.register(ExecTool( + working_dir=str(self.workspace), + timeout=self.exec_config.timeout, + restrict_to_workspace=self.restrict_to_workspace, + )) + # Web tools self.tools.register(WebSearchTool(api_key=self.brave_api_key)) self.tools.register(WebFetchTool()) - + # Message tool message_tool = MessageTool(send_callback=self.bus.publish_outbound) self.tools.register(message_tool) - + # Spawn tool (for subagents) spawn_tool = SpawnTool(manager=self.subagents) self.tools.register(spawn_tool) - + # Cron tool (for scheduling) if self.cron_service: self.tools.register(CronTool(self.cron_service)) - + async def _connect_mcp(self) -> None: """Connect to configured MCP servers (one-time, lazy).""" if self._mcp_connected or not self._mcp_servers: return self._mcp_connected = True from nanobot.agent.tools.mcp import connect_mcp_servers - self._mcp_stack = AsyncExitStack() await self._mcp_stack.__aenter__() await connect_mcp_servers(self._mcp_servers, self.tools, self._mcp_stack) @@ -163,13 +159,11 @@ class AgentLoop: @staticmethod def _tool_hint(tool_calls: list) -> str: """Format tool calls as concise hint, e.g. 'web_search("query")'.""" - def _fmt(tc): val = next(iter(tc.arguments.values()), None) if tc.arguments else None if not isinstance(val, str): return tc.name return f'{tc.name}("{val[:40]}…")' if len(val) > 40 else f'{tc.name}("{val}")' - return ", ".join(_fmt(tc) for tc in tool_calls) async def _run_agent_loop( @@ -217,15 +211,13 @@ class AgentLoop: "type": "function", "function": { "name": tc.name, - "arguments": json.dumps(tc.arguments, ensure_ascii=False), - }, + "arguments": json.dumps(tc.arguments, ensure_ascii=False) + } } for tc in response.tool_calls ] messages = self.context.add_assistant_message( - messages, - response.content, - tool_call_dicts, + messages, response.content, tool_call_dicts, reasoning_content=response.reasoning_content, ) @@ -243,13 +235,9 @@ class AgentLoop: # Give them one retry; don't forward the text to avoid duplicates. if not tools_used and not text_only_retried and final_content: text_only_retried = True - logger.debug( - "Interim text response (no tools used yet), retrying: {}", - final_content[:80], - ) + logger.debug("Interim text response (no tools used yet), retrying: {}", final_content[:80]) messages = self.context.add_assistant_message( - messages, - response.content, + messages, response.content, reasoning_content=response.reasoning_content, ) final_content = None @@ -266,23 +254,24 @@ class AgentLoop: while self._running: try: - msg = await asyncio.wait_for(self.bus.consume_inbound(), timeout=1.0) + msg = await asyncio.wait_for( + self.bus.consume_inbound(), + timeout=1.0 + ) try: response = await self._process_message(msg) if response: await self.bus.publish_outbound(response) except Exception as e: logger.error("Error processing message: {}", e) - await self.bus.publish_outbound( - OutboundMessage( - channel=msg.channel, - chat_id=msg.chat_id, - content=f"Sorry, I encountered an error: {str(e)}", - ) - ) + await self.bus.publish_outbound(OutboundMessage( + channel=msg.channel, + chat_id=msg.chat_id, + content=f"Sorry, I encountered an error: {str(e)}" + )) except asyncio.TimeoutError: continue - + async def close_mcp(self) -> None: """Close MCP connections.""" if self._mcp_stack: @@ -311,7 +300,7 @@ class AgentLoop: if lock.locked() or has_waiters: return self._consolidation_locks.pop(session_key, None) - + async def _process_message( self, msg: InboundMessage, @@ -320,30 +309,30 @@ class AgentLoop: ) -> OutboundMessage | None: """ Process a single inbound message. - + Args: msg: The inbound message to process. session_key: Override session key (used by process_direct). on_progress: Optional callback for intermediate output (defaults to bus publish). - + Returns: The response message, or None if no response needed. """ # System messages route back via chat_id ("channel:chat_id") if msg.channel == "system": return await self._process_system_message(msg) - + preview = msg.content[:80] + "..." if len(msg.content) > 80 else msg.content logger.info("Processing message from {}:{}: {}", msg.channel, msg.sender_id, preview) - + key = session_key or msg.session_key session = self.sessions.get_or_create(key) - + # Handle slash commands cmd = msg.content.strip().lower() if cmd == "/new": lock = self._get_consolidation_lock(session.key) - messages_to_archive = [] + messages_to_archive: list[dict[str, Any]] = [] try: async with lock: messages_to_archive = session.messages[session.last_consolidated :].copy() @@ -369,18 +358,12 @@ class AgentLoop: self.sessions.save(session) self.sessions.invalidate(session.key) self._prune_consolidation_lock(session.key, lock) - return OutboundMessage( - channel=msg.channel, - chat_id=msg.chat_id, - content="New session started. Memory consolidation in progress.", - ) + return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, + content="New session started. Memory consolidation in progress.") if cmd == "/help": - return OutboundMessage( - channel=msg.channel, - chat_id=msg.chat_id, - content="🐈 nanobot commands:\n/new — Start a new conversation\n/help — Show available commands", - ) - + return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, + content="🐈 nanobot commands:\n/new — Start a new conversation\n/help — Show available commands") + if len(session.messages) > self.memory_window and session.key not in self._consolidating: self._consolidating.add(session.key) lock = self._get_consolidation_lock(session.key) @@ -409,49 +392,42 @@ class AgentLoop: ) async def _bus_progress(content: str) -> None: - await self.bus.publish_outbound( - OutboundMessage( - channel=msg.channel, - chat_id=msg.chat_id, - content=content, - metadata=msg.metadata or {}, - ) - ) + await self.bus.publish_outbound(OutboundMessage( + channel=msg.channel, chat_id=msg.chat_id, content=content, + metadata=msg.metadata or {}, + )) final_content, tools_used = await self._run_agent_loop( - initial_messages, - on_progress=on_progress or _bus_progress, + initial_messages, on_progress=on_progress or _bus_progress, ) if final_content is None: final_content = "I've completed processing but have no response to give." - + preview = final_content[:120] + "..." if len(final_content) > 120 else final_content logger.info("Response to {}:{}: {}", msg.channel, msg.sender_id, preview) - + session.add_message("user", msg.content) - session.add_message( - "assistant", final_content, tools_used=tools_used if tools_used else None - ) + session.add_message("assistant", final_content, + tools_used=tools_used if tools_used else None) self.sessions.save(session) - + return OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, content=final_content, - metadata=msg.metadata - or {}, # Pass through for channel-specific needs (e.g. Slack thread_ts) + metadata=msg.metadata or {}, # Pass through for channel-specific needs (e.g. Slack thread_ts) ) - + async def _process_system_message(self, msg: InboundMessage) -> OutboundMessage | None: """ Process a system message (e.g., subagent announce). - + The chat_id field contains "original_channel:original_chat_id" to route the response back to the correct destination. """ logger.info("Processing system message from {}", msg.sender_id) - + # Parse origin from chat_id (format: "channel:chat_id") if ":" in msg.chat_id: parts = msg.chat_id.split(":", 1) @@ -461,7 +437,7 @@ class AgentLoop: # Fallback origin_channel = "cli" origin_chat_id = msg.chat_id - + session_key = f"{origin_channel}:{origin_chat_id}" session = self.sessions.get_or_create(session_key) self._set_tool_context(origin_channel, origin_chat_id, msg.metadata.get("message_id")) @@ -475,15 +451,17 @@ class AgentLoop: if final_content is None: final_content = "Background task completed." - + session.add_message("user", f"[System: {msg.sender_id}] {msg.content}") session.add_message("assistant", final_content) self.sessions.save(session) - + return OutboundMessage( - channel=origin_channel, chat_id=origin_chat_id, content=final_content + channel=origin_channel, + chat_id=origin_chat_id, + content=final_content ) - + async def _consolidate_memory(self, session, archive_all: bool = False) -> bool: """Consolidate old messages into MEMORY.md + HISTORY.md. @@ -496,49 +474,29 @@ class AgentLoop: if archive_all: old_messages = session.messages keep_count = 0 - logger.info( - "Memory consolidation (archive_all): {} total messages archived", - len(session.messages), - ) + logger.info("Memory consolidation (archive_all): {} total messages archived", len(session.messages)) else: keep_count = self.memory_window // 2 if len(session.messages) <= keep_count: - logger.debug( - "Session {}: No consolidation needed (messages={}, keep={})", - session.key, - len(session.messages), - keep_count, - ) + logger.debug("Session {}: No consolidation needed (messages={}, keep={})", session.key, len(session.messages), keep_count) return True messages_to_process = len(session.messages) - session.last_consolidated if messages_to_process <= 0: - logger.debug( - "Session {}: No new messages to consolidate (last_consolidated={}, total={})", - session.key, - session.last_consolidated, - len(session.messages), - ) + logger.debug("Session {}: No new messages to consolidate (last_consolidated={}, total={})", session.key, session.last_consolidated, len(session.messages)) return True - old_messages = session.messages[session.last_consolidated : -keep_count] + old_messages = session.messages[session.last_consolidated:-keep_count] if not old_messages: return True - logger.info( - "Memory consolidation started: {} total, {} new to consolidate, {} keep", - len(session.messages), - len(old_messages), - keep_count, - ) + logger.info("Memory consolidation started: {} total, {} new to consolidate, {} keep", len(session.messages), len(old_messages), keep_count) lines = [] for m in old_messages: if not m.get("content"): continue tools = f" [tools: {', '.join(m['tools_used'])}]" if m.get("tools_used") else "" - lines.append( - f"[{m.get('timestamp', '?')[:16]}] {m['role'].upper()}{tools}: {m['content']}" - ) + lines.append(f"[{m.get('timestamp', '?')[:16]}] {m['role'].upper()}{tools}: {m['content']}") conversation = "\n".join(lines) current_memory = memory.read_long_term() @@ -567,10 +525,7 @@ Respond with ONLY valid JSON, no markdown fences.""" try: response = await self.provider.chat( messages=[ - { - "role": "system", - "content": "You are a memory consolidation agent. Respond only with valid JSON.", - }, + {"role": "system", "content": "You are a memory consolidation agent. Respond only with valid JSON."}, {"role": "user", "content": prompt}, ], model=self.model, @@ -583,10 +538,7 @@ Respond with ONLY valid JSON, no markdown fences.""" text = text.split("\n", 1)[-1].rsplit("```", 1)[0].strip() result = json_repair.loads(text) if not isinstance(result, dict): - logger.warning( - "Memory consolidation: unexpected response type, skipping. Response: {}", - text[:200], - ) + logger.warning("Memory consolidation: unexpected response type, skipping. Response: {}", text[:200]) return False if entry := result.get("history_entry"): @@ -605,11 +557,7 @@ Respond with ONLY valid JSON, no markdown fences.""" session.last_consolidated = 0 else: session.last_consolidated = len(session.messages) - keep_count - logger.info( - "Memory consolidation done: {} messages, last_consolidated={}", - len(session.messages), - session.last_consolidated, - ) + logger.info("Memory consolidation done: {} messages, last_consolidated={}", len(session.messages), session.last_consolidated) return True except Exception as e: logger.error("Memory consolidation failed: {}", e) @@ -625,21 +573,24 @@ Respond with ONLY valid JSON, no markdown fences.""" ) -> str: """ Process a message directly (for CLI or cron usage). - + Args: content: The message content. session_key: Session identifier (overrides channel:chat_id for session lookup). channel: Source channel (for tool context routing). chat_id: Source chat ID (for tool context routing). on_progress: Optional callback for intermediate output. - + Returns: The agent's response. """ await self._connect_mcp() - msg = InboundMessage(channel=channel, sender_id="user", chat_id=chat_id, content=content) - - response = await self._process_message( - msg, session_key=session_key, on_progress=on_progress + msg = InboundMessage( + channel=channel, + sender_id="user", + chat_id=chat_id, + content=content ) + + response = await self._process_message(msg, session_key=session_key, on_progress=on_progress) return response.content if response else "" From 5c9cb3a208a0a9b3e543c510753269073579adaa Mon Sep 17 00:00:00 2001 From: andienguyen-ecoligo Date: Sat, 21 Feb 2026 12:34:14 -0500 Subject: [PATCH 07/17] fix(security): prevent path traversal bypass via startswith check `startswith` string comparison allows bypassing directory restrictions. For example, `/home/user/workspace_evil` passes the check against `/home/user/workspace` because the string starts with the allowed path. Replace with `Path.relative_to()` which correctly validates that the resolved path is actually inside the allowed directory tree. Fixes #888 --- nanobot/agent/tools/filesystem.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/nanobot/agent/tools/filesystem.py b/nanobot/agent/tools/filesystem.py index 9c169e4..b87da11 100644 --- a/nanobot/agent/tools/filesystem.py +++ b/nanobot/agent/tools/filesystem.py @@ -13,8 +13,11 @@ def _resolve_path(path: str, workspace: Path | None = None, allowed_dir: Path | if not p.is_absolute() and workspace: p = workspace / p resolved = p.resolve() - if allowed_dir and not str(resolved).startswith(str(allowed_dir.resolve())): - raise PermissionError(f"Path {path} is outside allowed directory {allowed_dir}") + if allowed_dir: + try: + resolved.relative_to(allowed_dir.resolve()) + except ValueError: + raise PermissionError(f"Path {path} is outside allowed directory {allowed_dir}") return resolved From ef96619039c98ccb45c61481cf1b5203aa5864ac Mon Sep 17 00:00:00 2001 From: andienguyen-ecoligo Date: Sat, 21 Feb 2026 12:34:50 -0500 Subject: [PATCH 08/17] fix(slack): add exception handling to socket listener _handle_message() in _on_socket_request() had no try/except. If it throws (bus full, permission error, etc.), the exception propagates up and crashes the Socket Mode event loop, causing missed messages. Other channels like Telegram already have explicit error handlers. Fixes #895 --- nanobot/channels/slack.py | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/nanobot/channels/slack.py b/nanobot/channels/slack.py index 4fc1f41..d1c5895 100644 --- a/nanobot/channels/slack.py +++ b/nanobot/channels/slack.py @@ -179,18 +179,21 @@ class SlackChannel(BaseChannel): except Exception as e: logger.debug("Slack reactions_add failed: {}", e) - await self._handle_message( - sender_id=sender_id, - chat_id=chat_id, - content=text, - metadata={ - "slack": { - "event": event, - "thread_ts": thread_ts, - "channel_type": channel_type, - } - }, - ) + try: + await self._handle_message( + sender_id=sender_id, + chat_id=chat_id, + content=text, + metadata={ + "slack": { + "event": event, + "thread_ts": thread_ts, + "channel_type": channel_type, + } + }, + ) + except Exception as e: + logger.error("Error handling Slack message from {}: {}", sender_id, e) def _is_allowed(self, sender_id: str, chat_id: str, channel_type: str) -> bool: if channel_type == "im": From 54a0f3d038e2a7f18315e1768a351010401cf6c2 Mon Sep 17 00:00:00 2001 From: andienguyen-ecoligo Date: Sat, 21 Feb 2026 12:35:21 -0500 Subject: [PATCH 09/17] fix(session): handle errors in legacy session migration shutil.move() in _load() can fail due to permissions, disk full, or concurrent access. Without error handling, the exception propagates up and prevents the session from loading entirely. Wrap in try/except so migration failures are logged as warnings and the session falls back to loading from the legacy path on next attempt. Fixes #863 --- nanobot/session/manager.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/nanobot/session/manager.py b/nanobot/session/manager.py index 18e23b2..19d4439 100644 --- a/nanobot/session/manager.py +++ b/nanobot/session/manager.py @@ -108,9 +108,12 @@ class SessionManager: if not path.exists(): legacy_path = self._get_legacy_session_path(key) if legacy_path.exists(): - import shutil - shutil.move(str(legacy_path), str(path)) - logger.info("Migrated session {} from legacy path", key) + try: + import shutil + shutil.move(str(legacy_path), str(path)) + logger.info("Migrated session {} from legacy path", key) + except Exception as e: + logger.warning("Failed to migrate session {}: {}", key, e) if not path.exists(): return None From ba66c6475025e8123e88732f5cec71e5b56b0b14 Mon Sep 17 00:00:00 2001 From: andienguyen-ecoligo Date: Sat, 21 Feb 2026 12:36:04 -0500 Subject: [PATCH 10/17] fix(email): evict oldest half of dedup set instead of clearing entirely When _processed_uids exceeds 100k entries, the entire set was cleared with .clear(), allowing all previously seen emails to be re-processed. Now evicts the oldest 50% of entries, keeping recent UIDs to prevent duplicate processing while still bounding memory usage. Fixes #890 --- nanobot/channels/email.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/nanobot/channels/email.py b/nanobot/channels/email.py index 1b6f46b..c90a14d 100644 --- a/nanobot/channels/email.py +++ b/nanobot/channels/email.py @@ -304,7 +304,9 @@ class EmailChannel(BaseChannel): self._processed_uids.add(uid) # mark_seen is the primary dedup; this set is a safety net if len(self._processed_uids) > self._MAX_PROCESSED_UIDS: - self._processed_uids.clear() + # Evict oldest half instead of clearing entirely + to_keep = list(self._processed_uids)[len(self._processed_uids) // 2:] + self._processed_uids = set(to_keep) if mark_seen: client.store(imap_id, "+FLAGS", "\\Seen") From 8c55b40b9f383c5e11217cbd6234483a63f597aa Mon Sep 17 00:00:00 2001 From: andienguyen-ecoligo Date: Sat, 21 Feb 2026 12:38:24 -0500 Subject: [PATCH 11/17] fix(qq): make start() long-running per base channel contract QQ channel's start() created a background task and returned immediately, violating the base Channel contract which specifies start() should be "a long-running async task". This caused the gateway to exit prematurely when QQ was the only enabled channel. Now directly awaits _run_bot() to stay alive like other channels. Fixes #894 --- nanobot/channels/qq.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nanobot/channels/qq.py b/nanobot/channels/qq.py index 16cbfb8..a940a75 100644 --- a/nanobot/channels/qq.py +++ b/nanobot/channels/qq.py @@ -71,8 +71,8 @@ class QQChannel(BaseChannel): BotClass = _make_bot_class(self) self._client = BotClass() - self._bot_task = asyncio.create_task(self._run_bot()) logger.info("QQ bot started (C2C private message)") + await self._run_bot() async def _run_bot(self) -> None: """Run the bot connection with auto-reconnect.""" From b323087631561d4312affd17f57aa0643210a730 Mon Sep 17 00:00:00 2001 From: Yingwen Luo-LUOYW Date: Sun, 22 Feb 2026 12:42:33 +0800 Subject: [PATCH 12/17] feat(cli): add DingTalk, QQ, and Email to channels status output --- nanobot/cli/commands.py | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/nanobot/cli/commands.py b/nanobot/cli/commands.py index 6155463..f1f9b30 100644 --- a/nanobot/cli/commands.py +++ b/nanobot/cli/commands.py @@ -668,6 +668,33 @@ def channels_status(): slack_config ) + # DingTalk + dt = config.channels.dingtalk + dt_config = f"client_id: {dt.client_id[:10]}..." if dt.client_id else "[dim]not configured[/dim]" + table.add_row( + "DingTalk", + "✓" if dt.enabled else "✗", + dt_config + ) + + # QQ + qq = config.channels.qq + qq_config = f"app_id: {qq.app_id[:10]}..." if qq.app_id else "[dim]not configured[/dim]" + table.add_row( + "QQ", + "✓" if qq.enabled else "✗", + qq_config + ) + + # Email + em = config.channels.email + em_config = em.imap_host if em.imap_host else "[dim]not configured[/dim]" + table.add_row( + "Email", + "✓" if em.enabled else "✗", + em_config + ) + console.print(table) From b93b77a485d3a0c1632a5c3d590bc0b2a37c43bd Mon Sep 17 00:00:00 2001 From: Re-bin Date: Sun, 22 Feb 2026 15:38:19 +0000 Subject: [PATCH 13/17] fix(slack): use logger.exception to capture full traceback --- nanobot/channels/slack.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nanobot/channels/slack.py b/nanobot/channels/slack.py index d1c5895..b0f9bbb 100644 --- a/nanobot/channels/slack.py +++ b/nanobot/channels/slack.py @@ -192,8 +192,8 @@ class SlackChannel(BaseChannel): } }, ) - except Exception as e: - logger.error("Error handling Slack message from {}: {}", sender_id, e) + except Exception: + logger.exception("Error handling Slack message from {}", sender_id) def _is_allowed(self, sender_id: str, chat_id: str, channel_type: str) -> bool: if channel_type == "im": From 71de1899e6dcf9e5d01396b6a90e6a63486bc6de Mon Sep 17 00:00:00 2001 From: Re-bin Date: Sun, 22 Feb 2026 15:40:17 +0000 Subject: [PATCH 14/17] fix(session): use logger.exception and move import to top --- nanobot/session/manager.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/nanobot/session/manager.py b/nanobot/session/manager.py index 19d4439..5f23dc2 100644 --- a/nanobot/session/manager.py +++ b/nanobot/session/manager.py @@ -1,6 +1,7 @@ """Session management for conversation history.""" import json +import shutil from pathlib import Path from dataclasses import dataclass, field from datetime import datetime @@ -109,11 +110,10 @@ class SessionManager: legacy_path = self._get_legacy_session_path(key) if legacy_path.exists(): try: - import shutil shutil.move(str(legacy_path), str(path)) logger.info("Migrated session {} from legacy path", key) - except Exception as e: - logger.warning("Failed to migrate session {}: {}", key, e) + except Exception: + logger.exception("Failed to migrate session {}", key) if not path.exists(): return None From 4e8c8cc2274f1c5f505574e8fa7d5a5df790aff6 Mon Sep 17 00:00:00 2001 From: Re-bin Date: Sun, 22 Feb 2026 15:48:49 +0000 Subject: [PATCH 15/17] fix(email): fix misleading comment and simplify uid eviction --- nanobot/channels/email.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/nanobot/channels/email.py b/nanobot/channels/email.py index c90a14d..5dc05fb 100644 --- a/nanobot/channels/email.py +++ b/nanobot/channels/email.py @@ -304,9 +304,8 @@ class EmailChannel(BaseChannel): self._processed_uids.add(uid) # mark_seen is the primary dedup; this set is a safety net if len(self._processed_uids) > self._MAX_PROCESSED_UIDS: - # Evict oldest half instead of clearing entirely - to_keep = list(self._processed_uids)[len(self._processed_uids) // 2:] - self._processed_uids = set(to_keep) + # Evict a random half to cap memory; mark_seen is the primary dedup + self._processed_uids = set(list(self._processed_uids)[len(self._processed_uids) // 2:]) if mark_seen: client.store(imap_id, "+FLAGS", "\\Seen") From b13d7f853e8173162df02af0dc1b8e25b674753b Mon Sep 17 00:00:00 2001 From: Re-bin Date: Sun, 22 Feb 2026 17:17:35 +0000 Subject: [PATCH 16/17] fix(agent): make tool hint a fallback when no content in on_progress --- nanobot/agent/loop.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index 5762fa9..b05ba90 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -196,7 +196,8 @@ class AgentLoop: clean = self._strip_think(response.content) if clean: await on_progress(clean) - await on_progress(self._tool_hint(response.tool_calls)) + else: + await on_progress(self._tool_hint(response.tool_calls)) tool_call_dicts = [ { From b53c3d39edece22484568bb1896cbe29bfb3c1ae Mon Sep 17 00:00:00 2001 From: Re-bin Date: Sun, 22 Feb 2026 17:35:53 +0000 Subject: [PATCH 17/17] fix(qq): remove dead _bot_task field and fix stop() to close client --- nanobot/channels/qq.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/nanobot/channels/qq.py b/nanobot/channels/qq.py index a940a75..5352a30 100644 --- a/nanobot/channels/qq.py +++ b/nanobot/channels/qq.py @@ -55,7 +55,6 @@ class QQChannel(BaseChannel): self.config: QQConfig = config self._client: "botpy.Client | None" = None self._processed_ids: deque = deque(maxlen=1000) - self._bot_task: asyncio.Task | None = None async def start(self) -> None: """Start the QQ bot.""" @@ -88,11 +87,10 @@ class QQChannel(BaseChannel): async def stop(self) -> None: """Stop the QQ bot.""" self._running = False - if self._bot_task: - self._bot_task.cancel() + if self._client: try: - await self._bot_task - except asyncio.CancelledError: + await self._client.close() + except Exception: pass logger.info("QQ bot stopped") @@ -130,5 +128,5 @@ class QQChannel(BaseChannel): content=content, metadata={"message_id": data.id}, ) - except Exception as e: - logger.error("Error handling QQ message: {}", e) + except Exception: + logger.exception("Error handling QQ message")