Merge branch 'main' into pr-887

This commit is contained in:
Re-bin
2026-02-21 07:12:58 +00:00
12 changed files with 540 additions and 146 deletions

View File

@@ -1,30 +1,36 @@
"""Agent loop: the core processing engine."""
import asyncio
from contextlib import AsyncExitStack
import json
import json_repair
from pathlib import Path
import re
from typing import Any, Awaitable, Callable
from __future__ import annotations
import asyncio
import json
import re
from contextlib import AsyncExitStack
from pathlib import Path
from typing import TYPE_CHECKING, Awaitable, Callable
import json_repair
from loguru import logger
from nanobot.agent.context import ContextBuilder
from nanobot.agent.memory import MemoryStore
from nanobot.agent.subagent import SubagentManager
from nanobot.agent.tools.cron import CronTool
from nanobot.agent.tools.filesystem import EditFileTool, ListDirTool, ReadFileTool, WriteFileTool
from nanobot.agent.tools.message import MessageTool
from nanobot.agent.tools.registry import ToolRegistry
from nanobot.agent.tools.shell import ExecTool
from nanobot.agent.tools.spawn import SpawnTool
from nanobot.agent.tools.web import WebFetchTool, WebSearchTool
from nanobot.bus.events import InboundMessage, OutboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.providers.base import LLMProvider
from nanobot.agent.context import ContextBuilder
from nanobot.agent.tools.registry import ToolRegistry
from nanobot.agent.tools.filesystem import ReadFileTool, WriteFileTool, EditFileTool, ListDirTool
from nanobot.agent.tools.shell import ExecTool
from nanobot.agent.tools.web import WebSearchTool, WebFetchTool
from nanobot.agent.tools.message import MessageTool
from nanobot.agent.tools.spawn import SpawnTool
from nanobot.agent.tools.cron import CronTool
from nanobot.agent.memory import MemoryStore
from nanobot.agent.subagent import SubagentManager
from nanobot.session.manager import Session, SessionManager
if TYPE_CHECKING:
from nanobot.config.schema import ExecToolConfig
from nanobot.cron.service import CronService
class AgentLoop:
"""
@@ -49,14 +55,13 @@ class AgentLoop:
max_tokens: int = 4096,
memory_window: int = 50,
brave_api_key: str | None = None,
exec_config: "ExecToolConfig | None" = None,
cron_service: "CronService | None" = None,
exec_config: ExecToolConfig | None = None,
cron_service: CronService | None = None,
restrict_to_workspace: bool = False,
session_manager: SessionManager | None = None,
mcp_servers: dict | None = None,
):
from nanobot.config.schema import ExecToolConfig
from nanobot.cron.service import CronService
self.bus = bus
self.provider = provider
self.workspace = workspace
@@ -84,14 +89,15 @@ class AgentLoop:
exec_config=self.exec_config,
restrict_to_workspace=restrict_to_workspace,
)
self._running = False
self._mcp_servers = mcp_servers or {}
self._mcp_stack: AsyncExitStack | None = None
self._mcp_connected = False
self._mcp_connecting = False
self._consolidating: set[str] = set() # Session keys with consolidation in progress
self._register_default_tools()
def _register_default_tools(self) -> None:
"""Register the default set of tools."""
# File tools (workspace for relative paths, restrict if configured)
@@ -100,39 +106,51 @@ class AgentLoop:
self.tools.register(WriteFileTool(workspace=self.workspace, allowed_dir=allowed_dir))
self.tools.register(EditFileTool(workspace=self.workspace, allowed_dir=allowed_dir))
self.tools.register(ListDirTool(workspace=self.workspace, allowed_dir=allowed_dir))
# Shell tool
self.tools.register(ExecTool(
working_dir=str(self.workspace),
timeout=self.exec_config.timeout,
restrict_to_workspace=self.restrict_to_workspace,
))
# Web tools
self.tools.register(WebSearchTool(api_key=self.brave_api_key))
self.tools.register(WebFetchTool())
# Message tool
message_tool = MessageTool(send_callback=self.bus.publish_outbound)
self.tools.register(message_tool)
# Spawn tool (for subagents)
spawn_tool = SpawnTool(manager=self.subagents)
self.tools.register(spawn_tool)
# Cron tool (for scheduling)
if self.cron_service:
self.tools.register(CronTool(self.cron_service))
async def _connect_mcp(self) -> None:
"""Connect to configured MCP servers (one-time, lazy)."""
if self._mcp_connected or not self._mcp_servers:
if self._mcp_connected or self._mcp_connecting or not self._mcp_servers:
return
self._mcp_connected = True
self._mcp_connecting = True
from nanobot.agent.tools.mcp import connect_mcp_servers
self._mcp_stack = AsyncExitStack()
await self._mcp_stack.__aenter__()
await connect_mcp_servers(self._mcp_servers, self.tools, self._mcp_stack)
try:
self._mcp_stack = AsyncExitStack()
await self._mcp_stack.__aenter__()
await connect_mcp_servers(self._mcp_servers, self.tools, self._mcp_stack)
self._mcp_connected = True
except Exception as e:
logger.error("Failed to connect MCP servers (will retry next message): {}", e)
if self._mcp_stack:
try:
await self._mcp_stack.aclose()
except Exception:
pass
self._mcp_stack = None
finally:
self._mcp_connecting = False
def _set_tool_context(self, channel: str, chat_id: str, message_id: str | None = None) -> None:
"""Update context for all tools that need routing info."""
@@ -238,10 +256,6 @@ class AgentLoop:
text_only_retried = True
interim_content = final_content
logger.debug("Interim text response (no tools used yet), retrying: {}", final_content[:80])
messages = self.context.add_assistant_message(
messages, response.content,
reasoning_content=response.reasoning_content,
)
final_content = None
continue
# Fall back to interim content if retry produced nothing
@@ -266,8 +280,9 @@ class AgentLoop:
)
try:
response = await self._process_message(msg)
if response:
await self.bus.publish_outbound(response)
await self.bus.publish_outbound(response or OutboundMessage(
channel=msg.channel, chat_id=msg.chat_id, content="",
))
except Exception as e:
logger.error("Error processing message: {}", e)
await self.bus.publish_outbound(OutboundMessage(
@@ -277,7 +292,7 @@ class AgentLoop:
))
except asyncio.TimeoutError:
continue
async def close_mcp(self) -> None:
"""Close MCP connections."""
if self._mcp_stack:
@@ -291,7 +306,7 @@ class AgentLoop:
"""Stop the agent loop."""
self._running = False
logger.info("Agent loop stopping")
async def _process_message(
self,
msg: InboundMessage,
@@ -300,25 +315,25 @@ class AgentLoop:
) -> OutboundMessage | None:
"""
Process a single inbound message.
Args:
msg: The inbound message to process.
session_key: Override session key (used by process_direct).
on_progress: Optional callback for intermediate output (defaults to bus publish).
Returns:
The response message, or None if no response needed.
"""
# System messages route back via chat_id ("channel:chat_id")
if msg.channel == "system":
return await self._process_system_message(msg)
preview = msg.content[:80] + "..." if len(msg.content) > 80 else msg.content
logger.info("Processing message from {}:{}: {}", msg.channel, msg.sender_id, preview)
key = session_key or msg.session_key
session = self.sessions.get_or_create(key)
# Handle slash commands
cmd = msg.content.strip().lower()
if cmd == "/new":
@@ -339,7 +354,7 @@ class AgentLoop:
if cmd == "/help":
return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id,
content="🐈 nanobot commands:\n/new — Start a new conversation\n/help — Show available commands")
if len(session.messages) > self.memory_window and session.key not in self._consolidating:
self._consolidating.add(session.key)
@@ -352,6 +367,10 @@ class AgentLoop:
asyncio.create_task(_consolidate_and_unlock())
self._set_tool_context(msg.channel, msg.chat_id, msg.metadata.get("message_id"))
if message_tool := self.tools.get("message"):
if isinstance(message_tool, MessageTool):
message_tool.start_turn()
initial_messages = self.context.build_messages(
history=session.get_history(max_messages=self.memory_window),
current_message=msg.content,
@@ -361,9 +380,11 @@ class AgentLoop:
)
async def _bus_progress(content: str) -> None:
meta = dict(msg.metadata or {})
meta["_progress"] = True
await self.bus.publish_outbound(OutboundMessage(
channel=msg.channel, chat_id=msg.chat_id, content=content,
metadata=msg.metadata or {},
metadata=meta,
))
final_content, tools_used = await self._run_agent_loop(
@@ -372,31 +393,35 @@ class AgentLoop:
if final_content is None:
final_content = "I've completed processing but have no response to give."
preview = final_content[:120] + "..." if len(final_content) > 120 else final_content
logger.info("Response to {}:{}: {}", msg.channel, msg.sender_id, preview)
session.add_message("user", msg.content)
session.add_message("assistant", final_content,
tools_used=tools_used if tools_used else None)
self.sessions.save(session)
if message_tool := self.tools.get("message"):
if isinstance(message_tool, MessageTool) and message_tool._sent_in_turn:
return None
return OutboundMessage(
channel=msg.channel,
chat_id=msg.chat_id,
content=final_content,
metadata=msg.metadata or {}, # Pass through for channel-specific needs (e.g. Slack thread_ts)
)
async def _process_system_message(self, msg: InboundMessage) -> OutboundMessage | None:
"""
Process a system message (e.g., subagent announce).
The chat_id field contains "original_channel:original_chat_id" to route
the response back to the correct destination.
"""
logger.info("Processing system message from {}", msg.sender_id)
# Parse origin from chat_id (format: "channel:chat_id")
if ":" in msg.chat_id:
parts = msg.chat_id.split(":", 1)
@@ -406,7 +431,7 @@ class AgentLoop:
# Fallback
origin_channel = "cli"
origin_chat_id = msg.chat_id
session_key = f"{origin_channel}:{origin_chat_id}"
session = self.sessions.get_or_create(session_key)
self._set_tool_context(origin_channel, origin_chat_id, msg.metadata.get("message_id"))
@@ -420,17 +445,17 @@ class AgentLoop:
if final_content is None:
final_content = "Background task completed."
session.add_message("user", f"[System: {msg.sender_id}] {msg.content}")
session.add_message("assistant", final_content)
self.sessions.save(session)
return OutboundMessage(
channel=origin_channel,
chat_id=origin_chat_id,
content=final_content
)
async def _consolidate_memory(self, session, archive_all: bool = False) -> None:
"""Consolidate old messages into MEMORY.md + HISTORY.md.
@@ -540,14 +565,14 @@ Respond with ONLY valid JSON, no markdown fences."""
) -> str:
"""
Process a message directly (for CLI or cron usage).
Args:
content: The message content.
session_key: Session identifier (overrides channel:chat_id for session lookup).
channel: Source channel (for tool context routing).
chat_id: Source chat ID (for tool context routing).
on_progress: Optional callback for intermediate output.
Returns:
The agent's response.
"""
@@ -558,6 +583,6 @@ Respond with ONLY valid JSON, no markdown fences."""
chat_id=chat_id,
content=content
)
response = await self._process_message(msg, session_key=session_key, on_progress=on_progress)
return response.content if response else ""

