Merge PR #832: avoid duplicate reply when message tool already sent

This commit is contained in:
Re-bin
2026-02-20 15:56:13 +00:00
2 changed files with 85 additions and 66 deletions

View File

@@ -1,30 +1,36 @@
"""Agent loop: the core processing engine.""" """Agent loop: the core processing engine."""
import asyncio from __future__ import annotations
from contextlib import AsyncExitStack
import json
import json_repair
from pathlib import Path
import re
from typing import Any, Awaitable, Callable
import asyncio
import json
import re
from contextlib import AsyncExitStack
from pathlib import Path
from typing import TYPE_CHECKING, Awaitable, Callable
import json_repair
from loguru import logger from loguru import logger
from nanobot.agent.context import ContextBuilder
from nanobot.agent.memory import MemoryStore
from nanobot.agent.subagent import SubagentManager
from nanobot.agent.tools.cron import CronTool
from nanobot.agent.tools.filesystem import EditFileTool, ListDirTool, ReadFileTool, WriteFileTool
from nanobot.agent.tools.message import MessageTool
from nanobot.agent.tools.registry import ToolRegistry
from nanobot.agent.tools.shell import ExecTool
from nanobot.agent.tools.spawn import SpawnTool
from nanobot.agent.tools.web import WebFetchTool, WebSearchTool
from nanobot.bus.events import InboundMessage, OutboundMessage from nanobot.bus.events import InboundMessage, OutboundMessage
from nanobot.bus.queue import MessageBus from nanobot.bus.queue import MessageBus
from nanobot.providers.base import LLMProvider from nanobot.providers.base import LLMProvider
from nanobot.agent.context import ContextBuilder
from nanobot.agent.tools.registry import ToolRegistry
from nanobot.agent.tools.filesystem import ReadFileTool, WriteFileTool, EditFileTool, ListDirTool
from nanobot.agent.tools.shell import ExecTool
from nanobot.agent.tools.web import WebSearchTool, WebFetchTool
from nanobot.agent.tools.message import MessageTool
from nanobot.agent.tools.spawn import SpawnTool
from nanobot.agent.tools.cron import CronTool
from nanobot.agent.memory import MemoryStore
from nanobot.agent.subagent import SubagentManager
from nanobot.session.manager import Session, SessionManager from nanobot.session.manager import Session, SessionManager
if TYPE_CHECKING:
from nanobot.config.schema import ExecToolConfig
from nanobot.cron.service import CronService
class AgentLoop: class AgentLoop:
""" """
@@ -49,14 +55,13 @@ class AgentLoop:
max_tokens: int = 4096, max_tokens: int = 4096,
memory_window: int = 50, memory_window: int = 50,
brave_api_key: str | None = None, brave_api_key: str | None = None,
exec_config: "ExecToolConfig | None" = None, exec_config: ExecToolConfig | None = None,
cron_service: "CronService | None" = None, cron_service: CronService | None = None,
restrict_to_workspace: bool = False, restrict_to_workspace: bool = False,
session_manager: SessionManager | None = None, session_manager: SessionManager | None = None,
mcp_servers: dict | None = None, mcp_servers: dict | None = None,
): ):
from nanobot.config.schema import ExecToolConfig from nanobot.config.schema import ExecToolConfig
from nanobot.cron.service import CronService
self.bus = bus self.bus = bus
self.provider = provider self.provider = provider
self.workspace = workspace self.workspace = workspace
@@ -84,14 +89,14 @@ class AgentLoop:
exec_config=self.exec_config, exec_config=self.exec_config,
restrict_to_workspace=restrict_to_workspace, restrict_to_workspace=restrict_to_workspace,
) )
self._running = False self._running = False
self._mcp_servers = mcp_servers or {} self._mcp_servers = mcp_servers or {}
self._mcp_stack: AsyncExitStack | None = None self._mcp_stack: AsyncExitStack | None = None
self._mcp_connected = False self._mcp_connected = False
self._consolidating: set[str] = set() # Session keys with consolidation in progress self._consolidating: set[str] = set() # Session keys with consolidation in progress
self._register_default_tools() self._register_default_tools()
def _register_default_tools(self) -> None: def _register_default_tools(self) -> None:
"""Register the default set of tools.""" """Register the default set of tools."""
# File tools (workspace for relative paths, restrict if configured) # File tools (workspace for relative paths, restrict if configured)
@@ -100,30 +105,30 @@ class AgentLoop:
self.tools.register(WriteFileTool(workspace=self.workspace, allowed_dir=allowed_dir)) self.tools.register(WriteFileTool(workspace=self.workspace, allowed_dir=allowed_dir))
self.tools.register(EditFileTool(workspace=self.workspace, allowed_dir=allowed_dir)) self.tools.register(EditFileTool(workspace=self.workspace, allowed_dir=allowed_dir))
self.tools.register(ListDirTool(workspace=self.workspace, allowed_dir=allowed_dir)) self.tools.register(ListDirTool(workspace=self.workspace, allowed_dir=allowed_dir))
# Shell tool # Shell tool
self.tools.register(ExecTool( self.tools.register(ExecTool(
working_dir=str(self.workspace), working_dir=str(self.workspace),
timeout=self.exec_config.timeout, timeout=self.exec_config.timeout,
restrict_to_workspace=self.restrict_to_workspace, restrict_to_workspace=self.restrict_to_workspace,
)) ))
# Web tools # Web tools
self.tools.register(WebSearchTool(api_key=self.brave_api_key)) self.tools.register(WebSearchTool(api_key=self.brave_api_key))
self.tools.register(WebFetchTool()) self.tools.register(WebFetchTool())
# Message tool # Message tool
message_tool = MessageTool(send_callback=self.bus.publish_outbound) message_tool = MessageTool(send_callback=self.bus.publish_outbound)
self.tools.register(message_tool) self.tools.register(message_tool)
# Spawn tool (for subagents) # Spawn tool (for subagents)
spawn_tool = SpawnTool(manager=self.subagents) spawn_tool = SpawnTool(manager=self.subagents)
self.tools.register(spawn_tool) self.tools.register(spawn_tool)
# Cron tool (for scheduling) # Cron tool (for scheduling)
if self.cron_service: if self.cron_service:
self.tools.register(CronTool(self.cron_service)) self.tools.register(CronTool(self.cron_service))
async def _connect_mcp(self) -> None: async def _connect_mcp(self) -> None:
"""Connect to configured MCP servers (one-time, lazy).""" """Connect to configured MCP servers (one-time, lazy)."""
if self._mcp_connected or not self._mcp_servers: if self._mcp_connected or not self._mcp_servers:
@@ -270,7 +275,7 @@ class AgentLoop:
)) ))
except asyncio.TimeoutError: except asyncio.TimeoutError:
continue continue
async def close_mcp(self) -> None: async def close_mcp(self) -> None:
"""Close MCP connections.""" """Close MCP connections."""
if self._mcp_stack: if self._mcp_stack:
@@ -284,7 +289,7 @@ class AgentLoop:
"""Stop the agent loop.""" """Stop the agent loop."""
self._running = False self._running = False
logger.info("Agent loop stopping") logger.info("Agent loop stopping")
async def _process_message( async def _process_message(
self, self,
msg: InboundMessage, msg: InboundMessage,
@@ -293,25 +298,25 @@ class AgentLoop:
) -> OutboundMessage | None: ) -> OutboundMessage | None:
""" """
Process a single inbound message. Process a single inbound message.
Args: Args:
msg: The inbound message to process. msg: The inbound message to process.
session_key: Override session key (used by process_direct). session_key: Override session key (used by process_direct).
on_progress: Optional callback for intermediate output (defaults to bus publish). on_progress: Optional callback for intermediate output (defaults to bus publish).
Returns: Returns:
The response message, or None if no response needed. The response message, or None if no response needed.
""" """
# System messages route back via chat_id ("channel:chat_id") # System messages route back via chat_id ("channel:chat_id")
if msg.channel == "system": if msg.channel == "system":
return await self._process_system_message(msg) return await self._process_system_message(msg)
preview = msg.content[:80] + "..." if len(msg.content) > 80 else msg.content preview = msg.content[:80] + "..." if len(msg.content) > 80 else msg.content
logger.info("Processing message from {}:{}: {}", msg.channel, msg.sender_id, preview) logger.info("Processing message from {}:{}: {}", msg.channel, msg.sender_id, preview)
key = session_key or msg.session_key key = session_key or msg.session_key
session = self.sessions.get_or_create(key) session = self.sessions.get_or_create(key)
# Handle slash commands # Handle slash commands
cmd = msg.content.strip().lower() cmd = msg.content.strip().lower()
if cmd == "/new": if cmd == "/new":
@@ -332,7 +337,7 @@ class AgentLoop:
if cmd == "/help": if cmd == "/help":
return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id,
content="🐈 nanobot commands:\n/new — Start a new conversation\n/help — Show available commands") content="🐈 nanobot commands:\n/new — Start a new conversation\n/help — Show available commands")
if len(session.messages) > self.memory_window and session.key not in self._consolidating: if len(session.messages) > self.memory_window and session.key not in self._consolidating:
self._consolidating.add(session.key) self._consolidating.add(session.key)
@@ -345,6 +350,10 @@ class AgentLoop:
asyncio.create_task(_consolidate_and_unlock()) asyncio.create_task(_consolidate_and_unlock())
self._set_tool_context(msg.channel, msg.chat_id, msg.metadata.get("message_id")) self._set_tool_context(msg.channel, msg.chat_id, msg.metadata.get("message_id"))
if message_tool := self.tools.get("message"):
if isinstance(message_tool, MessageTool):
message_tool.start_turn()
initial_messages = self.context.build_messages( initial_messages = self.context.build_messages(
history=session.get_history(max_messages=self.memory_window), history=session.get_history(max_messages=self.memory_window),
current_message=msg.content, current_message=msg.content,
@@ -365,31 +374,35 @@ class AgentLoop:
if final_content is None: if final_content is None:
final_content = "I've completed processing but have no response to give." final_content = "I've completed processing but have no response to give."
preview = final_content[:120] + "..." if len(final_content) > 120 else final_content preview = final_content[:120] + "..." if len(final_content) > 120 else final_content
logger.info("Response to {}:{}: {}", msg.channel, msg.sender_id, preview) logger.info("Response to {}:{}: {}", msg.channel, msg.sender_id, preview)
session.add_message("user", msg.content) session.add_message("user", msg.content)
session.add_message("assistant", final_content, session.add_message("assistant", final_content,
tools_used=tools_used if tools_used else None) tools_used=tools_used if tools_used else None)
self.sessions.save(session) self.sessions.save(session)
if message_tool := self.tools.get("message"):
if isinstance(message_tool, MessageTool) and message_tool._sent_in_turn:
return None
return OutboundMessage( return OutboundMessage(
channel=msg.channel, channel=msg.channel,
chat_id=msg.chat_id, chat_id=msg.chat_id,
content=final_content, content=final_content,
metadata=msg.metadata or {}, # Pass through for channel-specific needs (e.g. Slack thread_ts) metadata=msg.metadata or {}, # Pass through for channel-specific needs (e.g. Slack thread_ts)
) )
async def _process_system_message(self, msg: InboundMessage) -> OutboundMessage | None: async def _process_system_message(self, msg: InboundMessage) -> OutboundMessage | None:
""" """
Process a system message (e.g., subagent announce). Process a system message (e.g., subagent announce).
The chat_id field contains "original_channel:original_chat_id" to route The chat_id field contains "original_channel:original_chat_id" to route
the response back to the correct destination. the response back to the correct destination.
""" """
logger.info("Processing system message from {}", msg.sender_id) logger.info("Processing system message from {}", msg.sender_id)
# Parse origin from chat_id (format: "channel:chat_id") # Parse origin from chat_id (format: "channel:chat_id")
if ":" in msg.chat_id: if ":" in msg.chat_id:
parts = msg.chat_id.split(":", 1) parts = msg.chat_id.split(":", 1)
@@ -399,7 +412,7 @@ class AgentLoop:
# Fallback # Fallback
origin_channel = "cli" origin_channel = "cli"
origin_chat_id = msg.chat_id origin_chat_id = msg.chat_id
session_key = f"{origin_channel}:{origin_chat_id}" session_key = f"{origin_channel}:{origin_chat_id}"
session = self.sessions.get_or_create(session_key) session = self.sessions.get_or_create(session_key)
self._set_tool_context(origin_channel, origin_chat_id, msg.metadata.get("message_id")) self._set_tool_context(origin_channel, origin_chat_id, msg.metadata.get("message_id"))
@@ -413,17 +426,17 @@ class AgentLoop:
if final_content is None: if final_content is None:
final_content = "Background task completed." final_content = "Background task completed."
session.add_message("user", f"[System: {msg.sender_id}] {msg.content}") session.add_message("user", f"[System: {msg.sender_id}] {msg.content}")
session.add_message("assistant", final_content) session.add_message("assistant", final_content)
self.sessions.save(session) self.sessions.save(session)
return OutboundMessage( return OutboundMessage(
channel=origin_channel, channel=origin_channel,
chat_id=origin_chat_id, chat_id=origin_chat_id,
content=final_content content=final_content
) )
async def _consolidate_memory(self, session, archive_all: bool = False) -> None: async def _consolidate_memory(self, session, archive_all: bool = False) -> None:
"""Consolidate old messages into MEMORY.md + HISTORY.md. """Consolidate old messages into MEMORY.md + HISTORY.md.
@@ -533,14 +546,14 @@ Respond with ONLY valid JSON, no markdown fences."""
) -> str: ) -> str:
""" """
Process a message directly (for CLI or cron usage). Process a message directly (for CLI or cron usage).
Args: Args:
content: The message content. content: The message content.
session_key: Session identifier (overrides channel:chat_id for session lookup). session_key: Session identifier (overrides channel:chat_id for session lookup).
channel: Source channel (for tool context routing). channel: Source channel (for tool context routing).
chat_id: Source chat ID (for tool context routing). chat_id: Source chat ID (for tool context routing).
on_progress: Optional callback for intermediate output. on_progress: Optional callback for intermediate output.
Returns: Returns:
The agent's response. The agent's response.
""" """
@@ -551,6 +564,6 @@ Respond with ONLY valid JSON, no markdown fences."""
chat_id=chat_id, chat_id=chat_id,
content=content content=content
) )
response = await self._process_message(msg, session_key=session_key, on_progress=on_progress) response = await self._process_message(msg, session_key=session_key, on_progress=on_progress)
return response.content if response else "" return response.content if response else ""

