diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index 177302e..2348df7 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -1,30 +1,36 @@ """Agent loop: the core processing engine.""" -import asyncio -from contextlib import AsyncExitStack -import json -import json_repair -from pathlib import Path -import re -from typing import Any, Awaitable, Callable +from __future__ import annotations +import asyncio +import json +import re +from contextlib import AsyncExitStack +from pathlib import Path +from typing import TYPE_CHECKING, Awaitable, Callable + +import json_repair from loguru import logger +from nanobot.agent.context import ContextBuilder +from nanobot.agent.memory import MemoryStore +from nanobot.agent.subagent import SubagentManager +from nanobot.agent.tools.cron import CronTool +from nanobot.agent.tools.filesystem import EditFileTool, ListDirTool, ReadFileTool, WriteFileTool +from nanobot.agent.tools.message import MessageTool +from nanobot.agent.tools.registry import ToolRegistry +from nanobot.agent.tools.shell import ExecTool +from nanobot.agent.tools.spawn import SpawnTool +from nanobot.agent.tools.web import WebFetchTool, WebSearchTool from nanobot.bus.events import InboundMessage, OutboundMessage from nanobot.bus.queue import MessageBus from nanobot.providers.base import LLMProvider -from nanobot.agent.context import ContextBuilder -from nanobot.agent.tools.registry import ToolRegistry -from nanobot.agent.tools.filesystem import ReadFileTool, WriteFileTool, EditFileTool, ListDirTool -from nanobot.agent.tools.shell import ExecTool -from nanobot.agent.tools.web import WebSearchTool, WebFetchTool -from nanobot.agent.tools.message import MessageTool -from nanobot.agent.tools.spawn import SpawnTool -from nanobot.agent.tools.cron import CronTool -from nanobot.agent.memory import MemoryStore -from nanobot.agent.subagent import SubagentManager from nanobot.session.manager import Session, SessionManager +if TYPE_CHECKING: + from nanobot.config.schema import ExecToolConfig + from nanobot.cron.service import CronService + class AgentLoop: """ @@ -49,14 +55,13 @@ class AgentLoop: max_tokens: int = 4096, memory_window: int = 50, brave_api_key: str | None = None, - exec_config: "ExecToolConfig | None" = None, - cron_service: "CronService | None" = None, + exec_config: ExecToolConfig | None = None, + cron_service: CronService | None = None, restrict_to_workspace: bool = False, session_manager: SessionManager | None = None, mcp_servers: dict | None = None, ): from nanobot.config.schema import ExecToolConfig - from nanobot.cron.service import CronService self.bus = bus self.provider = provider self.workspace = workspace @@ -84,14 +89,15 @@ class AgentLoop: exec_config=self.exec_config, restrict_to_workspace=restrict_to_workspace, ) - + self._running = False self._mcp_servers = mcp_servers or {} self._mcp_stack: AsyncExitStack | None = None self._mcp_connected = False + self._mcp_connecting = False self._consolidating: set[str] = set() # Session keys with consolidation in progress self._register_default_tools() - + def _register_default_tools(self) -> None: """Register the default set of tools.""" # File tools (workspace for relative paths, restrict if configured) @@ -100,39 +106,51 @@ class AgentLoop: self.tools.register(WriteFileTool(workspace=self.workspace, allowed_dir=allowed_dir)) self.tools.register(EditFileTool(workspace=self.workspace, allowed_dir=allowed_dir)) self.tools.register(ListDirTool(workspace=self.workspace, allowed_dir=allowed_dir)) - + # Shell tool self.tools.register(ExecTool( working_dir=str(self.workspace), timeout=self.exec_config.timeout, restrict_to_workspace=self.restrict_to_workspace, )) - + # Web tools self.tools.register(WebSearchTool(api_key=self.brave_api_key)) self.tools.register(WebFetchTool()) - + # Message tool message_tool = MessageTool(send_callback=self.bus.publish_outbound) self.tools.register(message_tool) - + # Spawn tool (for subagents) spawn_tool = SpawnTool(manager=self.subagents) self.tools.register(spawn_tool) - + # Cron tool (for scheduling) if self.cron_service: self.tools.register(CronTool(self.cron_service)) - + async def _connect_mcp(self) -> None: """Connect to configured MCP servers (one-time, lazy).""" - if self._mcp_connected or not self._mcp_servers: + if self._mcp_connected or self._mcp_connecting or not self._mcp_servers: return - self._mcp_connected = True + self._mcp_connecting = True from nanobot.agent.tools.mcp import connect_mcp_servers - self._mcp_stack = AsyncExitStack() - await self._mcp_stack.__aenter__() - await connect_mcp_servers(self._mcp_servers, self.tools, self._mcp_stack) + try: + self._mcp_stack = AsyncExitStack() + await self._mcp_stack.__aenter__() + await connect_mcp_servers(self._mcp_servers, self.tools, self._mcp_stack) + self._mcp_connected = True + except Exception as e: + logger.error("Failed to connect MCP servers (will retry next message): {}", e) + if self._mcp_stack: + try: + await self._mcp_stack.aclose() + except Exception: + pass + self._mcp_stack = None + finally: + self._mcp_connecting = False def _set_tool_context(self, channel: str, chat_id: str, message_id: str | None = None) -> None: """Update context for all tools that need routing info.""" @@ -238,10 +256,6 @@ class AgentLoop: text_only_retried = True interim_content = final_content logger.debug("Interim text response (no tools used yet), retrying: {}", final_content[:80]) - messages = self.context.add_assistant_message( - messages, response.content, - reasoning_content=response.reasoning_content, - ) final_content = None continue # Fall back to interim content if retry produced nothing @@ -266,8 +280,9 @@ class AgentLoop: ) try: response = await self._process_message(msg) - if response: - await self.bus.publish_outbound(response) + await self.bus.publish_outbound(response or OutboundMessage( + channel=msg.channel, chat_id=msg.chat_id, content="", + )) except Exception as e: logger.error("Error processing message: {}", e) await self.bus.publish_outbound(OutboundMessage( @@ -277,7 +292,7 @@ class AgentLoop: )) except asyncio.TimeoutError: continue - + async def close_mcp(self) -> None: """Close MCP connections.""" if self._mcp_stack: @@ -291,7 +306,7 @@ class AgentLoop: """Stop the agent loop.""" self._running = False logger.info("Agent loop stopping") - + async def _process_message( self, msg: InboundMessage, @@ -300,25 +315,25 @@ class AgentLoop: ) -> OutboundMessage | None: """ Process a single inbound message. - + Args: msg: The inbound message to process. session_key: Override session key (used by process_direct). on_progress: Optional callback for intermediate output (defaults to bus publish). - + Returns: The response message, or None if no response needed. """ # System messages route back via chat_id ("channel:chat_id") if msg.channel == "system": return await self._process_system_message(msg) - + preview = msg.content[:80] + "..." if len(msg.content) > 80 else msg.content logger.info("Processing message from {}:{}: {}", msg.channel, msg.sender_id, preview) - + key = session_key or msg.session_key session = self.sessions.get_or_create(key) - + # Handle slash commands cmd = msg.content.strip().lower() if cmd == "/new": @@ -339,7 +354,7 @@ class AgentLoop: if cmd == "/help": return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, content="🐈 nanobot commands:\n/new — Start a new conversation\n/help — Show available commands") - + if len(session.messages) > self.memory_window and session.key not in self._consolidating: self._consolidating.add(session.key) @@ -352,6 +367,10 @@ class AgentLoop: asyncio.create_task(_consolidate_and_unlock()) self._set_tool_context(msg.channel, msg.chat_id, msg.metadata.get("message_id")) + if message_tool := self.tools.get("message"): + if isinstance(message_tool, MessageTool): + message_tool.start_turn() + initial_messages = self.context.build_messages( history=session.get_history(max_messages=self.memory_window), current_message=msg.content, @@ -361,9 +380,11 @@ class AgentLoop: ) async def _bus_progress(content: str) -> None: + meta = dict(msg.metadata or {}) + meta["_progress"] = True await self.bus.publish_outbound(OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, content=content, - metadata=msg.metadata or {}, + metadata=meta, )) final_content, tools_used = await self._run_agent_loop( @@ -372,31 +393,35 @@ class AgentLoop: if final_content is None: final_content = "I've completed processing but have no response to give." - + preview = final_content[:120] + "..." if len(final_content) > 120 else final_content logger.info("Response to {}:{}: {}", msg.channel, msg.sender_id, preview) - + session.add_message("user", msg.content) session.add_message("assistant", final_content, tools_used=tools_used if tools_used else None) self.sessions.save(session) - + + if message_tool := self.tools.get("message"): + if isinstance(message_tool, MessageTool) and message_tool._sent_in_turn: + return None + return OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, content=final_content, metadata=msg.metadata or {}, # Pass through for channel-specific needs (e.g. Slack thread_ts) ) - + async def _process_system_message(self, msg: InboundMessage) -> OutboundMessage | None: """ Process a system message (e.g., subagent announce). - + The chat_id field contains "original_channel:original_chat_id" to route the response back to the correct destination. """ logger.info("Processing system message from {}", msg.sender_id) - + # Parse origin from chat_id (format: "channel:chat_id") if ":" in msg.chat_id: parts = msg.chat_id.split(":", 1) @@ -406,7 +431,7 @@ class AgentLoop: # Fallback origin_channel = "cli" origin_chat_id = msg.chat_id - + session_key = f"{origin_channel}:{origin_chat_id}" session = self.sessions.get_or_create(session_key) self._set_tool_context(origin_channel, origin_chat_id, msg.metadata.get("message_id")) @@ -420,17 +445,17 @@ class AgentLoop: if final_content is None: final_content = "Background task completed." - + session.add_message("user", f"[System: {msg.sender_id}] {msg.content}") session.add_message("assistant", final_content) self.sessions.save(session) - + return OutboundMessage( channel=origin_channel, chat_id=origin_chat_id, content=final_content ) - + async def _consolidate_memory(self, session, archive_all: bool = False) -> None: """Consolidate old messages into MEMORY.md + HISTORY.md. @@ -540,14 +565,14 @@ Respond with ONLY valid JSON, no markdown fences.""" ) -> str: """ Process a message directly (for CLI or cron usage). - + Args: content: The message content. session_key: Session identifier (overrides channel:chat_id for session lookup). channel: Source channel (for tool context routing). chat_id: Source chat ID (for tool context routing). on_progress: Optional callback for intermediate output. - + Returns: The agent's response. """ @@ -558,6 +583,6 @@ Respond with ONLY valid JSON, no markdown fences.""" chat_id=chat_id, content=content ) - + response = await self._process_message(msg, session_key=session_key, on_progress=on_progress) return response.content if response else "" diff --git a/nanobot/agent/tools/filesystem.py b/nanobot/agent/tools/filesystem.py index 419b088..9c169e4 100644 --- a/nanobot/agent/tools/filesystem.py +++ b/nanobot/agent/tools/filesystem.py @@ -1,5 +1,6 @@ """File system tools: read, write, edit.""" +import difflib from pathlib import Path from typing import Any @@ -150,7 +151,7 @@ class EditFileTool(Tool): content = file_path.read_text(encoding="utf-8") if old_text not in content: - return f"Error: old_text not found in file. Make sure it matches exactly." + return self._not_found_message(old_text, content, path) # Count occurrences count = content.count(old_text) @@ -166,6 +167,28 @@ class EditFileTool(Tool): except Exception as e: return f"Error editing file: {str(e)}" + @staticmethod + def _not_found_message(old_text: str, content: str, path: str) -> str: + """Build a helpful error when old_text is not found.""" + lines = content.splitlines(keepends=True) + old_lines = old_text.splitlines(keepends=True) + window = len(old_lines) + + best_ratio, best_start = 0.0, 0 + for i in range(max(1, len(lines) - window + 1)): + ratio = difflib.SequenceMatcher(None, old_lines, lines[i : i + window]).ratio() + if ratio > best_ratio: + best_ratio, best_start = ratio, i + + if best_ratio > 0.5: + diff = "\n".join(difflib.unified_diff( + old_lines, lines[best_start : best_start + window], + fromfile="old_text (provided)", tofile=f"{path} (actual, line {best_start + 1})", + lineterm="", + )) + return f"Error: old_text not found in {path}.\nBest match ({best_ratio:.0%} similar) at line {best_start + 1}:\n{diff}" + return f"Error: old_text not found in {path}. No similar text found. Verify the file content." + class ListDirTool(Tool): """Tool to list directory contents.""" diff --git a/nanobot/agent/tools/message.py b/nanobot/agent/tools/message.py index 10947c4..40e76e3 100644 --- a/nanobot/agent/tools/message.py +++ b/nanobot/agent/tools/message.py @@ -1,6 +1,6 @@ """Message tool for sending messages to users.""" -from typing import Any, Callable, Awaitable +from typing import Any, Awaitable, Callable from nanobot.agent.tools.base import Tool from nanobot.bus.events import OutboundMessage @@ -8,37 +8,42 @@ from nanobot.bus.events import OutboundMessage class MessageTool(Tool): """Tool to send messages to users on chat channels.""" - + def __init__( - self, + self, send_callback: Callable[[OutboundMessage], Awaitable[None]] | None = None, default_channel: str = "", default_chat_id: str = "", - default_message_id: str | None = None + default_message_id: str | None = None, ): self._send_callback = send_callback self._default_channel = default_channel self._default_chat_id = default_chat_id self._default_message_id = default_message_id - + self._sent_in_turn: bool = False + def set_context(self, channel: str, chat_id: str, message_id: str | None = None) -> None: """Set the current message context.""" self._default_channel = channel self._default_chat_id = chat_id self._default_message_id = message_id - + def set_send_callback(self, callback: Callable[[OutboundMessage], Awaitable[None]]) -> None: """Set the callback for sending messages.""" self._send_callback = callback - + + def start_turn(self) -> None: + """Reset per-turn send tracking.""" + self._sent_in_turn = False + @property def name(self) -> str: return "message" - + @property def description(self) -> str: return "Send a message to the user. Use this when you want to communicate something." - + @property def parameters(self) -> dict[str, Any]: return { @@ -64,11 +69,11 @@ class MessageTool(Tool): }, "required": ["content"] } - + async def execute( - self, - content: str, - channel: str | None = None, + self, + content: str, + channel: str | None = None, chat_id: str | None = None, message_id: str | None = None, media: list[str] | None = None, @@ -77,13 +82,13 @@ class MessageTool(Tool): channel = channel or self._default_channel chat_id = chat_id or self._default_chat_id message_id = message_id or self._default_message_id - + if not channel or not chat_id: return "Error: No target channel/chat specified" - + if not self._send_callback: return "Error: Message sending not configured" - + msg = OutboundMessage( channel=channel, chat_id=chat_id, @@ -93,9 +98,10 @@ class MessageTool(Tool): "message_id": message_id, } ) - + try: await self._send_callback(msg) + self._sent_in_turn = True media_info = f" with {len(media)} attachments" if media else "" return f"Message sent to {channel}:{chat_id}{media_info}" except Exception as e: diff --git a/nanobot/channels/base.py b/nanobot/channels/base.py index 30fcd1a..3a5a785 100644 --- a/nanobot/channels/base.py +++ b/nanobot/channels/base.py @@ -105,8 +105,9 @@ class BaseChannel(ABC): """ if not self.is_allowed(sender_id): logger.warning( - f"Access denied for sender {sender_id} on channel {self.name}. " - f"Add them to allowFrom list in config to grant access." + "Access denied for sender {} on channel {}. " + "Add them to allowFrom list in config to grant access.", + sender_id, self.name, ) return diff --git a/nanobot/channels/dingtalk.py b/nanobot/channels/dingtalk.py index b7263b3..09c7714 100644 --- a/nanobot/channels/dingtalk.py +++ b/nanobot/channels/dingtalk.py @@ -58,7 +58,8 @@ class NanobotDingTalkHandler(CallbackHandler): if not content: logger.warning( - f"Received empty or unsupported message type: {chatbot_msg.message_type}" + "Received empty or unsupported message type: {}", + chatbot_msg.message_type, ) return AckMessage.STATUS_OK, "OK" @@ -126,7 +127,8 @@ class DingTalkChannel(BaseChannel): self._http = httpx.AsyncClient() logger.info( - f"Initializing DingTalk Stream Client with Client ID: {self.config.client_id}..." + "Initializing DingTalk Stream Client with Client ID: {}...", + self.config.client_id, ) credential = Credential(self.config.client_id, self.config.client_secret) self._client = DingTalkStreamClient(credential) diff --git a/nanobot/channels/discord.py b/nanobot/channels/discord.py index 8baecbf..1d2d7a6 100644 --- a/nanobot/channels/discord.py +++ b/nanobot/channels/discord.py @@ -17,6 +17,29 @@ from nanobot.config.schema import DiscordConfig DISCORD_API_BASE = "https://discord.com/api/v10" MAX_ATTACHMENT_BYTES = 20 * 1024 * 1024 # 20MB +MAX_MESSAGE_LEN = 2000 # Discord message character limit + + +def _split_message(content: str, max_len: int = MAX_MESSAGE_LEN) -> list[str]: + """Split content into chunks within max_len, preferring line breaks.""" + if not content: + return [] + if len(content) <= max_len: + return [content] + chunks: list[str] = [] + while content: + if len(content) <= max_len: + chunks.append(content) + break + cut = content[:max_len] + pos = cut.rfind('\n') + if pos <= 0: + pos = cut.rfind(' ') + if pos <= 0: + pos = max_len + chunks.append(content[:pos]) + content = content[pos:].lstrip() + return chunks class DiscordChannel(BaseChannel): @@ -79,34 +102,48 @@ class DiscordChannel(BaseChannel): return url = f"{DISCORD_API_BASE}/channels/{msg.chat_id}/messages" - payload: dict[str, Any] = {"content": msg.content} - - if msg.reply_to: - payload["message_reference"] = {"message_id": msg.reply_to} - payload["allowed_mentions"] = {"replied_user": False} - headers = {"Authorization": f"Bot {self.config.token}"} try: - for attempt in range(3): - try: - response = await self._http.post(url, headers=headers, json=payload) - if response.status_code == 429: - data = response.json() - retry_after = float(data.get("retry_after", 1.0)) - logger.warning("Discord rate limited, retrying in {}s", retry_after) - await asyncio.sleep(retry_after) - continue - response.raise_for_status() - return - except Exception as e: - if attempt == 2: - logger.error("Error sending Discord message: {}", e) - else: - await asyncio.sleep(1) + chunks = _split_message(msg.content or "") + if not chunks: + return + + for i, chunk in enumerate(chunks): + payload: dict[str, Any] = {"content": chunk} + + # Only set reply reference on the first chunk + if i == 0 and msg.reply_to: + payload["message_reference"] = {"message_id": msg.reply_to} + payload["allowed_mentions"] = {"replied_user": False} + + if not await self._send_payload(url, headers, payload): + break # Abort remaining chunks on failure finally: await self._stop_typing(msg.chat_id) + async def _send_payload( + self, url: str, headers: dict[str, str], payload: dict[str, Any] + ) -> bool: + """Send a single Discord API payload with retry on rate-limit. Returns True on success.""" + for attempt in range(3): + try: + response = await self._http.post(url, headers=headers, json=payload) + if response.status_code == 429: + data = response.json() + retry_after = float(data.get("retry_after", 1.0)) + logger.warning("Discord rate limited, retrying in {}s", retry_after) + await asyncio.sleep(retry_after) + continue + response.raise_for_status() + return True + except Exception as e: + if attempt == 2: + logger.error("Error sending Discord message: {}", e) + else: + await asyncio.sleep(1) + return False + async def _gateway_loop(self) -> None: """Main gateway loop: identify, heartbeat, dispatch events.""" if not self._ws: diff --git a/nanobot/channels/feishu.py b/nanobot/channels/feishu.py index a8ca1fa..815d853 100644 --- a/nanobot/channels/feishu.py +++ b/nanobot/channels/feishu.py @@ -6,6 +6,7 @@ import os import re import threading from collections import OrderedDict +from pathlib import Path from typing import Any from loguru import logger @@ -27,6 +28,8 @@ try: CreateMessageReactionRequest, CreateMessageReactionRequestBody, Emoji, + GetFileRequest, + GetMessageResourceRequest, P2ImMessageReceiveV1, ) FEISHU_AVAILABLE = True @@ -44,6 +47,139 @@ MSG_TYPE_MAP = { } +def _extract_share_card_content(content_json: dict, msg_type: str) -> str: + """Extract text representation from share cards and interactive messages.""" + parts = [] + + if msg_type == "share_chat": + parts.append(f"[shared chat: {content_json.get('chat_id', '')}]") + elif msg_type == "share_user": + parts.append(f"[shared user: {content_json.get('user_id', '')}]") + elif msg_type == "interactive": + parts.extend(_extract_interactive_content(content_json)) + elif msg_type == "share_calendar_event": + parts.append(f"[shared calendar event: {content_json.get('event_key', '')}]") + elif msg_type == "system": + parts.append("[system message]") + elif msg_type == "merge_forward": + parts.append("[merged forward messages]") + + return "\n".join(parts) if parts else f"[{msg_type}]" + + +def _extract_interactive_content(content: dict) -> list[str]: + """Recursively extract text and links from interactive card content.""" + parts = [] + + if isinstance(content, str): + try: + content = json.loads(content) + except (json.JSONDecodeError, TypeError): + return [content] if content.strip() else [] + + if not isinstance(content, dict): + return parts + + if "title" in content: + title = content["title"] + if isinstance(title, dict): + title_content = title.get("content", "") or title.get("text", "") + if title_content: + parts.append(f"title: {title_content}") + elif isinstance(title, str): + parts.append(f"title: {title}") + + for element in content.get("elements", []) if isinstance(content.get("elements"), list) else []: + parts.extend(_extract_element_content(element)) + + card = content.get("card", {}) + if card: + parts.extend(_extract_interactive_content(card)) + + header = content.get("header", {}) + if header: + header_title = header.get("title", {}) + if isinstance(header_title, dict): + header_text = header_title.get("content", "") or header_title.get("text", "") + if header_text: + parts.append(f"title: {header_text}") + + return parts + + +def _extract_element_content(element: dict) -> list[str]: + """Extract content from a single card element.""" + parts = [] + + if not isinstance(element, dict): + return parts + + tag = element.get("tag", "") + + if tag in ("markdown", "lark_md"): + content = element.get("content", "") + if content: + parts.append(content) + + elif tag == "div": + text = element.get("text", {}) + if isinstance(text, dict): + text_content = text.get("content", "") or text.get("text", "") + if text_content: + parts.append(text_content) + elif isinstance(text, str): + parts.append(text) + for field in element.get("fields", []): + if isinstance(field, dict): + field_text = field.get("text", {}) + if isinstance(field_text, dict): + c = field_text.get("content", "") + if c: + parts.append(c) + + elif tag == "a": + href = element.get("href", "") + text = element.get("text", "") + if href: + parts.append(f"link: {href}") + if text: + parts.append(text) + + elif tag == "button": + text = element.get("text", {}) + if isinstance(text, dict): + c = text.get("content", "") + if c: + parts.append(c) + url = element.get("url", "") or element.get("multi_url", {}).get("url", "") + if url: + parts.append(f"link: {url}") + + elif tag == "img": + alt = element.get("alt", {}) + parts.append(alt.get("content", "[image]") if isinstance(alt, dict) else "[image]") + + elif tag == "note": + for ne in element.get("elements", []): + parts.extend(_extract_element_content(ne)) + + elif tag == "column_set": + for col in element.get("columns", []): + for ce in col.get("elements", []): + parts.extend(_extract_element_content(ce)) + + elif tag == "plain_text": + content = element.get("content", "") + if content: + parts.append(content) + + else: + for ne in element.get("elements", []): + parts.extend(_extract_element_content(ne)) + + return parts + + def _extract_post_text(content_json: dict) -> str: """Extract plain text from Feishu post (rich text) message content. @@ -345,6 +481,87 @@ class FeishuChannel(BaseChannel): logger.error("Error uploading file {}: {}", file_path, e) return None + def _download_image_sync(self, message_id: str, image_key: str) -> tuple[bytes | None, str | None]: + """Download an image from Feishu message by message_id and image_key.""" + try: + request = GetMessageResourceRequest.builder() \ + .message_id(message_id) \ + .file_key(image_key) \ + .type("image") \ + .build() + response = self._client.im.v1.message_resource.get(request) + if response.success(): + file_data = response.file + # GetMessageResourceRequest returns BytesIO, need to read bytes + if hasattr(file_data, 'read'): + file_data = file_data.read() + return file_data, response.file_name + else: + logger.error("Failed to download image: code={}, msg={}", response.code, response.msg) + return None, None + except Exception as e: + logger.error("Error downloading image {}: {}", image_key, e) + return None, None + + def _download_file_sync(self, file_key: str) -> tuple[bytes | None, str | None]: + """Download a file from Feishu by file_key.""" + try: + request = GetFileRequest.builder().file_key(file_key).build() + response = self._client.im.v1.file.get(request) + if response.success(): + return response.file, response.file_name + else: + logger.error("Failed to download file: code={}, msg={}", response.code, response.msg) + return None, None + except Exception as e: + logger.error("Error downloading file {}: {}", file_key, e) + return None, None + + async def _download_and_save_media( + self, + msg_type: str, + content_json: dict, + message_id: str | None = None + ) -> tuple[str | None, str]: + """ + Download media from Feishu and save to local disk. + + Returns: + (file_path, content_text) - file_path is None if download failed + """ + loop = asyncio.get_running_loop() + media_dir = Path.home() / ".nanobot" / "media" + media_dir.mkdir(parents=True, exist_ok=True) + + data, filename = None, None + + if msg_type == "image": + image_key = content_json.get("image_key") + if image_key and message_id: + data, filename = await loop.run_in_executor( + None, self._download_image_sync, message_id, image_key + ) + if not filename: + filename = f"{image_key[:16]}.jpg" + + elif msg_type in ("audio", "file"): + file_key = content_json.get("file_key") + if file_key: + data, filename = await loop.run_in_executor( + None, self._download_file_sync, file_key + ) + if not filename: + ext = ".opus" if msg_type == "audio" else "" + filename = f"{file_key[:16]}{ext}" + + if data and filename: + file_path = media_dir / filename + file_path.write_bytes(data) + logger.debug("Downloaded {} to {}", msg_type, file_path) + return str(file_path), f"[{msg_type}: {filename}]" + + return None, f"[{msg_type}: download failed]" + def _send_message_sync(self, receive_id_type: str, receive_id: str, msg_type: str, content: str) -> bool: """Send a single message (text/image/file/interactive) synchronously.""" try: @@ -425,60 +642,81 @@ class FeishuChannel(BaseChannel): event = data.event message = event.message sender = event.sender - + # Deduplication check message_id = message.message_id if message_id in self._processed_message_ids: return self._processed_message_ids[message_id] = None - - # Trim cache: keep most recent 500 when exceeds 1000 + + # Trim cache while len(self._processed_message_ids) > 1000: self._processed_message_ids.popitem(last=False) - + # Skip bot messages - sender_type = sender.sender_type - if sender_type == "bot": + if sender.sender_type == "bot": return - + sender_id = sender.sender_id.open_id if sender.sender_id else "unknown" chat_id = message.chat_id - chat_type = message.chat_type # "p2p" or "group" + chat_type = message.chat_type msg_type = message.message_type - - # Add reaction to indicate "seen" + + # Add reaction await self._add_reaction(message_id, "THUMBSUP") - - # Parse message content + + # Parse content + content_parts = [] + media_paths = [] + + try: + content_json = json.loads(message.content) if message.content else {} + except json.JSONDecodeError: + content_json = {} + if msg_type == "text": - try: - content = json.loads(message.content).get("text", "") - except json.JSONDecodeError: - content = message.content or "" + text = content_json.get("text", "") + if text: + content_parts.append(text) + elif msg_type == "post": - try: - content_json = json.loads(message.content) - content = _extract_post_text(content_json) - except (json.JSONDecodeError, TypeError): - content = message.content or "" + text = _extract_post_text(content_json) + if text: + content_parts.append(text) + + elif msg_type in ("image", "audio", "file"): + file_path, content_text = await self._download_and_save_media(msg_type, content_json, message_id) + if file_path: + media_paths.append(file_path) + content_parts.append(content_text) + + elif msg_type in ("share_chat", "share_user", "interactive", "share_calendar_event", "system", "merge_forward"): + # Handle share cards and interactive messages + text = _extract_share_card_content(content_json, msg_type) + if text: + content_parts.append(text) + else: - content = MSG_TYPE_MAP.get(msg_type, f"[{msg_type}]") - - if not content: + content_parts.append(MSG_TYPE_MAP.get(msg_type, f"[{msg_type}]")) + + content = "\n".join(content_parts) if content_parts else "" + + if not content and not media_paths: return - + # Forward to message bus reply_to = chat_id if chat_type == "group" else sender_id await self._handle_message( sender_id=sender_id, chat_id=reply_to, content=content, + media=media_paths, metadata={ "message_id": message_id, "chat_type": chat_type, "msg_type": msg_type, } ) - + except Exception as e: logger.error("Error processing Feishu message: {}", e) diff --git a/nanobot/channels/slack.py b/nanobot/channels/slack.py index 79cbe76..4fc1f41 100644 --- a/nanobot/channels/slack.py +++ b/nanobot/channels/slack.py @@ -84,11 +84,24 @@ class SlackChannel(BaseChannel): channel_type = slack_meta.get("channel_type") # Only reply in thread for channel/group messages; DMs don't use threads use_thread = thread_ts and channel_type != "im" - await self._web_client.chat_postMessage( - channel=msg.chat_id, - text=self._to_mrkdwn(msg.content), - thread_ts=thread_ts if use_thread else None, - ) + thread_ts_param = thread_ts if use_thread else None + + if msg.content: + await self._web_client.chat_postMessage( + channel=msg.chat_id, + text=self._to_mrkdwn(msg.content), + thread_ts=thread_ts_param, + ) + + for media_path in msg.media or []: + try: + await self._web_client.files_upload_v2( + channel=msg.chat_id, + file=media_path, + thread_ts=thread_ts_param, + ) + except Exception as e: + logger.error("Failed to upload file {}: {}", media_path, e) except Exception as e: logger.error("Error sending Slack message: {}", e) diff --git a/nanobot/cli/commands.py b/nanobot/cli/commands.py index a135349..6155463 100644 --- a/nanobot/cli/commands.py +++ b/nanobot/cli/commands.py @@ -498,27 +498,58 @@ def agent( console.print(f" [dim]↳ {content}[/dim]") if message: - # Single message mode + # Single message mode — direct call, no bus needed async def run_once(): with _thinking_ctx(): response = await agent_loop.process_direct(message, session_id, on_progress=_cli_progress) _print_agent_response(response, render_markdown=markdown) await agent_loop.close_mcp() - + asyncio.run(run_once()) else: - # Interactive mode + # Interactive mode — route through bus like other channels + from nanobot.bus.events import InboundMessage _init_prompt_session() console.print(f"{__logo__} Interactive mode (type [bold]exit[/bold] or [bold]Ctrl+C[/bold] to quit)\n") + if ":" in session_id: + cli_channel, cli_chat_id = session_id.split(":", 1) + else: + cli_channel, cli_chat_id = "cli", session_id + def _exit_on_sigint(signum, frame): _restore_terminal() console.print("\nGoodbye!") os._exit(0) signal.signal(signal.SIGINT, _exit_on_sigint) - + async def run_interactive(): + bus_task = asyncio.create_task(agent_loop.run()) + turn_done = asyncio.Event() + turn_done.set() + turn_response: list[str] = [] + + async def _consume_outbound(): + while True: + try: + msg = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0) + if msg.metadata.get("_progress"): + console.print(f" [dim]↳ {msg.content}[/dim]") + elif not turn_done.is_set(): + if msg.content: + turn_response.append(msg.content) + turn_done.set() + elif msg.content: + console.print() + _print_agent_response(msg.content, render_markdown=markdown) + except asyncio.TimeoutError: + continue + except asyncio.CancelledError: + break + + outbound_task = asyncio.create_task(_consume_outbound()) + try: while True: try: @@ -532,10 +563,22 @@ def agent( _restore_terminal() console.print("\nGoodbye!") break - + + turn_done.clear() + turn_response.clear() + + await bus.publish_inbound(InboundMessage( + channel=cli_channel, + sender_id="user", + chat_id=cli_chat_id, + content=user_input, + )) + with _thinking_ctx(): - response = await agent_loop.process_direct(user_input, session_id, on_progress=_cli_progress) - _print_agent_response(response, render_markdown=markdown) + await turn_done.wait() + + if turn_response: + _print_agent_response(turn_response[0], render_markdown=markdown) except KeyboardInterrupt: _restore_terminal() console.print("\nGoodbye!") @@ -545,8 +588,11 @@ def agent( console.print("\nGoodbye!") break finally: + agent_loop.stop() + outbound_task.cancel() + await asyncio.gather(bus_task, outbound_task, return_exceptions=True) await agent_loop.close_mcp() - + asyncio.run(run_interactive()) diff --git a/nanobot/providers/litellm_provider.py b/nanobot/providers/litellm_provider.py index edeb5c6..58c9ac2 100644 --- a/nanobot/providers/litellm_provider.py +++ b/nanobot/providers/litellm_provider.py @@ -111,7 +111,7 @@ class LiteLLMProvider(LLMProvider): def _supports_cache_control(self, model: str) -> bool: """Return True when the provider supports cache_control on content blocks.""" if self._gateway is not None: - return False + return self._gateway.supports_prompt_caching spec = find_by_model(model) return spec is not None and spec.supports_prompt_caching diff --git a/nanobot/providers/registry.py b/nanobot/providers/registry.py index 445d977..ecf092f 100644 --- a/nanobot/providers/registry.py +++ b/nanobot/providers/registry.py @@ -100,6 +100,7 @@ PROVIDERS: tuple[ProviderSpec, ...] = ( default_api_base="https://openrouter.ai/api/v1", strip_model_prefix=False, model_overrides=(), + supports_prompt_caching=True, ), # AiHubMix: global gateway, OpenAI-compatible interface. diff --git a/nanobot/session/manager.py b/nanobot/session/manager.py index 9c1e427..18e23b2 100644 --- a/nanobot/session/manager.py +++ b/nanobot/session/manager.py @@ -154,6 +154,7 @@ class SessionManager: with open(path, "w", encoding="utf-8") as f: metadata_line = { "_type": "metadata", + "key": session.key, "created_at": session.created_at.isoformat(), "updated_at": session.updated_at.isoformat(), "metadata": session.metadata, @@ -186,8 +187,9 @@ class SessionManager: if first_line: data = json.loads(first_line) if data.get("_type") == "metadata": + key = data.get("key") or path.stem.replace("_", ":", 1) sessions.append({ - "key": path.stem.replace("_", ":"), + "key": key, "created_at": data.get("created_at"), "updated_at": data.get("updated_at"), "path": str(path)