From ddae3e9d5f73ab9d95aa866d9894190d0f2200de Mon Sep 17 00:00:00 2001 From: Kim <150593189+KimGLee@users.noreply.github.com> Date: Thu, 19 Feb 2026 11:25:47 +0800 Subject: [PATCH] fix(agent): avoid duplicate final send when message tool already replied --- nanobot/agent/loop.py | 120 +++++++++++++++++++-------------- nanobot/agent/tools/message.py | 43 +++++++----- 2 files changed, 97 insertions(+), 66 deletions(-) diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index 3016d92..926fad9 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -1,30 +1,36 @@ """Agent loop: the core processing engine.""" -import asyncio -from contextlib import AsyncExitStack -import json -import json_repair -from pathlib import Path -import re -from typing import Any, Awaitable, Callable +from __future__ import annotations +import asyncio +import json +import re +from contextlib import AsyncExitStack +from pathlib import Path +from typing import TYPE_CHECKING, Awaitable, Callable + +import json_repair from loguru import logger +from nanobot.agent.context import ContextBuilder +from nanobot.agent.memory import MemoryStore +from nanobot.agent.subagent import SubagentManager +from nanobot.agent.tools.cron import CronTool +from nanobot.agent.tools.filesystem import EditFileTool, ListDirTool, ReadFileTool, WriteFileTool +from nanobot.agent.tools.message import MessageTool +from nanobot.agent.tools.registry import ToolRegistry +from nanobot.agent.tools.shell import ExecTool +from nanobot.agent.tools.spawn import SpawnTool +from nanobot.agent.tools.web import WebFetchTool, WebSearchTool from nanobot.bus.events import InboundMessage, OutboundMessage from nanobot.bus.queue import MessageBus from nanobot.providers.base import LLMProvider -from nanobot.agent.context import ContextBuilder -from nanobot.agent.tools.registry import ToolRegistry -from nanobot.agent.tools.filesystem import ReadFileTool, WriteFileTool, EditFileTool, ListDirTool -from nanobot.agent.tools.shell import ExecTool -from nanobot.agent.tools.web import WebSearchTool, WebFetchTool -from nanobot.agent.tools.message import MessageTool -from nanobot.agent.tools.spawn import SpawnTool -from nanobot.agent.tools.cron import CronTool -from nanobot.agent.memory import MemoryStore -from nanobot.agent.subagent import SubagentManager from nanobot.session.manager import Session, SessionManager +if TYPE_CHECKING: + from nanobot.config.schema import ExecToolConfig + from nanobot.cron.service import CronService + class AgentLoop: """ @@ -49,14 +55,13 @@ class AgentLoop: max_tokens: int = 4096, memory_window: int = 50, brave_api_key: str | None = None, - exec_config: "ExecToolConfig | None" = None, - cron_service: "CronService | None" = None, + exec_config: ExecToolConfig | None = None, + cron_service: CronService | None = None, restrict_to_workspace: bool = False, session_manager: SessionManager | None = None, mcp_servers: dict | None = None, ): from nanobot.config.schema import ExecToolConfig - from nanobot.cron.service import CronService self.bus = bus self.provider = provider self.workspace = workspace @@ -84,14 +89,14 @@ class AgentLoop: exec_config=self.exec_config, restrict_to_workspace=restrict_to_workspace, ) - + self._running = False self._mcp_servers = mcp_servers or {} self._mcp_stack: AsyncExitStack | None = None self._mcp_connected = False self._consolidating: set[str] = set() # Session keys with consolidation in progress self._register_default_tools() - + def _register_default_tools(self) -> None: """Register the default set of tools.""" # File tools (workspace for relative paths, restrict if configured) @@ -100,30 +105,30 @@ class AgentLoop: self.tools.register(WriteFileTool(workspace=self.workspace, allowed_dir=allowed_dir)) self.tools.register(EditFileTool(workspace=self.workspace, allowed_dir=allowed_dir)) self.tools.register(ListDirTool(workspace=self.workspace, allowed_dir=allowed_dir)) - + # Shell tool self.tools.register(ExecTool( working_dir=str(self.workspace), timeout=self.exec_config.timeout, restrict_to_workspace=self.restrict_to_workspace, )) - + # Web tools self.tools.register(WebSearchTool(api_key=self.brave_api_key)) self.tools.register(WebFetchTool()) - + # Message tool message_tool = MessageTool(send_callback=self.bus.publish_outbound) self.tools.register(message_tool) - + # Spawn tool (for subagents) spawn_tool = SpawnTool(manager=self.subagents) self.tools.register(spawn_tool) - + # Cron tool (for scheduling) if self.cron_service: self.tools.register(CronTool(self.cron_service)) - + async def _connect_mcp(self) -> None: """Connect to configured MCP servers (one-time, lazy).""" if self._mcp_connected or not self._mcp_servers: @@ -270,7 +275,7 @@ class AgentLoop: )) except asyncio.TimeoutError: continue - + async def close_mcp(self) -> None: """Close MCP connections.""" if self._mcp_stack: @@ -284,7 +289,7 @@ class AgentLoop: """Stop the agent loop.""" self._running = False logger.info("Agent loop stopping") - + async def _process_message( self, msg: InboundMessage, @@ -293,25 +298,25 @@ class AgentLoop: ) -> OutboundMessage | None: """ Process a single inbound message. - + Args: msg: The inbound message to process. session_key: Override session key (used by process_direct). on_progress: Optional callback for intermediate output (defaults to bus publish). - + Returns: The response message, or None if no response needed. """ # System messages route back via chat_id ("channel:chat_id") if msg.channel == "system": return await self._process_system_message(msg) - + preview = msg.content[:80] + "..." if len(msg.content) > 80 else msg.content logger.info("Processing message from {}:{}: {}", msg.channel, msg.sender_id, preview) - + key = session_key or msg.session_key session = self.sessions.get_or_create(key) - + # Handle slash commands cmd = msg.content.strip().lower() if cmd == "/new": @@ -332,7 +337,7 @@ class AgentLoop: if cmd == "/help": return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, content="🐈 nanobot commands:\n/new — Start a new conversation\n/help — Show available commands") - + if len(session.messages) > self.memory_window and session.key not in self._consolidating: self._consolidating.add(session.key) @@ -345,6 +350,10 @@ class AgentLoop: asyncio.create_task(_consolidate_and_unlock()) self._set_tool_context(msg.channel, msg.chat_id, msg.metadata.get("message_id")) + if message_tool := self.tools.get("message"): + if isinstance(message_tool, MessageTool): + message_tool.start_turn() + initial_messages = self.context.build_messages( history=session.get_history(max_messages=self.memory_window), current_message=msg.content, @@ -365,31 +374,44 @@ class AgentLoop: if final_content is None: final_content = "I've completed processing but have no response to give." - + preview = final_content[:120] + "..." if len(final_content) > 120 else final_content logger.info("Response to {}:{}: {}", msg.channel, msg.sender_id, preview) - + session.add_message("user", msg.content) session.add_message("assistant", final_content, tools_used=tools_used if tools_used else None) self.sessions.save(session) - + + suppress_final_reply = False + if message_tool := self.tools.get("message"): + if isinstance(message_tool, MessageTool): + sent_targets = set(message_tool.get_turn_sends()) + suppress_final_reply = (msg.channel, msg.chat_id) in sent_targets + + if suppress_final_reply: + logger.info( + "Skipping final auto-reply because message tool already sent to " + f"{msg.channel}:{msg.chat_id} in this turn" + ) + return None + return OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, content=final_content, metadata=msg.metadata or {}, # Pass through for channel-specific needs (e.g. Slack thread_ts) ) - + async def _process_system_message(self, msg: InboundMessage) -> OutboundMessage | None: """ Process a system message (e.g., subagent announce). - + The chat_id field contains "original_channel:original_chat_id" to route the response back to the correct destination. """ logger.info("Processing system message from {}", msg.sender_id) - + # Parse origin from chat_id (format: "channel:chat_id") if ":" in msg.chat_id: parts = msg.chat_id.split(":", 1) @@ -399,7 +421,7 @@ class AgentLoop: # Fallback origin_channel = "cli" origin_chat_id = msg.chat_id - + session_key = f"{origin_channel}:{origin_chat_id}" session = self.sessions.get_or_create(session_key) self._set_tool_context(origin_channel, origin_chat_id, msg.metadata.get("message_id")) @@ -413,17 +435,17 @@ class AgentLoop: if final_content is None: final_content = "Background task completed." - + session.add_message("user", f"[System: {msg.sender_id}] {msg.content}") session.add_message("assistant", final_content) self.sessions.save(session) - + return OutboundMessage( channel=origin_channel, chat_id=origin_chat_id, content=final_content ) - + async def _consolidate_memory(self, session, archive_all: bool = False) -> None: """Consolidate old messages into MEMORY.md + HISTORY.md. @@ -533,14 +555,14 @@ Respond with ONLY valid JSON, no markdown fences.""" ) -> str: """ Process a message directly (for CLI or cron usage). - + Args: content: The message content. session_key: Session identifier (overrides channel:chat_id for session lookup). channel: Source channel (for tool context routing). chat_id: Source chat ID (for tool context routing). on_progress: Optional callback for intermediate output. - + Returns: The agent's response. """ @@ -551,6 +573,6 @@ Respond with ONLY valid JSON, no markdown fences.""" chat_id=chat_id, content=content ) - + response = await self._process_message(msg, session_key=session_key, on_progress=on_progress) return response.content if response else "" diff --git a/nanobot/agent/tools/message.py b/nanobot/agent/tools/message.py index 10947c4..593bc44 100644 --- a/nanobot/agent/tools/message.py +++ b/nanobot/agent/tools/message.py @@ -1,6 +1,6 @@ """Message tool for sending messages to users.""" -from typing import Any, Callable, Awaitable +from typing import Any, Awaitable, Callable from nanobot.agent.tools.base import Tool from nanobot.bus.events import OutboundMessage @@ -8,37 +8,45 @@ from nanobot.bus.events import OutboundMessage class MessageTool(Tool): """Tool to send messages to users on chat channels.""" - + def __init__( - self, + self, send_callback: Callable[[OutboundMessage], Awaitable[None]] | None = None, default_channel: str = "", default_chat_id: str = "", - default_message_id: str | None = None + default_message_id: str | None = None, ): self._send_callback = send_callback self._default_channel = default_channel self._default_chat_id = default_chat_id self._default_message_id = default_message_id - + self._turn_sends: list[tuple[str, str]] = [] + def set_context(self, channel: str, chat_id: str, message_id: str | None = None) -> None: """Set the current message context.""" self._default_channel = channel self._default_chat_id = chat_id self._default_message_id = message_id - def set_send_callback(self, callback: Callable[[OutboundMessage], Awaitable[None]]) -> None: """Set the callback for sending messages.""" self._send_callback = callback - + + def start_turn(self) -> None: + """Reset per-turn send tracking.""" + self._turn_sends.clear() + + def get_turn_sends(self) -> list[tuple[str, str]]: + """Get (channel, chat_id) targets sent in the current turn.""" + return list(self._turn_sends) + @property def name(self) -> str: return "message" - + @property def description(self) -> str: return "Send a message to the user. Use this when you want to communicate something." - + @property def parameters(self) -> dict[str, Any]: return { @@ -64,11 +72,11 @@ class MessageTool(Tool): }, "required": ["content"] } - + async def execute( - self, - content: str, - channel: str | None = None, + self, + content: str, + channel: str | None = None, chat_id: str | None = None, message_id: str | None = None, media: list[str] | None = None, @@ -77,13 +85,13 @@ class MessageTool(Tool): channel = channel or self._default_channel chat_id = chat_id or self._default_chat_id message_id = message_id or self._default_message_id - + if not channel or not chat_id: return "Error: No target channel/chat specified" - + if not self._send_callback: return "Error: Message sending not configured" - + msg = OutboundMessage( channel=channel, chat_id=chat_id, @@ -93,9 +101,10 @@ class MessageTool(Tool): "message_id": message_id, } ) - + try: await self._send_callback(msg) + self._turn_sends.append((channel, chat_id)) media_info = f" with {len(media)} attachments" if media else "" return f"Message sent to {channel}:{chat_id}{media_info}" except Exception as e: