fix(agent): avoid duplicate final send when message tool already replied
This commit is contained in:
@@ -1,30 +1,36 @@
|
||||
"""Agent loop: the core processing engine."""
|
||||
|
||||
import asyncio
|
||||
from contextlib import AsyncExitStack
|
||||
import json
|
||||
import json_repair
|
||||
from pathlib import Path
|
||||
import re
|
||||
from typing import Any, Awaitable, Callable
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import re
|
||||
from contextlib import AsyncExitStack
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, Awaitable, Callable
|
||||
|
||||
import json_repair
|
||||
from loguru import logger
|
||||
|
||||
from nanobot.agent.context import ContextBuilder
|
||||
from nanobot.agent.memory import MemoryStore
|
||||
from nanobot.agent.subagent import SubagentManager
|
||||
from nanobot.agent.tools.cron import CronTool
|
||||
from nanobot.agent.tools.filesystem import EditFileTool, ListDirTool, ReadFileTool, WriteFileTool
|
||||
from nanobot.agent.tools.message import MessageTool
|
||||
from nanobot.agent.tools.registry import ToolRegistry
|
||||
from nanobot.agent.tools.shell import ExecTool
|
||||
from nanobot.agent.tools.spawn import SpawnTool
|
||||
from nanobot.agent.tools.web import WebFetchTool, WebSearchTool
|
||||
from nanobot.bus.events import InboundMessage, OutboundMessage
|
||||
from nanobot.bus.queue import MessageBus
|
||||
from nanobot.providers.base import LLMProvider
|
||||
from nanobot.agent.context import ContextBuilder
|
||||
from nanobot.agent.tools.registry import ToolRegistry
|
||||
from nanobot.agent.tools.filesystem import ReadFileTool, WriteFileTool, EditFileTool, ListDirTool
|
||||
from nanobot.agent.tools.shell import ExecTool
|
||||
from nanobot.agent.tools.web import WebSearchTool, WebFetchTool
|
||||
from nanobot.agent.tools.message import MessageTool
|
||||
from nanobot.agent.tools.spawn import SpawnTool
|
||||
from nanobot.agent.tools.cron import CronTool
|
||||
from nanobot.agent.memory import MemoryStore
|
||||
from nanobot.agent.subagent import SubagentManager
|
||||
from nanobot.session.manager import Session, SessionManager
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from nanobot.config.schema import ExecToolConfig
|
||||
from nanobot.cron.service import CronService
|
||||
|
||||
|
||||
class AgentLoop:
|
||||
"""
|
||||
@@ -49,14 +55,13 @@ class AgentLoop:
|
||||
max_tokens: int = 4096,
|
||||
memory_window: int = 50,
|
||||
brave_api_key: str | None = None,
|
||||
exec_config: "ExecToolConfig | None" = None,
|
||||
cron_service: "CronService | None" = None,
|
||||
exec_config: ExecToolConfig | None = None,
|
||||
cron_service: CronService | None = None,
|
||||
restrict_to_workspace: bool = False,
|
||||
session_manager: SessionManager | None = None,
|
||||
mcp_servers: dict | None = None,
|
||||
):
|
||||
from nanobot.config.schema import ExecToolConfig
|
||||
from nanobot.cron.service import CronService
|
||||
self.bus = bus
|
||||
self.provider = provider
|
||||
self.workspace = workspace
|
||||
@@ -84,14 +89,14 @@ class AgentLoop:
|
||||
exec_config=self.exec_config,
|
||||
restrict_to_workspace=restrict_to_workspace,
|
||||
)
|
||||
|
||||
|
||||
self._running = False
|
||||
self._mcp_servers = mcp_servers or {}
|
||||
self._mcp_stack: AsyncExitStack | None = None
|
||||
self._mcp_connected = False
|
||||
self._consolidating: set[str] = set() # Session keys with consolidation in progress
|
||||
self._register_default_tools()
|
||||
|
||||
|
||||
def _register_default_tools(self) -> None:
|
||||
"""Register the default set of tools."""
|
||||
# File tools (workspace for relative paths, restrict if configured)
|
||||
@@ -100,30 +105,30 @@ class AgentLoop:
|
||||
self.tools.register(WriteFileTool(workspace=self.workspace, allowed_dir=allowed_dir))
|
||||
self.tools.register(EditFileTool(workspace=self.workspace, allowed_dir=allowed_dir))
|
||||
self.tools.register(ListDirTool(workspace=self.workspace, allowed_dir=allowed_dir))
|
||||
|
||||
|
||||
# Shell tool
|
||||
self.tools.register(ExecTool(
|
||||
working_dir=str(self.workspace),
|
||||
timeout=self.exec_config.timeout,
|
||||
restrict_to_workspace=self.restrict_to_workspace,
|
||||
))
|
||||
|
||||
|
||||
# Web tools
|
||||
self.tools.register(WebSearchTool(api_key=self.brave_api_key))
|
||||
self.tools.register(WebFetchTool())
|
||||
|
||||
|
||||
# Message tool
|
||||
message_tool = MessageTool(send_callback=self.bus.publish_outbound)
|
||||
self.tools.register(message_tool)
|
||||
|
||||
|
||||
# Spawn tool (for subagents)
|
||||
spawn_tool = SpawnTool(manager=self.subagents)
|
||||
self.tools.register(spawn_tool)
|
||||
|
||||
|
||||
# Cron tool (for scheduling)
|
||||
if self.cron_service:
|
||||
self.tools.register(CronTool(self.cron_service))
|
||||
|
||||
|
||||
async def _connect_mcp(self) -> None:
|
||||
"""Connect to configured MCP servers (one-time, lazy)."""
|
||||
if self._mcp_connected or not self._mcp_servers:
|
||||
@@ -270,7 +275,7 @@ class AgentLoop:
|
||||
))
|
||||
except asyncio.TimeoutError:
|
||||
continue
|
||||
|
||||
|
||||
async def close_mcp(self) -> None:
|
||||
"""Close MCP connections."""
|
||||
if self._mcp_stack:
|
||||
@@ -284,7 +289,7 @@ class AgentLoop:
|
||||
"""Stop the agent loop."""
|
||||
self._running = False
|
||||
logger.info("Agent loop stopping")
|
||||
|
||||
|
||||
async def _process_message(
|
||||
self,
|
||||
msg: InboundMessage,
|
||||
@@ -293,25 +298,25 @@ class AgentLoop:
|
||||
) -> OutboundMessage | None:
|
||||
"""
|
||||
Process a single inbound message.
|
||||
|
||||
|
||||
Args:
|
||||
msg: The inbound message to process.
|
||||
session_key: Override session key (used by process_direct).
|
||||
on_progress: Optional callback for intermediate output (defaults to bus publish).
|
||||
|
||||
|
||||
Returns:
|
||||
The response message, or None if no response needed.
|
||||
"""
|
||||
# System messages route back via chat_id ("channel:chat_id")
|
||||
if msg.channel == "system":
|
||||
return await self._process_system_message(msg)
|
||||
|
||||
|
||||
preview = msg.content[:80] + "..." if len(msg.content) > 80 else msg.content
|
||||
logger.info("Processing message from {}:{}: {}", msg.channel, msg.sender_id, preview)
|
||||
|
||||
|
||||
key = session_key or msg.session_key
|
||||
session = self.sessions.get_or_create(key)
|
||||
|
||||
|
||||
# Handle slash commands
|
||||
cmd = msg.content.strip().lower()
|
||||
if cmd == "/new":
|
||||
@@ -332,7 +337,7 @@ class AgentLoop:
|
||||
if cmd == "/help":
|
||||
return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id,
|
||||
content="🐈 nanobot commands:\n/new — Start a new conversation\n/help — Show available commands")
|
||||
|
||||
|
||||
if len(session.messages) > self.memory_window and session.key not in self._consolidating:
|
||||
self._consolidating.add(session.key)
|
||||
|
||||
@@ -345,6 +350,10 @@ class AgentLoop:
|
||||
asyncio.create_task(_consolidate_and_unlock())
|
||||
|
||||
self._set_tool_context(msg.channel, msg.chat_id, msg.metadata.get("message_id"))
|
||||
if message_tool := self.tools.get("message"):
|
||||
if isinstance(message_tool, MessageTool):
|
||||
message_tool.start_turn()
|
||||
|
||||
initial_messages = self.context.build_messages(
|
||||
history=session.get_history(max_messages=self.memory_window),
|
||||
current_message=msg.content,
|
||||
@@ -365,31 +374,44 @@ class AgentLoop:
|
||||
|
||||
if final_content is None:
|
||||
final_content = "I've completed processing but have no response to give."
|
||||
|
||||
|
||||
preview = final_content[:120] + "..." if len(final_content) > 120 else final_content
|
||||
logger.info("Response to {}:{}: {}", msg.channel, msg.sender_id, preview)
|
||||
|
||||
|
||||
session.add_message("user", msg.content)
|
||||
session.add_message("assistant", final_content,
|
||||
tools_used=tools_used if tools_used else None)
|
||||
self.sessions.save(session)
|
||||
|
||||
|
||||
suppress_final_reply = False
|
||||
if message_tool := self.tools.get("message"):
|
||||
if isinstance(message_tool, MessageTool):
|
||||
sent_targets = set(message_tool.get_turn_sends())
|
||||
suppress_final_reply = (msg.channel, msg.chat_id) in sent_targets
|
||||
|
||||
if suppress_final_reply:
|
||||
logger.info(
|
||||
"Skipping final auto-reply because message tool already sent to "
|
||||
f"{msg.channel}:{msg.chat_id} in this turn"
|
||||
)
|
||||
return None
|
||||
|
||||
return OutboundMessage(
|
||||
channel=msg.channel,
|
||||
chat_id=msg.chat_id,
|
||||
content=final_content,
|
||||
metadata=msg.metadata or {}, # Pass through for channel-specific needs (e.g. Slack thread_ts)
|
||||
)
|
||||
|
||||
|
||||
async def _process_system_message(self, msg: InboundMessage) -> OutboundMessage | None:
|
||||
"""
|
||||
Process a system message (e.g., subagent announce).
|
||||
|
||||
|
||||
The chat_id field contains "original_channel:original_chat_id" to route
|
||||
the response back to the correct destination.
|
||||
"""
|
||||
logger.info("Processing system message from {}", msg.sender_id)
|
||||
|
||||
|
||||
# Parse origin from chat_id (format: "channel:chat_id")
|
||||
if ":" in msg.chat_id:
|
||||
parts = msg.chat_id.split(":", 1)
|
||||
@@ -399,7 +421,7 @@ class AgentLoop:
|
||||
# Fallback
|
||||
origin_channel = "cli"
|
||||
origin_chat_id = msg.chat_id
|
||||
|
||||
|
||||
session_key = f"{origin_channel}:{origin_chat_id}"
|
||||
session = self.sessions.get_or_create(session_key)
|
||||
self._set_tool_context(origin_channel, origin_chat_id, msg.metadata.get("message_id"))
|
||||
@@ -413,17 +435,17 @@ class AgentLoop:
|
||||
|
||||
if final_content is None:
|
||||
final_content = "Background task completed."
|
||||
|
||||
|
||||
session.add_message("user", f"[System: {msg.sender_id}] {msg.content}")
|
||||
session.add_message("assistant", final_content)
|
||||
self.sessions.save(session)
|
||||
|
||||
|
||||
return OutboundMessage(
|
||||
channel=origin_channel,
|
||||
chat_id=origin_chat_id,
|
||||
content=final_content
|
||||
)
|
||||
|
||||
|
||||
async def _consolidate_memory(self, session, archive_all: bool = False) -> None:
|
||||
"""Consolidate old messages into MEMORY.md + HISTORY.md.
|
||||
|
||||
@@ -533,14 +555,14 @@ Respond with ONLY valid JSON, no markdown fences."""
|
||||
) -> str:
|
||||
"""
|
||||
Process a message directly (for CLI or cron usage).
|
||||
|
||||
|
||||
Args:
|
||||
content: The message content.
|
||||
session_key: Session identifier (overrides channel:chat_id for session lookup).
|
||||
channel: Source channel (for tool context routing).
|
||||
chat_id: Source chat ID (for tool context routing).
|
||||
on_progress: Optional callback for intermediate output.
|
||||
|
||||
|
||||
Returns:
|
||||
The agent's response.
|
||||
"""
|
||||
@@ -551,6 +573,6 @@ Respond with ONLY valid JSON, no markdown fences."""
|
||||
chat_id=chat_id,
|
||||
content=content
|
||||
)
|
||||
|
||||
|
||||
response = await self._process_message(msg, session_key=session_key, on_progress=on_progress)
|
||||
return response.content if response else ""
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
"""Message tool for sending messages to users."""
|
||||
|
||||
from typing import Any, Callable, Awaitable
|
||||
from typing import Any, Awaitable, Callable
|
||||
|
||||
from nanobot.agent.tools.base import Tool
|
||||
from nanobot.bus.events import OutboundMessage
|
||||
@@ -8,37 +8,45 @@ from nanobot.bus.events import OutboundMessage
|
||||
|
||||
class MessageTool(Tool):
|
||||
"""Tool to send messages to users on chat channels."""
|
||||
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
self,
|
||||
send_callback: Callable[[OutboundMessage], Awaitable[None]] | None = None,
|
||||
default_channel: str = "",
|
||||
default_chat_id: str = "",
|
||||
default_message_id: str | None = None
|
||||
default_message_id: str | None = None,
|
||||
):
|
||||
self._send_callback = send_callback
|
||||
self._default_channel = default_channel
|
||||
self._default_chat_id = default_chat_id
|
||||
self._default_message_id = default_message_id
|
||||
|
||||
self._turn_sends: list[tuple[str, str]] = []
|
||||
|
||||
def set_context(self, channel: str, chat_id: str, message_id: str | None = None) -> None:
|
||||
"""Set the current message context."""
|
||||
self._default_channel = channel
|
||||
self._default_chat_id = chat_id
|
||||
self._default_message_id = message_id
|
||||
|
||||
def set_send_callback(self, callback: Callable[[OutboundMessage], Awaitable[None]]) -> None:
|
||||
"""Set the callback for sending messages."""
|
||||
self._send_callback = callback
|
||||
|
||||
|
||||
def start_turn(self) -> None:
|
||||
"""Reset per-turn send tracking."""
|
||||
self._turn_sends.clear()
|
||||
|
||||
def get_turn_sends(self) -> list[tuple[str, str]]:
|
||||
"""Get (channel, chat_id) targets sent in the current turn."""
|
||||
return list(self._turn_sends)
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return "message"
|
||||
|
||||
|
||||
@property
|
||||
def description(self) -> str:
|
||||
return "Send a message to the user. Use this when you want to communicate something."
|
||||
|
||||
|
||||
@property
|
||||
def parameters(self) -> dict[str, Any]:
|
||||
return {
|
||||
@@ -64,11 +72,11 @@ class MessageTool(Tool):
|
||||
},
|
||||
"required": ["content"]
|
||||
}
|
||||
|
||||
|
||||
async def execute(
|
||||
self,
|
||||
content: str,
|
||||
channel: str | None = None,
|
||||
self,
|
||||
content: str,
|
||||
channel: str | None = None,
|
||||
chat_id: str | None = None,
|
||||
message_id: str | None = None,
|
||||
media: list[str] | None = None,
|
||||
@@ -77,13 +85,13 @@ class MessageTool(Tool):
|
||||
channel = channel or self._default_channel
|
||||
chat_id = chat_id or self._default_chat_id
|
||||
message_id = message_id or self._default_message_id
|
||||
|
||||
|
||||
if not channel or not chat_id:
|
||||
return "Error: No target channel/chat specified"
|
||||
|
||||
|
||||
if not self._send_callback:
|
||||
return "Error: Message sending not configured"
|
||||
|
||||
|
||||
msg = OutboundMessage(
|
||||
channel=channel,
|
||||
chat_id=chat_id,
|
||||
@@ -93,9 +101,10 @@ class MessageTool(Tool):
|
||||
"message_id": message_id,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
try:
|
||||
await self._send_callback(msg)
|
||||
self._turn_sends.append((channel, chat_id))
|
||||
media_info = f" with {len(media)} attachments" if media else ""
|
||||
return f"Message sent to {channel}:{chat_id}{media_info}"
|
||||
except Exception as e:
|
||||
|
||||
Reference in New Issue
Block a user