diff --git a/README.md b/README.md index 68ad5a9..cb751ba 100644 --- a/README.md +++ b/README.md @@ -16,10 +16,14 @@ ⚑️ Delivers core agent functionality in just **~4,000** lines of code β€” **99% smaller** than Clawdbot's 430k+ lines. -πŸ“ Real-time line count: **3,827 lines** (run `bash core_agent_lines.sh` to verify anytime) +πŸ“ Real-time line count: **3,806 lines** (run `bash core_agent_lines.sh` to verify anytime) ## πŸ“’ News +- **2026-02-21** πŸŽ‰ Released **v0.1.4.post1** β€” new providers, media support across channels, and major stability improvements. See [release notes](https://github.com/HKUDS/nanobot/releases/tag/v0.1.4.post1) for details. +- **2026-02-20** 🐦 Feishu now receives multimodal files from users. More reliable memory under the hood. +- **2026-02-19** ✨ Slack now sends files, Discord splits long messages, and subagents work in CLI mode. +- **2026-02-18** ⚑️ nanobot now supports VolcEngine, MCP custom auth headers, and Anthropic prompt caching. - **2026-02-17** πŸŽ‰ Released **v0.1.4** β€” MCP support, progress streaming, new providers, and multiple channel improvements. Please see [release notes](https://github.com/HKUDS/nanobot/releases/tag/v0.1.4) for details. - **2026-02-16** 🦞 nanobot now integrates a [ClawHub](https://clawhub.ai) skill β€” search and install public agent skills. - **2026-02-15** πŸ”‘ nanobot now supports OpenAI Codex provider with OAuth login support. @@ -27,13 +31,13 @@ - **2026-02-13** πŸŽ‰ Released **v0.1.3.post7** β€” includes security hardening and multiple improvements. **Please upgrade to the latest version to address security issues**. See [release notes](https://github.com/HKUDS/nanobot/releases/tag/v0.1.3.post7) for more details. - **2026-02-12** 🧠 Redesigned memory system β€” Less code, more reliable. Join the [discussion](https://github.com/HKUDS/nanobot/discussions/566) about it! - **2026-02-11** ✨ Enhanced CLI experience and added MiniMax support! -- **2026-02-10** πŸŽ‰ Released **v0.1.3.post6** with improvements! Check the updates [notes](https://github.com/HKUDS/nanobot/releases/tag/v0.1.3.post6) and our [roadmap](https://github.com/HKUDS/nanobot/discussions/431). -- **2026-02-09** πŸ’¬ Added Slack, Email, and QQ support β€” nanobot now supports multiple chat platforms! -- **2026-02-08** πŸ”§ Refactored Providersβ€”adding a new LLM provider now takes just 2 simple steps! Check [here](#providers).
Earlier news +- **2026-02-10** πŸŽ‰ Released **v0.1.3.post6** with improvements! Check the updates [notes](https://github.com/HKUDS/nanobot/releases/tag/v0.1.3.post6) and our [roadmap](https://github.com/HKUDS/nanobot/discussions/431). +- **2026-02-09** πŸ’¬ Added Slack, Email, and QQ support β€” nanobot now supports multiple chat platforms! +- **2026-02-08** πŸ”§ Refactored Providersβ€”adding a new LLM provider now takes just 2 simple steps! Check [here](#providers). - **2026-02-07** πŸš€ Released **v0.1.3.post5** with Qwen support & several key improvements! Check [here](https://github.com/HKUDS/nanobot/releases/tag/v0.1.3.post5) for details. - **2026-02-06** ✨ Added Moonshot/Kimi provider, Discord integration, and enhanced security hardening! - **2026-02-05** ✨ Added Feishu channel, DeepSeek provider, and enhanced scheduled tasks support! diff --git a/nanobot/agent/context.py b/nanobot/agent/context.py index 67aad0c..c5869f3 100644 --- a/nanobot/agent/context.py +++ b/nanobot/agent/context.py @@ -82,12 +82,7 @@ Skills with available="false" need dependencies installed first - you can try in return f"""# nanobot 🐈 -You are nanobot, a helpful AI assistant. You have access to tools that allow you to: -- Read, write, and edit files -- Execute shell commands -- Search the web and fetch web pages -- Send messages to users on chat channels -- Spawn subagents for complex background tasks +You are nanobot, a helpful AI assistant. ## Current Time {now} ({tz}) @@ -106,6 +101,7 @@ Only use the 'message' tool when you need to send a message to a specific chat c For normal conversation, just respond with text - do not call the message tool. Always be helpful, accurate, and concise. Before calling tools, briefly tell the user what you're about to do (one short sentence in the user's language). +If you need to use tools, call them directly β€” never send a preliminary message like "Let me check" without actually calling a tool. When remembering something important, write to {workspace_path}/memory/MEMORY.md To recall past events, grep {workspace_path}/memory/HISTORY.md""" @@ -235,7 +231,7 @@ To recall past events, grep {workspace_path}/memory/HISTORY.md""" msg["tool_calls"] = tool_calls # Include reasoning content when provided (required by some thinking models) - if reasoning_content: + if reasoning_content is not None: msg["reasoning_content"] = reasoning_content messages.append(msg) diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index 06ab1f7..5762fa9 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -1,29 +1,35 @@ """Agent loop: the core processing engine.""" +from __future__ import annotations + import asyncio -from contextlib import AsyncExitStack import json -import json_repair -from pathlib import Path import re -from typing import Any, Awaitable, Callable +from contextlib import AsyncExitStack +from pathlib import Path +from typing import TYPE_CHECKING, Any, Awaitable, Callable from loguru import logger +from nanobot.agent.context import ContextBuilder +from nanobot.agent.memory import MemoryStore +from nanobot.agent.subagent import SubagentManager +from nanobot.agent.tools.cron import CronTool +from nanobot.agent.tools.filesystem import EditFileTool, ListDirTool, ReadFileTool, WriteFileTool +from nanobot.agent.tools.message import MessageTool +from nanobot.agent.tools.registry import ToolRegistry +from nanobot.agent.tools.shell import ExecTool +from nanobot.agent.tools.spawn import SpawnTool +from nanobot.agent.tools.web import WebFetchTool, WebSearchTool from nanobot.bus.events import InboundMessage, OutboundMessage from nanobot.bus.queue import MessageBus from nanobot.providers.base import LLMProvider -from nanobot.agent.context import ContextBuilder -from nanobot.agent.tools.registry import ToolRegistry -from nanobot.agent.tools.filesystem import ReadFileTool, WriteFileTool, EditFileTool, ListDirTool -from nanobot.agent.tools.shell import ExecTool -from nanobot.agent.tools.web import WebSearchTool, WebFetchTool -from nanobot.agent.tools.message import MessageTool -from nanobot.agent.tools.spawn import SpawnTool -from nanobot.agent.tools.cron import CronTool -from nanobot.agent.subagent import SubagentManager from nanobot.session.manager import Session, SessionManager +if TYPE_CHECKING: + from nanobot.config.schema import ExecToolConfig + from nanobot.cron.service import CronService + class AgentLoop: """ @@ -48,14 +54,13 @@ class AgentLoop: max_tokens: int = 4096, memory_window: int = 50, brave_api_key: str | None = None, - exec_config: "ExecToolConfig | None" = None, - cron_service: "CronService | None" = None, + exec_config: ExecToolConfig | None = None, + cron_service: CronService | None = None, restrict_to_workspace: bool = False, session_manager: SessionManager | None = None, mcp_servers: dict | None = None, ): from nanobot.config.schema import ExecToolConfig - from nanobot.cron.service import CronService self.bus = bus self.provider = provider self.workspace = workspace @@ -83,57 +88,55 @@ class AgentLoop: exec_config=self.exec_config, restrict_to_workspace=restrict_to_workspace, ) - + self._running = False self._mcp_servers = mcp_servers or {} self._mcp_stack: AsyncExitStack | None = None self._mcp_connected = False + self._mcp_connecting = False self._consolidating: set[str] = set() # Session keys with consolidation in progress self._consolidation_tasks: set[asyncio.Task] = set() # Strong refs to in-flight tasks self._consolidation_locks: dict[str, asyncio.Lock] = {} self._register_default_tools() - + def _register_default_tools(self) -> None: """Register the default set of tools.""" - # File tools (workspace for relative paths, restrict if configured) allowed_dir = self.workspace if self.restrict_to_workspace else None - self.tools.register(ReadFileTool(workspace=self.workspace, allowed_dir=allowed_dir)) - self.tools.register(WriteFileTool(workspace=self.workspace, allowed_dir=allowed_dir)) - self.tools.register(EditFileTool(workspace=self.workspace, allowed_dir=allowed_dir)) - self.tools.register(ListDirTool(workspace=self.workspace, allowed_dir=allowed_dir)) - - # Shell tool + for cls in (ReadFileTool, WriteFileTool, EditFileTool, ListDirTool): + self.tools.register(cls(workspace=self.workspace, allowed_dir=allowed_dir)) self.tools.register(ExecTool( working_dir=str(self.workspace), timeout=self.exec_config.timeout, restrict_to_workspace=self.restrict_to_workspace, )) - - # Web tools self.tools.register(WebSearchTool(api_key=self.brave_api_key)) self.tools.register(WebFetchTool()) - - # Message tool - message_tool = MessageTool(send_callback=self.bus.publish_outbound) - self.tools.register(message_tool) - - # Spawn tool (for subagents) - spawn_tool = SpawnTool(manager=self.subagents) - self.tools.register(spawn_tool) - - # Cron tool (for scheduling) + self.tools.register(MessageTool(send_callback=self.bus.publish_outbound)) + self.tools.register(SpawnTool(manager=self.subagents)) if self.cron_service: self.tools.register(CronTool(self.cron_service)) - + async def _connect_mcp(self) -> None: """Connect to configured MCP servers (one-time, lazy).""" - if self._mcp_connected or not self._mcp_servers: + if self._mcp_connected or self._mcp_connecting or not self._mcp_servers: return - self._mcp_connected = True + self._mcp_connecting = True from nanobot.agent.tools.mcp import connect_mcp_servers - self._mcp_stack = AsyncExitStack() - await self._mcp_stack.__aenter__() - await connect_mcp_servers(self._mcp_servers, self.tools, self._mcp_stack) + try: + self._mcp_stack = AsyncExitStack() + await self._mcp_stack.__aenter__() + await connect_mcp_servers(self._mcp_servers, self.tools, self._mcp_stack) + self._mcp_connected = True + except Exception as e: + logger.error("Failed to connect MCP servers (will retry next message): {}", e) + if self._mcp_stack: + try: + await self._mcp_stack.aclose() + except Exception: + pass + self._mcp_stack = None + finally: + self._mcp_connecting = False def _set_tool_context(self, channel: str, chat_id: str, message_id: str | None = None) -> None: """Update context for all tools that need routing info.""" @@ -171,21 +174,11 @@ class AgentLoop: initial_messages: list[dict], on_progress: Callable[[str], Awaitable[None]] | None = None, ) -> tuple[str | None, list[str]]: - """ - Run the agent iteration loop. - - Args: - initial_messages: Starting messages for the LLM conversation. - on_progress: Optional callback to push intermediate content to the user. - - Returns: - Tuple of (final_content, list_of_tools_used). - """ + """Run the agent iteration loop. Returns (final_content, tools_used).""" messages = initial_messages iteration = 0 final_content = None tools_used: list[str] = [] - text_only_retried = False while iteration < self.max_iterations: iteration += 1 @@ -231,17 +224,6 @@ class AgentLoop: ) else: final_content = self._strip_think(response.content) - # Some models send an interim text response before tool calls. - # Give them one retry; don't forward the text to avoid duplicates. - if not tools_used and not text_only_retried and final_content: - text_only_retried = True - logger.debug("Interim text response (no tools used yet), retrying: {}", final_content[:80]) - messages = self.context.add_assistant_message( - messages, response.content, - reasoning_content=response.reasoning_content, - ) - final_content = None - continue break return final_content, tools_used @@ -260,8 +242,12 @@ class AgentLoop: ) try: response = await self._process_message(msg) - if response: + if response is not None: await self.bus.publish_outbound(response) + elif msg.channel == "cli": + await self.bus.publish_outbound(OutboundMessage( + channel=msg.channel, chat_id=msg.chat_id, content="", metadata=msg.metadata or {}, + )) except Exception as e: logger.error("Error processing message: {}", e) await self.bus.publish_outbound(OutboundMessage( @@ -271,7 +257,7 @@ class AgentLoop: )) except asyncio.TimeoutError: continue - + async def close_mcp(self) -> None: """Close MCP connections.""" if self._mcp_stack: @@ -294,76 +280,77 @@ class AgentLoop: return lock def _prune_consolidation_lock(self, session_key: str, lock: asyncio.Lock) -> None: - """Drop unused per-session lock entries to avoid unbounded growth.""" - waiters = getattr(lock, "_waiters", None) - has_waiters = bool(waiters) - if lock.locked() or has_waiters: - return - self._consolidation_locks.pop(session_key, None) - + """Drop lock entry if no longer in use.""" + if not lock.locked(): + self._consolidation_locks.pop(session_key, None) + async def _process_message( self, msg: InboundMessage, session_key: str | None = None, on_progress: Callable[[str], Awaitable[None]] | None = None, ) -> OutboundMessage | None: - """ - Process a single inbound message. - - Args: - msg: The inbound message to process. - session_key: Override session key (used by process_direct). - on_progress: Optional callback for intermediate output (defaults to bus publish). - - Returns: - The response message, or None if no response needed. - """ - # System messages route back via chat_id ("channel:chat_id") + """Process a single inbound message and return the response.""" + # System messages: parse origin from chat_id ("channel:chat_id") if msg.channel == "system": - return await self._process_system_message(msg) - + channel, chat_id = (msg.chat_id.split(":", 1) if ":" in msg.chat_id + else ("cli", msg.chat_id)) + logger.info("Processing system message from {}", msg.sender_id) + key = f"{channel}:{chat_id}" + session = self.sessions.get_or_create(key) + self._set_tool_context(channel, chat_id, msg.metadata.get("message_id")) + messages = self.context.build_messages( + history=session.get_history(max_messages=self.memory_window), + current_message=msg.content, channel=channel, chat_id=chat_id, + ) + final_content, _ = await self._run_agent_loop(messages) + session.add_message("user", f"[System: {msg.sender_id}] {msg.content}") + session.add_message("assistant", final_content or "Background task completed.") + self.sessions.save(session) + return OutboundMessage(channel=channel, chat_id=chat_id, + content=final_content or "Background task completed.") + preview = msg.content[:80] + "..." if len(msg.content) > 80 else msg.content logger.info("Processing message from {}:{}: {}", msg.channel, msg.sender_id, preview) - + key = session_key or msg.session_key session = self.sessions.get_or_create(key) - - # Handle slash commands + + # Slash commands cmd = msg.content.strip().lower() if cmd == "/new": lock = self._get_consolidation_lock(session.key) - messages_to_archive: list[dict[str, Any]] = [] + self._consolidating.add(session.key) try: async with lock: - messages_to_archive = session.messages[session.last_consolidated :].copy() - temp_session = Session(key=session.key) - temp_session.messages = messages_to_archive - archived = await self._consolidate_memory(temp_session, archive_all=True) - except Exception as e: - logger.error("/new archival failed for {}: {}", session.key, e) + snapshot = session.messages[session.last_consolidated:] + if snapshot: + temp = Session(key=session.key) + temp.messages = list(snapshot) + if not await self._consolidate_memory(temp, archive_all=True): + return OutboundMessage( + channel=msg.channel, chat_id=msg.chat_id, + content="Memory archival failed, session not cleared. Please try again.", + ) + except Exception: + logger.exception("/new archival failed for {}", session.key) return OutboundMessage( - channel=msg.channel, - chat_id=msg.chat_id, - content="Could not start a new session because memory archival failed. Please try again.", - ) - - if messages_to_archive and not archived: - return OutboundMessage( - channel=msg.channel, - chat_id=msg.chat_id, - content="Could not start a new session because memory archival failed. Please try again.", + channel=msg.channel, chat_id=msg.chat_id, + content="Memory archival failed, session not cleared. Please try again.", ) + finally: + self._consolidating.discard(session.key) + self._prune_consolidation_lock(session.key, lock) session.clear() self.sessions.save(session) self.sessions.invalidate(session.key) - self._prune_consolidation_lock(session.key, lock) return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, - content="New session started. Memory consolidation in progress.") + content="New session started.") if cmd == "/help": return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, content="🐈 nanobot commands:\n/new β€” Start a new conversation\n/help β€” Show available commands") - + if len(session.messages) > self.memory_window and session.key not in self._consolidating: self._consolidating.add(session.key) lock = self._get_consolidation_lock(session.key) @@ -383,18 +370,22 @@ class AgentLoop: self._consolidation_tasks.add(_task) self._set_tool_context(msg.channel, msg.chat_id, msg.metadata.get("message_id")) + if message_tool := self.tools.get("message"): + if isinstance(message_tool, MessageTool): + message_tool.start_turn() + initial_messages = self.context.build_messages( history=session.get_history(max_messages=self.memory_window), current_message=msg.content, media=msg.media if msg.media else None, - channel=msg.channel, - chat_id=msg.chat_id, + channel=msg.channel, chat_id=msg.chat_id, ) async def _bus_progress(content: str) -> None: + meta = dict(msg.metadata or {}) + meta["_progress"] = True await self.bus.publish_outbound(OutboundMessage( - channel=msg.channel, chat_id=msg.chat_id, content=content, - metadata=msg.metadata or {}, + channel=msg.channel, chat_id=msg.chat_id, content=content, metadata=meta, )) final_content, tools_used = await self._run_agent_loop( @@ -403,165 +394,30 @@ class AgentLoop: if final_content is None: final_content = "I've completed processing but have no response to give." - + preview = final_content[:120] + "..." if len(final_content) > 120 else final_content logger.info("Response to {}:{}: {}", msg.channel, msg.sender_id, preview) - + session.add_message("user", msg.content) session.add_message("assistant", final_content, tools_used=tools_used if tools_used else None) self.sessions.save(session) - - return OutboundMessage( - channel=msg.channel, - chat_id=msg.chat_id, - content=final_content, - metadata=msg.metadata or {}, # Pass through for channel-specific needs (e.g. Slack thread_ts) - ) - - async def _process_system_message(self, msg: InboundMessage) -> OutboundMessage | None: - """ - Process a system message (e.g., subagent announce). - - The chat_id field contains "original_channel:original_chat_id" to route - the response back to the correct destination. - """ - logger.info("Processing system message from {}", msg.sender_id) - - # Parse origin from chat_id (format: "channel:chat_id") - if ":" in msg.chat_id: - parts = msg.chat_id.split(":", 1) - origin_channel = parts[0] - origin_chat_id = parts[1] - else: - # Fallback - origin_channel = "cli" - origin_chat_id = msg.chat_id - - session_key = f"{origin_channel}:{origin_chat_id}" - session = self.sessions.get_or_create(session_key) - self._set_tool_context(origin_channel, origin_chat_id, msg.metadata.get("message_id")) - initial_messages = self.context.build_messages( - history=session.get_history(max_messages=self.memory_window), - current_message=msg.content, - channel=origin_channel, - chat_id=origin_chat_id, - ) - final_content, _ = await self._run_agent_loop(initial_messages) - if final_content is None: - final_content = "Background task completed." - - session.add_message("user", f"[System: {msg.sender_id}] {msg.content}") - session.add_message("assistant", final_content) - self.sessions.save(session) - + if message_tool := self.tools.get("message"): + if isinstance(message_tool, MessageTool) and message_tool._sent_in_turn: + return None + return OutboundMessage( - channel=origin_channel, - chat_id=origin_chat_id, - content=final_content + channel=msg.channel, chat_id=msg.chat_id, content=final_content, + metadata=msg.metadata or {}, ) - + async def _consolidate_memory(self, session, archive_all: bool = False) -> bool: - """Consolidate old messages into MEMORY.md + HISTORY.md. - - Args: - archive_all: If True, clear all messages and reset session (for /new command). - If False, only write to files without modifying session. - """ - memory = self.context.memory - - if archive_all: - old_messages = session.messages - keep_count = 0 - logger.info("Memory consolidation (archive_all): {} total messages archived", len(session.messages)) - else: - keep_count = self.memory_window // 2 - if len(session.messages) <= keep_count: - logger.debug("Session {}: No consolidation needed (messages={}, keep={})", session.key, len(session.messages), keep_count) - return True - - messages_to_process = len(session.messages) - session.last_consolidated - if messages_to_process <= 0: - logger.debug("Session {}: No new messages to consolidate (last_consolidated={}, total={})", session.key, session.last_consolidated, len(session.messages)) - return True - - old_messages = session.messages[session.last_consolidated:-keep_count] - if not old_messages: - return True - logger.info("Memory consolidation started: {} total, {} new to consolidate, {} keep", len(session.messages), len(old_messages), keep_count) - - lines = [] - for m in old_messages: - if not m.get("content"): - continue - tools = f" [tools: {', '.join(m['tools_used'])}]" if m.get("tools_used") else "" - lines.append(f"[{m.get('timestamp', '?')[:16]}] {m['role'].upper()}{tools}: {m['content']}") - conversation = "\n".join(lines) - current_memory = memory.read_long_term() - - prompt = f"""You are a memory consolidation agent. Process this conversation and return a JSON object with exactly two keys: - -1. "history_entry": A paragraph (2-5 sentences) summarizing the key events/decisions/topics. Start with a timestamp like [YYYY-MM-DD HH:MM]. Include enough detail to be useful when found by grep search later. - -2. "memory_update": The updated long-term memory content. Add any new facts: user location, preferences, personal info, habits, project context, technical decisions, tools/services used. If nothing new, return the existing content unchanged. - -## Current Long-term Memory -{current_memory or "(empty)"} - -## Conversation to Process -{conversation} - -**IMPORTANT**: Both values MUST be strings, not objects or arrays. - -Example: -{{ - "history_entry": "[2026-02-14 22:50] User asked about...", - "memory_update": "- Host: HARRYBOOK-T14P\n- Name: Nado" -}} - -Respond with ONLY valid JSON, no markdown fences.""" - - try: - response = await self.provider.chat( - messages=[ - {"role": "system", "content": "You are a memory consolidation agent. Respond only with valid JSON."}, - {"role": "user", "content": prompt}, - ], - model=self.model, - ) - text = (response.content or "").strip() - if not text: - logger.warning("Memory consolidation: LLM returned empty response, skipping") - return False - if text.startswith("```"): - text = text.split("\n", 1)[-1].rsplit("```", 1)[0].strip() - result = json_repair.loads(text) - if not isinstance(result, dict): - logger.warning("Memory consolidation: unexpected response type, skipping. Response: {}", text[:200]) - return False - - if entry := result.get("history_entry"): - # Defensive: ensure entry is a string (LLM may return dict) - if not isinstance(entry, str): - entry = json.dumps(entry, ensure_ascii=False) - memory.append_history(entry) - if update := result.get("memory_update"): - # Defensive: ensure update is a string - if not isinstance(update, str): - update = json.dumps(update, ensure_ascii=False) - if update != current_memory: - memory.write_long_term(update) - - if archive_all: - session.last_consolidated = 0 - else: - session.last_consolidated = len(session.messages) - keep_count - logger.info("Memory consolidation done: {} messages, last_consolidated={}", len(session.messages), session.last_consolidated) - return True - except Exception as e: - logger.error("Memory consolidation failed: {}", e) - return False + """Delegate to MemoryStore.consolidate(). Returns True on success.""" + return await MemoryStore(self.workspace).consolidate( + session, self.provider, self.model, + archive_all=archive_all, memory_window=self.memory_window, + ) async def process_direct( self, @@ -571,26 +427,8 @@ Respond with ONLY valid JSON, no markdown fences.""" chat_id: str = "direct", on_progress: Callable[[str], Awaitable[None]] | None = None, ) -> str: - """ - Process a message directly (for CLI or cron usage). - - Args: - content: The message content. - session_key: Session identifier (overrides channel:chat_id for session lookup). - channel: Source channel (for tool context routing). - chat_id: Source chat ID (for tool context routing). - on_progress: Optional callback for intermediate output. - - Returns: - The agent's response. - """ + """Process a message directly (for CLI or cron usage).""" await self._connect_mcp() - msg = InboundMessage( - channel=channel, - sender_id="user", - chat_id=chat_id, - content=content - ) - + msg = InboundMessage(channel=channel, sender_id="user", chat_id=chat_id, content=content) response = await self._process_message(msg, session_key=session_key, on_progress=on_progress) return response.content if response else "" diff --git a/nanobot/agent/memory.py b/nanobot/agent/memory.py index 29477c4..cdbc49f 100644 --- a/nanobot/agent/memory.py +++ b/nanobot/agent/memory.py @@ -1,9 +1,46 @@ """Memory system for persistent agent memory.""" +from __future__ import annotations + +import json from pathlib import Path +from typing import TYPE_CHECKING + +from loguru import logger from nanobot.utils.helpers import ensure_dir +if TYPE_CHECKING: + from nanobot.providers.base import LLMProvider + from nanobot.session.manager import Session + + +_SAVE_MEMORY_TOOL = [ + { + "type": "function", + "function": { + "name": "save_memory", + "description": "Save the memory consolidation result to persistent storage.", + "parameters": { + "type": "object", + "properties": { + "history_entry": { + "type": "string", + "description": "A paragraph (2-5 sentences) summarizing key events/decisions/topics. " + "Start with [YYYY-MM-DD HH:MM]. Include detail useful for grep search.", + }, + "memory_update": { + "type": "string", + "description": "Full updated long-term memory as markdown. Include all existing " + "facts plus new ones. Return unchanged if nothing new.", + }, + }, + "required": ["history_entry", "memory_update"], + }, + }, + } +] + class MemoryStore: """Two-layer memory: MEMORY.md (long-term facts) + HISTORY.md (grep-searchable log).""" @@ -28,3 +65,79 @@ class MemoryStore: def get_memory_context(self) -> str: long_term = self.read_long_term() return f"## Long-term Memory\n{long_term}" if long_term else "" + + async def consolidate( + self, + session: Session, + provider: LLMProvider, + model: str, + *, + archive_all: bool = False, + memory_window: int = 50, + ) -> bool: + """Consolidate old messages into MEMORY.md + HISTORY.md via LLM tool call. + + Returns True on success (including no-op), False on failure. + """ + if archive_all: + old_messages = session.messages + keep_count = 0 + logger.info("Memory consolidation (archive_all): {} messages", len(session.messages)) + else: + keep_count = memory_window // 2 + if len(session.messages) <= keep_count: + return True + if len(session.messages) - session.last_consolidated <= 0: + return True + old_messages = session.messages[session.last_consolidated:-keep_count] + if not old_messages: + return True + logger.info("Memory consolidation: {} to consolidate, {} keep", len(old_messages), keep_count) + + lines = [] + for m in old_messages: + if not m.get("content"): + continue + tools = f" [tools: {', '.join(m['tools_used'])}]" if m.get("tools_used") else "" + lines.append(f"[{m.get('timestamp', '?')[:16]}] {m['role'].upper()}{tools}: {m['content']}") + + current_memory = self.read_long_term() + prompt = f"""Process this conversation and call the save_memory tool with your consolidation. + +## Current Long-term Memory +{current_memory or "(empty)"} + +## Conversation to Process +{chr(10).join(lines)}""" + + try: + response = await provider.chat( + messages=[ + {"role": "system", "content": "You are a memory consolidation agent. Call the save_memory tool with your consolidation of the conversation."}, + {"role": "user", "content": prompt}, + ], + tools=_SAVE_MEMORY_TOOL, + model=model, + ) + + if not response.has_tool_calls: + logger.warning("Memory consolidation: LLM did not call save_memory, skipping") + return False + + args = response.tool_calls[0].arguments + if entry := args.get("history_entry"): + if not isinstance(entry, str): + entry = json.dumps(entry, ensure_ascii=False) + self.append_history(entry) + if update := args.get("memory_update"): + if not isinstance(update, str): + update = json.dumps(update, ensure_ascii=False) + if update != current_memory: + self.write_long_term(update) + + session.last_consolidated = 0 if archive_all else len(session.messages) - keep_count + logger.info("Memory consolidation done: {} messages, last_consolidated={}", len(session.messages), session.last_consolidated) + return True + except Exception: + logger.exception("Memory consolidation failed") + return False diff --git a/nanobot/agent/tools/filesystem.py b/nanobot/agent/tools/filesystem.py index 419b088..b87da11 100644 --- a/nanobot/agent/tools/filesystem.py +++ b/nanobot/agent/tools/filesystem.py @@ -1,5 +1,6 @@ """File system tools: read, write, edit.""" +import difflib from pathlib import Path from typing import Any @@ -12,8 +13,11 @@ def _resolve_path(path: str, workspace: Path | None = None, allowed_dir: Path | if not p.is_absolute() and workspace: p = workspace / p resolved = p.resolve() - if allowed_dir and not str(resolved).startswith(str(allowed_dir.resolve())): - raise PermissionError(f"Path {path} is outside allowed directory {allowed_dir}") + if allowed_dir: + try: + resolved.relative_to(allowed_dir.resolve()) + except ValueError: + raise PermissionError(f"Path {path} is outside allowed directory {allowed_dir}") return resolved @@ -150,7 +154,7 @@ class EditFileTool(Tool): content = file_path.read_text(encoding="utf-8") if old_text not in content: - return f"Error: old_text not found in file. Make sure it matches exactly." + return self._not_found_message(old_text, content, path) # Count occurrences count = content.count(old_text) @@ -166,6 +170,28 @@ class EditFileTool(Tool): except Exception as e: return f"Error editing file: {str(e)}" + @staticmethod + def _not_found_message(old_text: str, content: str, path: str) -> str: + """Build a helpful error when old_text is not found.""" + lines = content.splitlines(keepends=True) + old_lines = old_text.splitlines(keepends=True) + window = len(old_lines) + + best_ratio, best_start = 0.0, 0 + for i in range(max(1, len(lines) - window + 1)): + ratio = difflib.SequenceMatcher(None, old_lines, lines[i : i + window]).ratio() + if ratio > best_ratio: + best_ratio, best_start = ratio, i + + if best_ratio > 0.5: + diff = "\n".join(difflib.unified_diff( + old_lines, lines[best_start : best_start + window], + fromfile="old_text (provided)", tofile=f"{path} (actual, line {best_start + 1})", + lineterm="", + )) + return f"Error: old_text not found in {path}.\nBest match ({best_ratio:.0%} similar) at line {best_start + 1}:\n{diff}" + return f"Error: old_text not found in {path}. No similar text found. Verify the file content." + class ListDirTool(Tool): """Tool to list directory contents.""" diff --git a/nanobot/agent/tools/message.py b/nanobot/agent/tools/message.py index 10947c4..40e76e3 100644 --- a/nanobot/agent/tools/message.py +++ b/nanobot/agent/tools/message.py @@ -1,6 +1,6 @@ """Message tool for sending messages to users.""" -from typing import Any, Callable, Awaitable +from typing import Any, Awaitable, Callable from nanobot.agent.tools.base import Tool from nanobot.bus.events import OutboundMessage @@ -8,37 +8,42 @@ from nanobot.bus.events import OutboundMessage class MessageTool(Tool): """Tool to send messages to users on chat channels.""" - + def __init__( - self, + self, send_callback: Callable[[OutboundMessage], Awaitable[None]] | None = None, default_channel: str = "", default_chat_id: str = "", - default_message_id: str | None = None + default_message_id: str | None = None, ): self._send_callback = send_callback self._default_channel = default_channel self._default_chat_id = default_chat_id self._default_message_id = default_message_id - + self._sent_in_turn: bool = False + def set_context(self, channel: str, chat_id: str, message_id: str | None = None) -> None: """Set the current message context.""" self._default_channel = channel self._default_chat_id = chat_id self._default_message_id = message_id - + def set_send_callback(self, callback: Callable[[OutboundMessage], Awaitable[None]]) -> None: """Set the callback for sending messages.""" self._send_callback = callback - + + def start_turn(self) -> None: + """Reset per-turn send tracking.""" + self._sent_in_turn = False + @property def name(self) -> str: return "message" - + @property def description(self) -> str: return "Send a message to the user. Use this when you want to communicate something." - + @property def parameters(self) -> dict[str, Any]: return { @@ -64,11 +69,11 @@ class MessageTool(Tool): }, "required": ["content"] } - + async def execute( - self, - content: str, - channel: str | None = None, + self, + content: str, + channel: str | None = None, chat_id: str | None = None, message_id: str | None = None, media: list[str] | None = None, @@ -77,13 +82,13 @@ class MessageTool(Tool): channel = channel or self._default_channel chat_id = chat_id or self._default_chat_id message_id = message_id or self._default_message_id - + if not channel or not chat_id: return "Error: No target channel/chat specified" - + if not self._send_callback: return "Error: Message sending not configured" - + msg = OutboundMessage( channel=channel, chat_id=chat_id, @@ -93,9 +98,10 @@ class MessageTool(Tool): "message_id": message_id, } ) - + try: await self._send_callback(msg) + self._sent_in_turn = True media_info = f" with {len(media)} attachments" if media else "" return f"Message sent to {channel}:{chat_id}{media_info}" except Exception as e: diff --git a/nanobot/channels/base.py b/nanobot/channels/base.py index 30fcd1a..3a5a785 100644 --- a/nanobot/channels/base.py +++ b/nanobot/channels/base.py @@ -105,8 +105,9 @@ class BaseChannel(ABC): """ if not self.is_allowed(sender_id): logger.warning( - f"Access denied for sender {sender_id} on channel {self.name}. " - f"Add them to allowFrom list in config to grant access." + "Access denied for sender {} on channel {}. " + "Add them to allowFrom list in config to grant access.", + sender_id, self.name, ) return diff --git a/nanobot/channels/dingtalk.py b/nanobot/channels/dingtalk.py index b7263b3..09c7714 100644 --- a/nanobot/channels/dingtalk.py +++ b/nanobot/channels/dingtalk.py @@ -58,7 +58,8 @@ class NanobotDingTalkHandler(CallbackHandler): if not content: logger.warning( - f"Received empty or unsupported message type: {chatbot_msg.message_type}" + "Received empty or unsupported message type: {}", + chatbot_msg.message_type, ) return AckMessage.STATUS_OK, "OK" @@ -126,7 +127,8 @@ class DingTalkChannel(BaseChannel): self._http = httpx.AsyncClient() logger.info( - f"Initializing DingTalk Stream Client with Client ID: {self.config.client_id}..." + "Initializing DingTalk Stream Client with Client ID: {}...", + self.config.client_id, ) credential = Credential(self.config.client_id, self.config.client_secret) self._client = DingTalkStreamClient(credential) diff --git a/nanobot/channels/discord.py b/nanobot/channels/discord.py index 8baecbf..1d2d7a6 100644 --- a/nanobot/channels/discord.py +++ b/nanobot/channels/discord.py @@ -17,6 +17,29 @@ from nanobot.config.schema import DiscordConfig DISCORD_API_BASE = "https://discord.com/api/v10" MAX_ATTACHMENT_BYTES = 20 * 1024 * 1024 # 20MB +MAX_MESSAGE_LEN = 2000 # Discord message character limit + + +def _split_message(content: str, max_len: int = MAX_MESSAGE_LEN) -> list[str]: + """Split content into chunks within max_len, preferring line breaks.""" + if not content: + return [] + if len(content) <= max_len: + return [content] + chunks: list[str] = [] + while content: + if len(content) <= max_len: + chunks.append(content) + break + cut = content[:max_len] + pos = cut.rfind('\n') + if pos <= 0: + pos = cut.rfind(' ') + if pos <= 0: + pos = max_len + chunks.append(content[:pos]) + content = content[pos:].lstrip() + return chunks class DiscordChannel(BaseChannel): @@ -79,34 +102,48 @@ class DiscordChannel(BaseChannel): return url = f"{DISCORD_API_BASE}/channels/{msg.chat_id}/messages" - payload: dict[str, Any] = {"content": msg.content} - - if msg.reply_to: - payload["message_reference"] = {"message_id": msg.reply_to} - payload["allowed_mentions"] = {"replied_user": False} - headers = {"Authorization": f"Bot {self.config.token}"} try: - for attempt in range(3): - try: - response = await self._http.post(url, headers=headers, json=payload) - if response.status_code == 429: - data = response.json() - retry_after = float(data.get("retry_after", 1.0)) - logger.warning("Discord rate limited, retrying in {}s", retry_after) - await asyncio.sleep(retry_after) - continue - response.raise_for_status() - return - except Exception as e: - if attempt == 2: - logger.error("Error sending Discord message: {}", e) - else: - await asyncio.sleep(1) + chunks = _split_message(msg.content or "") + if not chunks: + return + + for i, chunk in enumerate(chunks): + payload: dict[str, Any] = {"content": chunk} + + # Only set reply reference on the first chunk + if i == 0 and msg.reply_to: + payload["message_reference"] = {"message_id": msg.reply_to} + payload["allowed_mentions"] = {"replied_user": False} + + if not await self._send_payload(url, headers, payload): + break # Abort remaining chunks on failure finally: await self._stop_typing(msg.chat_id) + async def _send_payload( + self, url: str, headers: dict[str, str], payload: dict[str, Any] + ) -> bool: + """Send a single Discord API payload with retry on rate-limit. Returns True on success.""" + for attempt in range(3): + try: + response = await self._http.post(url, headers=headers, json=payload) + if response.status_code == 429: + data = response.json() + retry_after = float(data.get("retry_after", 1.0)) + logger.warning("Discord rate limited, retrying in {}s", retry_after) + await asyncio.sleep(retry_after) + continue + response.raise_for_status() + return True + except Exception as e: + if attempt == 2: + logger.error("Error sending Discord message: {}", e) + else: + await asyncio.sleep(1) + return False + async def _gateway_loop(self) -> None: """Main gateway loop: identify, heartbeat, dispatch events.""" if not self._ws: diff --git a/nanobot/channels/email.py b/nanobot/channels/email.py index 1b6f46b..5dc05fb 100644 --- a/nanobot/channels/email.py +++ b/nanobot/channels/email.py @@ -304,7 +304,8 @@ class EmailChannel(BaseChannel): self._processed_uids.add(uid) # mark_seen is the primary dedup; this set is a safety net if len(self._processed_uids) > self._MAX_PROCESSED_UIDS: - self._processed_uids.clear() + # Evict a random half to cap memory; mark_seen is the primary dedup + self._processed_uids = set(list(self._processed_uids)[len(self._processed_uids) // 2:]) if mark_seen: client.store(imap_id, "+FLAGS", "\\Seen") diff --git a/nanobot/channels/feishu.py b/nanobot/channels/feishu.py index a8ca1fa..815d853 100644 --- a/nanobot/channels/feishu.py +++ b/nanobot/channels/feishu.py @@ -6,6 +6,7 @@ import os import re import threading from collections import OrderedDict +from pathlib import Path from typing import Any from loguru import logger @@ -27,6 +28,8 @@ try: CreateMessageReactionRequest, CreateMessageReactionRequestBody, Emoji, + GetFileRequest, + GetMessageResourceRequest, P2ImMessageReceiveV1, ) FEISHU_AVAILABLE = True @@ -44,6 +47,139 @@ MSG_TYPE_MAP = { } +def _extract_share_card_content(content_json: dict, msg_type: str) -> str: + """Extract text representation from share cards and interactive messages.""" + parts = [] + + if msg_type == "share_chat": + parts.append(f"[shared chat: {content_json.get('chat_id', '')}]") + elif msg_type == "share_user": + parts.append(f"[shared user: {content_json.get('user_id', '')}]") + elif msg_type == "interactive": + parts.extend(_extract_interactive_content(content_json)) + elif msg_type == "share_calendar_event": + parts.append(f"[shared calendar event: {content_json.get('event_key', '')}]") + elif msg_type == "system": + parts.append("[system message]") + elif msg_type == "merge_forward": + parts.append("[merged forward messages]") + + return "\n".join(parts) if parts else f"[{msg_type}]" + + +def _extract_interactive_content(content: dict) -> list[str]: + """Recursively extract text and links from interactive card content.""" + parts = [] + + if isinstance(content, str): + try: + content = json.loads(content) + except (json.JSONDecodeError, TypeError): + return [content] if content.strip() else [] + + if not isinstance(content, dict): + return parts + + if "title" in content: + title = content["title"] + if isinstance(title, dict): + title_content = title.get("content", "") or title.get("text", "") + if title_content: + parts.append(f"title: {title_content}") + elif isinstance(title, str): + parts.append(f"title: {title}") + + for element in content.get("elements", []) if isinstance(content.get("elements"), list) else []: + parts.extend(_extract_element_content(element)) + + card = content.get("card", {}) + if card: + parts.extend(_extract_interactive_content(card)) + + header = content.get("header", {}) + if header: + header_title = header.get("title", {}) + if isinstance(header_title, dict): + header_text = header_title.get("content", "") or header_title.get("text", "") + if header_text: + parts.append(f"title: {header_text}") + + return parts + + +def _extract_element_content(element: dict) -> list[str]: + """Extract content from a single card element.""" + parts = [] + + if not isinstance(element, dict): + return parts + + tag = element.get("tag", "") + + if tag in ("markdown", "lark_md"): + content = element.get("content", "") + if content: + parts.append(content) + + elif tag == "div": + text = element.get("text", {}) + if isinstance(text, dict): + text_content = text.get("content", "") or text.get("text", "") + if text_content: + parts.append(text_content) + elif isinstance(text, str): + parts.append(text) + for field in element.get("fields", []): + if isinstance(field, dict): + field_text = field.get("text", {}) + if isinstance(field_text, dict): + c = field_text.get("content", "") + if c: + parts.append(c) + + elif tag == "a": + href = element.get("href", "") + text = element.get("text", "") + if href: + parts.append(f"link: {href}") + if text: + parts.append(text) + + elif tag == "button": + text = element.get("text", {}) + if isinstance(text, dict): + c = text.get("content", "") + if c: + parts.append(c) + url = element.get("url", "") or element.get("multi_url", {}).get("url", "") + if url: + parts.append(f"link: {url}") + + elif tag == "img": + alt = element.get("alt", {}) + parts.append(alt.get("content", "[image]") if isinstance(alt, dict) else "[image]") + + elif tag == "note": + for ne in element.get("elements", []): + parts.extend(_extract_element_content(ne)) + + elif tag == "column_set": + for col in element.get("columns", []): + for ce in col.get("elements", []): + parts.extend(_extract_element_content(ce)) + + elif tag == "plain_text": + content = element.get("content", "") + if content: + parts.append(content) + + else: + for ne in element.get("elements", []): + parts.extend(_extract_element_content(ne)) + + return parts + + def _extract_post_text(content_json: dict) -> str: """Extract plain text from Feishu post (rich text) message content. @@ -345,6 +481,87 @@ class FeishuChannel(BaseChannel): logger.error("Error uploading file {}: {}", file_path, e) return None + def _download_image_sync(self, message_id: str, image_key: str) -> tuple[bytes | None, str | None]: + """Download an image from Feishu message by message_id and image_key.""" + try: + request = GetMessageResourceRequest.builder() \ + .message_id(message_id) \ + .file_key(image_key) \ + .type("image") \ + .build() + response = self._client.im.v1.message_resource.get(request) + if response.success(): + file_data = response.file + # GetMessageResourceRequest returns BytesIO, need to read bytes + if hasattr(file_data, 'read'): + file_data = file_data.read() + return file_data, response.file_name + else: + logger.error("Failed to download image: code={}, msg={}", response.code, response.msg) + return None, None + except Exception as e: + logger.error("Error downloading image {}: {}", image_key, e) + return None, None + + def _download_file_sync(self, file_key: str) -> tuple[bytes | None, str | None]: + """Download a file from Feishu by file_key.""" + try: + request = GetFileRequest.builder().file_key(file_key).build() + response = self._client.im.v1.file.get(request) + if response.success(): + return response.file, response.file_name + else: + logger.error("Failed to download file: code={}, msg={}", response.code, response.msg) + return None, None + except Exception as e: + logger.error("Error downloading file {}: {}", file_key, e) + return None, None + + async def _download_and_save_media( + self, + msg_type: str, + content_json: dict, + message_id: str | None = None + ) -> tuple[str | None, str]: + """ + Download media from Feishu and save to local disk. + + Returns: + (file_path, content_text) - file_path is None if download failed + """ + loop = asyncio.get_running_loop() + media_dir = Path.home() / ".nanobot" / "media" + media_dir.mkdir(parents=True, exist_ok=True) + + data, filename = None, None + + if msg_type == "image": + image_key = content_json.get("image_key") + if image_key and message_id: + data, filename = await loop.run_in_executor( + None, self._download_image_sync, message_id, image_key + ) + if not filename: + filename = f"{image_key[:16]}.jpg" + + elif msg_type in ("audio", "file"): + file_key = content_json.get("file_key") + if file_key: + data, filename = await loop.run_in_executor( + None, self._download_file_sync, file_key + ) + if not filename: + ext = ".opus" if msg_type == "audio" else "" + filename = f"{file_key[:16]}{ext}" + + if data and filename: + file_path = media_dir / filename + file_path.write_bytes(data) + logger.debug("Downloaded {} to {}", msg_type, file_path) + return str(file_path), f"[{msg_type}: {filename}]" + + return None, f"[{msg_type}: download failed]" + def _send_message_sync(self, receive_id_type: str, receive_id: str, msg_type: str, content: str) -> bool: """Send a single message (text/image/file/interactive) synchronously.""" try: @@ -425,60 +642,81 @@ class FeishuChannel(BaseChannel): event = data.event message = event.message sender = event.sender - + # Deduplication check message_id = message.message_id if message_id in self._processed_message_ids: return self._processed_message_ids[message_id] = None - - # Trim cache: keep most recent 500 when exceeds 1000 + + # Trim cache while len(self._processed_message_ids) > 1000: self._processed_message_ids.popitem(last=False) - + # Skip bot messages - sender_type = sender.sender_type - if sender_type == "bot": + if sender.sender_type == "bot": return - + sender_id = sender.sender_id.open_id if sender.sender_id else "unknown" chat_id = message.chat_id - chat_type = message.chat_type # "p2p" or "group" + chat_type = message.chat_type msg_type = message.message_type - - # Add reaction to indicate "seen" + + # Add reaction await self._add_reaction(message_id, "THUMBSUP") - - # Parse message content + + # Parse content + content_parts = [] + media_paths = [] + + try: + content_json = json.loads(message.content) if message.content else {} + except json.JSONDecodeError: + content_json = {} + if msg_type == "text": - try: - content = json.loads(message.content).get("text", "") - except json.JSONDecodeError: - content = message.content or "" + text = content_json.get("text", "") + if text: + content_parts.append(text) + elif msg_type == "post": - try: - content_json = json.loads(message.content) - content = _extract_post_text(content_json) - except (json.JSONDecodeError, TypeError): - content = message.content or "" + text = _extract_post_text(content_json) + if text: + content_parts.append(text) + + elif msg_type in ("image", "audio", "file"): + file_path, content_text = await self._download_and_save_media(msg_type, content_json, message_id) + if file_path: + media_paths.append(file_path) + content_parts.append(content_text) + + elif msg_type in ("share_chat", "share_user", "interactive", "share_calendar_event", "system", "merge_forward"): + # Handle share cards and interactive messages + text = _extract_share_card_content(content_json, msg_type) + if text: + content_parts.append(text) + else: - content = MSG_TYPE_MAP.get(msg_type, f"[{msg_type}]") - - if not content: + content_parts.append(MSG_TYPE_MAP.get(msg_type, f"[{msg_type}]")) + + content = "\n".join(content_parts) if content_parts else "" + + if not content and not media_paths: return - + # Forward to message bus reply_to = chat_id if chat_type == "group" else sender_id await self._handle_message( sender_id=sender_id, chat_id=reply_to, content=content, + media=media_paths, metadata={ "message_id": message_id, "chat_type": chat_type, "msg_type": msg_type, } ) - + except Exception as e: logger.error("Error processing Feishu message: {}", e) diff --git a/nanobot/channels/slack.py b/nanobot/channels/slack.py index 79cbe76..b0f9bbb 100644 --- a/nanobot/channels/slack.py +++ b/nanobot/channels/slack.py @@ -84,11 +84,24 @@ class SlackChannel(BaseChannel): channel_type = slack_meta.get("channel_type") # Only reply in thread for channel/group messages; DMs don't use threads use_thread = thread_ts and channel_type != "im" - await self._web_client.chat_postMessage( - channel=msg.chat_id, - text=self._to_mrkdwn(msg.content), - thread_ts=thread_ts if use_thread else None, - ) + thread_ts_param = thread_ts if use_thread else None + + if msg.content: + await self._web_client.chat_postMessage( + channel=msg.chat_id, + text=self._to_mrkdwn(msg.content), + thread_ts=thread_ts_param, + ) + + for media_path in msg.media or []: + try: + await self._web_client.files_upload_v2( + channel=msg.chat_id, + file=media_path, + thread_ts=thread_ts_param, + ) + except Exception as e: + logger.error("Failed to upload file {}: {}", media_path, e) except Exception as e: logger.error("Error sending Slack message: {}", e) @@ -166,18 +179,21 @@ class SlackChannel(BaseChannel): except Exception as e: logger.debug("Slack reactions_add failed: {}", e) - await self._handle_message( - sender_id=sender_id, - chat_id=chat_id, - content=text, - metadata={ - "slack": { - "event": event, - "thread_ts": thread_ts, - "channel_type": channel_type, - } - }, - ) + try: + await self._handle_message( + sender_id=sender_id, + chat_id=chat_id, + content=text, + metadata={ + "slack": { + "event": event, + "thread_ts": thread_ts, + "channel_type": channel_type, + } + }, + ) + except Exception: + logger.exception("Error handling Slack message from {}", sender_id) def _is_allowed(self, sender_id: str, chat_id: str, channel_type: str) -> bool: if channel_type == "im": diff --git a/nanobot/cli/commands.py b/nanobot/cli/commands.py index a135349..f1f9b30 100644 --- a/nanobot/cli/commands.py +++ b/nanobot/cli/commands.py @@ -498,27 +498,58 @@ def agent( console.print(f" [dim]↳ {content}[/dim]") if message: - # Single message mode + # Single message mode β€” direct call, no bus needed async def run_once(): with _thinking_ctx(): response = await agent_loop.process_direct(message, session_id, on_progress=_cli_progress) _print_agent_response(response, render_markdown=markdown) await agent_loop.close_mcp() - + asyncio.run(run_once()) else: - # Interactive mode + # Interactive mode β€” route through bus like other channels + from nanobot.bus.events import InboundMessage _init_prompt_session() console.print(f"{__logo__} Interactive mode (type [bold]exit[/bold] or [bold]Ctrl+C[/bold] to quit)\n") + if ":" in session_id: + cli_channel, cli_chat_id = session_id.split(":", 1) + else: + cli_channel, cli_chat_id = "cli", session_id + def _exit_on_sigint(signum, frame): _restore_terminal() console.print("\nGoodbye!") os._exit(0) signal.signal(signal.SIGINT, _exit_on_sigint) - + async def run_interactive(): + bus_task = asyncio.create_task(agent_loop.run()) + turn_done = asyncio.Event() + turn_done.set() + turn_response: list[str] = [] + + async def _consume_outbound(): + while True: + try: + msg = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0) + if msg.metadata.get("_progress"): + console.print(f" [dim]↳ {msg.content}[/dim]") + elif not turn_done.is_set(): + if msg.content: + turn_response.append(msg.content) + turn_done.set() + elif msg.content: + console.print() + _print_agent_response(msg.content, render_markdown=markdown) + except asyncio.TimeoutError: + continue + except asyncio.CancelledError: + break + + outbound_task = asyncio.create_task(_consume_outbound()) + try: while True: try: @@ -532,10 +563,22 @@ def agent( _restore_terminal() console.print("\nGoodbye!") break - + + turn_done.clear() + turn_response.clear() + + await bus.publish_inbound(InboundMessage( + channel=cli_channel, + sender_id="user", + chat_id=cli_chat_id, + content=user_input, + )) + with _thinking_ctx(): - response = await agent_loop.process_direct(user_input, session_id, on_progress=_cli_progress) - _print_agent_response(response, render_markdown=markdown) + await turn_done.wait() + + if turn_response: + _print_agent_response(turn_response[0], render_markdown=markdown) except KeyboardInterrupt: _restore_terminal() console.print("\nGoodbye!") @@ -545,8 +588,11 @@ def agent( console.print("\nGoodbye!") break finally: + agent_loop.stop() + outbound_task.cancel() + await asyncio.gather(bus_task, outbound_task, return_exceptions=True) await agent_loop.close_mcp() - + asyncio.run(run_interactive()) @@ -622,6 +668,33 @@ def channels_status(): slack_config ) + # DingTalk + dt = config.channels.dingtalk + dt_config = f"client_id: {dt.client_id[:10]}..." if dt.client_id else "[dim]not configured[/dim]" + table.add_row( + "DingTalk", + "βœ“" if dt.enabled else "βœ—", + dt_config + ) + + # QQ + qq = config.channels.qq + qq_config = f"app_id: {qq.app_id[:10]}..." if qq.app_id else "[dim]not configured[/dim]" + table.add_row( + "QQ", + "βœ“" if qq.enabled else "βœ—", + qq_config + ) + + # Email + em = config.channels.email + em_config = em.imap_host if em.imap_host else "[dim]not configured[/dim]" + table.add_row( + "Email", + "βœ“" if em.enabled else "βœ—", + em_config + ) + console.print(table) diff --git a/nanobot/providers/litellm_provider.py b/nanobot/providers/litellm_provider.py index edeb5c6..58c9ac2 100644 --- a/nanobot/providers/litellm_provider.py +++ b/nanobot/providers/litellm_provider.py @@ -111,7 +111,7 @@ class LiteLLMProvider(LLMProvider): def _supports_cache_control(self, model: str) -> bool: """Return True when the provider supports cache_control on content blocks.""" if self._gateway is not None: - return False + return self._gateway.supports_prompt_caching spec = find_by_model(model) return spec is not None and spec.supports_prompt_caching diff --git a/nanobot/providers/registry.py b/nanobot/providers/registry.py index 445d977..2766929 100644 --- a/nanobot/providers/registry.py +++ b/nanobot/providers/registry.py @@ -100,6 +100,7 @@ PROVIDERS: tuple[ProviderSpec, ...] = ( default_api_base="https://openrouter.ai/api/v1", strip_model_prefix=False, model_overrides=(), + supports_prompt_caching=True, ), # AiHubMix: global gateway, OpenAI-compatible interface. @@ -146,7 +147,7 @@ PROVIDERS: tuple[ProviderSpec, ...] = ( keywords=("volcengine", "volces", "ark"), env_key="OPENAI_API_KEY", display_name="VolcEngine", - litellm_prefix="openai", + litellm_prefix="volcengine", skip_prefixes=(), env_extras=(), is_gateway=True, diff --git a/nanobot/session/manager.py b/nanobot/session/manager.py index 9c1e427..5f23dc2 100644 --- a/nanobot/session/manager.py +++ b/nanobot/session/manager.py @@ -1,6 +1,7 @@ """Session management for conversation history.""" import json +import shutil from pathlib import Path from dataclasses import dataclass, field from datetime import datetime @@ -108,9 +109,11 @@ class SessionManager: if not path.exists(): legacy_path = self._get_legacy_session_path(key) if legacy_path.exists(): - import shutil - shutil.move(str(legacy_path), str(path)) - logger.info("Migrated session {} from legacy path", key) + try: + shutil.move(str(legacy_path), str(path)) + logger.info("Migrated session {} from legacy path", key) + except Exception: + logger.exception("Failed to migrate session {}", key) if not path.exists(): return None @@ -154,6 +157,7 @@ class SessionManager: with open(path, "w", encoding="utf-8") as f: metadata_line = { "_type": "metadata", + "key": session.key, "created_at": session.created_at.isoformat(), "updated_at": session.updated_at.isoformat(), "metadata": session.metadata, @@ -186,8 +190,9 @@ class SessionManager: if first_line: data = json.loads(first_line) if data.get("_type") == "metadata": + key = data.get("key") or path.stem.replace("_", ":", 1) sessions.append({ - "key": path.stem.replace("_", ":"), + "key": key, "created_at": data.get("created_at"), "updated_at": data.get("updated_at"), "path": str(path) diff --git a/pyproject.toml b/pyproject.toml index 64a884d..c337d02 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "nanobot-ai" -version = "0.1.4" +version = "0.1.4.post1" description = "A lightweight personal AI assistant framework" requires-python = ">=3.11" license = {text = "MIT"}