From 5f9eca466484e52ce535d4f20f4d0b87581da5db Mon Sep 17 00:00:00 2001 From: Alexander Minges Date: Fri, 20 Feb 2026 12:46:11 +0100 Subject: [PATCH] style(loop): remove formatting-only changes from upstream PR 881 --- nanobot/agent/loop.py | 220 ++++++++++++++++-------------------------- 1 file changed, 85 insertions(+), 135 deletions(-) diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index 7806fb8..481b72e 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -56,7 +56,6 @@ class AgentLoop: ): from nanobot.config.schema import ExecToolConfig from nanobot.cron.service import CronService - self.bus = bus self.provider = provider self.workspace = workspace @@ -84,16 +83,16 @@ class AgentLoop: exec_config=self.exec_config, restrict_to_workspace=restrict_to_workspace, ) - + self._running = False self._mcp_servers = mcp_servers or {} self._mcp_stack: AsyncExitStack | None = None self._mcp_connected = False self._consolidating: set[str] = set() # Session keys with consolidation in progress - self._consolidation_tasks: set[asyncio.Task] = set() # Keep strong refs for in-flight tasks + self._consolidation_tasks: set[asyncio.Task] = set() # Strong refs to in-flight tasks self._consolidation_locks: dict[str, asyncio.Lock] = {} self._register_default_tools() - + def _register_default_tools(self) -> None: """Register the default set of tools.""" # File tools (workspace for relative paths, restrict if configured) @@ -102,39 +101,36 @@ class AgentLoop: self.tools.register(WriteFileTool(workspace=self.workspace, allowed_dir=allowed_dir)) self.tools.register(EditFileTool(workspace=self.workspace, allowed_dir=allowed_dir)) self.tools.register(ListDirTool(workspace=self.workspace, allowed_dir=allowed_dir)) - + # Shell tool - self.tools.register( - ExecTool( - working_dir=str(self.workspace), - timeout=self.exec_config.timeout, - restrict_to_workspace=self.restrict_to_workspace, - ) - ) - + self.tools.register(ExecTool( + working_dir=str(self.workspace), + timeout=self.exec_config.timeout, + restrict_to_workspace=self.restrict_to_workspace, + )) + # Web tools self.tools.register(WebSearchTool(api_key=self.brave_api_key)) self.tools.register(WebFetchTool()) - + # Message tool message_tool = MessageTool(send_callback=self.bus.publish_outbound) self.tools.register(message_tool) - + # Spawn tool (for subagents) spawn_tool = SpawnTool(manager=self.subagents) self.tools.register(spawn_tool) - + # Cron tool (for scheduling) if self.cron_service: self.tools.register(CronTool(self.cron_service)) - + async def _connect_mcp(self) -> None: """Connect to configured MCP servers (one-time, lazy).""" if self._mcp_connected or not self._mcp_servers: return self._mcp_connected = True from nanobot.agent.tools.mcp import connect_mcp_servers - self._mcp_stack = AsyncExitStack() await self._mcp_stack.__aenter__() await connect_mcp_servers(self._mcp_servers, self.tools, self._mcp_stack) @@ -163,13 +159,11 @@ class AgentLoop: @staticmethod def _tool_hint(tool_calls: list) -> str: """Format tool calls as concise hint, e.g. 'web_search("query")'.""" - def _fmt(tc): val = next(iter(tc.arguments.values()), None) if tc.arguments else None if not isinstance(val, str): return tc.name return f'{tc.name}("{val[:40]}…")' if len(val) > 40 else f'{tc.name}("{val}")' - return ", ".join(_fmt(tc) for tc in tool_calls) async def _run_agent_loop( @@ -217,15 +211,13 @@ class AgentLoop: "type": "function", "function": { "name": tc.name, - "arguments": json.dumps(tc.arguments, ensure_ascii=False), - }, + "arguments": json.dumps(tc.arguments, ensure_ascii=False) + } } for tc in response.tool_calls ] messages = self.context.add_assistant_message( - messages, - response.content, - tool_call_dicts, + messages, response.content, tool_call_dicts, reasoning_content=response.reasoning_content, ) @@ -243,13 +235,9 @@ class AgentLoop: # Give them one retry; don't forward the text to avoid duplicates. if not tools_used and not text_only_retried and final_content: text_only_retried = True - logger.debug( - "Interim text response (no tools used yet), retrying: {}", - final_content[:80], - ) + logger.debug("Interim text response (no tools used yet), retrying: {}", final_content[:80]) messages = self.context.add_assistant_message( - messages, - response.content, + messages, response.content, reasoning_content=response.reasoning_content, ) final_content = None @@ -266,23 +254,24 @@ class AgentLoop: while self._running: try: - msg = await asyncio.wait_for(self.bus.consume_inbound(), timeout=1.0) + msg = await asyncio.wait_for( + self.bus.consume_inbound(), + timeout=1.0 + ) try: response = await self._process_message(msg) if response: await self.bus.publish_outbound(response) except Exception as e: logger.error("Error processing message: {}", e) - await self.bus.publish_outbound( - OutboundMessage( - channel=msg.channel, - chat_id=msg.chat_id, - content=f"Sorry, I encountered an error: {str(e)}", - ) - ) + await self.bus.publish_outbound(OutboundMessage( + channel=msg.channel, + chat_id=msg.chat_id, + content=f"Sorry, I encountered an error: {str(e)}" + )) except asyncio.TimeoutError: continue - + async def close_mcp(self) -> None: """Close MCP connections.""" if self._mcp_stack: @@ -298,13 +287,12 @@ class AgentLoop: logger.info("Agent loop stopping") def _get_consolidation_lock(self, session_key: str) -> asyncio.Lock: - """Return a per-session lock for memory consolidation writers.""" lock = self._consolidation_locks.get(session_key) if lock is None: lock = asyncio.Lock() self._consolidation_locks[session_key] = lock return lock - + async def _process_message( self, msg: InboundMessage, @@ -313,25 +301,25 @@ class AgentLoop: ) -> OutboundMessage | None: """ Process a single inbound message. - + Args: msg: The inbound message to process. session_key: Override session key (used by process_direct). on_progress: Optional callback for intermediate output (defaults to bus publish). - + Returns: The response message, or None if no response needed. """ # System messages route back via chat_id ("channel:chat_id") if msg.channel == "system": return await self._process_system_message(msg) - + preview = msg.content[:80] + "..." if len(msg.content) > 80 else msg.content logger.info("Processing message from {}:{}: {}", msg.channel, msg.sender_id, preview) - + key = session_key or msg.session_key session = self.sessions.get_or_create(key) - + # Handle slash commands cmd = msg.content.strip().lower() if cmd == "/new": @@ -348,24 +336,18 @@ class AgentLoop: return OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, - content="Could not start a new session because memory archival failed. Please try again.", + content="Could not start a new session because memory archival failed. Please try again." ) session.clear() self.sessions.save(session) self.sessions.invalidate(session.key) - return OutboundMessage( - channel=msg.channel, - chat_id=msg.chat_id, - content="New session started. Memory consolidation in progress.", - ) + return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, + content="New session started. Memory consolidation in progress.") if cmd == "/help": - return OutboundMessage( - channel=msg.channel, - chat_id=msg.chat_id, - content="🐈 nanobot commands:\n/new — Start a new conversation\n/help — Show available commands", - ) - + return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, + content="🐈 nanobot commands:\n/new — Start a new conversation\n/help — Show available commands") + if len(session.messages) > self.memory_window and session.key not in self._consolidating: self._consolidating.add(session.key) lock = self._get_consolidation_lock(session.key) @@ -376,12 +358,12 @@ class AgentLoop: await self._consolidate_memory(session) finally: self._consolidating.discard(session.key) - task = asyncio.current_task() - if task is not None: - self._consolidation_tasks.discard(task) + _task = asyncio.current_task() + if _task is not None: + self._consolidation_tasks.discard(_task) - task = asyncio.create_task(_consolidate_and_unlock()) - self._consolidation_tasks.add(task) + _task = asyncio.create_task(_consolidate_and_unlock()) + self._consolidation_tasks.add(_task) self._set_tool_context(msg.channel, msg.chat_id, msg.metadata.get("message_id")) initial_messages = self.context.build_messages( @@ -393,49 +375,42 @@ class AgentLoop: ) async def _bus_progress(content: str) -> None: - await self.bus.publish_outbound( - OutboundMessage( - channel=msg.channel, - chat_id=msg.chat_id, - content=content, - metadata=msg.metadata or {}, - ) - ) + await self.bus.publish_outbound(OutboundMessage( + channel=msg.channel, chat_id=msg.chat_id, content=content, + metadata=msg.metadata or {}, + )) final_content, tools_used = await self._run_agent_loop( - initial_messages, - on_progress=on_progress or _bus_progress, + initial_messages, on_progress=on_progress or _bus_progress, ) if final_content is None: final_content = "I've completed processing but have no response to give." - + preview = final_content[:120] + "..." if len(final_content) > 120 else final_content logger.info("Response to {}:{}: {}", msg.channel, msg.sender_id, preview) - + session.add_message("user", msg.content) - session.add_message( - "assistant", final_content, tools_used=tools_used if tools_used else None - ) + session.add_message("assistant", final_content, + tools_used=tools_used if tools_used else None) self.sessions.save(session) - + return OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, content=final_content, - metadata=msg.metadata - or {}, # Pass through for channel-specific needs (e.g. Slack thread_ts) + metadata=msg.metadata or {}, # Pass through for channel-specific needs (e.g. Slack thread_ts) ) - + async def _process_system_message(self, msg: InboundMessage) -> OutboundMessage | None: """ Process a system message (e.g., subagent announce). - + The chat_id field contains "original_channel:original_chat_id" to route the response back to the correct destination. """ logger.info("Processing system message from {}", msg.sender_id) - + # Parse origin from chat_id (format: "channel:chat_id") if ":" in msg.chat_id: parts = msg.chat_id.split(":", 1) @@ -445,7 +420,7 @@ class AgentLoop: # Fallback origin_channel = "cli" origin_chat_id = msg.chat_id - + session_key = f"{origin_channel}:{origin_chat_id}" session = self.sessions.get_or_create(session_key) self._set_tool_context(origin_channel, origin_chat_id, msg.metadata.get("message_id")) @@ -459,15 +434,17 @@ class AgentLoop: if final_content is None: final_content = "Background task completed." - + session.add_message("user", f"[System: {msg.sender_id}] {msg.content}") session.add_message("assistant", final_content) self.sessions.save(session) - + return OutboundMessage( - channel=origin_channel, chat_id=origin_chat_id, content=final_content + channel=origin_channel, + chat_id=origin_chat_id, + content=final_content ) - + async def _consolidate_memory(self, session, archive_all: bool = False) -> None: """Consolidate old messages into MEMORY.md + HISTORY.md. @@ -480,49 +457,29 @@ class AgentLoop: if archive_all: old_messages = session.messages keep_count = 0 - logger.info( - "Memory consolidation (archive_all): {} total messages archived", - len(session.messages), - ) + logger.info("Memory consolidation (archive_all): {} total messages archived", len(session.messages)) else: keep_count = self.memory_window // 2 if len(session.messages) <= keep_count: - logger.debug( - "Session {}: No consolidation needed (messages={}, keep={})", - session.key, - len(session.messages), - keep_count, - ) + logger.debug("Session {}: No consolidation needed (messages={}, keep={})", session.key, len(session.messages), keep_count) return messages_to_process = len(session.messages) - session.last_consolidated if messages_to_process <= 0: - logger.debug( - "Session {}: No new messages to consolidate (last_consolidated={}, total={})", - session.key, - session.last_consolidated, - len(session.messages), - ) + logger.debug("Session {}: No new messages to consolidate (last_consolidated={}, total={})", session.key, session.last_consolidated, len(session.messages)) return - old_messages = session.messages[session.last_consolidated : -keep_count] + old_messages = session.messages[session.last_consolidated:-keep_count] if not old_messages: return - logger.info( - "Memory consolidation started: {} total, {} new to consolidate, {} keep", - len(session.messages), - len(old_messages), - keep_count, - ) + logger.info("Memory consolidation started: {} total, {} new to consolidate, {} keep", len(session.messages), len(old_messages), keep_count) lines = [] for m in old_messages: if not m.get("content"): continue tools = f" [tools: {', '.join(m['tools_used'])}]" if m.get("tools_used") else "" - lines.append( - f"[{m.get('timestamp', '?')[:16]}] {m['role'].upper()}{tools}: {m['content']}" - ) + lines.append(f"[{m.get('timestamp', '?')[:16]}] {m['role'].upper()}{tools}: {m['content']}") conversation = "\n".join(lines) current_memory = memory.read_long_term() @@ -551,10 +508,7 @@ Respond with ONLY valid JSON, no markdown fences.""" try: response = await self.provider.chat( messages=[ - { - "role": "system", - "content": "You are a memory consolidation agent. Respond only with valid JSON.", - }, + {"role": "system", "content": "You are a memory consolidation agent. Respond only with valid JSON."}, {"role": "user", "content": prompt}, ], model=self.model, @@ -567,10 +521,7 @@ Respond with ONLY valid JSON, no markdown fences.""" text = text.split("\n", 1)[-1].rsplit("```", 1)[0].strip() result = json_repair.loads(text) if not isinstance(result, dict): - logger.warning( - "Memory consolidation: unexpected response type, skipping. Response: {}", - text[:200], - ) + logger.warning("Memory consolidation: unexpected response type, skipping. Response: {}", text[:200]) return if entry := result.get("history_entry"): @@ -589,11 +540,7 @@ Respond with ONLY valid JSON, no markdown fences.""" session.last_consolidated = 0 else: session.last_consolidated = len(session.messages) - keep_count - logger.info( - "Memory consolidation done: {} messages, last_consolidated={}", - len(session.messages), - session.last_consolidated, - ) + logger.info("Memory consolidation done: {} messages, last_consolidated={}", len(session.messages), session.last_consolidated) except Exception as e: logger.error("Memory consolidation failed: {}", e) @@ -607,21 +554,24 @@ Respond with ONLY valid JSON, no markdown fences.""" ) -> str: """ Process a message directly (for CLI or cron usage). - + Args: content: The message content. session_key: Session identifier (overrides channel:chat_id for session lookup). channel: Source channel (for tool context routing). chat_id: Source chat ID (for tool context routing). on_progress: Optional callback for intermediate output. - + Returns: The agent's response. """ await self._connect_mcp() - msg = InboundMessage(channel=channel, sender_id="user", chat_id=chat_id, content=content) - - response = await self._process_message( - msg, session_key=session_key, on_progress=on_progress + msg = InboundMessage( + channel=channel, + sender_id="user", + chat_id=chat_id, + content=content ) + + response = await self._process_message(msg, session_key=session_key, on_progress=on_progress) return response.content if response else ""