View File

@@ -1,5 +1,6 @@
"""File system tools: read, write, edit."""
import difflib
from pathlib import Path
from typing import Any
@@ -150,7 +151,7 @@ class EditFileTool(Tool):
content = file_path.read_text(encoding="utf-8")
if old_text not in content:
return f"Error: old_text not found in file. Make sure it matches exactly."
return self._not_found_message(old_text, content, path)
# Count occurrences
count = content.count(old_text)
@@ -166,6 +167,28 @@ class EditFileTool(Tool):
except Exception as e:
return f"Error editing file: {str(e)}"
@staticmethod
def _not_found_message(old_text: str, content: str, path: str) -> str:
"""Build a helpful error when old_text is not found."""
lines = content.splitlines(keepends=True)
old_lines = old_text.splitlines(keepends=True)
window = len(old_lines)
best_ratio, best_start = 0.0, 0
for i in range(max(1, len(lines) - window + 1)):
ratio = difflib.SequenceMatcher(None, old_lines, lines[i : i + window]).ratio()
if ratio > best_ratio:
best_ratio, best_start = ratio, i
if best_ratio > 0.5:
diff = "\n".join(difflib.unified_diff(
old_lines, lines[best_start : best_start + window],
fromfile="old_text (provided)", tofile=f"{path} (actual, line {best_start + 1})",
lineterm="",
))
return f"Error: old_text not found in {path}.\nBest match ({best_ratio:.0%} similar) at line {best_start + 1}:\n{diff}"
return f"Error: old_text not found in {path}. No similar text found. Verify the file content."
class ListDirTool(Tool):
"""Tool to list directory contents."""