View File

@@ -1,6 +1,6 @@
"""Message tool for sending messages to users.""" """Message tool for sending messages to users."""
from typing import Any, Callable, Awaitable from typing import Any, Awaitable, Callable
from nanobot.agent.tools.base import Tool from nanobot.agent.tools.base import Tool
from nanobot.bus.events import OutboundMessage from nanobot.bus.events import OutboundMessage
@@ -8,37 +8,42 @@ from nanobot.bus.events import OutboundMessage
class MessageTool(Tool): class MessageTool(Tool):
"""Tool to send messages to users on chat channels.""" """Tool to send messages to users on chat channels."""
def __init__( def __init__(
self, self,
send_callback: Callable[[OutboundMessage], Awaitable[None]] | None = None, send_callback: Callable[[OutboundMessage], Awaitable[None]] | None = None,
default_channel: str = "", default_channel: str = "",
default_chat_id: str = "", default_chat_id: str = "",
default_message_id: str | None = None default_message_id: str | None = None,
): ):
self._send_callback = send_callback self._send_callback = send_callback
self._default_channel = default_channel self._default_channel = default_channel
self._default_chat_id = default_chat_id self._default_chat_id = default_chat_id
self._default_message_id = default_message_id self._default_message_id = default_message_id
self._sent_in_turn: bool = False
def set_context(self, channel: str, chat_id: str, message_id: str | None = None) -> None: def set_context(self, channel: str, chat_id: str, message_id: str | None = None) -> None:
"""Set the current message context.""" """Set the current message context."""
self._default_channel = channel self._default_channel = channel
self._default_chat_id = chat_id self._default_chat_id = chat_id
self._default_message_id = message_id self._default_message_id = message_id
def set_send_callback(self, callback: Callable[[OutboundMessage], Awaitable[None]]) -> None: def set_send_callback(self, callback: Callable[[OutboundMessage], Awaitable[None]]) -> None:
"""Set the callback for sending messages.""" """Set the callback for sending messages."""
self._send_callback = callback self._send_callback = callback
def start_turn(self) -> None:
"""Reset per-turn send tracking."""
self._sent_in_turn = False
@property @property
def name(self) -> str: def name(self) -> str:
return "message" return "message"
@property @property
def description(self) -> str: def description(self) -> str:
return "Send a message to the user. Use this when you want to communicate something." return "Send a message to the user. Use this when you want to communicate something."
@property @property
def parameters(self) -> dict[str, Any]: def parameters(self) -> dict[str, Any]:
return { return {
@@ -64,11 +69,11 @@ class MessageTool(Tool):
}, },
"required": ["content"] "required": ["content"]
} }
async def execute( async def execute(
self, self,
content: str, content: str,
channel: str | None = None, channel: str | None = None,
chat_id: str | None = None, chat_id: str | None = None,
message_id: str | None = None, message_id: str | None = None,
media: list[str] | None = None, media: list[str] | None = None,
@@ -77,13 +82,13 @@ class MessageTool(Tool):
channel = channel or self._default_channel channel = channel or self._default_channel
chat_id = chat_id or self._default_chat_id chat_id = chat_id or self._default_chat_id
message_id = message_id or self._default_message_id message_id = message_id or self._default_message_id
if not channel or not chat_id: if not channel or not chat_id:
return "Error: No target channel/chat specified" return "Error: No target channel/chat specified"
if not self._send_callback: if not self._send_callback:
return "Error: Message sending not configured" return "Error: Message sending not configured"
msg = OutboundMessage( msg = OutboundMessage(
channel=channel, channel=channel,
chat_id=chat_id, chat_id=chat_id,
@@ -93,9 +98,10 @@ class MessageTool(Tool):
"message_id": message_id, "message_id": message_id,
} }
) )
try: try:
await self._send_callback(msg) await self._send_callback(msg)
self._sent_in_turn = True
media_info = f" with {len(media)} attachments" if media else "" media_info = f" with {len(media)} attachments" if media else ""
return f"Message sent to {channel}:{chat_id}{media_info}" return f"Message sent to {channel}:{chat_id}{media_info}"
except Exception as e: except Exception as e: