Merge branch 'main' into pr-653

This commit is contained in:
Re-bin
2026-02-20 08:01:08 +00:00
41 changed files with 1687 additions and 482 deletions

View File

@@ -105,7 +105,7 @@ IMPORTANT: When responding to direct questions or conversations, reply directly
Only use the 'message' tool when you need to send a message to a specific chat channel (like WhatsApp).
For normal conversation, just respond with text - do not call the message tool.
Always be helpful, accurate, and concise. When using tools, think step by step: what you know, what you need, and why you chose this tool.
Always be helpful, accurate, and concise. Before calling tools, briefly tell the user what you're about to do (one short sentence in the user's language).
When remembering something important, write to {workspace_path}/memory/MEMORY.md
To recall past events, grep {workspace_path}/memory/HISTORY.md"""
@@ -225,14 +225,18 @@ To recall past events, grep {workspace_path}/memory/HISTORY.md"""
Returns:
Updated message list.
"""
msg: dict[str, Any] = {"role": "assistant", "content": content or ""}
msg: dict[str, Any] = {"role": "assistant"}
# Omit empty content — some backends reject empty text blocks
if content:
msg["content"] = content
if tool_calls:
msg["tool_calls"] = tool_calls
# Thinking models reject history without this
# Include reasoning content when provided (required by some thinking models)
if reasoning_content:
msg["reasoning_content"] = reasoning_content
messages.append(msg)
return messages

View File

@@ -1,9 +1,12 @@
"""Agent loop: the core processing engine."""
import asyncio
from contextlib import AsyncExitStack
import json
import json_repair
from pathlib import Path
from typing import Any
import re
from typing import Any, Awaitable, Callable
from loguru import logger
@@ -50,6 +53,7 @@ class AgentLoop:
cron_service: "CronService | None" = None,
restrict_to_workspace: bool = False,
session_manager: SessionManager | None = None,
mcp_servers: dict | None = None,
):
from nanobot.config.schema import ExecToolConfig
from nanobot.cron.service import CronService
@@ -82,6 +86,9 @@ class AgentLoop:
)
self._running = False
self._mcp_servers = mcp_servers or {}
self._mcp_stack: AsyncExitStack | None = None
self._mcp_connected = False
self._register_default_tools()
def _register_default_tools(self) -> None:
@@ -116,6 +123,16 @@ class AgentLoop:
if self.cron_service:
self.tools.register(CronTool(self.cron_service))
async def _connect_mcp(self) -> None:
"""Connect to configured MCP servers (one-time, lazy)."""
if self._mcp_connected or not self._mcp_servers:
return
self._mcp_connected = True
from nanobot.agent.tools.mcp import connect_mcp_servers
self._mcp_stack = AsyncExitStack()
await self._mcp_stack.__aenter__()
await connect_mcp_servers(self._mcp_servers, self.tools, self._mcp_stack)
def _set_tool_context(self, channel: str, chat_id: str) -> None:
"""Update context for all tools that need routing info."""
if message_tool := self.tools.get("message"):
@@ -130,12 +147,34 @@ class AgentLoop:
if isinstance(cron_tool, CronTool):
cron_tool.set_context(channel, chat_id)
async def _run_agent_loop(self, initial_messages: list[dict]) -> tuple[str | None, list[str]]:
@staticmethod
def _strip_think(text: str | None) -> str | None:
"""Remove <think>…</think> blocks that some models embed in content."""
if not text:
return None
return re.sub(r"<think>[\s\S]*?</think>", "", text).strip() or None
@staticmethod
def _tool_hint(tool_calls: list) -> str:
"""Format tool calls as concise hint, e.g. 'web_search("query")'."""
def _fmt(tc):
val = next(iter(tc.arguments.values()), None) if tc.arguments else None
if not isinstance(val, str):
return tc.name
return f'{tc.name}("{val[:40]}")' if len(val) > 40 else f'{tc.name}("{val}")'
return ", ".join(_fmt(tc) for tc in tool_calls)
async def _run_agent_loop(
self,
initial_messages: list[dict],
on_progress: Callable[[str], Awaitable[None]] | None = None,
) -> tuple[str | None, list[str]]:
"""
Run the agent iteration loop.
Args:
initial_messages: Starting messages for the LLM conversation.
on_progress: Optional callback to push intermediate content to the user.
Returns:
Tuple of (final_content, list_of_tools_used).
@@ -157,13 +196,17 @@ class AgentLoop:
)
if response.has_tool_calls:
if on_progress:
clean = self._strip_think(response.content)
await on_progress(clean or self._tool_hint(response.tool_calls))
tool_call_dicts = [
{
"id": tc.id,
"type": "function",
"function": {
"name": tc.name,
"arguments": json.dumps(tc.arguments)
"arguments": json.dumps(tc.arguments, ensure_ascii=False)
}
}
for tc in response.tool_calls
@@ -176,14 +219,13 @@ class AgentLoop:
for tool_call in response.tool_calls:
tools_used.append(tool_call.name)
args_str = json.dumps(tool_call.arguments, ensure_ascii=False)
logger.info(f"Tool call: {tool_call.name}({args_str[:200]})")
logger.info("Tool call: {}({})", tool_call.name, args_str[:200])
result = await self.tools.execute(tool_call.name, tool_call.arguments)
messages = self.context.add_tool_result(
messages, tool_call.id, tool_call.name, result
)
messages.append({"role": "user", "content": "Reflect on the results and decide next steps."})
else:
final_content = response.content
final_content = self._strip_think(response.content)
break
return final_content, tools_used
@@ -191,6 +233,7 @@ class AgentLoop:
async def run(self) -> None:
"""Run the agent loop, processing messages from the bus."""
self._running = True
await self._connect_mcp()
logger.info("Agent loop started")
while self._running:
@@ -204,7 +247,7 @@ class AgentLoop:
if response:
await self.bus.publish_outbound(response)
except Exception as e:
logger.error(f"Error processing message: {e}")
logger.error("Error processing message: {}", e)
await self.bus.publish_outbound(OutboundMessage(
channel=msg.channel,
chat_id=msg.chat_id,
@@ -213,18 +256,33 @@ class AgentLoop:
except asyncio.TimeoutError:
continue
async def close_mcp(self) -> None:
"""Close MCP connections."""
if self._mcp_stack:
try:
await self._mcp_stack.aclose()
except (RuntimeError, BaseExceptionGroup):
pass # MCP SDK cancel scope cleanup is noisy but harmless
self._mcp_stack = None
def stop(self) -> None:
"""Stop the agent loop."""
self._running = False
logger.info("Agent loop stopping")
async def _process_message(self, msg: InboundMessage, session_key: str | None = None) -> OutboundMessage | None:
async def _process_message(
self,
msg: InboundMessage,
session_key: str | None = None,
on_progress: Callable[[str], Awaitable[None]] | None = None,
) -> OutboundMessage | None:
"""
Process a single inbound message.
Args:
msg: The inbound message to process.
session_key: Override session key (used by process_direct).
on_progress: Optional callback for intermediate output (defaults to bus publish).
Returns:
The response message, or None if no response needed.
@@ -234,7 +292,7 @@ class AgentLoop:
return await self._process_system_message(msg)
preview = msg.content[:80] + "..." if len(msg.content) > 80 else msg.content
logger.info(f"Processing message from {msg.channel}:{msg.sender_id}: {preview}")
logger.info("Processing message from {}:{}: {}", msg.channel, msg.sender_id, preview)
key = session_key or msg.session_key
session = self.sessions.get_or_create(key)
@@ -271,13 +329,22 @@ class AgentLoop:
channel=msg.channel,
chat_id=msg.chat_id,
)
final_content, tools_used = await self._run_agent_loop(initial_messages)
async def _bus_progress(content: str) -> None:
await self.bus.publish_outbound(OutboundMessage(
channel=msg.channel, chat_id=msg.chat_id, content=content,
metadata=msg.metadata or {},
))
final_content, tools_used = await self._run_agent_loop(
initial_messages, on_progress=on_progress or _bus_progress,
)
if final_content is None:
final_content = "I've completed processing but have no response to give."
preview = final_content[:120] + "..." if len(final_content) > 120 else final_content
logger.info(f"Response to {msg.channel}:{msg.sender_id}: {preview}")
logger.info("Response to {}:{}: {}", msg.channel, msg.sender_id, preview)
session.add_message("user", msg.content)
session.add_message("assistant", final_content,
@@ -298,7 +365,7 @@ class AgentLoop:
The chat_id field contains "original_channel:original_chat_id" to route
the response back to the correct destination.
"""
logger.info(f"Processing system message from {msg.sender_id}")
logger.info("Processing system message from {}", msg.sender_id)
# Parse origin from chat_id (format: "channel:chat_id")
if ":" in msg.chat_id:
@@ -346,22 +413,22 @@ class AgentLoop:
if archive_all:
old_messages = session.messages
keep_count = 0
logger.info(f"Memory consolidation (archive_all): {len(session.messages)} total messages archived")
logger.info("Memory consolidation (archive_all): {} total messages archived", len(session.messages))
else:
keep_count = self.memory_window // 2
if len(session.messages) <= keep_count:
logger.debug(f"Session {session.key}: No consolidation needed (messages={len(session.messages)}, keep={keep_count})")
logger.debug("Session {}: No consolidation needed (messages={}, keep={})", session.key, len(session.messages), keep_count)
return
messages_to_process = len(session.messages) - session.last_consolidated
if messages_to_process <= 0:
logger.debug(f"Session {session.key}: No new messages to consolidate (last_consolidated={session.last_consolidated}, total={len(session.messages)})")
logger.debug("Session {}: No new messages to consolidate (last_consolidated={}, total={})", session.key, session.last_consolidated, len(session.messages))
return
old_messages = session.messages[session.last_consolidated:-keep_count]
if not old_messages:
return
logger.info(f"Memory consolidation started: {len(session.messages)} total, {len(old_messages)} new to consolidate, {keep_count} keep")
logger.info("Memory consolidation started: {} total, {} new to consolidate, {} keep", len(session.messages), len(old_messages), keep_count)
lines = []
for m in old_messages:
@@ -395,9 +462,15 @@ Respond with ONLY valid JSON, no markdown fences."""
model=self.model,
)
text = (response.content or "").strip()
if not text:
logger.warning("Memory consolidation: LLM returned empty response, skipping")
return
if text.startswith("```"):
text = text.split("\n", 1)[-1].rsplit("```", 1)[0].strip()
result = json.loads(text)
result = json_repair.loads(text)
if not isinstance(result, dict):
logger.warning("Memory consolidation: unexpected response type, skipping. Response: {}", text[:200])
return
if entry := result.get("history_entry"):
memory.append_history(entry)
@@ -409,9 +482,9 @@ Respond with ONLY valid JSON, no markdown fences."""
session.last_consolidated = 0
else:
session.last_consolidated = len(session.messages) - keep_count
logger.info(f"Memory consolidation done: {len(session.messages)} messages, last_consolidated={session.last_consolidated}")
logger.info("Memory consolidation done: {} messages, last_consolidated={}", len(session.messages), session.last_consolidated)
except Exception as e:
logger.error(f"Memory consolidation failed: {e}")
logger.error("Memory consolidation failed: {}", e)
async def process_direct(
self,
@@ -419,6 +492,7 @@ Respond with ONLY valid JSON, no markdown fences."""
session_key: str = "cli:direct",
channel: str = "cli",
chat_id: str = "direct",
on_progress: Callable[[str], Awaitable[None]] | None = None,
) -> str:
"""
Process a message directly (for CLI or cron usage).
@@ -428,10 +502,12 @@ Respond with ONLY valid JSON, no markdown fences."""
session_key: Session identifier (overrides channel:chat_id for session lookup).
channel: Source channel (for tool context routing).
chat_id: Source chat ID (for tool context routing).
on_progress: Optional callback for intermediate output.
Returns:
The agent's response.
"""
await self._connect_mcp()
msg = InboundMessage(
channel=channel,
sender_id="user",
@@ -439,5 +515,5 @@ Respond with ONLY valid JSON, no markdown fences."""
content=content
)
response = await self._process_message(msg, session_key=session_key)
response = await self._process_message(msg, session_key=session_key, on_progress=on_progress)
return response.content if response else ""

View File

@@ -167,10 +167,10 @@ class SkillsLoader:
return content
def _parse_nanobot_metadata(self, raw: str) -> dict:
"""Parse nanobot metadata JSON from frontmatter."""
"""Parse skill metadata JSON from frontmatter (supports nanobot and openclaw keys)."""
try:
data = json.loads(raw)
return data.get("nanobot", {}) if isinstance(data, dict) else {}
return data.get("nanobot", data.get("openclaw", {})) if isinstance(data, dict) else {}
except (json.JSONDecodeError, TypeError):
return {}

View File

@@ -86,7 +86,7 @@ class SubagentManager:
# Cleanup when done
bg_task.add_done_callback(lambda _: self._running_tasks.pop(task_id, None))
logger.info(f"Spawned subagent [{task_id}]: {display_label}")
logger.info("Spawned subagent [{}]: {}", task_id, display_label)
return f"Subagent [{display_label}] started (id: {task_id}). I'll notify you when it completes."
async def _run_subagent(
@@ -97,7 +97,7 @@ class SubagentManager:
origin: dict[str, str],
) -> None:
"""Execute the subagent task and announce the result."""
logger.info(f"Subagent [{task_id}] starting task: {label}")
logger.info("Subagent [{}] starting task: {}", task_id, label)
try:
# Build subagent tools (no message tool, no spawn tool)
@@ -146,7 +146,7 @@ class SubagentManager:
"type": "function",
"function": {
"name": tc.name,
"arguments": json.dumps(tc.arguments),
"arguments": json.dumps(tc.arguments, ensure_ascii=False),
},
}
for tc in response.tool_calls
@@ -159,8 +159,8 @@ class SubagentManager:
# Execute tools
for tool_call in response.tool_calls:
args_str = json.dumps(tool_call.arguments)
logger.debug(f"Subagent [{task_id}] executing: {tool_call.name} with arguments: {args_str}")
args_str = json.dumps(tool_call.arguments, ensure_ascii=False)
logger.debug("Subagent [{}] executing: {} with arguments: {}", task_id, tool_call.name, args_str)
result = await tools.execute(tool_call.name, tool_call.arguments)
messages.append({
"role": "tool",
@@ -175,12 +175,12 @@ class SubagentManager:
if final_result is None:
final_result = "Task completed but no final response was generated."
logger.info(f"Subagent [{task_id}] completed successfully")
logger.info("Subagent [{}] completed successfully", task_id)
await self._announce_result(task_id, label, task, final_result, origin, "ok")
except Exception as e:
error_msg = f"Error: {str(e)}"
logger.error(f"Subagent [{task_id}] failed: {e}")
logger.error("Subagent [{}] failed: {}", task_id, e)
await self._announce_result(task_id, label, task, error_msg, origin, "error")
async def _announce_result(
@@ -213,7 +213,7 @@ Summarize this naturally for the user. Keep it brief (1-2 sentences). Do not men
)
await self.bus.publish_inbound(msg)
logger.debug(f"Subagent [{task_id}] announced result to {origin['channel']}:{origin['chat_id']}")
logger.debug("Subagent [{}] announced result to {}:{}", task_id, origin['channel'], origin['chat_id'])
def _build_subagent_prompt(self, task: str) -> str:
"""Build a focused system prompt for the subagent."""

View File

@@ -50,6 +50,10 @@ class CronTool(Tool):
"type": "string",
"description": "Cron expression like '0 9 * * *' (for scheduled tasks)"
},
"tz": {
"type": "string",
"description": "IANA timezone for cron expressions (e.g. 'America/Vancouver')"
},
"at": {
"type": "string",
"description": "ISO datetime for one-time execution (e.g. '2026-02-12T10:30:00')"
@@ -68,30 +72,46 @@ class CronTool(Tool):
message: str = "",
every_seconds: int | None = None,
cron_expr: str | None = None,
tz: str | None = None,
at: str | None = None,
job_id: str | None = None,
**kwargs: Any
) -> str:
if action == "add":
return self._add_job(message, every_seconds, cron_expr, at)
return self._add_job(message, every_seconds, cron_expr, tz, at)
elif action == "list":
return self._list_jobs()
elif action == "remove":
return self._remove_job(job_id)
return f"Unknown action: {action}"
def _add_job(self, message: str, every_seconds: int | None, cron_expr: str | None, at: str | None) -> str:
def _add_job(
self,
message: str,
every_seconds: int | None,
cron_expr: str | None,
tz: str | None,
at: str | None,
) -> str:
if not message:
return "Error: message is required for add"
if not self._channel or not self._chat_id:
return "Error: no session context (channel/chat_id)"
if tz and not cron_expr:
return "Error: tz can only be used with cron_expr"
if tz:
from zoneinfo import ZoneInfo
try:
ZoneInfo(tz)
except (KeyError, Exception):
return f"Error: unknown timezone '{tz}'"
# Build schedule
delete_after = False
if every_seconds:
schedule = CronSchedule(kind="every", every_ms=every_seconds * 1000)
elif cron_expr:
schedule = CronSchedule(kind="cron", expr=cron_expr)
schedule = CronSchedule(kind="cron", expr=cron_expr, tz=tz)
elif at:
from datetime import datetime
dt = datetime.fromisoformat(at)

View File

@@ -0,0 +1,80 @@
"""MCP client: connects to MCP servers and wraps their tools as native nanobot tools."""
from contextlib import AsyncExitStack
from typing import Any
from loguru import logger
from nanobot.agent.tools.base import Tool
from nanobot.agent.tools.registry import ToolRegistry
class MCPToolWrapper(Tool):
"""Wraps a single MCP server tool as a nanobot Tool."""
def __init__(self, session, server_name: str, tool_def):
self._session = session
self._original_name = tool_def.name
self._name = f"mcp_{server_name}_{tool_def.name}"
self._description = tool_def.description or tool_def.name
self._parameters = tool_def.inputSchema or {"type": "object", "properties": {}}
@property
def name(self) -> str:
return self._name
@property
def description(self) -> str:
return self._description
@property
def parameters(self) -> dict[str, Any]:
return self._parameters
async def execute(self, **kwargs: Any) -> str:
from mcp import types
result = await self._session.call_tool(self._original_name, arguments=kwargs)
parts = []
for block in result.content:
if isinstance(block, types.TextContent):
parts.append(block.text)
else:
parts.append(str(block))
return "\n".join(parts) or "(no output)"
async def connect_mcp_servers(
mcp_servers: dict, registry: ToolRegistry, stack: AsyncExitStack
) -> None:
"""Connect to configured MCP servers and register their tools."""
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
for name, cfg in mcp_servers.items():
try:
if cfg.command:
params = StdioServerParameters(
command=cfg.command, args=cfg.args, env=cfg.env or None
)
read, write = await stack.enter_async_context(stdio_client(params))
elif cfg.url:
from mcp.client.streamable_http import streamable_http_client
read, write, _ = await stack.enter_async_context(
streamable_http_client(cfg.url)
)
else:
logger.warning("MCP server '{}': no command or url configured, skipping", name)
continue
session = await stack.enter_async_context(ClientSession(read, write))
await session.initialize()
tools = await session.list_tools()
for tool_def in tools.tools:
wrapper = MCPToolWrapper(session, name, tool_def)
registry.register(wrapper)
logger.debug("MCP: registered tool '{}' from server '{}'", wrapper.name, name)
logger.info("MCP server '{}': connected, {} tools registered", name, len(tools.tools))
except Exception as e:
logger.error("MCP server '{}': failed to connect: {}", name, e)

View File

@@ -52,6 +52,11 @@ class MessageTool(Tool):
"chat_id": {
"type": "string",
"description": "Optional: target chat/user ID"
},
"media": {
"type": "array",
"items": {"type": "string"},
"description": "Optional: list of file paths to attach (images, audio, documents)"
}
},
"required": ["content"]
@@ -62,6 +67,7 @@ class MessageTool(Tool):
content: str,
channel: str | None = None,
chat_id: str | None = None,
media: list[str] | None = None,
**kwargs: Any
) -> str:
channel = channel or self._default_channel
@@ -76,11 +82,13 @@ class MessageTool(Tool):
msg = OutboundMessage(
channel=channel,
chat_id=chat_id,
content=content
content=content,
media=media or []
)
try:
await self._send_callback(msg)
return f"Message sent to {channel}:{chat_id}"
media_info = f" with {len(media)} attachments" if media else ""
return f"Message sent to {channel}:{chat_id}{media_info}"
except Exception as e:
return f"Error sending message: {str(e)}"

View File

@@ -26,7 +26,8 @@ class ExecTool(Tool):
r"\brm\s+-[rf]{1,2}\b", # rm -r, rm -rf, rm -fr
r"\bdel\s+/[fq]\b", # del /f, del /q
r"\brmdir\s+/s\b", # rmdir /s
r"\b(format|mkfs|diskpart)\b", # disk operations
r"(?:^|[;&|]\s*)format\b", # format (as standalone command only)
r"\b(mkfs|diskpart)\b", # disk operations
r"\bdd\s+if=", # dd
r">\s*/dev/sd", # write to disk
r"\b(shutdown|reboot|poweroff)\b", # system power
@@ -81,6 +82,12 @@ class ExecTool(Tool):
)
except asyncio.TimeoutError:
process.kill()
# Wait for the process to fully terminate so pipes are
# drained and file descriptors are released.
try:
await asyncio.wait_for(process.wait(), timeout=5.0)
except asyncio.TimeoutError:
pass
return f"Error: Command timed out after {self.timeout} seconds"
output_parts = []

View File

@@ -116,7 +116,7 @@ class WebFetchTool(Tool):
# Validate URL before fetching
is_valid, error_msg = _validate_url(url)
if not is_valid:
return json.dumps({"error": f"URL validation failed: {error_msg}", "url": url})
return json.dumps({"error": f"URL validation failed: {error_msg}", "url": url}, ensure_ascii=False)
try:
async with httpx.AsyncClient(
@@ -131,7 +131,7 @@ class WebFetchTool(Tool):
# JSON
if "application/json" in ctype:
text, extractor = json.dumps(r.json(), indent=2), "json"
text, extractor = json.dumps(r.json(), indent=2, ensure_ascii=False), "json"
# HTML
elif "text/html" in ctype or r.text[:256].lower().startswith(("<!doctype", "<html")):
doc = Document(r.text)
@@ -146,9 +146,9 @@ class WebFetchTool(Tool):
text = text[:max_chars]
return json.dumps({"url": url, "finalUrl": str(r.url), "status": r.status_code,
"extractor": extractor, "truncated": truncated, "length": len(text), "text": text})
"extractor": extractor, "truncated": truncated, "length": len(text), "text": text}, ensure_ascii=False)
except Exception as e:
return json.dumps({"error": str(e), "url": url})
return json.dumps({"error": str(e), "url": url}, ensure_ascii=False)
def _to_markdown(self, html: str) -> str:
"""Convert HTML to markdown."""