View File

@@ -1,6 +1,6 @@
"""Message tool for sending messages to users."""
from typing import Any, Callable, Awaitable
from typing import Any, Awaitable, Callable
from nanobot.agent.tools.base import Tool
from nanobot.bus.events import OutboundMessage
@@ -8,37 +8,42 @@ from nanobot.bus.events import OutboundMessage
class MessageTool(Tool):
"""Tool to send messages to users on chat channels."""
def __init__(
self,
self,
send_callback: Callable[[OutboundMessage], Awaitable[None]] | None = None,
default_channel: str = "",
default_chat_id: str = "",
default_message_id: str | None = None
default_message_id: str | None = None,
):
self._send_callback = send_callback
self._default_channel = default_channel
self._default_chat_id = default_chat_id
self._default_message_id = default_message_id
self._sent_in_turn: bool = False
def set_context(self, channel: str, chat_id: str, message_id: str | None = None) -> None:
"""Set the current message context."""
self._default_channel = channel
self._default_chat_id = chat_id
self._default_message_id = message_id
def set_send_callback(self, callback: Callable[[OutboundMessage], Awaitable[None]]) -> None:
"""Set the callback for sending messages."""
self._send_callback = callback
def start_turn(self) -> None:
"""Reset per-turn send tracking."""
self._sent_in_turn = False
@property
def name(self) -> str:
return "message"
@property
def description(self) -> str:
return "Send a message to the user. Use this when you want to communicate something."
@property
def parameters(self) -> dict[str, Any]:
return {
@@ -64,11 +69,11 @@ class MessageTool(Tool):
},
"required": ["content"]
}
async def execute(
self,
content: str,
channel: str | None = None,
self,
content: str,
channel: str | None = None,
chat_id: str | None = None,
message_id: str | None = None,
media: list[str] | None = None,
@@ -77,13 +82,13 @@ class MessageTool(Tool):
channel = channel or self._default_channel
chat_id = chat_id or self._default_chat_id
message_id = message_id or self._default_message_id
if not channel or not chat_id:
return "Error: No target channel/chat specified"
if not self._send_callback:
return "Error: Message sending not configured"
msg = OutboundMessage(
channel=channel,
chat_id=chat_id,
@@ -93,9 +98,10 @@ class MessageTool(Tool):
"message_id": message_id,
}
)
try:
await self._send_callback(msg)
self._sent_in_turn = True
media_info = f" with {len(media)} attachments" if media else ""
return f"Message sent to {channel}:{chat_id}{media_info}"
except Exception as e: