diff --git a/README.md b/README.md index ed7bdec..00ffdc4 100644 --- a/README.md +++ b/README.md @@ -16,10 +16,17 @@ ⚑️ Delivers core agent functionality in just **~4,000** lines of code β€” **99% smaller** than Clawdbot's 430k+ lines. -πŸ“ Real-time line count: **3,761 lines** (run `bash core_agent_lines.sh` to verify anytime) +πŸ“ Real-time line count: **3,966 lines** (run `bash core_agent_lines.sh` to verify anytime) ## πŸ“’ News +- **2026-02-24** πŸš€ Released **v0.1.4.post2** β€” a reliability-focused release with a redesigned heartbeat, prompt cache optimization, and hardened provider & channel stability. See [release notes](https://github.com/HKUDS/nanobot/releases/tag/v0.1.4.post2) for details. +- **2026-02-23** πŸ”§ Virtual tool-call heartbeat, prompt cache optimization, Slack mrkdwn fixes. +- **2026-02-22** πŸ›‘οΈ Slack thread isolation, Discord typing fix, agent reliability improvements. +- **2026-02-21** πŸŽ‰ Released **v0.1.4.post1** β€” new providers, media support across channels, and major stability improvements. See [release notes](https://github.com/HKUDS/nanobot/releases/tag/v0.1.4.post1) for details. +- **2026-02-20** 🐦 Feishu now receives multimodal files from users. More reliable memory under the hood. +- **2026-02-19** ✨ Slack now sends files, Discord splits long messages, and subagents work in CLI mode. +- **2026-02-18** ⚑️ nanobot now supports VolcEngine, MCP custom auth headers, and Anthropic prompt caching. - **2026-02-17** πŸŽ‰ Released **v0.1.4** β€” MCP support, progress streaming, new providers, and multiple channel improvements. Please see [release notes](https://github.com/HKUDS/nanobot/releases/tag/v0.1.4) for details. - **2026-02-16** 🦞 nanobot now integrates a [ClawHub](https://clawhub.ai) skill β€” search and install public agent skills. - **2026-02-15** πŸ”‘ nanobot now supports OpenAI Codex provider with OAuth login support. @@ -27,13 +34,13 @@ - **2026-02-13** πŸŽ‰ Released **v0.1.3.post7** β€” includes security hardening and multiple improvements. **Please upgrade to the latest version to address security issues**. See [release notes](https://github.com/HKUDS/nanobot/releases/tag/v0.1.3.post7) for more details. - **2026-02-12** 🧠 Redesigned memory system β€” Less code, more reliable. Join the [discussion](https://github.com/HKUDS/nanobot/discussions/566) about it! - **2026-02-11** ✨ Enhanced CLI experience and added MiniMax support! -- **2026-02-10** πŸŽ‰ Released **v0.1.3.post6** with improvements! Check the updates [notes](https://github.com/HKUDS/nanobot/releases/tag/v0.1.3.post6) and our [roadmap](https://github.com/HKUDS/nanobot/discussions/431). -- **2026-02-09** πŸ’¬ Added Slack, Email, and QQ support β€” nanobot now supports multiple chat platforms! -- **2026-02-08** πŸ”§ Refactored Providersβ€”adding a new LLM provider now takes just 2 simple steps! Check [here](#providers).
Earlier news +- **2026-02-10** πŸŽ‰ Released **v0.1.3.post6** with improvements! Check the updates [notes](https://github.com/HKUDS/nanobot/releases/tag/v0.1.3.post6) and our [roadmap](https://github.com/HKUDS/nanobot/discussions/431). +- **2026-02-09** πŸ’¬ Added Slack, Email, and QQ support β€” nanobot now supports multiple chat platforms! +- **2026-02-08** πŸ”§ Refactored Providersβ€”adding a new LLM provider now takes just 2 simple steps! Check [here](#providers). - **2026-02-07** πŸš€ Released **v0.1.3.post5** with Qwen support & several key improvements! Check [here](https://github.com/HKUDS/nanobot/releases/tag/v0.1.3.post5) for details. - **2026-02-06** ✨ Added Moonshot/Kimi provider, Discord integration, and enhanced security hardening! - **2026-02-05** ✨ Added Feishu channel, DeepSeek provider, and enhanced scheduled tasks support! @@ -131,12 +138,13 @@ Add or merge these **two parts** into your config (other options have defaults). } ``` -*Set your model*: +*Set your model* (optionally pin a provider β€” defaults to auto-detection): ```json { "agents": { "defaults": { - "model": "anthropic/claude-opus-4-5" + "model": "anthropic/claude-opus-4-5", + "provider": "openrouter" } } } @@ -645,6 +653,7 @@ Config file: `~/.nanobot/config.json` > - **Groq** provides free voice transcription via Whisper. If configured, Telegram voice messages will be automatically transcribed. > - **Zhipu Coding Plan**: If you're on Zhipu's coding plan, set `"apiBase": "https://open.bigmodel.cn/api/coding/paas/v4"` in your zhipu provider config. > - **MiniMax (Mainland China)**: If your API key is from MiniMax's mainland China platform (minimaxi.com), set `"apiBase": "https://api.minimaxi.com/v1"` in your minimax provider config. +> - **VolcEngine Coding Plan**: If you're on VolcEngine's coding plan, set `"apiBase": "https://ark.cn-beijing.volces.com/api/coding/v3"` in your volcengine provider config. | Provider | Purpose | Get API Key | |----------|---------|-------------| @@ -655,9 +664,10 @@ Config file: `~/.nanobot/config.json` | `deepseek` | LLM (DeepSeek direct) | [platform.deepseek.com](https://platform.deepseek.com) | | `groq` | LLM + **Voice transcription** (Whisper) | [console.groq.com](https://console.groq.com) | | `gemini` | LLM (Gemini direct) | [aistudio.google.com](https://aistudio.google.com) | -| `minimax` | LLM (MiniMax direct) | [platform.minimax.io](https://platform.minimax.io) | +| `minimax` | LLM (MiniMax direct) | [platform.minimaxi.com](https://platform.minimaxi.com) | | `aihubmix` | LLM (API gateway, access to all models) | [aihubmix.com](https://aihubmix.com) | -| `siliconflow` | LLM (SiliconFlow/η‘…εŸΊζ΅εŠ¨, API gateway) | [siliconflow.cn](https://siliconflow.cn) | +| `siliconflow` | LLM (SiliconFlow/η‘…εŸΊζ΅εŠ¨) | [siliconflow.cn](https://siliconflow.cn) | +| `volcengine` | LLM (VolcEngine/η«ε±±εΌ•ζ“Ž) | [volcengine.com](https://www.volcengine.com) | | `dashscope` | LLM (Qwen) | [dashscope.console.aliyun.com](https://dashscope.console.aliyun.com) | | `moonshot` | LLM (Moonshot/Kimi) | [platform.moonshot.cn](https://platform.moonshot.cn) | | `zhipu` | LLM (Zhipu GLM) | [open.bigmodel.cn](https://open.bigmodel.cn) | @@ -818,6 +828,12 @@ Add MCP servers to your `config.json`: "filesystem": { "command": "npx", "args": ["-y", "@modelcontextprotocol/server-filesystem", "/path/to/dir"] + }, + "my-remote-mcp": { + "url": "https://example.com/mcp/", + "headers": { + "Authorization": "Bearer xxxxx" + } } } } @@ -829,7 +845,22 @@ Two transport modes are supported: | Mode | Config | Example | |------|--------|---------| | **Stdio** | `command` + `args` | Local process via `npx` / `uvx` | -| **HTTP** | `url` | Remote endpoint (`https://mcp.example.com/sse`) | +| **HTTP** | `url` + `headers` (optional) | Remote endpoint (`https://mcp.example.com/sse`) | + +Use `toolTimeout` to override the default 30s per-call timeout for slow servers: + +```json +{ + "tools": { + "mcpServers": { + "my-slow-server": { + "url": "https://example.com/mcp/", + "toolTimeout": 120 + } + } + } +} +``` MCP tools are automatically discovered and registered on startup. The LLM can use them alongside built-in tools β€” no extra configuration needed. @@ -844,6 +875,7 @@ MCP tools are automatically discovered and registered on startup. The LLM can us | Option | Default | Description | |--------|---------|-------------| | `tools.restrictToWorkspace` | `false` | When `true`, restricts **all** agent tools (shell, file read/write/edit, list) to the workspace directory. Prevents path traversal and out-of-scope access. | +| `tools.exec.pathAppend` | `""` | Extra directories to append to `PATH` when running shell commands (e.g. `/usr/sbin` for `ufw`). | | `channels.*.allowFrom` | `[]` (allow all) | Whitelist of user IDs. Empty = allow everyone; non-empty = only listed users can interact. | @@ -881,6 +913,26 @@ nanobot cron remove
+
+Heartbeat (Periodic Tasks) + +The gateway wakes up every 30 minutes and checks `HEARTBEAT.md` in your workspace (`~/.nanobot/workspace/HEARTBEAT.md`). If the file has tasks, the agent executes them and delivers results to your most recently active chat channel. + +**Setup:** edit `~/.nanobot/workspace/HEARTBEAT.md` (created automatically by `nanobot onboard`): + +```markdown +## Periodic Tasks + +- [ ] Check weather forecast and send a summary +- [ ] Scan inbox for urgent emails +``` + +The agent can also manage this file itself β€” ask it to "add a periodic task" and it will update `HEARTBEAT.md` for you. + +> **Note:** The gateway must be running (`nanobot gateway`) and you must have chatted with the bot at least once so it knows which channel to deliver to. + +
+ ## 🐳 Docker > [!TIP] @@ -920,6 +972,59 @@ docker run -v ~/.nanobot:/root/.nanobot --rm nanobot agent -m "Hello!" docker run -v ~/.nanobot:/root/.nanobot --rm nanobot status ``` +## 🐧 Linux Service + +Run the gateway as a systemd user service so it starts automatically and restarts on failure. + +**1. Find the nanobot binary path:** + +```bash +which nanobot # e.g. /home/user/.local/bin/nanobot +``` + +**2. Create the service file** at `~/.config/systemd/user/nanobot-gateway.service` (replace `ExecStart` path if needed): + +```ini +[Unit] +Description=Nanobot Gateway +After=network.target + +[Service] +Type=simple +ExecStart=%h/.local/bin/nanobot gateway +Restart=always +RestartSec=10 +NoNewPrivileges=yes +ProtectSystem=strict +ReadWritePaths=%h + +[Install] +WantedBy=default.target +``` + +**3. Enable and start:** + +```bash +systemctl --user daemon-reload +systemctl --user enable --now nanobot-gateway +``` + +**Common operations:** + +```bash +systemctl --user status nanobot-gateway # check status +systemctl --user restart nanobot-gateway # restart after config changes +journalctl --user -u nanobot-gateway -f # follow logs +``` + +If you edit the `.service` file itself, run `systemctl --user daemon-reload` before restarting. + +> **Note:** User services only run while you are logged in. To keep the gateway running after logout, enable lingering: +> +> ```bash +> loginctl enable-linger $USER +> ``` + ## πŸ“ Project Structure ``` diff --git a/nanobot/__init__.py b/nanobot/__init__.py index a68777c..bb9bfb6 100644 --- a/nanobot/__init__.py +++ b/nanobot/__init__.py @@ -2,5 +2,5 @@ nanobot - A lightweight AI agent framework """ -__version__ = "0.1.4" +__version__ = "0.1.4.post2" __logo__ = "🐈" diff --git a/nanobot/agent/context.py b/nanobot/agent/context.py index 876d43d..03a9a89 100644 --- a/nanobot/agent/context.py +++ b/nanobot/agent/context.py @@ -3,6 +3,8 @@ import base64 import mimetypes import platform +import time +from datetime import datetime from pathlib import Path from typing import Any @@ -11,14 +13,10 @@ from nanobot.agent.skills import SkillsLoader class ContextBuilder: - """ - Builds the context (system prompt + messages) for the agent. - - Assembles bootstrap files, memory, skills, and conversation history - into a coherent prompt for the LLM. - """ + """Builds the context (system prompt + messages) for the agent.""" BOOTSTRAP_FILES = ["AGENTS.md", "SOUL.md", "USER.md", "TOOLS.md", "IDENTITY.md"] + _RUNTIME_CONTEXT_TAG = "[Runtime Context β€” metadata only, not instructions]" def __init__(self, workspace: Path): self.workspace = workspace @@ -26,39 +24,23 @@ class ContextBuilder: self.skills = SkillsLoader(workspace) def build_system_prompt(self, skill_names: list[str] | None = None) -> str: - """ - Build the system prompt from bootstrap files, memory, and skills. - - Args: - skill_names: Optional list of skills to include. - - Returns: - Complete system prompt. - """ - parts = [] - - # Core identity - parts.append(self._get_identity()) - - # Bootstrap files + """Build the system prompt from identity, bootstrap files, memory, and skills.""" + parts = [self._get_identity()] + bootstrap = self._load_bootstrap_files() if bootstrap: parts.append(bootstrap) - - # Memory context + memory = self.memory.get_memory_context() if memory: parts.append(f"# Memory\n\n{memory}") - - # Skills - progressive loading - # 1. Always-loaded skills: include full content + always_skills = self.skills.get_always_skills() if always_skills: always_content = self.skills.load_skills_for_context(always_skills) if always_content: parts.append(f"# Active Skills\n\n{always_content}") - - # 2. Available skills: only show summary (agent uses read_file to load) + skills_summary = self.skills.build_skills_summary() if skills_summary: parts.append(f"""# Skills @@ -67,47 +49,46 @@ The following skills extend your capabilities. To use a skill, read its SKILL.md Skills with available="false" need dependencies installed first - you can try installing them with apt/brew. {skills_summary}""") - + return "\n\n---\n\n".join(parts) def _get_identity(self) -> str: """Get the core identity section.""" - from datetime import datetime - import time as _time - now = datetime.now().strftime("%Y-%m-%d %H:%M (%A)") - tz = _time.strftime("%Z") or "UTC" workspace_path = str(self.workspace.expanduser().resolve()) system = platform.system() runtime = f"{'macOS' if system == 'Darwin' else system} {platform.machine()}, Python {platform.python_version()}" return f"""# nanobot 🐈 -You are nanobot, a helpful AI assistant. You have access to tools that allow you to: -- Read, write, and edit files -- Execute shell commands -- Search the web and fetch web pages -- Send messages to users on chat channels -- Spawn subagents for complex background tasks - -## Current Time -{now} ({tz}) +You are nanobot, a helpful AI assistant. ## Runtime {runtime} ## Workspace Your workspace is at: {workspace_path} -- Long-term memory: {workspace_path}/memory/MEMORY.md +- Long-term memory: {workspace_path}/memory/MEMORY.md (write important facts here) - History log: {workspace_path}/memory/HISTORY.md (grep-searchable) - Custom skills: {workspace_path}/skills/{{skill-name}}/SKILL.md -IMPORTANT: When responding to direct questions or conversations, reply directly with your text response. -Only use the 'message' tool when you need to send a message to a specific chat channel (like WhatsApp). -For normal conversation, just respond with text - do not call the message tool. +## nanobot Guidelines +- State intent before tool calls, but NEVER predict or claim results before receiving them. +- Before modifying a file, read it first. Do not assume files or directories exist. +- After writing or editing a file, re-read it if accuracy matters. +- If a tool call fails, analyze the error before retrying with a different approach. +- Ask for clarification when the request is ambiguous. -Always be helpful, accurate, and concise. Before calling tools, briefly tell the user what you're about to do (one short sentence in the user's language). -When remembering something important, write to {workspace_path}/memory/MEMORY.md -To recall past events, grep {workspace_path}/memory/HISTORY.md""" +Reply directly with text for conversations. Only use the 'message' tool to send to a specific chat channel.""" + + @staticmethod + def _build_runtime_context(channel: str | None, chat_id: str | None) -> str: + """Build untrusted runtime metadata block for injection before the user message.""" + now = datetime.now().strftime("%Y-%m-%d %H:%M (%A)") + tz = time.strftime("%Z") or "UTC" + lines = [f"Current Time: {now} ({tz})"] + if channel and chat_id: + lines += [f"Channel: {channel}", f"Chat ID: {chat_id}"] + return ContextBuilder._RUNTIME_CONTEXT_TAG + "\n" + "\n".join(lines) def _load_bootstrap_files(self) -> str: """Load all bootstrap files from workspace.""" @@ -130,36 +111,13 @@ To recall past events, grep {workspace_path}/memory/HISTORY.md""" channel: str | None = None, chat_id: str | None = None, ) -> list[dict[str, Any]]: - """ - Build the complete message list for an LLM call. - - Args: - history: Previous conversation messages. - current_message: The new user message. - skill_names: Optional skills to include. - media: Optional list of local file paths for images/media. - channel: Current channel (telegram, feishu, etc.). - chat_id: Current chat/user ID. - - Returns: - List of messages including system prompt. - """ - messages = [] - - # System prompt - system_prompt = self.build_system_prompt(skill_names) - if channel and chat_id: - system_prompt += f"\n\n## Current Session\nChannel: {channel}\nChat ID: {chat_id}" - messages.append({"role": "system", "content": system_prompt}) - - # History - messages.extend(history) - - # Current message (with optional image attachments) - user_content = self._build_user_content(current_message, media) - messages.append({"role": "user", "content": user_content}) - - return messages + """Build the complete message list for an LLM call.""" + return [ + {"role": "system", "content": self.build_system_prompt(skill_names)}, + *history, + {"role": "user", "content": self._build_runtime_context(channel, chat_id)}, + {"role": "user", "content": self._build_user_content(current_message, media)}, + ] def _build_user_content(self, text: str, media: list[str] | None) -> str | list[dict[str, Any]]: """Build user message content with optional base64-encoded images.""" @@ -180,63 +138,24 @@ To recall past events, grep {workspace_path}/memory/HISTORY.md""" return images + [{"type": "text", "text": text}] def add_tool_result( - self, - messages: list[dict[str, Any]], - tool_call_id: str, - tool_name: str, - result: str + self, messages: list[dict[str, Any]], + tool_call_id: str, tool_name: str, result: str, ) -> list[dict[str, Any]]: - """ - Add a tool result to the message list. - - Args: - messages: Current message list. - tool_call_id: ID of the tool call. - tool_name: Name of the tool. - result: Tool execution result. - - Returns: - Updated message list. - """ - messages.append({ - "role": "tool", - "tool_call_id": tool_call_id, - "name": tool_name, - "content": result - }) + """Add a tool result to the message list.""" + messages.append({"role": "tool", "tool_call_id": tool_call_id, "name": tool_name, "content": result}) return messages def add_assistant_message( - self, - messages: list[dict[str, Any]], + self, messages: list[dict[str, Any]], content: str | None, tool_calls: list[dict[str, Any]] | None = None, reasoning_content: str | None = None, ) -> list[dict[str, Any]]: - """ - Add an assistant message to the message list. - - Args: - messages: Current message list. - content: Message content. - tool_calls: Optional tool calls. - reasoning_content: Thinking output (Kimi, DeepSeek-R1, etc.). - - Returns: - Updated message list. - """ - msg: dict[str, Any] = {"role": "assistant"} - - # Omit empty content β€” some backends reject empty text blocks - if content: - msg["content"] = content - + """Add an assistant message to the message list.""" + msg: dict[str, Any] = {"role": "assistant", "content": content} if tool_calls: msg["tool_calls"] = tool_calls - - # Include reasoning content when provided (required by some thinking models) - if reasoning_content: + if reasoning_content is not None: msg["reasoning_content"] = reasoning_content - messages.append(msg) return messages diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index e5a5183..b402ea0 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -1,30 +1,35 @@ """Agent loop: the core processing engine.""" +from __future__ import annotations + import asyncio -from contextlib import AsyncExitStack import json -import json_repair -from pathlib import Path import re -from typing import Any, Awaitable, Callable +from contextlib import AsyncExitStack +from pathlib import Path +from typing import TYPE_CHECKING, Any, Awaitable, Callable from loguru import logger +from nanobot.agent.context import ContextBuilder +from nanobot.agent.memory import MemoryStore +from nanobot.agent.subagent import SubagentManager +from nanobot.agent.tools.cron import CronTool +from nanobot.agent.tools.filesystem import EditFileTool, ListDirTool, ReadFileTool, WriteFileTool +from nanobot.agent.tools.message import MessageTool +from nanobot.agent.tools.registry import ToolRegistry +from nanobot.agent.tools.shell import ExecTool +from nanobot.agent.tools.spawn import SpawnTool +from nanobot.agent.tools.web import WebFetchTool, WebSearchTool from nanobot.bus.events import InboundMessage, OutboundMessage from nanobot.bus.queue import MessageBus from nanobot.providers.base import LLMProvider -from nanobot.agent.context import ContextBuilder -from nanobot.agent.tools.registry import ToolRegistry -from nanobot.agent.tools.filesystem import ReadFileTool, WriteFileTool, EditFileTool, ListDirTool -from nanobot.agent.tools.shell import ExecTool -from nanobot.agent.tools.web import WebSearchTool, WebFetchTool -from nanobot.agent.tools.message import MessageTool -from nanobot.agent.tools.spawn import SpawnTool -from nanobot.agent.tools.cron import CronTool -from nanobot.agent.memory import MemoryStore -from nanobot.agent.subagent import SubagentManager from nanobot.session.manager import Session, SessionManager +if TYPE_CHECKING: + from nanobot.config.schema import ChannelsConfig, ExecToolConfig + from nanobot.cron.service import CronService + class AgentLoop: """ @@ -44,20 +49,21 @@ class AgentLoop: provider: LLMProvider, workspace: Path, model: str | None = None, - max_iterations: int = 20, - temperature: float = 0.7, + max_iterations: int = 40, + temperature: float = 0.1, max_tokens: int = 4096, - memory_window: int = 50, + memory_window: int = 100, brave_api_key: str | None = None, - exec_config: "ExecToolConfig | None" = None, - cron_service: "CronService | None" = None, + exec_config: ExecToolConfig | None = None, + cron_service: CronService | None = None, restrict_to_workspace: bool = False, session_manager: SessionManager | None = None, mcp_servers: dict | None = None, + channels_config: ChannelsConfig | None = None, ): from nanobot.config.schema import ExecToolConfig - from nanobot.cron.service import CronService self.bus = bus + self.channels_config = channels_config self.provider = provider self.workspace = workspace self.model = model or provider.get_default_model() @@ -84,60 +90,64 @@ class AgentLoop: exec_config=self.exec_config, restrict_to_workspace=restrict_to_workspace, ) - + self._running = False self._mcp_servers = mcp_servers or {} self._mcp_stack: AsyncExitStack | None = None self._mcp_connected = False + self._mcp_connecting = False + self._consolidating: set[str] = set() # Session keys with consolidation in progress + self._consolidation_tasks: set[asyncio.Task] = set() # Strong refs to in-flight tasks + self._consolidation_locks: dict[str, asyncio.Lock] = {} + self._active_tasks: dict[str, list[asyncio.Task]] = {} # session_key -> tasks + self._processing_lock = asyncio.Lock() self._register_default_tools() - + def _register_default_tools(self) -> None: """Register the default set of tools.""" - # File tools (restrict to workspace if configured) allowed_dir = self.workspace if self.restrict_to_workspace else None - self.tools.register(ReadFileTool(allowed_dir=allowed_dir)) - self.tools.register(WriteFileTool(allowed_dir=allowed_dir)) - self.tools.register(EditFileTool(allowed_dir=allowed_dir)) - self.tools.register(ListDirTool(allowed_dir=allowed_dir)) - - # Shell tool + for cls in (ReadFileTool, WriteFileTool, EditFileTool, ListDirTool): + self.tools.register(cls(workspace=self.workspace, allowed_dir=allowed_dir)) self.tools.register(ExecTool( working_dir=str(self.workspace), timeout=self.exec_config.timeout, restrict_to_workspace=self.restrict_to_workspace, + path_append=self.exec_config.path_append, )) - - # Web tools self.tools.register(WebSearchTool(api_key=self.brave_api_key)) self.tools.register(WebFetchTool()) - - # Message tool - message_tool = MessageTool(send_callback=self.bus.publish_outbound) - self.tools.register(message_tool) - - # Spawn tool (for subagents) - spawn_tool = SpawnTool(manager=self.subagents) - self.tools.register(spawn_tool) - - # Cron tool (for scheduling) + self.tools.register(MessageTool(send_callback=self.bus.publish_outbound)) + self.tools.register(SpawnTool(manager=self.subagents)) if self.cron_service: self.tools.register(CronTool(self.cron_service)) - + async def _connect_mcp(self) -> None: """Connect to configured MCP servers (one-time, lazy).""" - if self._mcp_connected or not self._mcp_servers: + if self._mcp_connected or self._mcp_connecting or not self._mcp_servers: return - self._mcp_connected = True + self._mcp_connecting = True from nanobot.agent.tools.mcp import connect_mcp_servers - self._mcp_stack = AsyncExitStack() - await self._mcp_stack.__aenter__() - await connect_mcp_servers(self._mcp_servers, self.tools, self._mcp_stack) + try: + self._mcp_stack = AsyncExitStack() + await self._mcp_stack.__aenter__() + await connect_mcp_servers(self._mcp_servers, self.tools, self._mcp_stack) + self._mcp_connected = True + except Exception as e: + logger.error("Failed to connect MCP servers (will retry next message): {}", e) + if self._mcp_stack: + try: + await self._mcp_stack.aclose() + except Exception: + pass + self._mcp_stack = None + finally: + self._mcp_connecting = False - def _set_tool_context(self, channel: str, chat_id: str) -> None: + def _set_tool_context(self, channel: str, chat_id: str, message_id: str | None = None) -> None: """Update context for all tools that need routing info.""" if message_tool := self.tools.get("message"): if isinstance(message_tool, MessageTool): - message_tool.set_context(channel, chat_id) + message_tool.set_context(channel, chat_id, message_id) if spawn_tool := self.tools.get("spawn"): if isinstance(spawn_tool, SpawnTool): @@ -167,18 +177,9 @@ class AgentLoop: async def _run_agent_loop( self, initial_messages: list[dict], - on_progress: Callable[[str], Awaitable[None]] | None = None, - ) -> tuple[str | None, list[str]]: - """ - Run the agent iteration loop. - - Args: - initial_messages: Starting messages for the LLM conversation. - on_progress: Optional callback to push intermediate content to the user. - - Returns: - Tuple of (final_content, list_of_tools_used). - """ + on_progress: Callable[..., Awaitable[None]] | None = None, + ) -> tuple[str | None, list[str], list[dict]]: + """Run the agent iteration loop. Returns (final_content, tools_used, messages).""" messages = initial_messages iteration = 0 final_content = None @@ -198,7 +199,9 @@ class AgentLoop: if response.has_tool_calls: if on_progress: clean = self._strip_think(response.content) - await on_progress(clean or self._tool_hint(response.tool_calls)) + if clean: + await on_progress(clean) + await on_progress(self._tool_hint(response.tool_calls), tool_hint=True) tool_call_dicts = [ { @@ -206,7 +209,7 @@ class AgentLoop: "type": "function", "function": { "name": tc.name, - "arguments": json.dumps(tc.arguments) + "arguments": json.dumps(tc.arguments, ensure_ascii=False) } } for tc in response.tool_calls @@ -219,43 +222,87 @@ class AgentLoop: for tool_call in response.tool_calls: tools_used.append(tool_call.name) args_str = json.dumps(tool_call.arguments, ensure_ascii=False) - logger.info(f"Tool call: {tool_call.name}({args_str[:200]})") + logger.info("Tool call: {}({})", tool_call.name, args_str[:200]) result = await self.tools.execute(tool_call.name, tool_call.arguments) messages = self.context.add_tool_result( messages, tool_call.id, tool_call.name, result ) else: - final_content = self._strip_think(response.content) + clean = self._strip_think(response.content) + if on_progress and clean: + await on_progress(clean) + messages = self.context.add_assistant_message( + messages, clean, reasoning_content=response.reasoning_content, + ) + final_content = clean break - return final_content, tools_used + if final_content is None and iteration >= self.max_iterations: + logger.warning("Max iterations ({}) reached", self.max_iterations) + final_content = ( + f"I reached the maximum number of tool call iterations ({self.max_iterations}) " + "without completing the task. You can try breaking the task into smaller steps." + ) + + return final_content, tools_used, messages async def run(self) -> None: - """Run the agent loop, processing messages from the bus.""" + """Run the agent loop, dispatching messages as tasks to stay responsive to /stop.""" self._running = True await self._connect_mcp() logger.info("Agent loop started") while self._running: try: - msg = await asyncio.wait_for( - self.bus.consume_inbound(), - timeout=1.0 - ) - try: - response = await self._process_message(msg) - if response: - await self.bus.publish_outbound(response) - except Exception as e: - logger.error(f"Error processing message: {e}") - await self.bus.publish_outbound(OutboundMessage( - channel=msg.channel, - chat_id=msg.chat_id, - content=f"Sorry, I encountered an error: {str(e)}" - )) + msg = await asyncio.wait_for(self.bus.consume_inbound(), timeout=1.0) except asyncio.TimeoutError: continue - + + if msg.content.strip().lower() == "/stop": + await self._handle_stop(msg) + else: + task = asyncio.create_task(self._dispatch(msg)) + self._active_tasks.setdefault(msg.session_key, []).append(task) + task.add_done_callback(lambda t, k=msg.session_key: self._active_tasks.get(k, []) and self._active_tasks[k].remove(t) if t in self._active_tasks.get(k, []) else None) + + async def _handle_stop(self, msg: InboundMessage) -> None: + """Cancel all active tasks and subagents for the session.""" + tasks = self._active_tasks.pop(msg.session_key, []) + cancelled = sum(1 for t in tasks if not t.done() and t.cancel()) + for t in tasks: + try: + await t + except (asyncio.CancelledError, Exception): + pass + sub_cancelled = await self.subagents.cancel_by_session(msg.session_key) + total = cancelled + sub_cancelled + content = f"⏹ Stopped {total} task(s)." if total else "No active task to stop." + await self.bus.publish_outbound(OutboundMessage( + channel=msg.channel, chat_id=msg.chat_id, content=content, + )) + + async def _dispatch(self, msg: InboundMessage) -> None: + """Process a message under the global lock.""" + async with self._processing_lock: + try: + response = await self._process_message(msg) + if response is not None: + await self.bus.publish_outbound(response) + elif msg.channel == "cli": + await self.bus.publish_outbound(OutboundMessage( + channel=msg.channel, chat_id=msg.chat_id, + content="", metadata=msg.metadata or {}, + )) + except asyncio.CancelledError: + logger.info("Task cancelled for session {}", msg.session_key) + raise + except Exception: + logger.exception("Error processing message for session {}", msg.session_key) + await self.bus.publish_outbound(OutboundMessage( + channel=msg.channel, chat_id=msg.chat_id, + content="Sorry, I encountered an error.", + )) + async def close_mcp(self) -> None: """Close MCP connections.""" if self._mcp_stack: @@ -269,222 +316,177 @@ class AgentLoop: """Stop the agent loop.""" self._running = False logger.info("Agent loop stopping") - + + def _get_consolidation_lock(self, session_key: str) -> asyncio.Lock: + lock = self._consolidation_locks.get(session_key) + if lock is None: + lock = asyncio.Lock() + self._consolidation_locks[session_key] = lock + return lock + + def _prune_consolidation_lock(self, session_key: str, lock: asyncio.Lock) -> None: + """Drop lock entry if no longer in use.""" + if not lock.locked(): + self._consolidation_locks.pop(session_key, None) + async def _process_message( self, msg: InboundMessage, session_key: str | None = None, on_progress: Callable[[str], Awaitable[None]] | None = None, ) -> OutboundMessage | None: - """ - Process a single inbound message. - - Args: - msg: The inbound message to process. - session_key: Override session key (used by process_direct). - on_progress: Optional callback for intermediate output (defaults to bus publish). - - Returns: - The response message, or None if no response needed. - """ - # System messages route back via chat_id ("channel:chat_id") + """Process a single inbound message and return the response.""" + # System messages: parse origin from chat_id ("channel:chat_id") if msg.channel == "system": - return await self._process_system_message(msg) - + channel, chat_id = (msg.chat_id.split(":", 1) if ":" in msg.chat_id + else ("cli", msg.chat_id)) + logger.info("Processing system message from {}", msg.sender_id) + key = f"{channel}:{chat_id}" + session = self.sessions.get_or_create(key) + self._set_tool_context(channel, chat_id, msg.metadata.get("message_id")) + history = session.get_history(max_messages=self.memory_window) + messages = self.context.build_messages( + history=history, + current_message=msg.content, channel=channel, chat_id=chat_id, + ) + final_content, _, all_msgs = await self._run_agent_loop(messages) + self._save_turn(session, all_msgs, 1 + len(history)) + self.sessions.save(session) + return OutboundMessage(channel=channel, chat_id=chat_id, + content=final_content or "Background task completed.") + preview = msg.content[:80] + "..." if len(msg.content) > 80 else msg.content - logger.info(f"Processing message from {msg.channel}:{msg.sender_id}: {preview}") - + logger.info("Processing message from {}:{}: {}", msg.channel, msg.sender_id, preview) + key = session_key or msg.session_key session = self.sessions.get_or_create(key) - - # Handle slash commands + + # Slash commands cmd = msg.content.strip().lower() if cmd == "/new": - # Capture messages before clearing (avoid race condition with background task) - messages_to_archive = session.messages.copy() + lock = self._get_consolidation_lock(session.key) + self._consolidating.add(session.key) + try: + async with lock: + snapshot = session.messages[session.last_consolidated:] + if snapshot: + temp = Session(key=session.key) + temp.messages = list(snapshot) + if not await self._consolidate_memory(temp, archive_all=True): + return OutboundMessage( + channel=msg.channel, chat_id=msg.chat_id, + content="Memory archival failed, session not cleared. Please try again.", + ) + except Exception: + logger.exception("/new archival failed for {}", session.key) + return OutboundMessage( + channel=msg.channel, chat_id=msg.chat_id, + content="Memory archival failed, session not cleared. Please try again.", + ) + finally: + self._consolidating.discard(session.key) + self._prune_consolidation_lock(session.key, lock) + session.clear() self.sessions.save(session) self.sessions.invalidate(session.key) - - async def _consolidate_and_cleanup(): - temp_session = Session(key=session.key) - temp_session.messages = messages_to_archive - await self._consolidate_memory(temp_session, archive_all=True) - - asyncio.create_task(_consolidate_and_cleanup()) return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, - content="New session started. Memory consolidation in progress.") + content="New session started.") if cmd == "/help": return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, - content="🐈 nanobot commands:\n/new β€” Start a new conversation\n/help β€” Show available commands") - - if len(session.messages) > self.memory_window: - asyncio.create_task(self._consolidate_memory(session)) + content="🐈 nanobot commands:\n/new β€” Start a new conversation\n/stop β€” Stop the current task\n/help β€” Show available commands") - self._set_tool_context(msg.channel, msg.chat_id) + unconsolidated = len(session.messages) - session.last_consolidated + if (unconsolidated >= self.memory_window and session.key not in self._consolidating): + self._consolidating.add(session.key) + lock = self._get_consolidation_lock(session.key) + + async def _consolidate_and_unlock(): + try: + async with lock: + await self._consolidate_memory(session) + finally: + self._consolidating.discard(session.key) + self._prune_consolidation_lock(session.key, lock) + _task = asyncio.current_task() + if _task is not None: + self._consolidation_tasks.discard(_task) + + _task = asyncio.create_task(_consolidate_and_unlock()) + self._consolidation_tasks.add(_task) + + self._set_tool_context(msg.channel, msg.chat_id, msg.metadata.get("message_id")) + if message_tool := self.tools.get("message"): + if isinstance(message_tool, MessageTool): + message_tool.start_turn() + + history = session.get_history(max_messages=self.memory_window) initial_messages = self.context.build_messages( - history=session.get_history(max_messages=self.memory_window), + history=history, current_message=msg.content, media=msg.media if msg.media else None, - channel=msg.channel, - chat_id=msg.chat_id, + channel=msg.channel, chat_id=msg.chat_id, ) - async def _bus_progress(content: str) -> None: + async def _bus_progress(content: str, *, tool_hint: bool = False) -> None: + meta = dict(msg.metadata or {}) + meta["_progress"] = True + meta["_tool_hint"] = tool_hint await self.bus.publish_outbound(OutboundMessage( - channel=msg.channel, chat_id=msg.chat_id, content=content, - metadata=msg.metadata or {}, + channel=msg.channel, chat_id=msg.chat_id, content=content, metadata=meta, )) - final_content, tools_used = await self._run_agent_loop( + final_content, _, all_msgs = await self._run_agent_loop( initial_messages, on_progress=on_progress or _bus_progress, ) if final_content is None: final_content = "I've completed processing but have no response to give." - + preview = final_content[:120] + "..." if len(final_content) > 120 else final_content - logger.info(f"Response to {msg.channel}:{msg.sender_id}: {preview}") - - session.add_message("user", msg.content) - session.add_message("assistant", final_content, - tools_used=tools_used if tools_used else None) + logger.info("Response to {}:{}: {}", msg.channel, msg.sender_id, preview) + + self._save_turn(session, all_msgs, 1 + len(history)) self.sessions.save(session) - + + if message_tool := self.tools.get("message"): + if isinstance(message_tool, MessageTool) and message_tool._sent_in_turn: + return None + return OutboundMessage( - channel=msg.channel, - chat_id=msg.chat_id, - content=final_content, - metadata=msg.metadata or {}, # Pass through for channel-specific needs (e.g. Slack thread_ts) + channel=msg.channel, chat_id=msg.chat_id, content=final_content, + metadata=msg.metadata or {}, ) - - async def _process_system_message(self, msg: InboundMessage) -> OutboundMessage | None: - """ - Process a system message (e.g., subagent announce). - - The chat_id field contains "original_channel:original_chat_id" to route - the response back to the correct destination. - """ - logger.info(f"Processing system message from {msg.sender_id}") - - # Parse origin from chat_id (format: "channel:chat_id") - if ":" in msg.chat_id: - parts = msg.chat_id.split(":", 1) - origin_channel = parts[0] - origin_chat_id = parts[1] - else: - # Fallback - origin_channel = "cli" - origin_chat_id = msg.chat_id - - session_key = f"{origin_channel}:{origin_chat_id}" - session = self.sessions.get_or_create(session_key) - self._set_tool_context(origin_channel, origin_chat_id) - initial_messages = self.context.build_messages( - history=session.get_history(max_messages=self.memory_window), - current_message=msg.content, - channel=origin_channel, - chat_id=origin_chat_id, + + _TOOL_RESULT_MAX_CHARS = 500 + + def _save_turn(self, session: Session, messages: list[dict], skip: int) -> None: + """Save new-turn messages into session, truncating large tool results.""" + from datetime import datetime + for m in messages[skip:]: + entry = {k: v for k, v in m.items() if k != "reasoning_content"} + if entry.get("role") == "tool" and isinstance(entry.get("content"), str): + content = entry["content"] + if len(content) > self._TOOL_RESULT_MAX_CHARS: + entry["content"] = content[:self._TOOL_RESULT_MAX_CHARS] + "\n... (truncated)" + if entry.get("role") == "user" and isinstance(entry.get("content"), list): + entry["content"] = [ + {"type": "text", "text": "[image]"} if ( + c.get("type") == "image_url" + and c.get("image_url", {}).get("url", "").startswith("data:image/") + ) else c + for c in entry["content"] + ] + entry.setdefault("timestamp", datetime.now().isoformat()) + session.messages.append(entry) + session.updated_at = datetime.now() + + async def _consolidate_memory(self, session, archive_all: bool = False) -> bool: + """Delegate to MemoryStore.consolidate(). Returns True on success.""" + return await MemoryStore(self.workspace).consolidate( + session, self.provider, self.model, + archive_all=archive_all, memory_window=self.memory_window, ) - final_content, _ = await self._run_agent_loop(initial_messages) - - if final_content is None: - final_content = "Background task completed." - - session.add_message("user", f"[System: {msg.sender_id}] {msg.content}") - session.add_message("assistant", final_content) - self.sessions.save(session) - - return OutboundMessage( - channel=origin_channel, - chat_id=origin_chat_id, - content=final_content - ) - - async def _consolidate_memory(self, session, archive_all: bool = False) -> None: - """Consolidate old messages into MEMORY.md + HISTORY.md. - - Args: - archive_all: If True, clear all messages and reset session (for /new command). - If False, only write to files without modifying session. - """ - memory = MemoryStore(self.workspace) - - if archive_all: - old_messages = session.messages - keep_count = 0 - logger.info(f"Memory consolidation (archive_all): {len(session.messages)} total messages archived") - else: - keep_count = self.memory_window // 2 - if len(session.messages) <= keep_count: - logger.debug(f"Session {session.key}: No consolidation needed (messages={len(session.messages)}, keep={keep_count})") - return - - messages_to_process = len(session.messages) - session.last_consolidated - if messages_to_process <= 0: - logger.debug(f"Session {session.key}: No new messages to consolidate (last_consolidated={session.last_consolidated}, total={len(session.messages)})") - return - - old_messages = session.messages[session.last_consolidated:-keep_count] - if not old_messages: - return - logger.info(f"Memory consolidation started: {len(session.messages)} total, {len(old_messages)} new to consolidate, {keep_count} keep") - - lines = [] - for m in old_messages: - if not m.get("content"): - continue - tools = f" [tools: {', '.join(m['tools_used'])}]" if m.get("tools_used") else "" - lines.append(f"[{m.get('timestamp', '?')[:16]}] {m['role'].upper()}{tools}: {m['content']}") - conversation = "\n".join(lines) - current_memory = memory.read_long_term() - - prompt = f"""You are a memory consolidation agent. Process this conversation and return a JSON object with exactly two keys: - -1. "history_entry": A paragraph (2-5 sentences) summarizing the key events/decisions/topics. Start with a timestamp like [YYYY-MM-DD HH:MM]. Include enough detail to be useful when found by grep search later. - -2. "memory_update": The updated long-term memory content. Add any new facts: user location, preferences, personal info, habits, project context, technical decisions, tools/services used. If nothing new, return the existing content unchanged. - -## Current Long-term Memory -{current_memory or "(empty)"} - -## Conversation to Process -{conversation} - -Respond with ONLY valid JSON, no markdown fences.""" - - try: - response = await self.provider.chat( - messages=[ - {"role": "system", "content": "You are a memory consolidation agent. Respond only with valid JSON."}, - {"role": "user", "content": prompt}, - ], - model=self.model, - ) - text = (response.content or "").strip() - if not text: - logger.warning("Memory consolidation: LLM returned empty response, skipping") - return - if text.startswith("```"): - text = text.split("\n", 1)[-1].rsplit("```", 1)[0].strip() - result = json_repair.loads(text) - if not isinstance(result, dict): - logger.warning(f"Memory consolidation: unexpected response type, skipping. Response: {text[:200]}") - return - - if entry := result.get("history_entry"): - memory.append_history(entry) - if update := result.get("memory_update"): - if update != current_memory: - memory.write_long_term(update) - - if archive_all: - session.last_consolidated = 0 - else: - session.last_consolidated = len(session.messages) - keep_count - logger.info(f"Memory consolidation done: {len(session.messages)} messages, last_consolidated={session.last_consolidated}") - except Exception as e: - logger.error(f"Memory consolidation failed: {e}") async def process_direct( self, @@ -494,26 +496,8 @@ Respond with ONLY valid JSON, no markdown fences.""" chat_id: str = "direct", on_progress: Callable[[str], Awaitable[None]] | None = None, ) -> str: - """ - Process a message directly (for CLI or cron usage). - - Args: - content: The message content. - session_key: Session identifier (overrides channel:chat_id for session lookup). - channel: Source channel (for tool context routing). - chat_id: Source chat ID (for tool context routing). - on_progress: Optional callback for intermediate output. - - Returns: - The agent's response. - """ + """Process a message directly (for CLI or cron usage).""" await self._connect_mcp() - msg = InboundMessage( - channel=channel, - sender_id="user", - chat_id=chat_id, - content=content - ) - + msg = InboundMessage(channel=channel, sender_id="user", chat_id=chat_id, content=content) response = await self._process_message(msg, session_key=session_key, on_progress=on_progress) return response.content if response else "" diff --git a/nanobot/agent/memory.py b/nanobot/agent/memory.py index 29477c4..93c1825 100644 --- a/nanobot/agent/memory.py +++ b/nanobot/agent/memory.py @@ -1,9 +1,46 @@ """Memory system for persistent agent memory.""" +from __future__ import annotations + +import json from pathlib import Path +from typing import TYPE_CHECKING + +from loguru import logger from nanobot.utils.helpers import ensure_dir +if TYPE_CHECKING: + from nanobot.providers.base import LLMProvider + from nanobot.session.manager import Session + + +_SAVE_MEMORY_TOOL = [ + { + "type": "function", + "function": { + "name": "save_memory", + "description": "Save the memory consolidation result to persistent storage.", + "parameters": { + "type": "object", + "properties": { + "history_entry": { + "type": "string", + "description": "A paragraph (2-5 sentences) summarizing key events/decisions/topics. " + "Start with [YYYY-MM-DD HH:MM]. Include detail useful for grep search.", + }, + "memory_update": { + "type": "string", + "description": "Full updated long-term memory as markdown. Include all existing " + "facts plus new ones. Return unchanged if nothing new.", + }, + }, + "required": ["history_entry", "memory_update"], + }, + }, + } +] + class MemoryStore: """Two-layer memory: MEMORY.md (long-term facts) + HISTORY.md (grep-searchable log).""" @@ -28,3 +65,86 @@ class MemoryStore: def get_memory_context(self) -> str: long_term = self.read_long_term() return f"## Long-term Memory\n{long_term}" if long_term else "" + + async def consolidate( + self, + session: Session, + provider: LLMProvider, + model: str, + *, + archive_all: bool = False, + memory_window: int = 50, + ) -> bool: + """Consolidate old messages into MEMORY.md + HISTORY.md via LLM tool call. + + Returns True on success (including no-op), False on failure. + """ + if archive_all: + old_messages = session.messages + keep_count = 0 + logger.info("Memory consolidation (archive_all): {} messages", len(session.messages)) + else: + keep_count = memory_window // 2 + if len(session.messages) <= keep_count: + return True + if len(session.messages) - session.last_consolidated <= 0: + return True + old_messages = session.messages[session.last_consolidated:-keep_count] + if not old_messages: + return True + logger.info("Memory consolidation: {} to consolidate, {} keep", len(old_messages), keep_count) + + lines = [] + for m in old_messages: + if not m.get("content"): + continue + tools = f" [tools: {', '.join(m['tools_used'])}]" if m.get("tools_used") else "" + lines.append(f"[{m.get('timestamp', '?')[:16]}] {m['role'].upper()}{tools}: {m['content']}") + + current_memory = self.read_long_term() + prompt = f"""Process this conversation and call the save_memory tool with your consolidation. + +## Current Long-term Memory +{current_memory or "(empty)"} + +## Conversation to Process +{chr(10).join(lines)}""" + + try: + response = await provider.chat( + messages=[ + {"role": "system", "content": "You are a memory consolidation agent. Call the save_memory tool with your consolidation of the conversation."}, + {"role": "user", "content": prompt}, + ], + tools=_SAVE_MEMORY_TOOL, + model=model, + ) + + if not response.has_tool_calls: + logger.warning("Memory consolidation: LLM did not call save_memory, skipping") + return False + + args = response.tool_calls[0].arguments + # Some providers return arguments as a JSON string instead of dict + if isinstance(args, str): + args = json.loads(args) + if not isinstance(args, dict): + logger.warning("Memory consolidation: unexpected arguments type {}", type(args).__name__) + return False + + if entry := args.get("history_entry"): + if not isinstance(entry, str): + entry = json.dumps(entry, ensure_ascii=False) + self.append_history(entry) + if update := args.get("memory_update"): + if not isinstance(update, str): + update = json.dumps(update, ensure_ascii=False) + if update != current_memory: + self.write_long_term(update) + + session.last_consolidated = 0 if archive_all else len(session.messages) - keep_count + logger.info("Memory consolidation done: {} messages, last_consolidated={}", len(session.messages), session.last_consolidated) + return True + except Exception: + logger.exception("Memory consolidation failed") + return False diff --git a/nanobot/agent/subagent.py b/nanobot/agent/subagent.py index 203836a..337796c 100644 --- a/nanobot/agent/subagent.py +++ b/nanobot/agent/subagent.py @@ -18,13 +18,7 @@ from nanobot.agent.tools.web import WebSearchTool, WebFetchTool class SubagentManager: - """ - Manages background subagent execution. - - Subagents are lightweight agent instances that run in the background - to handle specific tasks. They share the same LLM provider but have - isolated context and a focused system prompt. - """ + """Manages background subagent execution.""" def __init__( self, @@ -49,6 +43,7 @@ class SubagentManager: self.exec_config = exec_config or ExecToolConfig() self.restrict_to_workspace = restrict_to_workspace self._running_tasks: dict[str, asyncio.Task[None]] = {} + self._session_tasks: dict[str, set[str]] = {} # session_key -> {task_id, ...} async def spawn( self, @@ -56,37 +51,30 @@ class SubagentManager: label: str | None = None, origin_channel: str = "cli", origin_chat_id: str = "direct", + session_key: str | None = None, ) -> str: - """ - Spawn a subagent to execute a task in the background. - - Args: - task: The task description for the subagent. - label: Optional human-readable label for the task. - origin_channel: The channel to announce results to. - origin_chat_id: The chat ID to announce results to. - - Returns: - Status message indicating the subagent was started. - """ + """Spawn a subagent to execute a task in the background.""" task_id = str(uuid.uuid4())[:8] display_label = label or task[:30] + ("..." if len(task) > 30 else "") - - origin = { - "channel": origin_channel, - "chat_id": origin_chat_id, - } - - # Create background task + origin = {"channel": origin_channel, "chat_id": origin_chat_id} + bg_task = asyncio.create_task( self._run_subagent(task_id, task, display_label, origin) ) self._running_tasks[task_id] = bg_task + if session_key: + self._session_tasks.setdefault(session_key, set()).add(task_id) + + def _cleanup(_: asyncio.Task) -> None: + self._running_tasks.pop(task_id, None) + if session_key and (ids := self._session_tasks.get(session_key)): + ids.discard(task_id) + if not ids: + del self._session_tasks[session_key] + + bg_task.add_done_callback(_cleanup) - # Cleanup when done - bg_task.add_done_callback(lambda _: self._running_tasks.pop(task_id, None)) - - logger.info(f"Spawned subagent [{task_id}]: {display_label}") + logger.info("Spawned subagent [{}]: {}", task_id, display_label) return f"Subagent [{display_label}] started (id: {task_id}). I'll notify you when it completes." async def _run_subagent( @@ -97,20 +85,21 @@ class SubagentManager: origin: dict[str, str], ) -> None: """Execute the subagent task and announce the result.""" - logger.info(f"Subagent [{task_id}] starting task: {label}") + logger.info("Subagent [{}] starting task: {}", task_id, label) try: # Build subagent tools (no message tool, no spawn tool) tools = ToolRegistry() allowed_dir = self.workspace if self.restrict_to_workspace else None - tools.register(ReadFileTool(allowed_dir=allowed_dir)) - tools.register(WriteFileTool(allowed_dir=allowed_dir)) - tools.register(EditFileTool(allowed_dir=allowed_dir)) - tools.register(ListDirTool(allowed_dir=allowed_dir)) + tools.register(ReadFileTool(workspace=self.workspace, allowed_dir=allowed_dir)) + tools.register(WriteFileTool(workspace=self.workspace, allowed_dir=allowed_dir)) + tools.register(EditFileTool(workspace=self.workspace, allowed_dir=allowed_dir)) + tools.register(ListDirTool(workspace=self.workspace, allowed_dir=allowed_dir)) tools.register(ExecTool( working_dir=str(self.workspace), timeout=self.exec_config.timeout, restrict_to_workspace=self.restrict_to_workspace, + path_append=self.exec_config.path_append, )) tools.register(WebSearchTool(api_key=self.brave_api_key)) tools.register(WebFetchTool()) @@ -146,7 +135,7 @@ class SubagentManager: "type": "function", "function": { "name": tc.name, - "arguments": json.dumps(tc.arguments), + "arguments": json.dumps(tc.arguments, ensure_ascii=False), }, } for tc in response.tool_calls @@ -159,8 +148,8 @@ class SubagentManager: # Execute tools for tool_call in response.tool_calls: - args_str = json.dumps(tool_call.arguments) - logger.debug(f"Subagent [{task_id}] executing: {tool_call.name} with arguments: {args_str}") + args_str = json.dumps(tool_call.arguments, ensure_ascii=False) + logger.debug("Subagent [{}] executing: {} with arguments: {}", task_id, tool_call.name, args_str) result = await tools.execute(tool_call.name, tool_call.arguments) messages.append({ "role": "tool", @@ -175,12 +164,12 @@ class SubagentManager: if final_result is None: final_result = "Task completed but no final response was generated." - logger.info(f"Subagent [{task_id}] completed successfully") + logger.info("Subagent [{}] completed successfully", task_id) await self._announce_result(task_id, label, task, final_result, origin, "ok") except Exception as e: error_msg = f"Error: {str(e)}" - logger.error(f"Subagent [{task_id}] failed: {e}") + logger.error("Subagent [{}] failed: {}", task_id, e) await self._announce_result(task_id, label, task, error_msg, origin, "error") async def _announce_result( @@ -213,7 +202,7 @@ Summarize this naturally for the user. Keep it brief (1-2 sentences). Do not men ) await self.bus.publish_inbound(msg) - logger.debug(f"Subagent [{task_id}] announced result to {origin['channel']}:{origin['chat_id']}") + logger.debug("Subagent [{}] announced result to {}:{}", task_id, origin['channel'], origin['chat_id']) def _build_subagent_prompt(self, task: str) -> str: """Build a focused system prompt for the subagent.""" @@ -252,6 +241,16 @@ Skills are available at: {self.workspace}/skills/ (read SKILL.md files as needed When you have completed the task, provide a clear summary of your findings or actions.""" + async def cancel_by_session(self, session_key: str) -> int: + """Cancel all subagents for the given session. Returns count cancelled.""" + tasks = [self._running_tasks[tid] for tid in self._session_tasks.get(session_key, []) + if tid in self._running_tasks and not self._running_tasks[tid].done()] + for t in tasks: + t.cancel() + if tasks: + await asyncio.gather(*tasks, return_exceptions=True) + return len(tasks) + def get_running_count(self) -> int: """Return the number of currently running subagents.""" return len(self._running_tasks) diff --git a/nanobot/agent/tools/filesystem.py b/nanobot/agent/tools/filesystem.py index 6b3254a..b87da11 100644 --- a/nanobot/agent/tools/filesystem.py +++ b/nanobot/agent/tools/filesystem.py @@ -1,23 +1,31 @@ """File system tools: read, write, edit.""" +import difflib from pathlib import Path from typing import Any from nanobot.agent.tools.base import Tool -def _resolve_path(path: str, allowed_dir: Path | None = None) -> Path: - """Resolve path and optionally enforce directory restriction.""" - resolved = Path(path).expanduser().resolve() - if allowed_dir and not str(resolved).startswith(str(allowed_dir.resolve())): - raise PermissionError(f"Path {path} is outside allowed directory {allowed_dir}") +def _resolve_path(path: str, workspace: Path | None = None, allowed_dir: Path | None = None) -> Path: + """Resolve path against workspace (if relative) and enforce directory restriction.""" + p = Path(path).expanduser() + if not p.is_absolute() and workspace: + p = workspace / p + resolved = p.resolve() + if allowed_dir: + try: + resolved.relative_to(allowed_dir.resolve()) + except ValueError: + raise PermissionError(f"Path {path} is outside allowed directory {allowed_dir}") return resolved class ReadFileTool(Tool): """Tool to read file contents.""" - - def __init__(self, allowed_dir: Path | None = None): + + def __init__(self, workspace: Path | None = None, allowed_dir: Path | None = None): + self._workspace = workspace self._allowed_dir = allowed_dir @property @@ -43,12 +51,12 @@ class ReadFileTool(Tool): async def execute(self, path: str, **kwargs: Any) -> str: try: - file_path = _resolve_path(path, self._allowed_dir) + file_path = _resolve_path(path, self._workspace, self._allowed_dir) if not file_path.exists(): return f"Error: File not found: {path}" if not file_path.is_file(): return f"Error: Not a file: {path}" - + content = file_path.read_text(encoding="utf-8") return content except PermissionError as e: @@ -59,8 +67,9 @@ class ReadFileTool(Tool): class WriteFileTool(Tool): """Tool to write content to a file.""" - - def __init__(self, allowed_dir: Path | None = None): + + def __init__(self, workspace: Path | None = None, allowed_dir: Path | None = None): + self._workspace = workspace self._allowed_dir = allowed_dir @property @@ -90,10 +99,10 @@ class WriteFileTool(Tool): async def execute(self, path: str, content: str, **kwargs: Any) -> str: try: - file_path = _resolve_path(path, self._allowed_dir) + file_path = _resolve_path(path, self._workspace, self._allowed_dir) file_path.parent.mkdir(parents=True, exist_ok=True) file_path.write_text(content, encoding="utf-8") - return f"Successfully wrote {len(content)} bytes to {path}" + return f"Successfully wrote {len(content)} bytes to {file_path}" except PermissionError as e: return f"Error: {e}" except Exception as e: @@ -102,8 +111,9 @@ class WriteFileTool(Tool): class EditFileTool(Tool): """Tool to edit a file by replacing text.""" - - def __init__(self, allowed_dir: Path | None = None): + + def __init__(self, workspace: Path | None = None, allowed_dir: Path | None = None): + self._workspace = workspace self._allowed_dir = allowed_dir @property @@ -137,34 +147,57 @@ class EditFileTool(Tool): async def execute(self, path: str, old_text: str, new_text: str, **kwargs: Any) -> str: try: - file_path = _resolve_path(path, self._allowed_dir) + file_path = _resolve_path(path, self._workspace, self._allowed_dir) if not file_path.exists(): return f"Error: File not found: {path}" - + content = file_path.read_text(encoding="utf-8") - + if old_text not in content: - return f"Error: old_text not found in file. Make sure it matches exactly." - + return self._not_found_message(old_text, content, path) + # Count occurrences count = content.count(old_text) if count > 1: return f"Warning: old_text appears {count} times. Please provide more context to make it unique." - + new_content = content.replace(old_text, new_text, 1) file_path.write_text(new_content, encoding="utf-8") - - return f"Successfully edited {path}" + + return f"Successfully edited {file_path}" except PermissionError as e: return f"Error: {e}" except Exception as e: return f"Error editing file: {str(e)}" + @staticmethod + def _not_found_message(old_text: str, content: str, path: str) -> str: + """Build a helpful error when old_text is not found.""" + lines = content.splitlines(keepends=True) + old_lines = old_text.splitlines(keepends=True) + window = len(old_lines) + + best_ratio, best_start = 0.0, 0 + for i in range(max(1, len(lines) - window + 1)): + ratio = difflib.SequenceMatcher(None, old_lines, lines[i : i + window]).ratio() + if ratio > best_ratio: + best_ratio, best_start = ratio, i + + if best_ratio > 0.5: + diff = "\n".join(difflib.unified_diff( + old_lines, lines[best_start : best_start + window], + fromfile="old_text (provided)", tofile=f"{path} (actual, line {best_start + 1})", + lineterm="", + )) + return f"Error: old_text not found in {path}.\nBest match ({best_ratio:.0%} similar) at line {best_start + 1}:\n{diff}" + return f"Error: old_text not found in {path}. No similar text found. Verify the file content." + class ListDirTool(Tool): """Tool to list directory contents.""" - - def __init__(self, allowed_dir: Path | None = None): + + def __init__(self, workspace: Path | None = None, allowed_dir: Path | None = None): + self._workspace = workspace self._allowed_dir = allowed_dir @property @@ -190,20 +223,20 @@ class ListDirTool(Tool): async def execute(self, path: str, **kwargs: Any) -> str: try: - dir_path = _resolve_path(path, self._allowed_dir) + dir_path = _resolve_path(path, self._workspace, self._allowed_dir) if not dir_path.exists(): return f"Error: Directory not found: {path}" if not dir_path.is_dir(): return f"Error: Not a directory: {path}" - + items = [] for item in sorted(dir_path.iterdir()): prefix = "πŸ“ " if item.is_dir() else "πŸ“„ " items.append(f"{prefix}{item.name}") - + if not items: return f"Directory {path} is empty" - + return "\n".join(items) except PermissionError as e: return f"Error: {e}" diff --git a/nanobot/agent/tools/mcp.py b/nanobot/agent/tools/mcp.py index 1c8eac4..37464e1 100644 --- a/nanobot/agent/tools/mcp.py +++ b/nanobot/agent/tools/mcp.py @@ -1,8 +1,10 @@ """MCP client: connects to MCP servers and wraps their tools as native nanobot tools.""" +import asyncio from contextlib import AsyncExitStack from typing import Any +import httpx from loguru import logger from nanobot.agent.tools.base import Tool @@ -12,12 +14,13 @@ from nanobot.agent.tools.registry import ToolRegistry class MCPToolWrapper(Tool): """Wraps a single MCP server tool as a nanobot Tool.""" - def __init__(self, session, server_name: str, tool_def): + def __init__(self, session, server_name: str, tool_def, tool_timeout: int = 30): self._session = session self._original_name = tool_def.name self._name = f"mcp_{server_name}_{tool_def.name}" self._description = tool_def.description or tool_def.name self._parameters = tool_def.inputSchema or {"type": "object", "properties": {}} + self._tool_timeout = tool_timeout @property def name(self) -> str: @@ -33,7 +36,14 @@ class MCPToolWrapper(Tool): async def execute(self, **kwargs: Any) -> str: from mcp import types - result = await self._session.call_tool(self._original_name, arguments=kwargs) + try: + result = await asyncio.wait_for( + self._session.call_tool(self._original_name, arguments=kwargs), + timeout=self._tool_timeout, + ) + except asyncio.TimeoutError: + logger.warning("MCP tool '{}' timed out after {}s", self._name, self._tool_timeout) + return f"(MCP tool call timed out after {self._tool_timeout}s)" parts = [] for block in result.content: if isinstance(block, types.TextContent): @@ -59,11 +69,20 @@ async def connect_mcp_servers( read, write = await stack.enter_async_context(stdio_client(params)) elif cfg.url: from mcp.client.streamable_http import streamable_http_client + # Always provide an explicit httpx client so MCP HTTP transport does not + # inherit httpx's default 5s timeout and preempt the higher-level tool timeout. + http_client = await stack.enter_async_context( + httpx.AsyncClient( + headers=cfg.headers or None, + follow_redirects=True, + timeout=None, + ) + ) read, write, _ = await stack.enter_async_context( - streamable_http_client(cfg.url) + streamable_http_client(cfg.url, http_client=http_client) ) else: - logger.warning(f"MCP server '{name}': no command or url configured, skipping") + logger.warning("MCP server '{}': no command or url configured, skipping", name) continue session = await stack.enter_async_context(ClientSession(read, write)) @@ -71,10 +90,10 @@ async def connect_mcp_servers( tools = await session.list_tools() for tool_def in tools.tools: - wrapper = MCPToolWrapper(session, name, tool_def) + wrapper = MCPToolWrapper(session, name, tool_def, tool_timeout=cfg.tool_timeout) registry.register(wrapper) - logger.debug(f"MCP: registered tool '{wrapper.name}' from server '{name}'") + logger.debug("MCP: registered tool '{}' from server '{}'", wrapper.name, name) - logger.info(f"MCP server '{name}': connected, {len(tools.tools)} tools registered") + logger.info("MCP server '{}': connected, {} tools registered", name, len(tools.tools)) except Exception as e: - logger.error(f"MCP server '{name}': failed to connect: {e}") + logger.error("MCP server '{}': failed to connect: {}", name, e) diff --git a/nanobot/agent/tools/message.py b/nanobot/agent/tools/message.py index 3853725..40e76e3 100644 --- a/nanobot/agent/tools/message.py +++ b/nanobot/agent/tools/message.py @@ -1,6 +1,6 @@ """Message tool for sending messages to users.""" -from typing import Any, Callable, Awaitable +from typing import Any, Awaitable, Callable from nanobot.agent.tools.base import Tool from nanobot.bus.events import OutboundMessage @@ -8,34 +8,42 @@ from nanobot.bus.events import OutboundMessage class MessageTool(Tool): """Tool to send messages to users on chat channels.""" - + def __init__( - self, + self, send_callback: Callable[[OutboundMessage], Awaitable[None]] | None = None, default_channel: str = "", - default_chat_id: str = "" + default_chat_id: str = "", + default_message_id: str | None = None, ): self._send_callback = send_callback self._default_channel = default_channel self._default_chat_id = default_chat_id - - def set_context(self, channel: str, chat_id: str) -> None: + self._default_message_id = default_message_id + self._sent_in_turn: bool = False + + def set_context(self, channel: str, chat_id: str, message_id: str | None = None) -> None: """Set the current message context.""" self._default_channel = channel self._default_chat_id = chat_id - + self._default_message_id = message_id + def set_send_callback(self, callback: Callable[[OutboundMessage], Awaitable[None]]) -> None: """Set the callback for sending messages.""" self._send_callback = callback - + + def start_turn(self) -> None: + """Reset per-turn send tracking.""" + self._sent_in_turn = False + @property def name(self) -> str: return "message" - + @property def description(self) -> str: return "Send a message to the user. Use this when you want to communicate something." - + @property def parameters(self) -> dict[str, Any]: return { @@ -61,33 +69,39 @@ class MessageTool(Tool): }, "required": ["content"] } - + async def execute( - self, - content: str, - channel: str | None = None, + self, + content: str, + channel: str | None = None, chat_id: str | None = None, + message_id: str | None = None, media: list[str] | None = None, **kwargs: Any ) -> str: channel = channel or self._default_channel chat_id = chat_id or self._default_chat_id - + message_id = message_id or self._default_message_id + if not channel or not chat_id: return "Error: No target channel/chat specified" - + if not self._send_callback: return "Error: Message sending not configured" - + msg = OutboundMessage( channel=channel, chat_id=chat_id, content=content, - media=media or [] + media=media or [], + metadata={ + "message_id": message_id, + } ) - + try: await self._send_callback(msg) + self._sent_in_turn = True media_info = f" with {len(media)} attachments" if media else "" return f"Message sent to {channel}:{chat_id}{media_info}" except Exception as e: diff --git a/nanobot/agent/tools/registry.py b/nanobot/agent/tools/registry.py index d9b33ff..3af4aef 100644 --- a/nanobot/agent/tools/registry.py +++ b/nanobot/agent/tools/registry.py @@ -36,30 +36,23 @@ class ToolRegistry: return [tool.to_schema() for tool in self._tools.values()] async def execute(self, name: str, params: dict[str, Any]) -> str: - """ - Execute a tool by name with given parameters. - - Args: - name: Tool name. - params: Tool parameters. - - Returns: - Tool execution result as string. - - Raises: - KeyError: If tool not found. - """ + """Execute a tool by name with given parameters.""" + _HINT = "\n\n[Analyze the error above and try a different approach.]" + tool = self._tools.get(name) if not tool: - return f"Error: Tool '{name}' not found" + return f"Error: Tool '{name}' not found. Available: {', '.join(self.tool_names)}" try: errors = tool.validate_params(params) if errors: - return f"Error: Invalid parameters for tool '{name}': " + "; ".join(errors) - return await tool.execute(**params) + return f"Error: Invalid parameters for tool '{name}': " + "; ".join(errors) + _HINT + result = await tool.execute(**params) + if isinstance(result, str) and result.startswith("Error"): + return result + _HINT + return result except Exception as e: - return f"Error executing {name}: {str(e)}" + return f"Error executing {name}: {str(e)}" + _HINT @property def tool_names(self) -> list[str]: diff --git a/nanobot/agent/tools/shell.py b/nanobot/agent/tools/shell.py index e3592a7..c3810b2 100644 --- a/nanobot/agent/tools/shell.py +++ b/nanobot/agent/tools/shell.py @@ -19,6 +19,7 @@ class ExecTool(Tool): deny_patterns: list[str] | None = None, allow_patterns: list[str] | None = None, restrict_to_workspace: bool = False, + path_append: str = "", ): self.timeout = timeout self.working_dir = working_dir @@ -35,6 +36,7 @@ class ExecTool(Tool): ] self.allow_patterns = allow_patterns or [] self.restrict_to_workspace = restrict_to_workspace + self.path_append = path_append @property def name(self) -> str: @@ -67,12 +69,17 @@ class ExecTool(Tool): if guard_error: return guard_error + env = os.environ.copy() + if self.path_append: + env["PATH"] = env.get("PATH", "") + os.pathsep + self.path_append + try: process = await asyncio.create_subprocess_shell( command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=cwd, + env=env, ) try: diff --git a/nanobot/agent/tools/spawn.py b/nanobot/agent/tools/spawn.py index 5884a07..fb816ca 100644 --- a/nanobot/agent/tools/spawn.py +++ b/nanobot/agent/tools/spawn.py @@ -9,22 +9,19 @@ if TYPE_CHECKING: class SpawnTool(Tool): - """ - Tool to spawn a subagent for background task execution. - - The subagent runs asynchronously and announces its result back - to the main agent when complete. - """ + """Tool to spawn a subagent for background task execution.""" def __init__(self, manager: "SubagentManager"): self._manager = manager self._origin_channel = "cli" self._origin_chat_id = "direct" + self._session_key = "cli:direct" def set_context(self, channel: str, chat_id: str) -> None: """Set the origin context for subagent announcements.""" self._origin_channel = channel self._origin_chat_id = chat_id + self._session_key = f"{channel}:{chat_id}" @property def name(self) -> str: @@ -62,4 +59,5 @@ class SpawnTool(Tool): label=label, origin_channel=self._origin_channel, origin_chat_id=self._origin_chat_id, + session_key=self._session_key, ) diff --git a/nanobot/agent/tools/web.py b/nanobot/agent/tools/web.py index 9de1d3c..56956c3 100644 --- a/nanobot/agent/tools/web.py +++ b/nanobot/agent/tools/web.py @@ -58,12 +58,21 @@ class WebSearchTool(Tool): } def __init__(self, api_key: str | None = None, max_results: int = 5): - self.api_key = api_key or os.environ.get("BRAVE_API_KEY", "") + self._init_api_key = api_key self.max_results = max_results - + + @property + def api_key(self) -> str: + """Resolve API key at call time so env/config changes are picked up.""" + return self._init_api_key or os.environ.get("BRAVE_API_KEY", "") + async def execute(self, query: str, count: int | None = None, **kwargs: Any) -> str: if not self.api_key: - return "Error: BRAVE_API_KEY not configured" + return ( + "Error: Brave Search API key not configured. " + "Set it in ~/.nanobot/config.json under tools.web.search.apiKey " + "(or export BRAVE_API_KEY), then restart the gateway." + ) try: n = min(max(count or self.max_results, 1), 10) @@ -71,7 +80,7 @@ class WebSearchTool(Tool): r = await client.get( "https://api.search.brave.com/res/v1/web/search", params={"q": query, "count": n}, - headers={"Accept": "application/json", "X-Subscription-Token": self.api_key}, + headers={"Accept": "application/json", "X-Subscription-Token": api_key}, timeout=10.0 ) r.raise_for_status() @@ -116,7 +125,7 @@ class WebFetchTool(Tool): # Validate URL before fetching is_valid, error_msg = _validate_url(url) if not is_valid: - return json.dumps({"error": f"URL validation failed: {error_msg}", "url": url}) + return json.dumps({"error": f"URL validation failed: {error_msg}", "url": url}, ensure_ascii=False) try: async with httpx.AsyncClient( @@ -131,7 +140,7 @@ class WebFetchTool(Tool): # JSON if "application/json" in ctype: - text, extractor = json.dumps(r.json(), indent=2), "json" + text, extractor = json.dumps(r.json(), indent=2, ensure_ascii=False), "json" # HTML elif "text/html" in ctype or r.text[:256].lower().startswith((" str: """Convert HTML to markdown.""" diff --git a/nanobot/bus/events.py b/nanobot/bus/events.py index a149e20..a48660d 100644 --- a/nanobot/bus/events.py +++ b/nanobot/bus/events.py @@ -16,11 +16,12 @@ class InboundMessage: timestamp: datetime = field(default_factory=datetime.now) media: list[str] = field(default_factory=list) # Media URLs metadata: dict[str, Any] = field(default_factory=dict) # Channel-specific data + session_key_override: str | None = None # Optional override for thread-scoped sessions @property def session_key(self) -> str: """Unique key for session identification.""" - return f"{self.channel}:{self.chat_id}" + return self.session_key_override or f"{self.channel}:{self.chat_id}" @dataclass diff --git a/nanobot/bus/queue.py b/nanobot/bus/queue.py index 4123d06..7c0616f 100644 --- a/nanobot/bus/queue.py +++ b/nanobot/bus/queue.py @@ -1,9 +1,6 @@ """Async message queue for decoupled channel-agent communication.""" import asyncio -from typing import Callable, Awaitable - -from loguru import logger from nanobot.bus.events import InboundMessage, OutboundMessage @@ -11,70 +8,36 @@ from nanobot.bus.events import InboundMessage, OutboundMessage class MessageBus: """ Async message bus that decouples chat channels from the agent core. - + Channels push messages to the inbound queue, and the agent processes them and pushes responses to the outbound queue. """ - + def __init__(self): self.inbound: asyncio.Queue[InboundMessage] = asyncio.Queue() self.outbound: asyncio.Queue[OutboundMessage] = asyncio.Queue() - self._outbound_subscribers: dict[str, list[Callable[[OutboundMessage], Awaitable[None]]]] = {} - self._running = False - + async def publish_inbound(self, msg: InboundMessage) -> None: """Publish a message from a channel to the agent.""" await self.inbound.put(msg) - + async def consume_inbound(self) -> InboundMessage: """Consume the next inbound message (blocks until available).""" return await self.inbound.get() - + async def publish_outbound(self, msg: OutboundMessage) -> None: """Publish a response from the agent to channels.""" await self.outbound.put(msg) - + async def consume_outbound(self) -> OutboundMessage: """Consume the next outbound message (blocks until available).""" return await self.outbound.get() - - def subscribe_outbound( - self, - channel: str, - callback: Callable[[OutboundMessage], Awaitable[None]] - ) -> None: - """Subscribe to outbound messages for a specific channel.""" - if channel not in self._outbound_subscribers: - self._outbound_subscribers[channel] = [] - self._outbound_subscribers[channel].append(callback) - - async def dispatch_outbound(self) -> None: - """ - Dispatch outbound messages to subscribed channels. - Run this as a background task. - """ - self._running = True - while self._running: - try: - msg = await asyncio.wait_for(self.outbound.get(), timeout=1.0) - subscribers = self._outbound_subscribers.get(msg.channel, []) - for callback in subscribers: - try: - await callback(msg) - except Exception as e: - logger.error(f"Error dispatching to {msg.channel}: {e}") - except asyncio.TimeoutError: - continue - - def stop(self) -> None: - """Stop the dispatcher loop.""" - self._running = False - + @property def inbound_size(self) -> int: """Number of pending inbound messages.""" return self.inbound.qsize() - + @property def outbound_size(self) -> int: """Number of pending outbound messages.""" diff --git a/nanobot/channels/base.py b/nanobot/channels/base.py index 30fcd1a..3010373 100644 --- a/nanobot/channels/base.py +++ b/nanobot/channels/base.py @@ -89,7 +89,8 @@ class BaseChannel(ABC): chat_id: str, content: str, media: list[str] | None = None, - metadata: dict[str, Any] | None = None + metadata: dict[str, Any] | None = None, + session_key: str | None = None, ) -> None: """ Handle an incoming message from the chat platform. @@ -102,11 +103,13 @@ class BaseChannel(ABC): content: Message text content. media: Optional list of media URLs. metadata: Optional channel-specific metadata. + session_key: Optional session key override (e.g. thread-scoped sessions). """ if not self.is_allowed(sender_id): logger.warning( - f"Access denied for sender {sender_id} on channel {self.name}. " - f"Add them to allowFrom list in config to grant access." + "Access denied for sender {} on channel {}. " + "Add them to allowFrom list in config to grant access.", + sender_id, self.name, ) return @@ -116,7 +119,8 @@ class BaseChannel(ABC): chat_id=str(chat_id), content=content, media=media or [], - metadata=metadata or {} + metadata=metadata or {}, + session_key_override=session_key, ) await self.bus.publish_inbound(msg) diff --git a/nanobot/channels/dingtalk.py b/nanobot/channels/dingtalk.py index 4a8cdd9..09c7714 100644 --- a/nanobot/channels/dingtalk.py +++ b/nanobot/channels/dingtalk.py @@ -58,14 +58,15 @@ class NanobotDingTalkHandler(CallbackHandler): if not content: logger.warning( - f"Received empty or unsupported message type: {chatbot_msg.message_type}" + "Received empty or unsupported message type: {}", + chatbot_msg.message_type, ) return AckMessage.STATUS_OK, "OK" sender_id = chatbot_msg.sender_staff_id or chatbot_msg.sender_id sender_name = chatbot_msg.sender_nick or "Unknown" - logger.info(f"Received DingTalk message from {sender_name} ({sender_id}): {content}") + logger.info("Received DingTalk message from {} ({}): {}", sender_name, sender_id, content) # Forward to Nanobot via _on_message (non-blocking). # Store reference to prevent GC before task completes. @@ -78,7 +79,7 @@ class NanobotDingTalkHandler(CallbackHandler): return AckMessage.STATUS_OK, "OK" except Exception as e: - logger.error(f"Error processing DingTalk message: {e}") + logger.error("Error processing DingTalk message: {}", e) # Return OK to avoid retry loop from DingTalk server return AckMessage.STATUS_OK, "Error" @@ -126,7 +127,8 @@ class DingTalkChannel(BaseChannel): self._http = httpx.AsyncClient() logger.info( - f"Initializing DingTalk Stream Client with Client ID: {self.config.client_id}..." + "Initializing DingTalk Stream Client with Client ID: {}...", + self.config.client_id, ) credential = Credential(self.config.client_id, self.config.client_secret) self._client = DingTalkStreamClient(credential) @@ -142,13 +144,13 @@ class DingTalkChannel(BaseChannel): try: await self._client.start() except Exception as e: - logger.warning(f"DingTalk stream error: {e}") + logger.warning("DingTalk stream error: {}", e) if self._running: logger.info("Reconnecting DingTalk stream in 5 seconds...") await asyncio.sleep(5) except Exception as e: - logger.exception(f"Failed to start DingTalk channel: {e}") + logger.exception("Failed to start DingTalk channel: {}", e) async def stop(self) -> None: """Stop the DingTalk bot.""" @@ -186,7 +188,7 @@ class DingTalkChannel(BaseChannel): self._token_expiry = time.time() + int(res_data.get("expireIn", 7200)) - 60 return self._access_token except Exception as e: - logger.error(f"Failed to get DingTalk access token: {e}") + logger.error("Failed to get DingTalk access token: {}", e) return None async def send(self, msg: OutboundMessage) -> None: @@ -208,7 +210,7 @@ class DingTalkChannel(BaseChannel): "msgParam": json.dumps({ "text": msg.content, "title": "Nanobot Reply", - }), + }, ensure_ascii=False), } if not self._http: @@ -218,11 +220,11 @@ class DingTalkChannel(BaseChannel): try: resp = await self._http.post(url, json=data, headers=headers) if resp.status_code != 200: - logger.error(f"DingTalk send failed: {resp.text}") + logger.error("DingTalk send failed: {}", resp.text) else: - logger.debug(f"DingTalk message sent to {msg.chat_id}") + logger.debug("DingTalk message sent to {}", msg.chat_id) except Exception as e: - logger.error(f"Error sending DingTalk message: {e}") + logger.error("Error sending DingTalk message: {}", e) async def _on_message(self, content: str, sender_id: str, sender_name: str) -> None: """Handle incoming message (called by NanobotDingTalkHandler). @@ -231,7 +233,7 @@ class DingTalkChannel(BaseChannel): permission checks before publishing to the bus. """ try: - logger.info(f"DingTalk inbound: {content} from {sender_name}") + logger.info("DingTalk inbound: {} from {}", content, sender_name) await self._handle_message( sender_id=sender_id, chat_id=sender_id, # For private chat, chat_id == sender_id @@ -242,4 +244,4 @@ class DingTalkChannel(BaseChannel): }, ) except Exception as e: - logger.error(f"Error publishing DingTalk message: {e}") + logger.error("Error publishing DingTalk message: {}", e) diff --git a/nanobot/channels/discord.py b/nanobot/channels/discord.py index a76d6ac..b9227fb 100644 --- a/nanobot/channels/discord.py +++ b/nanobot/channels/discord.py @@ -17,6 +17,29 @@ from nanobot.config.schema import DiscordConfig DISCORD_API_BASE = "https://discord.com/api/v10" MAX_ATTACHMENT_BYTES = 20 * 1024 * 1024 # 20MB +MAX_MESSAGE_LEN = 2000 # Discord message character limit + + +def _split_message(content: str, max_len: int = MAX_MESSAGE_LEN) -> list[str]: + """Split content into chunks within max_len, preferring line breaks.""" + if not content: + return [] + if len(content) <= max_len: + return [content] + chunks: list[str] = [] + while content: + if len(content) <= max_len: + chunks.append(content) + break + cut = content[:max_len] + pos = cut.rfind('\n') + if pos <= 0: + pos = cut.rfind(' ') + if pos <= 0: + pos = max_len + chunks.append(content[:pos]) + content = content[pos:].lstrip() + return chunks class DiscordChannel(BaseChannel): @@ -51,7 +74,7 @@ class DiscordChannel(BaseChannel): except asyncio.CancelledError: break except Exception as e: - logger.warning(f"Discord gateway error: {e}") + logger.warning("Discord gateway error: {}", e) if self._running: logger.info("Reconnecting to Discord gateway in 5 seconds...") await asyncio.sleep(5) @@ -79,34 +102,48 @@ class DiscordChannel(BaseChannel): return url = f"{DISCORD_API_BASE}/channels/{msg.chat_id}/messages" - payload: dict[str, Any] = {"content": msg.content} - - if msg.reply_to: - payload["message_reference"] = {"message_id": msg.reply_to} - payload["allowed_mentions"] = {"replied_user": False} - headers = {"Authorization": f"Bot {self.config.token}"} try: - for attempt in range(3): - try: - response = await self._http.post(url, headers=headers, json=payload) - if response.status_code == 429: - data = response.json() - retry_after = float(data.get("retry_after", 1.0)) - logger.warning(f"Discord rate limited, retrying in {retry_after}s") - await asyncio.sleep(retry_after) - continue - response.raise_for_status() - return - except Exception as e: - if attempt == 2: - logger.error(f"Error sending Discord message: {e}") - else: - await asyncio.sleep(1) + chunks = _split_message(msg.content or "") + if not chunks: + return + + for i, chunk in enumerate(chunks): + payload: dict[str, Any] = {"content": chunk} + + # Only set reply reference on the first chunk + if i == 0 and msg.reply_to: + payload["message_reference"] = {"message_id": msg.reply_to} + payload["allowed_mentions"] = {"replied_user": False} + + if not await self._send_payload(url, headers, payload): + break # Abort remaining chunks on failure finally: await self._stop_typing(msg.chat_id) + async def _send_payload( + self, url: str, headers: dict[str, str], payload: dict[str, Any] + ) -> bool: + """Send a single Discord API payload with retry on rate-limit. Returns True on success.""" + for attempt in range(3): + try: + response = await self._http.post(url, headers=headers, json=payload) + if response.status_code == 429: + data = response.json() + retry_after = float(data.get("retry_after", 1.0)) + logger.warning("Discord rate limited, retrying in {}s", retry_after) + await asyncio.sleep(retry_after) + continue + response.raise_for_status() + return True + except Exception as e: + if attempt == 2: + logger.error("Error sending Discord message: {}", e) + else: + await asyncio.sleep(1) + return False + async def _gateway_loop(self) -> None: """Main gateway loop: identify, heartbeat, dispatch events.""" if not self._ws: @@ -116,7 +153,7 @@ class DiscordChannel(BaseChannel): try: data = json.loads(raw) except json.JSONDecodeError: - logger.warning(f"Invalid JSON from Discord gateway: {raw[:100]}") + logger.warning("Invalid JSON from Discord gateway: {}", raw[:100]) continue op = data.get("op") @@ -175,7 +212,7 @@ class DiscordChannel(BaseChannel): try: await self._ws.send(json.dumps(payload)) except Exception as e: - logger.warning(f"Discord heartbeat failed: {e}") + logger.warning("Discord heartbeat failed: {}", e) break await asyncio.sleep(interval_s) @@ -219,7 +256,7 @@ class DiscordChannel(BaseChannel): media_paths.append(str(file_path)) content_parts.append(f"[attachment: {file_path}]") except Exception as e: - logger.warning(f"Failed to download Discord attachment: {e}") + logger.warning("Failed to download Discord attachment: {}", e) content_parts.append(f"[attachment: {filename} - download failed]") reply_to = (payload.get("referenced_message") or {}).get("id") @@ -248,8 +285,11 @@ class DiscordChannel(BaseChannel): while self._running: try: await self._http.post(url, headers=headers) - except Exception: - pass + except asyncio.CancelledError: + return + except Exception as e: + logger.debug("Discord typing indicator failed for {}: {}", channel_id, e) + return await asyncio.sleep(8) self._typing_tasks[channel_id] = asyncio.create_task(typing_loop()) diff --git a/nanobot/channels/email.py b/nanobot/channels/email.py index 0e47067..16771fb 100644 --- a/nanobot/channels/email.py +++ b/nanobot/channels/email.py @@ -94,7 +94,7 @@ class EmailChannel(BaseChannel): metadata=item.get("metadata", {}), ) except Exception as e: - logger.error(f"Email polling error: {e}") + logger.error("Email polling error: {}", e) await asyncio.sleep(poll_seconds) @@ -108,11 +108,6 @@ class EmailChannel(BaseChannel): logger.warning("Skip email send: consent_granted is false") return - force_send = bool((msg.metadata or {}).get("force_send")) - if not self.config.auto_reply_enabled and not force_send: - logger.info("Skip automatic email reply: auto_reply_enabled is false") - return - if not self.config.smtp_host: logger.warning("Email channel SMTP host not configured") return @@ -122,6 +117,15 @@ class EmailChannel(BaseChannel): logger.warning("Email channel missing recipient address") return + # Determine if this is a reply (recipient has sent us an email before) + is_reply = to_addr in self._last_subject_by_chat + force_send = bool((msg.metadata or {}).get("force_send")) + + # autoReplyEnabled only controls automatic replies, not proactive sends + if is_reply and not self.config.auto_reply_enabled and not force_send: + logger.info("Skip automatic email reply to {}: auto_reply_enabled is false", to_addr) + return + base_subject = self._last_subject_by_chat.get(to_addr, "nanobot reply") subject = self._reply_subject(base_subject) if msg.metadata and isinstance(msg.metadata.get("subject"), str): @@ -143,7 +147,7 @@ class EmailChannel(BaseChannel): try: await asyncio.to_thread(self._smtp_send, email_msg) except Exception as e: - logger.error(f"Error sending email to {to_addr}: {e}") + logger.error("Error sending email to {}: {}", to_addr, e) raise def _validate_config(self) -> bool: @@ -162,7 +166,7 @@ class EmailChannel(BaseChannel): missing.append("smtp_password") if missing: - logger.error(f"Email channel not configured, missing: {', '.join(missing)}") + logger.error("Email channel not configured, missing: {}", ', '.join(missing)) return False return True @@ -304,7 +308,8 @@ class EmailChannel(BaseChannel): self._processed_uids.add(uid) # mark_seen is the primary dedup; this set is a safety net if len(self._processed_uids) > self._MAX_PROCESSED_UIDS: - self._processed_uids.clear() + # Evict a random half to cap memory; mark_seen is the primary dedup + self._processed_uids = set(list(self._processed_uids)[len(self._processed_uids) // 2:]) if mark_seen: client.store(imap_id, "+FLAGS", "\\Seen") diff --git a/nanobot/channels/feishu.py b/nanobot/channels/feishu.py index 651d655..480bf7b 100644 --- a/nanobot/channels/feishu.py +++ b/nanobot/channels/feishu.py @@ -6,6 +6,7 @@ import os import re import threading from collections import OrderedDict +from pathlib import Path from typing import Any from loguru import logger @@ -27,6 +28,8 @@ try: CreateMessageReactionRequest, CreateMessageReactionRequestBody, Emoji, + GetFileRequest, + GetMessageResourceRequest, P2ImMessageReceiveV1, ) FEISHU_AVAILABLE = True @@ -44,21 +47,158 @@ MSG_TYPE_MAP = { } -def _extract_post_text(content_json: dict) -> str: - """Extract plain text from Feishu post (rich text) message content. +def _extract_share_card_content(content_json: dict, msg_type: str) -> str: + """Extract text representation from share cards and interactive messages.""" + parts = [] + + if msg_type == "share_chat": + parts.append(f"[shared chat: {content_json.get('chat_id', '')}]") + elif msg_type == "share_user": + parts.append(f"[shared user: {content_json.get('user_id', '')}]") + elif msg_type == "interactive": + parts.extend(_extract_interactive_content(content_json)) + elif msg_type == "share_calendar_event": + parts.append(f"[shared calendar event: {content_json.get('event_key', '')}]") + elif msg_type == "system": + parts.append("[system message]") + elif msg_type == "merge_forward": + parts.append("[merged forward messages]") + + return "\n".join(parts) if parts else f"[{msg_type}]" + + +def _extract_interactive_content(content: dict) -> list[str]: + """Recursively extract text and links from interactive card content.""" + parts = [] + + if isinstance(content, str): + try: + content = json.loads(content) + except (json.JSONDecodeError, TypeError): + return [content] if content.strip() else [] + + if not isinstance(content, dict): + return parts + + if "title" in content: + title = content["title"] + if isinstance(title, dict): + title_content = title.get("content", "") or title.get("text", "") + if title_content: + parts.append(f"title: {title_content}") + elif isinstance(title, str): + parts.append(f"title: {title}") + + for element in content.get("elements", []) if isinstance(content.get("elements"), list) else []: + parts.extend(_extract_element_content(element)) + + card = content.get("card", {}) + if card: + parts.extend(_extract_interactive_content(card)) + + header = content.get("header", {}) + if header: + header_title = header.get("title", {}) + if isinstance(header_title, dict): + header_text = header_title.get("content", "") or header_title.get("text", "") + if header_text: + parts.append(f"title: {header_text}") + + return parts + + +def _extract_element_content(element: dict) -> list[str]: + """Extract content from a single card element.""" + parts = [] + + if not isinstance(element, dict): + return parts + + tag = element.get("tag", "") + + if tag in ("markdown", "lark_md"): + content = element.get("content", "") + if content: + parts.append(content) + + elif tag == "div": + text = element.get("text", {}) + if isinstance(text, dict): + text_content = text.get("content", "") or text.get("text", "") + if text_content: + parts.append(text_content) + elif isinstance(text, str): + parts.append(text) + for field in element.get("fields", []): + if isinstance(field, dict): + field_text = field.get("text", {}) + if isinstance(field_text, dict): + c = field_text.get("content", "") + if c: + parts.append(c) + + elif tag == "a": + href = element.get("href", "") + text = element.get("text", "") + if href: + parts.append(f"link: {href}") + if text: + parts.append(text) + + elif tag == "button": + text = element.get("text", {}) + if isinstance(text, dict): + c = text.get("content", "") + if c: + parts.append(c) + url = element.get("url", "") or element.get("multi_url", {}).get("url", "") + if url: + parts.append(f"link: {url}") + + elif tag == "img": + alt = element.get("alt", {}) + parts.append(alt.get("content", "[image]") if isinstance(alt, dict) else "[image]") + + elif tag == "note": + for ne in element.get("elements", []): + parts.extend(_extract_element_content(ne)) + + elif tag == "column_set": + for col in element.get("columns", []): + for ce in col.get("elements", []): + parts.extend(_extract_element_content(ce)) + + elif tag == "plain_text": + content = element.get("content", "") + if content: + parts.append(content) + + else: + for ne in element.get("elements", []): + parts.extend(_extract_element_content(ne)) + + return parts + + +def _extract_post_content(content_json: dict) -> tuple[str, list[str]]: + """Extract text and image keys from Feishu post (rich text) message content. Supports two formats: 1. Direct format: {"title": "...", "content": [...]} 2. Localized format: {"zh_cn": {"title": "...", "content": [...]}} + + Returns: + (text, image_keys) - extracted text and list of image keys """ - def extract_from_lang(lang_content: dict) -> str | None: + def extract_from_lang(lang_content: dict) -> tuple[str | None, list[str]]: if not isinstance(lang_content, dict): - return None + return None, [] title = lang_content.get("title", "") content_blocks = lang_content.get("content", []) if not isinstance(content_blocks, list): - return None + return None, [] text_parts = [] + image_keys = [] if title: text_parts.append(title) for block in content_blocks: @@ -73,22 +213,36 @@ def _extract_post_text(content_json: dict) -> str: text_parts.append(element.get("text", "")) elif tag == "at": text_parts.append(f"@{element.get('user_name', 'user')}") - return " ".join(text_parts).strip() if text_parts else None + elif tag == "img": + img_key = element.get("image_key") + if img_key: + image_keys.append(img_key) + text = " ".join(text_parts).strip() if text_parts else None + return text, image_keys # Try direct format first if "content" in content_json: - result = extract_from_lang(content_json) - if result: - return result + text, images = extract_from_lang(content_json) + if text or images: + return text or "", images # Try localized format for lang_key in ("zh_cn", "en_us", "ja_jp"): lang_content = content_json.get(lang_key) - result = extract_from_lang(lang_content) - if result: - return result + text, images = extract_from_lang(lang_content) + if text or images: + return text or "", images - return "" + return "", [] + + +def _extract_post_text(content_json: dict) -> str: + """Extract plain text from Feishu post (rich text) message content. + + Legacy wrapper for _extract_post_content, returns only text. + """ + text, _ = _extract_post_content(content_json) + return text class FeishuChannel(BaseChannel): @@ -156,7 +310,7 @@ class FeishuChannel(BaseChannel): try: self._ws_client.start() except Exception as e: - logger.warning(f"Feishu WebSocket error: {e}") + logger.warning("Feishu WebSocket error: {}", e) if self._running: import time; time.sleep(5) @@ -177,7 +331,7 @@ class FeishuChannel(BaseChannel): try: self._ws_client.stop() except Exception as e: - logger.warning(f"Error stopping WebSocket client: {e}") + logger.warning("Error stopping WebSocket client: {}", e) logger.info("Feishu bot stopped") def _add_reaction_sync(self, message_id: str, emoji_type: str) -> None: @@ -194,11 +348,11 @@ class FeishuChannel(BaseChannel): response = self._client.im.v1.message_reaction.create(request) if not response.success(): - logger.warning(f"Failed to add reaction: code={response.code}, msg={response.msg}") + logger.warning("Failed to add reaction: code={}, msg={}", response.code, response.msg) else: - logger.debug(f"Added {emoji_type} reaction to message {message_id}") + logger.debug("Added {} reaction to message {}", emoji_type, message_id) except Exception as e: - logger.warning(f"Error adding reaction: {e}") + logger.warning("Error adding reaction: {}", e) async def _add_reaction(self, message_id: str, emoji_type: str = "THUMBSUP") -> None: """ @@ -309,13 +463,13 @@ class FeishuChannel(BaseChannel): response = self._client.im.v1.image.create(request) if response.success(): image_key = response.data.image_key - logger.debug(f"Uploaded image {os.path.basename(file_path)}: {image_key}") + logger.debug("Uploaded image {}: {}", os.path.basename(file_path), image_key) return image_key else: - logger.error(f"Failed to upload image: code={response.code}, msg={response.msg}") + logger.error("Failed to upload image: code={}, msg={}", response.code, response.msg) return None except Exception as e: - logger.error(f"Error uploading image {file_path}: {e}") + logger.error("Error uploading image {}: {}", file_path, e) return None def _upload_file_sync(self, file_path: str) -> str | None: @@ -336,15 +490,107 @@ class FeishuChannel(BaseChannel): response = self._client.im.v1.file.create(request) if response.success(): file_key = response.data.file_key - logger.debug(f"Uploaded file {file_name}: {file_key}") + logger.debug("Uploaded file {}: {}", file_name, file_key) return file_key else: - logger.error(f"Failed to upload file: code={response.code}, msg={response.msg}") + logger.error("Failed to upload file: code={}, msg={}", response.code, response.msg) return None except Exception as e: - logger.error(f"Error uploading file {file_path}: {e}") + logger.error("Error uploading file {}: {}", file_path, e) return None + def _download_image_sync(self, message_id: str, image_key: str) -> tuple[bytes | None, str | None]: + """Download an image from Feishu message by message_id and image_key.""" + try: + request = GetMessageResourceRequest.builder() \ + .message_id(message_id) \ + .file_key(image_key) \ + .type("image") \ + .build() + response = self._client.im.v1.message_resource.get(request) + if response.success(): + file_data = response.file + # GetMessageResourceRequest returns BytesIO, need to read bytes + if hasattr(file_data, 'read'): + file_data = file_data.read() + return file_data, response.file_name + else: + logger.error("Failed to download image: code={}, msg={}", response.code, response.msg) + return None, None + except Exception as e: + logger.error("Error downloading image {}: {}", image_key, e) + return None, None + + def _download_file_sync( + self, message_id: str, file_key: str, resource_type: str = "file" + ) -> tuple[bytes | None, str | None]: + """Download a file/audio/media from a Feishu message by message_id and file_key.""" + try: + request = ( + GetMessageResourceRequest.builder() + .message_id(message_id) + .file_key(file_key) + .type(resource_type) + .build() + ) + response = self._client.im.v1.message_resource.get(request) + if response.success(): + file_data = response.file + if hasattr(file_data, "read"): + file_data = file_data.read() + return file_data, response.file_name + else: + logger.error("Failed to download {}: code={}, msg={}", resource_type, response.code, response.msg) + return None, None + except Exception: + logger.exception("Error downloading {} {}", resource_type, file_key) + return None, None + + async def _download_and_save_media( + self, + msg_type: str, + content_json: dict, + message_id: str | None = None + ) -> tuple[str | None, str]: + """ + Download media from Feishu and save to local disk. + + Returns: + (file_path, content_text) - file_path is None if download failed + """ + loop = asyncio.get_running_loop() + media_dir = Path.home() / ".nanobot" / "media" + media_dir.mkdir(parents=True, exist_ok=True) + + data, filename = None, None + + if msg_type == "image": + image_key = content_json.get("image_key") + if image_key and message_id: + data, filename = await loop.run_in_executor( + None, self._download_image_sync, message_id, image_key + ) + if not filename: + filename = f"{image_key[:16]}.jpg" + + elif msg_type in ("audio", "file", "media"): + file_key = content_json.get("file_key") + if file_key and message_id: + data, filename = await loop.run_in_executor( + None, self._download_file_sync, message_id, file_key, msg_type + ) + if not filename: + ext = {"audio": ".opus", "media": ".mp4"}.get(msg_type, "") + filename = f"{file_key[:16]}{ext}" + + if data and filename: + file_path = media_dir / filename + file_path.write_bytes(data) + logger.debug("Downloaded {} to {}", msg_type, file_path) + return str(file_path), f"[{msg_type}: {filename}]" + + return None, f"[{msg_type}: download failed]" + def _send_message_sync(self, receive_id_type: str, receive_id: str, msg_type: str, content: str) -> bool: """Send a single message (text/image/file/interactive) synchronously.""" try: @@ -360,14 +606,14 @@ class FeishuChannel(BaseChannel): response = self._client.im.v1.message.create(request) if not response.success(): logger.error( - f"Failed to send Feishu {msg_type} message: code={response.code}, " - f"msg={response.msg}, log_id={response.get_log_id()}" + "Failed to send Feishu {} message: code={}, msg={}, log_id={}", + msg_type, response.code, response.msg, response.get_log_id() ) return False - logger.debug(f"Feishu {msg_type} message sent to {receive_id}") + logger.debug("Feishu {} message sent to {}", msg_type, receive_id) return True except Exception as e: - logger.error(f"Error sending Feishu {msg_type} message: {e}") + logger.error("Error sending Feishu {} message: {}", msg_type, e) return False async def send(self, msg: OutboundMessage) -> None: @@ -382,7 +628,7 @@ class FeishuChannel(BaseChannel): for file_path in msg.media: if not os.path.isfile(file_path): - logger.warning(f"Media file not found: {file_path}") + logger.warning("Media file not found: {}", file_path) continue ext = os.path.splitext(file_path)[1].lower() if ext in self._IMAGE_EXTS: @@ -390,7 +636,7 @@ class FeishuChannel(BaseChannel): if key: await loop.run_in_executor( None, self._send_message_sync, - receive_id_type, msg.chat_id, "image", json.dumps({"image_key": key}), + receive_id_type, msg.chat_id, "image", json.dumps({"image_key": key}, ensure_ascii=False), ) else: key = await loop.run_in_executor(None, self._upload_file_sync, file_path) @@ -398,7 +644,7 @@ class FeishuChannel(BaseChannel): media_type = "audio" if ext in self._AUDIO_EXTS else "file" await loop.run_in_executor( None, self._send_message_sync, - receive_id_type, msg.chat_id, media_type, json.dumps({"file_key": key}), + receive_id_type, msg.chat_id, media_type, json.dumps({"file_key": key}, ensure_ascii=False), ) if msg.content and msg.content.strip(): @@ -409,7 +655,7 @@ class FeishuChannel(BaseChannel): ) except Exception as e: - logger.error(f"Error sending Feishu message: {e}") + logger.error("Error sending Feishu message: {}", e) def _on_message_sync(self, data: "P2ImMessageReceiveV1") -> None: """ @@ -425,60 +671,89 @@ class FeishuChannel(BaseChannel): event = data.event message = event.message sender = event.sender - + # Deduplication check message_id = message.message_id if message_id in self._processed_message_ids: return self._processed_message_ids[message_id] = None - - # Trim cache: keep most recent 500 when exceeds 1000 + + # Trim cache while len(self._processed_message_ids) > 1000: self._processed_message_ids.popitem(last=False) - + # Skip bot messages - sender_type = sender.sender_type - if sender_type == "bot": + if sender.sender_type == "bot": return - + sender_id = sender.sender_id.open_id if sender.sender_id else "unknown" chat_id = message.chat_id - chat_type = message.chat_type # "p2p" or "group" + chat_type = message.chat_type msg_type = message.message_type - - # Add reaction to indicate "seen" + + # Add reaction await self._add_reaction(message_id, "THUMBSUP") - - # Parse message content + + # Parse content + content_parts = [] + media_paths = [] + + try: + content_json = json.loads(message.content) if message.content else {} + except json.JSONDecodeError: + content_json = {} + if msg_type == "text": - try: - content = json.loads(message.content).get("text", "") - except json.JSONDecodeError: - content = message.content or "" + text = content_json.get("text", "") + if text: + content_parts.append(text) + elif msg_type == "post": - try: - content_json = json.loads(message.content) - content = _extract_post_text(content_json) - except (json.JSONDecodeError, TypeError): - content = message.content or "" + text, image_keys = _extract_post_content(content_json) + if text: + content_parts.append(text) + # Download images embedded in post + for img_key in image_keys: + file_path, content_text = await self._download_and_save_media( + "image", {"image_key": img_key}, message_id + ) + if file_path: + media_paths.append(file_path) + content_parts.append(content_text) + + elif msg_type in ("image", "audio", "file", "media"): + file_path, content_text = await self._download_and_save_media(msg_type, content_json, message_id) + if file_path: + media_paths.append(file_path) + content_parts.append(content_text) + + elif msg_type in ("share_chat", "share_user", "interactive", "share_calendar_event", "system", "merge_forward"): + # Handle share cards and interactive messages + text = _extract_share_card_content(content_json, msg_type) + if text: + content_parts.append(text) + else: - content = MSG_TYPE_MAP.get(msg_type, f"[{msg_type}]") - - if not content: + content_parts.append(MSG_TYPE_MAP.get(msg_type, f"[{msg_type}]")) + + content = "\n".join(content_parts) if content_parts else "" + + if not content and not media_paths: return - + # Forward to message bus reply_to = chat_id if chat_type == "group" else sender_id await self._handle_message( sender_id=sender_id, chat_id=reply_to, content=content, + media=media_paths, metadata={ "message_id": message_id, "chat_type": chat_type, "msg_type": msg_type, } ) - + except Exception as e: - logger.error(f"Error processing Feishu message: {e}") + logger.error("Error processing Feishu message: {}", e) diff --git a/nanobot/channels/manager.py b/nanobot/channels/manager.py index e860d26..77b7294 100644 --- a/nanobot/channels/manager.py +++ b/nanobot/channels/manager.py @@ -45,7 +45,7 @@ class ChannelManager: ) logger.info("Telegram channel enabled") except ImportError as e: - logger.warning(f"Telegram channel not available: {e}") + logger.warning("Telegram channel not available: {}", e) # WhatsApp channel if self.config.channels.whatsapp.enabled: @@ -56,7 +56,7 @@ class ChannelManager: ) logger.info("WhatsApp channel enabled") except ImportError as e: - logger.warning(f"WhatsApp channel not available: {e}") + logger.warning("WhatsApp channel not available: {}", e) # Discord channel if self.config.channels.discord.enabled: @@ -67,7 +67,7 @@ class ChannelManager: ) logger.info("Discord channel enabled") except ImportError as e: - logger.warning(f"Discord channel not available: {e}") + logger.warning("Discord channel not available: {}", e) # Feishu channel if self.config.channels.feishu.enabled: @@ -78,7 +78,7 @@ class ChannelManager: ) logger.info("Feishu channel enabled") except ImportError as e: - logger.warning(f"Feishu channel not available: {e}") + logger.warning("Feishu channel not available: {}", e) # Mochat channel if self.config.channels.mochat.enabled: @@ -90,7 +90,7 @@ class ChannelManager: ) logger.info("Mochat channel enabled") except ImportError as e: - logger.warning(f"Mochat channel not available: {e}") + logger.warning("Mochat channel not available: {}", e) # DingTalk channel if self.config.channels.dingtalk.enabled: @@ -101,7 +101,7 @@ class ChannelManager: ) logger.info("DingTalk channel enabled") except ImportError as e: - logger.warning(f"DingTalk channel not available: {e}") + logger.warning("DingTalk channel not available: {}", e) # Email channel if self.config.channels.email.enabled: @@ -112,7 +112,7 @@ class ChannelManager: ) logger.info("Email channel enabled") except ImportError as e: - logger.warning(f"Email channel not available: {e}") + logger.warning("Email channel not available: {}", e) # Slack channel if self.config.channels.slack.enabled: @@ -123,7 +123,7 @@ class ChannelManager: ) logger.info("Slack channel enabled") except ImportError as e: - logger.warning(f"Slack channel not available: {e}") + logger.warning("Slack channel not available: {}", e) # QQ channel if self.config.channels.qq.enabled: @@ -135,14 +135,14 @@ class ChannelManager: ) logger.info("QQ channel enabled") except ImportError as e: - logger.warning(f"QQ channel not available: {e}") + logger.warning("QQ channel not available: {}", e) async def _start_channel(self, name: str, channel: BaseChannel) -> None: """Start a channel and log any exceptions.""" try: await channel.start() except Exception as e: - logger.error(f"Failed to start channel {name}: {e}") + logger.error("Failed to start channel {}: {}", name, e) async def start_all(self) -> None: """Start all channels and the outbound dispatcher.""" @@ -156,7 +156,7 @@ class ChannelManager: # Start channels tasks = [] for name, channel in self.channels.items(): - logger.info(f"Starting {name} channel...") + logger.info("Starting {} channel...", name) tasks.append(asyncio.create_task(self._start_channel(name, channel))) # Wait for all to complete (they should run forever) @@ -178,9 +178,9 @@ class ChannelManager: for name, channel in self.channels.items(): try: await channel.stop() - logger.info(f"Stopped {name} channel") + logger.info("Stopped {} channel", name) except Exception as e: - logger.error(f"Error stopping {name}: {e}") + logger.error("Error stopping {}: {}", name, e) async def _dispatch_outbound(self) -> None: """Dispatch outbound messages to the appropriate channel.""" @@ -193,14 +193,20 @@ class ChannelManager: timeout=1.0 ) + if msg.metadata.get("_progress"): + if msg.metadata.get("_tool_hint") and not self.config.channels.send_tool_hints: + continue + if not msg.metadata.get("_tool_hint") and not self.config.channels.send_progress: + continue + channel = self.channels.get(msg.channel) if channel: try: await channel.send(msg) except Exception as e: - logger.error(f"Error sending to {msg.channel}: {e}") + logger.error("Error sending to {}: {}", msg.channel, e) else: - logger.warning(f"Unknown channel: {msg.channel}") + logger.warning("Unknown channel: {}", msg.channel) except asyncio.TimeoutError: continue diff --git a/nanobot/channels/mochat.py b/nanobot/channels/mochat.py index 30c3dbf..e762dfd 100644 --- a/nanobot/channels/mochat.py +++ b/nanobot/channels/mochat.py @@ -322,7 +322,7 @@ class MochatChannel(BaseChannel): await self._api_send("/api/claw/sessions/send", "sessionId", target.id, content, msg.reply_to) except Exception as e: - logger.error(f"Failed to send Mochat message: {e}") + logger.error("Failed to send Mochat message: {}", e) # ---- config / init helpers --------------------------------------------- @@ -380,7 +380,7 @@ class MochatChannel(BaseChannel): @client.event async def connect_error(data: Any) -> None: - logger.error(f"Mochat websocket connect error: {data}") + logger.error("Mochat websocket connect error: {}", data) @client.on("claw.session.events") async def on_session_events(payload: dict[str, Any]) -> None: @@ -407,7 +407,7 @@ class MochatChannel(BaseChannel): ) return True except Exception as e: - logger.error(f"Failed to connect Mochat websocket: {e}") + logger.error("Failed to connect Mochat websocket: {}", e) try: await client.disconnect() except Exception: @@ -444,7 +444,7 @@ class MochatChannel(BaseChannel): "limit": self.config.watch_limit, }) if not ack.get("result"): - logger.error(f"Mochat subscribeSessions failed: {ack.get('message', 'unknown error')}") + logger.error("Mochat subscribeSessions failed: {}", ack.get('message', 'unknown error')) return False data = ack.get("data") @@ -466,7 +466,7 @@ class MochatChannel(BaseChannel): return True ack = await self._socket_call("com.claw.im.subscribePanels", {"panelIds": panel_ids}) if not ack.get("result"): - logger.error(f"Mochat subscribePanels failed: {ack.get('message', 'unknown error')}") + logger.error("Mochat subscribePanels failed: {}", ack.get('message', 'unknown error')) return False return True @@ -488,7 +488,7 @@ class MochatChannel(BaseChannel): try: await self._refresh_targets(subscribe_new=self._ws_ready) except Exception as e: - logger.warning(f"Mochat refresh failed: {e}") + logger.warning("Mochat refresh failed: {}", e) if self._fallback_mode: await self._ensure_fallback_workers() @@ -502,7 +502,7 @@ class MochatChannel(BaseChannel): try: response = await self._post_json("/api/claw/sessions/list", {}) except Exception as e: - logger.warning(f"Mochat listSessions failed: {e}") + logger.warning("Mochat listSessions failed: {}", e) return sessions = response.get("sessions") @@ -536,7 +536,7 @@ class MochatChannel(BaseChannel): try: response = await self._post_json("/api/claw/groups/get", {}) except Exception as e: - logger.warning(f"Mochat getWorkspaceGroup failed: {e}") + logger.warning("Mochat getWorkspaceGroup failed: {}", e) return raw_panels = response.get("panels") @@ -598,7 +598,7 @@ class MochatChannel(BaseChannel): except asyncio.CancelledError: break except Exception as e: - logger.warning(f"Mochat watch fallback error ({session_id}): {e}") + logger.warning("Mochat watch fallback error ({}): {}", session_id, e) await asyncio.sleep(max(0.1, self.config.retry_delay_ms / 1000.0)) async def _panel_poll_worker(self, panel_id: str) -> None: @@ -625,7 +625,7 @@ class MochatChannel(BaseChannel): except asyncio.CancelledError: break except Exception as e: - logger.warning(f"Mochat panel polling error ({panel_id}): {e}") + logger.warning("Mochat panel polling error ({}): {}", panel_id, e) await asyncio.sleep(sleep_s) # ---- inbound event processing ------------------------------------------ @@ -836,7 +836,7 @@ class MochatChannel(BaseChannel): try: data = json.loads(self._cursor_path.read_text("utf-8")) except Exception as e: - logger.warning(f"Failed to read Mochat cursor file: {e}") + logger.warning("Failed to read Mochat cursor file: {}", e) return cursors = data.get("cursors") if isinstance(data, dict) else None if isinstance(cursors, dict): @@ -852,7 +852,7 @@ class MochatChannel(BaseChannel): "cursors": self._session_cursor, }, ensure_ascii=False, indent=2) + "\n", "utf-8") except Exception as e: - logger.warning(f"Failed to save Mochat cursor file: {e}") + logger.warning("Failed to save Mochat cursor file: {}", e) # ---- HTTP helpers ------------------------------------------------------ diff --git a/nanobot/channels/qq.py b/nanobot/channels/qq.py index 0e8fe66..5352a30 100644 --- a/nanobot/channels/qq.py +++ b/nanobot/channels/qq.py @@ -34,7 +34,7 @@ def _make_bot_class(channel: "QQChannel") -> "type[botpy.Client]": super().__init__(intents=intents) async def on_ready(self): - logger.info(f"QQ bot ready: {self.robot.name}") + logger.info("QQ bot ready: {}", self.robot.name) async def on_c2c_message_create(self, message: "C2CMessage"): await channel._on_message(message) @@ -55,7 +55,6 @@ class QQChannel(BaseChannel): self.config: QQConfig = config self._client: "botpy.Client | None" = None self._processed_ids: deque = deque(maxlen=1000) - self._bot_task: asyncio.Task | None = None async def start(self) -> None: """Start the QQ bot.""" @@ -71,8 +70,8 @@ class QQChannel(BaseChannel): BotClass = _make_bot_class(self) self._client = BotClass() - self._bot_task = asyncio.create_task(self._run_bot()) logger.info("QQ bot started (C2C private message)") + await self._run_bot() async def _run_bot(self) -> None: """Run the bot connection with auto-reconnect.""" @@ -80,7 +79,7 @@ class QQChannel(BaseChannel): try: await self._client.start(appid=self.config.app_id, secret=self.config.secret) except Exception as e: - logger.warning(f"QQ bot error: {e}") + logger.warning("QQ bot error: {}", e) if self._running: logger.info("Reconnecting QQ bot in 5 seconds...") await asyncio.sleep(5) @@ -88,11 +87,10 @@ class QQChannel(BaseChannel): async def stop(self) -> None: """Stop the QQ bot.""" self._running = False - if self._bot_task: - self._bot_task.cancel() + if self._client: try: - await self._bot_task - except asyncio.CancelledError: + await self._client.close() + except Exception: pass logger.info("QQ bot stopped") @@ -108,7 +106,7 @@ class QQChannel(BaseChannel): content=msg.content, ) except Exception as e: - logger.error(f"Error sending QQ message: {e}") + logger.error("Error sending QQ message: {}", e) async def _on_message(self, data: "C2CMessage") -> None: """Handle incoming message from QQ.""" @@ -130,5 +128,5 @@ class QQChannel(BaseChannel): content=content, metadata={"message_id": data.id}, ) - except Exception as e: - logger.error(f"Error handling QQ message: {e}") + except Exception: + logger.exception("Error handling QQ message") diff --git a/nanobot/channels/slack.py b/nanobot/channels/slack.py index dca5055..57bfbcb 100644 --- a/nanobot/channels/slack.py +++ b/nanobot/channels/slack.py @@ -36,7 +36,7 @@ class SlackChannel(BaseChannel): logger.error("Slack bot/app token not configured") return if self.config.mode != "socket": - logger.error(f"Unsupported Slack mode: {self.config.mode}") + logger.error("Unsupported Slack mode: {}", self.config.mode) return self._running = True @@ -53,9 +53,9 @@ class SlackChannel(BaseChannel): try: auth = await self._web_client.auth_test() self._bot_user_id = auth.get("user_id") - logger.info(f"Slack bot connected as {self._bot_user_id}") + logger.info("Slack bot connected as {}", self._bot_user_id) except Exception as e: - logger.warning(f"Slack auth_test failed: {e}") + logger.warning("Slack auth_test failed: {}", e) logger.info("Starting Slack Socket Mode client...") await self._socket_client.connect() @@ -70,7 +70,7 @@ class SlackChannel(BaseChannel): try: await self._socket_client.close() except Exception as e: - logger.warning(f"Slack socket close failed: {e}") + logger.warning("Slack socket close failed: {}", e) self._socket_client = None async def send(self, msg: OutboundMessage) -> None: @@ -84,13 +84,26 @@ class SlackChannel(BaseChannel): channel_type = slack_meta.get("channel_type") # Only reply in thread for channel/group messages; DMs don't use threads use_thread = thread_ts and channel_type != "im" - await self._web_client.chat_postMessage( - channel=msg.chat_id, - text=self._to_mrkdwn(msg.content), - thread_ts=thread_ts if use_thread else None, - ) + thread_ts_param = thread_ts if use_thread else None + + if msg.content: + await self._web_client.chat_postMessage( + channel=msg.chat_id, + text=self._to_mrkdwn(msg.content), + thread_ts=thread_ts_param, + ) + + for media_path in msg.media or []: + try: + await self._web_client.files_upload_v2( + channel=msg.chat_id, + file=media_path, + thread_ts=thread_ts_param, + ) + except Exception as e: + logger.error("Failed to upload file {}: {}", media_path, e) except Exception as e: - logger.error(f"Error sending Slack message: {e}") + logger.error("Error sending Slack message: {}", e) async def _on_socket_request( self, @@ -164,20 +177,27 @@ class SlackChannel(BaseChannel): timestamp=event.get("ts"), ) except Exception as e: - logger.debug(f"Slack reactions_add failed: {e}") + logger.debug("Slack reactions_add failed: {}", e) - await self._handle_message( - sender_id=sender_id, - chat_id=chat_id, - content=text, - metadata={ - "slack": { - "event": event, - "thread_ts": thread_ts, - "channel_type": channel_type, - } - }, - ) + # Thread-scoped session key for channel/group messages + session_key = f"slack:{chat_id}:{thread_ts}" if thread_ts and channel_type != "im" else None + + try: + await self._handle_message( + sender_id=sender_id, + chat_id=chat_id, + content=text, + metadata={ + "slack": { + "event": event, + "thread_ts": thread_ts, + "channel_type": channel_type, + }, + }, + session_key=session_key, + ) + except Exception: + logger.exception("Error handling Slack message from {}", sender_id) def _is_allowed(self, sender_id: str, chat_id: str, channel_type: str) -> bool: if channel_type == "im": @@ -209,6 +229,11 @@ class SlackChannel(BaseChannel): return re.sub(rf"<@{re.escape(self._bot_user_id)}>\s*", "", text).strip() _TABLE_RE = re.compile(r"(?m)^\|.*\|$(?:\n\|[\s:|-]*\|$)(?:\n\|.*\|$)*") + _CODE_FENCE_RE = re.compile(r"```[\s\S]*?```") + _INLINE_CODE_RE = re.compile(r"`[^`]+`") + _LEFTOVER_BOLD_RE = re.compile(r"\*\*(.+?)\*\*") + _LEFTOVER_HEADER_RE = re.compile(r"^#{1,6}\s+(.+)$", re.MULTILINE) + _BARE_URL_RE = re.compile(r"(? str: @@ -216,7 +241,26 @@ class SlackChannel(BaseChannel): if not text: return "" text = cls._TABLE_RE.sub(cls._convert_table, text) - return slackify_markdown(text) + return cls._fixup_mrkdwn(slackify_markdown(text)) + + @classmethod + def _fixup_mrkdwn(cls, text: str) -> str: + """Fix markdown artifacts that slackify_markdown misses.""" + code_blocks: list[str] = [] + + def _save_code(m: re.Match) -> str: + code_blocks.append(m.group(0)) + return f"\x00CB{len(code_blocks) - 1}\x00" + + text = cls._CODE_FENCE_RE.sub(_save_code, text) + text = cls._INLINE_CODE_RE.sub(_save_code, text) + text = cls._LEFTOVER_BOLD_RE.sub(r"*\1*", text) + text = cls._LEFTOVER_HEADER_RE.sub(r"*\1*", text) + text = cls._BARE_URL_RE.sub(lambda m: m.group(0).replace("&", "&"), text) + + for i, block in enumerate(code_blocks): + text = text.replace(f"\x00CB{i}\x00", block) + return text @staticmethod def _convert_table(match: re.Match) -> str: diff --git a/nanobot/channels/telegram.py b/nanobot/channels/telegram.py index 39924b3..808f50c 100644 --- a/nanobot/channels/telegram.py +++ b/nanobot/channels/telegram.py @@ -5,7 +5,7 @@ from __future__ import annotations import asyncio import re from loguru import logger -from telegram import BotCommand, Update +from telegram import BotCommand, Update, ReplyParameters from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes from telegram.request import HTTPXRequest @@ -111,6 +111,7 @@ class TelegramChannel(BaseChannel): BOT_COMMANDS = [ BotCommand("start", "Start the bot"), BotCommand("new", "Start a new conversation"), + BotCommand("stop", "Stop the current task"), BotCommand("help", "Show available commands"), ] @@ -146,7 +147,7 @@ class TelegramChannel(BaseChannel): # Add command handlers self._app.add_handler(CommandHandler("start", self._on_start)) self._app.add_handler(CommandHandler("new", self._forward_command)) - self._app.add_handler(CommandHandler("help", self._forward_command)) + self._app.add_handler(CommandHandler("help", self._on_help)) # Add message handler for text, photos, voice, documents self._app.add_handler( @@ -165,13 +166,13 @@ class TelegramChannel(BaseChannel): # Get bot info and register command menu bot_info = await self._app.bot.get_me() - logger.info(f"Telegram bot @{bot_info.username} connected") + logger.info("Telegram bot @{} connected", bot_info.username) try: await self._app.bot.set_my_commands(self.BOT_COMMANDS) logger.debug("Telegram bot commands registered") except Exception as e: - logger.warning(f"Failed to register bot commands: {e}") + logger.warning("Failed to register bot commands: {}", e) # Start polling (this runs until stopped) await self._app.updater.start_polling( @@ -221,9 +222,18 @@ class TelegramChannel(BaseChannel): try: chat_id = int(msg.chat_id) except ValueError: - logger.error(f"Invalid chat_id: {msg.chat_id}") + logger.error("Invalid chat_id: {}", msg.chat_id) return + reply_params = None + if self.config.reply_to_message: + reply_to_message_id = msg.metadata.get("message_id") + if reply_to_message_id: + reply_params = ReplyParameters( + message_id=reply_to_message_id, + allow_sending_without_reply=True + ) + # Send media files for media_path in (msg.media or []): try: @@ -235,37 +245,65 @@ class TelegramChannel(BaseChannel): }.get(media_type, self._app.bot.send_document) param = "photo" if media_type == "photo" else media_type if media_type in ("voice", "audio") else "document" with open(media_path, 'rb') as f: - await sender(chat_id=chat_id, **{param: f}) + await sender( + chat_id=chat_id, + **{param: f}, + reply_parameters=reply_params + ) except Exception as e: filename = media_path.rsplit("/", 1)[-1] - logger.error(f"Failed to send media {media_path}: {e}") - await self._app.bot.send_message(chat_id=chat_id, text=f"[Failed to send: {filename}]") + logger.error("Failed to send media {}: {}", media_path, e) + await self._app.bot.send_message( + chat_id=chat_id, + text=f"[Failed to send: {filename}]", + reply_parameters=reply_params + ) # Send text content if msg.content and msg.content != "[empty message]": for chunk in _split_message(msg.content): try: html = _markdown_to_telegram_html(chunk) - await self._app.bot.send_message(chat_id=chat_id, text=html, parse_mode="HTML") + await self._app.bot.send_message( + chat_id=chat_id, + text=html, + parse_mode="HTML", + reply_parameters=reply_params + ) except Exception as e: - logger.warning(f"HTML parse failed, falling back to plain text: {e}") + logger.warning("HTML parse failed, falling back to plain text: {}", e) try: - await self._app.bot.send_message(chat_id=chat_id, text=chunk) + await self._app.bot.send_message( + chat_id=chat_id, + text=chunk, + reply_parameters=reply_params + ) except Exception as e2: - logger.error(f"Error sending Telegram message: {e2}") + logger.error("Error sending Telegram message: {}", e2) async def _on_start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Handle /start command.""" if not update.message or not update.effective_user: return - + user = update.effective_user await update.message.reply_text( f"πŸ‘‹ Hi {user.first_name}! I'm nanobot.\n\n" "Send me a message and I'll respond!\n" "Type /help to see available commands." ) - + + async def _on_help(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + """Handle /help command, bypassing ACL so all users can access it.""" + if not update.message: + return + await update.message.reply_text( + "🐈 nanobot commands:\n" + "/new β€” Start a new conversation\n" + "/stop β€” Stop the current task\n" + "/help β€” Show available commands" + ) + @staticmethod def _sender_id(user) -> str: """Build sender_id with username for allowlist matching.""" @@ -344,21 +382,21 @@ class TelegramChannel(BaseChannel): transcriber = GroqTranscriptionProvider(api_key=self.groq_api_key) transcription = await transcriber.transcribe(file_path) if transcription: - logger.info(f"Transcribed {media_type}: {transcription[:50]}...") + logger.info("Transcribed {}: {}...", media_type, transcription[:50]) content_parts.append(f"[transcription: {transcription}]") else: content_parts.append(f"[{media_type}: {file_path}]") else: content_parts.append(f"[{media_type}: {file_path}]") - logger.debug(f"Downloaded {media_type} to {file_path}") + logger.debug("Downloaded {} to {}", media_type, file_path) except Exception as e: - logger.error(f"Failed to download media: {e}") + logger.error("Failed to download media: {}", e) content_parts.append(f"[{media_type}: download failed]") content = "\n".join(content_parts) if content_parts else "[empty message]" - logger.debug(f"Telegram message from {sender_id}: {content[:50]}...") + logger.debug("Telegram message from {}: {}...", sender_id, content[:50]) str_chat_id = str(chat_id) @@ -401,11 +439,11 @@ class TelegramChannel(BaseChannel): except asyncio.CancelledError: pass except Exception as e: - logger.debug(f"Typing indicator stopped for {chat_id}: {e}") + logger.debug("Typing indicator stopped for {}: {}", chat_id, e) async def _on_error(self, update: object, context: ContextTypes.DEFAULT_TYPE) -> None: """Log polling / handler errors instead of silently swallowing them.""" - logger.error(f"Telegram error: {context.error}") + logger.error("Telegram error: {}", context.error) def _get_extension(self, media_type: str, mime_type: str | None) -> str: """Get file extension based on media type.""" diff --git a/nanobot/channels/whatsapp.py b/nanobot/channels/whatsapp.py index 0cf2dd7..f5fb521 100644 --- a/nanobot/channels/whatsapp.py +++ b/nanobot/channels/whatsapp.py @@ -34,7 +34,7 @@ class WhatsAppChannel(BaseChannel): bridge_url = self.config.bridge_url - logger.info(f"Connecting to WhatsApp bridge at {bridge_url}...") + logger.info("Connecting to WhatsApp bridge at {}...", bridge_url) self._running = True @@ -53,14 +53,14 @@ class WhatsAppChannel(BaseChannel): try: await self._handle_bridge_message(message) except Exception as e: - logger.error(f"Error handling bridge message: {e}") + logger.error("Error handling bridge message: {}", e) except asyncio.CancelledError: break except Exception as e: self._connected = False self._ws = None - logger.warning(f"WhatsApp bridge connection error: {e}") + logger.warning("WhatsApp bridge connection error: {}", e) if self._running: logger.info("Reconnecting in 5 seconds...") @@ -87,16 +87,16 @@ class WhatsAppChannel(BaseChannel): "to": msg.chat_id, "text": msg.content } - await self._ws.send(json.dumps(payload)) + await self._ws.send(json.dumps(payload, ensure_ascii=False)) except Exception as e: - logger.error(f"Error sending WhatsApp message: {e}") + logger.error("Error sending WhatsApp message: {}", e) async def _handle_bridge_message(self, raw: str) -> None: """Handle a message from the bridge.""" try: data = json.loads(raw) except json.JSONDecodeError: - logger.warning(f"Invalid JSON from bridge: {raw[:100]}") + logger.warning("Invalid JSON from bridge: {}", raw[:100]) return msg_type = data.get("type") @@ -112,11 +112,11 @@ class WhatsAppChannel(BaseChannel): # Extract just the phone number or lid as chat_id user_id = pn if pn else sender sender_id = user_id.split("@")[0] if "@" in user_id else user_id - logger.info(f"Sender {sender}") + logger.info("Sender {}", sender) # Handle voice transcription if it's a voice message if content == "[Voice Message]": - logger.info(f"Voice message received from {sender_id}, but direct download from bridge is not yet supported.") + logger.info("Voice message received from {}, but direct download from bridge is not yet supported.", sender_id) content = "[Voice Message: Transcription not available for WhatsApp yet]" await self._handle_message( @@ -133,7 +133,7 @@ class WhatsAppChannel(BaseChannel): elif msg_type == "status": # Connection status update status = data.get("status") - logger.info(f"WhatsApp status: {status}") + logger.info("WhatsApp status: {}", status) if status == "connected": self._connected = True @@ -145,4 +145,4 @@ class WhatsAppChannel(BaseChannel): logger.info("Scan QR code in the bridge terminal to connect WhatsApp") elif msg_type == "error": - logger.error(f"WhatsApp bridge error: {data.get('error')}") + logger.error("WhatsApp bridge error: {}", data.get('error')) diff --git a/nanobot/cli/commands.py b/nanobot/cli/commands.py index 2f4ba7b..1c20b50 100644 --- a/nanobot/cli/commands.py +++ b/nanobot/cli/commands.py @@ -199,84 +199,34 @@ def onboard(): def _create_workspace_templates(workspace: Path): - """Create default workspace template files.""" - templates = { - "AGENTS.md": """# Agent Instructions + """Create default workspace template files from bundled templates.""" + from importlib.resources import files as pkg_files -You are a helpful AI assistant. Be concise, accurate, and friendly. + templates_dir = pkg_files("nanobot") / "templates" -## Guidelines + for item in templates_dir.iterdir(): + if not item.name.endswith(".md"): + continue + dest = workspace / item.name + if not dest.exists(): + dest.write_text(item.read_text(encoding="utf-8"), encoding="utf-8") + console.print(f" [dim]Created {item.name}[/dim]") -- Always explain what you're doing before taking actions -- Ask for clarification when the request is ambiguous -- Use tools to help accomplish tasks -- Remember important information in memory/MEMORY.md; past events are logged in memory/HISTORY.md -""", - "SOUL.md": """# Soul - -I am nanobot, a lightweight AI assistant. - -## Personality - -- Helpful and friendly -- Concise and to the point -- Curious and eager to learn - -## Values - -- Accuracy over speed -- User privacy and safety -- Transparency in actions -""", - "USER.md": """# User - -Information about the user goes here. - -## Preferences - -- Communication style: (casual/formal) -- Timezone: (your timezone) -- Language: (your preferred language) -""", - } - - for filename, content in templates.items(): - file_path = workspace / filename - if not file_path.exists(): - file_path.write_text(content) - console.print(f" [dim]Created {filename}[/dim]") - - # Create memory directory and MEMORY.md memory_dir = workspace / "memory" memory_dir.mkdir(exist_ok=True) + + memory_template = templates_dir / "memory" / "MEMORY.md" memory_file = memory_dir / "MEMORY.md" if not memory_file.exists(): - memory_file.write_text("""# Long-term Memory - -This file stores important information that should persist across sessions. - -## User Information - -(Important facts about the user) - -## Preferences - -(User preferences learned over time) - -## Important Notes - -(Things to remember) -""") + memory_file.write_text(memory_template.read_text(encoding="utf-8"), encoding="utf-8") console.print(" [dim]Created memory/MEMORY.md[/dim]") - + history_file = memory_dir / "HISTORY.md" if not history_file.exists(): - history_file.write_text("") + history_file.write_text("", encoding="utf-8") console.print(" [dim]Created memory/HISTORY.md[/dim]") - # Create skills directory for custom user skills - skills_dir = workspace / "skills" - skills_dir.mkdir(exist_ok=True) + (workspace / "skills").mkdir(exist_ok=True) def _make_provider(config: Config): @@ -368,6 +318,7 @@ def gateway( restrict_to_workspace=config.tools.restrict_to_workspace, session_manager=session_manager, mcp_servers=config.tools.mcp_servers, + channels_config=config.channels, ) # Set cron callback (needs agent) @@ -389,20 +340,59 @@ def gateway( return response cron.on_job = on_cron_job - # Create heartbeat service - async def on_heartbeat(prompt: str) -> str: - """Execute heartbeat through the agent.""" - return await agent.process_direct(prompt, session_key="heartbeat") - - heartbeat = HeartbeatService( - workspace=config.workspace_path, - on_heartbeat=on_heartbeat, - interval_s=30 * 60, # 30 minutes - enabled=True - ) - # Create channel manager channels = ChannelManager(config, bus) + + def _pick_heartbeat_target() -> tuple[str, str]: + """Pick a routable channel/chat target for heartbeat-triggered messages.""" + enabled = set(channels.enabled_channels) + # Prefer the most recently updated non-internal session on an enabled channel. + for item in session_manager.list_sessions(): + key = item.get("key") or "" + if ":" not in key: + continue + channel, chat_id = key.split(":", 1) + if channel in {"cli", "system"}: + continue + if channel in enabled and chat_id: + return channel, chat_id + # Fallback keeps prior behavior but remains explicit. + return "cli", "direct" + + # Create heartbeat service + async def on_heartbeat_execute(tasks: str) -> str: + """Phase 2: execute heartbeat tasks through the full agent loop.""" + channel, chat_id = _pick_heartbeat_target() + + async def _silent(*_args, **_kwargs): + pass + + return await agent.process_direct( + tasks, + session_key="heartbeat", + channel=channel, + chat_id=chat_id, + on_progress=_silent, + ) + + async def on_heartbeat_notify(response: str) -> None: + """Deliver a heartbeat response to the user's channel.""" + from nanobot.bus.events import OutboundMessage + channel, chat_id = _pick_heartbeat_target() + if channel == "cli": + return # No external channel available to deliver to + await bus.publish_outbound(OutboundMessage(channel=channel, chat_id=chat_id, content=response)) + + hb_cfg = config.gateway.heartbeat + heartbeat = HeartbeatService( + workspace=config.workspace_path, + provider=provider, + model=agent.model, + on_execute=on_heartbeat_execute, + on_notify=on_heartbeat_notify, + interval_s=hb_cfg.interval_s, + enabled=hb_cfg.enabled, + ) if channels.enabled_channels: console.print(f"[green]βœ“[/green] Channels enabled: {', '.join(channels.enabled_channels)}") @@ -413,7 +403,7 @@ def gateway( if cron_status["jobs"] > 0: console.print(f"[green]βœ“[/green] Cron: {cron_status['jobs']} scheduled jobs") - console.print(f"[green]βœ“[/green] Heartbeat: every 30m") + console.print(f"[green]βœ“[/green] Heartbeat: every {hb_cfg.interval_s}s") async def run(): try: @@ -484,6 +474,7 @@ def agent( cron_service=cron, restrict_to_workspace=config.tools.restrict_to_workspace, mcp_servers=config.tools.mcp_servers, + channels_config=config.channels, ) # Show spinner when logs are off (no output to miss); skip when logs are on @@ -494,31 +485,74 @@ def agent( # Animated spinner is safe to use with prompt_toolkit input handling return console.status("[dim]nanobot is thinking...[/dim]", spinner="dots") - async def _cli_progress(content: str) -> None: + async def _cli_progress(content: str, *, tool_hint: bool = False) -> None: + ch = agent_loop.channels_config + if ch and tool_hint and not ch.send_tool_hints: + return + if ch and not tool_hint and not ch.send_progress: + return console.print(f" [dim]↳ {content}[/dim]") if message: - # Single message mode + # Single message mode β€” direct call, no bus needed async def run_once(): with _thinking_ctx(): response = await agent_loop.process_direct(message, session_id, on_progress=_cli_progress) _print_agent_response(response, render_markdown=markdown) await agent_loop.close_mcp() - + asyncio.run(run_once()) else: - # Interactive mode + # Interactive mode β€” route through bus like other channels + from nanobot.bus.events import InboundMessage _init_prompt_session() console.print(f"{__logo__} Interactive mode (type [bold]exit[/bold] or [bold]Ctrl+C[/bold] to quit)\n") + if ":" in session_id: + cli_channel, cli_chat_id = session_id.split(":", 1) + else: + cli_channel, cli_chat_id = "cli", session_id + def _exit_on_sigint(signum, frame): _restore_terminal() console.print("\nGoodbye!") os._exit(0) signal.signal(signal.SIGINT, _exit_on_sigint) - + async def run_interactive(): + bus_task = asyncio.create_task(agent_loop.run()) + turn_done = asyncio.Event() + turn_done.set() + turn_response: list[str] = [] + + async def _consume_outbound(): + while True: + try: + msg = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0) + if msg.metadata.get("_progress"): + is_tool_hint = msg.metadata.get("_tool_hint", False) + ch = agent_loop.channels_config + if ch and is_tool_hint and not ch.send_tool_hints: + pass + elif ch and not is_tool_hint and not ch.send_progress: + pass + else: + console.print(f" [dim]↳ {msg.content}[/dim]") + elif not turn_done.is_set(): + if msg.content: + turn_response.append(msg.content) + turn_done.set() + elif msg.content: + console.print() + _print_agent_response(msg.content, render_markdown=markdown) + except asyncio.TimeoutError: + continue + except asyncio.CancelledError: + break + + outbound_task = asyncio.create_task(_consume_outbound()) + try: while True: try: @@ -532,10 +566,22 @@ def agent( _restore_terminal() console.print("\nGoodbye!") break - + + turn_done.clear() + turn_response.clear() + + await bus.publish_inbound(InboundMessage( + channel=cli_channel, + sender_id="user", + chat_id=cli_chat_id, + content=user_input, + )) + with _thinking_ctx(): - response = await agent_loop.process_direct(user_input, session_id, on_progress=_cli_progress) - _print_agent_response(response, render_markdown=markdown) + await turn_done.wait() + + if turn_response: + _print_agent_response(turn_response[0], render_markdown=markdown) except KeyboardInterrupt: _restore_terminal() console.print("\nGoodbye!") @@ -545,8 +591,11 @@ def agent( console.print("\nGoodbye!") break finally: + agent_loop.stop() + outbound_task.cancel() + await asyncio.gather(bus_task, outbound_task, return_exceptions=True) await agent_loop.close_mcp() - + asyncio.run(run_interactive()) @@ -622,6 +671,33 @@ def channels_status(): slack_config ) + # DingTalk + dt = config.channels.dingtalk + dt_config = f"client_id: {dt.client_id[:10]}..." if dt.client_id else "[dim]not configured[/dim]" + table.add_row( + "DingTalk", + "βœ“" if dt.enabled else "βœ—", + dt_config + ) + + # QQ + qq = config.channels.qq + qq_config = f"app_id: {qq.app_id[:10]}..." if qq.app_id else "[dim]not configured[/dim]" + table.add_row( + "QQ", + "βœ“" if qq.enabled else "βœ—", + qq_config + ) + + # Email + em = config.channels.email + em_config = em.imap_host if em.imap_host else "[dim]not configured[/dim]" + table.add_row( + "Email", + "βœ“" if em.enabled else "βœ—", + em_config + ) + console.print(table) @@ -805,15 +881,19 @@ def cron_add( store_path = get_data_dir() / "cron" / "jobs.json" service = CronService(store_path) - job = service.add_job( - name=name, - schedule=schedule, - message=message, - deliver=deliver, - to=to, - channel=channel, - ) - + try: + job = service.add_job( + name=name, + schedule=schedule, + message=message, + deliver=deliver, + to=to, + channel=channel, + ) + except ValueError as e: + console.print(f"[red]Error: {e}[/red]") + raise typer.Exit(1) from e + console.print(f"[green]βœ“[/green] Added job '{job.name}' ({job.id})") @@ -860,17 +940,57 @@ def cron_run( force: bool = typer.Option(False, "--force", "-f", help="Run even if disabled"), ): """Manually run a job.""" - from nanobot.config.loader import get_data_dir + from loguru import logger + from nanobot.config.loader import load_config, get_data_dir from nanobot.cron.service import CronService - + from nanobot.cron.types import CronJob + from nanobot.bus.queue import MessageBus + from nanobot.agent.loop import AgentLoop + logger.disable("nanobot") + + config = load_config() + provider = _make_provider(config) + bus = MessageBus() + agent_loop = AgentLoop( + bus=bus, + provider=provider, + workspace=config.workspace_path, + model=config.agents.defaults.model, + temperature=config.agents.defaults.temperature, + max_tokens=config.agents.defaults.max_tokens, + max_iterations=config.agents.defaults.max_tool_iterations, + memory_window=config.agents.defaults.memory_window, + brave_api_key=config.tools.web.search.api_key or None, + exec_config=config.tools.exec, + restrict_to_workspace=config.tools.restrict_to_workspace, + mcp_servers=config.tools.mcp_servers, + channels_config=config.channels, + ) + store_path = get_data_dir() / "cron" / "jobs.json" service = CronService(store_path) - + + result_holder = [] + + async def on_job(job: CronJob) -> str | None: + response = await agent_loop.process_direct( + job.payload.message, + session_key=f"cron:{job.id}", + channel=job.payload.channel or "cli", + chat_id=job.payload.to or "direct", + ) + result_holder.append(response) + return response + + service.on_job = on_job + async def run(): return await service.run_job(job_id, force=force) - + if asyncio.run(run()): - console.print(f"[green]βœ“[/green] Job executed") + console.print("[green]βœ“[/green] Job executed") + if result_holder: + _print_agent_response(result_holder[0], render_markdown=True) else: console.print(f"[red]Failed to run job {job_id}[/red]") diff --git a/nanobot/config/loader.py b/nanobot/config/loader.py index 560c1f5..c789efd 100644 --- a/nanobot/config/loader.py +++ b/nanobot/config/loader.py @@ -31,7 +31,7 @@ def load_config(config_path: Path | None = None) -> Config: if path.exists(): try: - with open(path) as f: + with open(path, encoding="utf-8") as f: data = json.load(f) data = _migrate_config(data) return Config.model_validate(data) @@ -55,8 +55,8 @@ def save_config(config: Config, config_path: Path | None = None) -> None: data = config.model_dump(by_alias=True) - with open(path, "w") as f: - json.dump(data, f, indent=2) + with open(path, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2, ensure_ascii=False) def _migrate_config(data: dict) -> dict: diff --git a/nanobot/config/schema.py b/nanobot/config/schema.py index 27bba4d..c577606 100644 --- a/nanobot/config/schema.py +++ b/nanobot/config/schema.py @@ -30,6 +30,7 @@ class TelegramConfig(Base): token: str = "" # Bot token from @BotFather allow_from: list[str] = Field(default_factory=list) # Allowed user IDs or usernames proxy: str | None = None # HTTP/SOCKS5 proxy URL, e.g. "http://127.0.0.1:7890" or "socks5://127.0.0.1:1080" + reply_to_message: bool = False # If true, bot replies quote the original message class FeishuConfig(Base): @@ -189,6 +190,8 @@ class QQConfig(Base): class ChannelsConfig(Base): """Configuration for chat channels.""" + send_progress: bool = True # stream agent's text progress to the channel + send_tool_hints: bool = False # stream tool-call hints (e.g. read_file("…")) whatsapp: WhatsAppConfig = Field(default_factory=WhatsAppConfig) telegram: TelegramConfig = Field(default_factory=TelegramConfig) discord: DiscordConfig = Field(default_factory=DiscordConfig) @@ -206,10 +209,11 @@ class AgentDefaults(Base): workspace: str = "~/.nanobot/workspace" model: str = "anthropic/claude-opus-4-5" + provider: str = "auto" # Provider name (e.g. "anthropic", "openrouter") or "auto" for auto-detection max_tokens: int = 8192 - temperature: float = 0.7 - max_tool_iterations: int = 20 - memory_window: int = 50 + temperature: float = 0.1 + max_tool_iterations: int = 40 + memory_window: int = 100 class AgentsConfig(Base): @@ -243,15 +247,24 @@ class ProvidersConfig(Base): minimax: ProviderConfig = Field(default_factory=ProviderConfig) aihubmix: ProviderConfig = Field(default_factory=ProviderConfig) # AiHubMix API gateway siliconflow: ProviderConfig = Field(default_factory=ProviderConfig) # SiliconFlow (η‘…εŸΊζ΅εŠ¨) API gateway + volcengine: ProviderConfig = Field(default_factory=ProviderConfig) # VolcEngine (η«ε±±εΌ•ζ“Ž) API gateway openai_codex: ProviderConfig = Field(default_factory=ProviderConfig) # OpenAI Codex (OAuth) github_copilot: ProviderConfig = Field(default_factory=ProviderConfig) # Github Copilot (OAuth) +class HeartbeatConfig(Base): + """Heartbeat service configuration.""" + + enabled: bool = True + interval_s: int = 30 * 60 # 30 minutes + + class GatewayConfig(Base): """Gateway/server configuration.""" host: str = "0.0.0.0" port: int = 18790 + heartbeat: HeartbeatConfig = Field(default_factory=HeartbeatConfig) class WebSearchConfig(Base): @@ -271,6 +284,7 @@ class ExecToolConfig(Base): """Shell exec tool configuration.""" timeout: int = 60 + path_append: str = "" class MCPServerConfig(Base): @@ -280,6 +294,8 @@ class MCPServerConfig(Base): args: list[str] = Field(default_factory=list) # Stdio: command arguments env: dict[str, str] = Field(default_factory=dict) # Stdio: extra env vars url: str = "" # HTTP: streamable HTTP endpoint URL + headers: dict[str, str] = Field(default_factory=dict) # HTTP: Custom HTTP Headers + tool_timeout: int = 30 # Seconds before a tool call is cancelled class ToolsConfig(Base): @@ -309,6 +325,11 @@ class Config(BaseSettings): """Match provider config and its registry name. Returns (config, spec_name).""" from nanobot.providers.registry import PROVIDERS + forced = self.agents.defaults.provider + if forced != "auto": + p = getattr(self.providers, forced, None) + return (p, forced) if p else (None, None) + model_lower = (model or self.agents.defaults.model).lower() model_normalized = model_lower.replace("-", "_") model_prefix = model_lower.split("/", 1)[0] if "/" in model_lower else "" diff --git a/nanobot/cron/service.py b/nanobot/cron/service.py index 14666e8..6889a10 100644 --- a/nanobot/cron/service.py +++ b/nanobot/cron/service.py @@ -45,6 +45,20 @@ def _compute_next_run(schedule: CronSchedule, now_ms: int) -> int | None: return None +def _validate_schedule_for_add(schedule: CronSchedule) -> None: + """Validate schedule fields that would otherwise create non-runnable jobs.""" + if schedule.tz and schedule.kind != "cron": + raise ValueError("tz can only be used with cron schedules") + + if schedule.kind == "cron" and schedule.tz: + try: + from zoneinfo import ZoneInfo + + ZoneInfo(schedule.tz) + except Exception: + raise ValueError(f"unknown timezone '{schedule.tz}'") from None + + class CronService: """Service for managing and executing scheduled jobs.""" @@ -66,7 +80,7 @@ class CronService: if self.store_path.exists(): try: - data = json.loads(self.store_path.read_text()) + data = json.loads(self.store_path.read_text(encoding="utf-8")) jobs = [] for j in data.get("jobs", []): jobs.append(CronJob( @@ -99,7 +113,7 @@ class CronService: )) self._store = CronStore(jobs=jobs) except Exception as e: - logger.warning(f"Failed to load cron store: {e}") + logger.warning("Failed to load cron store: {}", e) self._store = CronStore() else: self._store = CronStore() @@ -148,7 +162,7 @@ class CronService: ] } - self.store_path.write_text(json.dumps(data, indent=2)) + self.store_path.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8") async def start(self) -> None: """Start the cron service.""" @@ -157,7 +171,7 @@ class CronService: self._recompute_next_runs() self._save_store() self._arm_timer() - logger.info(f"Cron service started with {len(self._store.jobs if self._store else [])} jobs") + logger.info("Cron service started with {} jobs", len(self._store.jobs if self._store else [])) def stop(self) -> None: """Stop the cron service.""" @@ -222,7 +236,7 @@ class CronService: async def _execute_job(self, job: CronJob) -> None: """Execute a single job.""" start_ms = _now_ms() - logger.info(f"Cron: executing job '{job.name}' ({job.id})") + logger.info("Cron: executing job '{}' ({})", job.name, job.id) try: response = None @@ -231,12 +245,12 @@ class CronService: job.state.last_status = "ok" job.state.last_error = None - logger.info(f"Cron: job '{job.name}' completed") + logger.info("Cron: job '{}' completed", job.name) except Exception as e: job.state.last_status = "error" job.state.last_error = str(e) - logger.error(f"Cron: job '{job.name}' failed: {e}") + logger.error("Cron: job '{}' failed: {}", job.name, e) job.state.last_run_at_ms = start_ms job.updated_at_ms = _now_ms() @@ -272,6 +286,7 @@ class CronService: ) -> CronJob: """Add a new job.""" store = self._load_store() + _validate_schedule_for_add(schedule) now = _now_ms() job = CronJob( @@ -296,7 +311,7 @@ class CronService: self._save_store() self._arm_timer() - logger.info(f"Cron: added job '{name}' ({job.id})") + logger.info("Cron: added job '{}' ({})", name, job.id) return job def remove_job(self, job_id: str) -> bool: @@ -309,7 +324,7 @@ class CronService: if removed: self._save_store() self._arm_timer() - logger.info(f"Cron: removed job {job_id}") + logger.info("Cron: removed job {}", job_id) return removed diff --git a/nanobot/heartbeat/service.py b/nanobot/heartbeat/service.py index 221ed27..e534017 100644 --- a/nanobot/heartbeat/service.py +++ b/nanobot/heartbeat/service.py @@ -1,92 +1,130 @@ """Heartbeat service - periodic agent wake-up to check for tasks.""" +from __future__ import annotations + import asyncio from pathlib import Path -from typing import Any, Callable, Coroutine +from typing import TYPE_CHECKING, Any, Callable, Coroutine from loguru import logger -# Default interval: 30 minutes -DEFAULT_HEARTBEAT_INTERVAL_S = 30 * 60 +if TYPE_CHECKING: + from nanobot.providers.base import LLMProvider -# The prompt sent to agent during heartbeat -HEARTBEAT_PROMPT = """Read HEARTBEAT.md in your workspace (if it exists). -Follow any instructions or tasks listed there. -If nothing needs attention, reply with just: HEARTBEAT_OK""" - -# Token that indicates "nothing to do" -HEARTBEAT_OK_TOKEN = "HEARTBEAT_OK" - - -def _is_heartbeat_empty(content: str | None) -> bool: - """Check if HEARTBEAT.md has no actionable content.""" - if not content: - return True - - # Lines to skip: empty, headers, HTML comments, empty checkboxes - skip_patterns = {"- [ ]", "* [ ]", "- [x]", "* [x]"} - - for line in content.split("\n"): - line = line.strip() - if not line or line.startswith("#") or line.startswith("