diff --git a/README.md b/README.md index be360dc..251181b 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ ⚑️ Delivers core agent functionality in just **~4,000** lines of code β€” **99% smaller** than Clawdbot's 430k+ lines. -πŸ“ Real-time line count: **3,966 lines** (run `bash core_agent_lines.sh` to verify anytime) +πŸ“ Real-time line count: **3,922 lines** (run `bash core_agent_lines.sh` to verify anytime) ## πŸ“’ News diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index 7ae2634..69c2916 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -43,6 +43,8 @@ class AgentLoop: 5. Sends responses back """ + _TOOL_RESULT_MAX_CHARS = 500 + def __init__( self, bus: MessageBus, @@ -145,17 +147,10 @@ class AgentLoop: def _set_tool_context(self, channel: str, chat_id: str, message_id: str | None = None) -> None: """Update context for all tools that need routing info.""" - if message_tool := self.tools.get("message"): - if isinstance(message_tool, MessageTool): - message_tool.set_context(channel, chat_id, message_id) - - if spawn_tool := self.tools.get("spawn"): - if isinstance(spawn_tool, SpawnTool): - spawn_tool.set_context(channel, chat_id) - - if cron_tool := self.tools.get("cron"): - if isinstance(cron_tool, CronTool): - cron_tool.set_context(channel, chat_id) + for name in ("message", "spawn", "cron"): + if tool := self.tools.get(name): + if hasattr(tool, "set_context"): + tool.set_context(channel, chat_id, *([message_id] if name == "message" else [])) @staticmethod def _strip_think(text: str | None) -> str | None: @@ -315,18 +310,6 @@ class AgentLoop: self._running = False logger.info("Agent loop stopping") - def _get_consolidation_lock(self, session_key: str) -> asyncio.Lock: - lock = self._consolidation_locks.get(session_key) - if lock is None: - lock = asyncio.Lock() - self._consolidation_locks[session_key] = lock - return lock - - def _prune_consolidation_lock(self, session_key: str, lock: asyncio.Lock) -> None: - """Drop lock entry if no longer in use.""" - if not lock.locked(): - self._consolidation_locks.pop(session_key, None) - async def _process_message( self, msg: InboundMessage, @@ -362,7 +345,7 @@ class AgentLoop: # Slash commands cmd = msg.content.strip().lower() if cmd == "/new": - lock = self._get_consolidation_lock(session.key) + lock = self._consolidation_locks.setdefault(session.key, asyncio.Lock()) self._consolidating.add(session.key) try: async with lock: @@ -383,7 +366,8 @@ class AgentLoop: ) finally: self._consolidating.discard(session.key) - self._prune_consolidation_lock(session.key, lock) + if not lock.locked(): + self._consolidation_locks.pop(session.key, None) session.clear() self.sessions.save(session) @@ -397,7 +381,7 @@ class AgentLoop: unconsolidated = len(session.messages) - session.last_consolidated if (unconsolidated >= self.memory_window and session.key not in self._consolidating): self._consolidating.add(session.key) - lock = self._get_consolidation_lock(session.key) + lock = self._consolidation_locks.setdefault(session.key, asyncio.Lock()) async def _consolidate_and_unlock(): try: @@ -405,7 +389,8 @@ class AgentLoop: await self._consolidate_memory(session) finally: self._consolidating.discard(session.key) - self._prune_consolidation_lock(session.key, lock) + if not lock.locked(): + self._consolidation_locks.pop(session.key, None) _task = asyncio.current_task() if _task is not None: self._consolidation_tasks.discard(_task) @@ -441,23 +426,19 @@ class AgentLoop: if final_content is None: final_content = "I've completed processing but have no response to give." - preview = final_content[:120] + "..." if len(final_content) > 120 else final_content - logger.info("Response to {}:{}: {}", msg.channel, msg.sender_id, preview) - self._save_turn(session, all_msgs, 1 + len(history)) self.sessions.save(session) - if message_tool := self.tools.get("message"): - if isinstance(message_tool, MessageTool) and message_tool._sent_in_turn: - return None + if (mt := self.tools.get("message")) and isinstance(mt, MessageTool) and mt._sent_in_turn: + return None + preview = final_content[:120] + "..." if len(final_content) > 120 else final_content + logger.info("Response to {}:{}: {}", msg.channel, msg.sender_id, preview) return OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, content=final_content, metadata=msg.metadata or {}, ) - _TOOL_RESULT_MAX_CHARS = 500 - def _save_turn(self, session: Session, messages: list[dict], skip: int) -> None: """Save new-turn messages into session, truncating large tool results.""" from datetime import datetime diff --git a/nanobot/agent/tools/message.py b/nanobot/agent/tools/message.py index 40e76e3..35e519a 100644 --- a/nanobot/agent/tools/message.py +++ b/nanobot/agent/tools/message.py @@ -101,7 +101,8 @@ class MessageTool(Tool): try: await self._send_callback(msg) - self._sent_in_turn = True + if channel == self._default_channel and chat_id == self._default_chat_id: + self._sent_in_turn = True media_info = f" with {len(media)} attachments" if media else "" return f"Message sent to {channel}:{chat_id}{media_info}" except Exception as e: diff --git a/nanobot/agent/tools/web.py b/nanobot/agent/tools/web.py index 56956c3..7860f12 100644 --- a/nanobot/agent/tools/web.py +++ b/nanobot/agent/tools/web.py @@ -80,7 +80,7 @@ class WebSearchTool(Tool): r = await client.get( "https://api.search.brave.com/res/v1/web/search", params={"q": query, "count": n}, - headers={"Accept": "application/json", "X-Subscription-Token": api_key}, + headers={"Accept": "application/json", "X-Subscription-Token": self.api_key}, timeout=10.0 ) r.raise_for_status() diff --git a/nanobot/channels/feishu.py b/nanobot/channels/feishu.py index 480bf7b..4a6312e 100644 --- a/nanobot/channels/feishu.py +++ b/nanobot/channels/feishu.py @@ -692,7 +692,7 @@ class FeishuChannel(BaseChannel): msg_type = message.message_type # Add reaction - await self._add_reaction(message_id, "THUMBSUP") + await self._add_reaction(message_id, self.config.react_emoji) # Parse content content_parts = [] diff --git a/nanobot/channels/manager.py b/nanobot/channels/manager.py index 77b7294..c8df6b2 100644 --- a/nanobot/channels/manager.py +++ b/nanobot/channels/manager.py @@ -136,6 +136,18 @@ class ChannelManager: logger.info("QQ channel enabled") except ImportError as e: logger.warning("QQ channel not available: {}", e) + + # Matrix channel + if self.config.channels.matrix.enabled: + try: + from nanobot.channels.matrix import MatrixChannel + self.channels["matrix"] = MatrixChannel( + self.config.channels.matrix, + self.bus, + ) + logger.info("Matrix channel enabled") + except ImportError as e: + logger.warning("Matrix channel not available: {}", e) async def _start_channel(self, name: str, channel: BaseChannel) -> None: """Start a channel and log any exceptions.""" diff --git a/nanobot/channels/telegram.py b/nanobot/channels/telegram.py index 808f50c..969d853 100644 --- a/nanobot/channels/telegram.py +++ b/nanobot/channels/telegram.py @@ -127,6 +127,8 @@ class TelegramChannel(BaseChannel): self._app: Application | None = None self._chat_ids: dict[str, int] = {} # Map sender_id to chat_id for replies self._typing_tasks: dict[str, asyncio.Task] = {} # chat_id -> typing loop task + self._media_group_buffers: dict[str, dict] = {} + self._media_group_tasks: dict[str, asyncio.Task] = {} async def start(self) -> None: """Start the Telegram bot with long polling.""" @@ -191,6 +193,11 @@ class TelegramChannel(BaseChannel): # Cancel all typing indicators for chat_id in list(self._typing_tasks): self._stop_typing(chat_id) + + for task in self._media_group_tasks.values(): + task.cancel() + self._media_group_tasks.clear() + self._media_group_buffers.clear() if self._app: logger.info("Stopping Telegram bot...") @@ -399,6 +406,28 @@ class TelegramChannel(BaseChannel): logger.debug("Telegram message from {}: {}...", sender_id, content[:50]) str_chat_id = str(chat_id) + + # Telegram media groups: buffer briefly, forward as one aggregated turn. + if media_group_id := getattr(message, "media_group_id", None): + key = f"{str_chat_id}:{media_group_id}" + if key not in self._media_group_buffers: + self._media_group_buffers[key] = { + "sender_id": sender_id, "chat_id": str_chat_id, + "contents": [], "media": [], + "metadata": { + "message_id": message.message_id, "user_id": user.id, + "username": user.username, "first_name": user.first_name, + "is_group": message.chat.type != "private", + }, + } + self._start_typing(str_chat_id) + buf = self._media_group_buffers[key] + if content and content != "[empty message]": + buf["contents"].append(content) + buf["media"].extend(media_paths) + if key not in self._media_group_tasks: + self._media_group_tasks[key] = asyncio.create_task(self._flush_media_group(key)) + return # Start typing indicator before processing self._start_typing(str_chat_id) @@ -418,6 +447,21 @@ class TelegramChannel(BaseChannel): } ) + async def _flush_media_group(self, key: str) -> None: + """Wait briefly, then forward buffered media-group as one turn.""" + try: + await asyncio.sleep(0.6) + if not (buf := self._media_group_buffers.pop(key, None)): + return + content = "\n".join(buf["contents"]) or "[empty message]" + await self._handle_message( + sender_id=buf["sender_id"], chat_id=buf["chat_id"], + content=content, media=list(dict.fromkeys(buf["media"])), + metadata=buf["metadata"], + ) + finally: + self._media_group_tasks.pop(key, None) + def _start_typing(self, chat_id: str) -> None: """Start sending 'typing...' indicator for a chat.""" # Cancel any existing typing task for this chat diff --git a/nanobot/cli/commands.py b/nanobot/cli/commands.py index 1c20b50..fc4c261 100644 --- a/nanobot/cli/commands.py +++ b/nanobot/cli/commands.py @@ -20,6 +20,7 @@ from prompt_toolkit.patch_stdout import patch_stdout from nanobot import __version__, __logo__ from nanobot.config.schema import Config +from nanobot.utils.helpers import sync_workspace_templates app = typer.Typer( name="nanobot", @@ -185,8 +186,7 @@ def onboard(): workspace.mkdir(parents=True, exist_ok=True) console.print(f"[green]βœ“[/green] Created workspace at {workspace}") - # Create default bootstrap files - _create_workspace_templates(workspace) + sync_workspace_templates(workspace) console.print(f"\n{__logo__} nanobot is ready!") console.print("\nNext steps:") @@ -198,36 +198,6 @@ def onboard(): -def _create_workspace_templates(workspace: Path): - """Create default workspace template files from bundled templates.""" - from importlib.resources import files as pkg_files - - templates_dir = pkg_files("nanobot") / "templates" - - for item in templates_dir.iterdir(): - if not item.name.endswith(".md"): - continue - dest = workspace / item.name - if not dest.exists(): - dest.write_text(item.read_text(encoding="utf-8"), encoding="utf-8") - console.print(f" [dim]Created {item.name}[/dim]") - - memory_dir = workspace / "memory" - memory_dir.mkdir(exist_ok=True) - - memory_template = templates_dir / "memory" / "MEMORY.md" - memory_file = memory_dir / "MEMORY.md" - if not memory_file.exists(): - memory_file.write_text(memory_template.read_text(encoding="utf-8"), encoding="utf-8") - console.print(" [dim]Created memory/MEMORY.md[/dim]") - - history_file = memory_dir / "HISTORY.md" - if not history_file.exists(): - history_file.write_text("", encoding="utf-8") - console.print(" [dim]Created memory/HISTORY.md[/dim]") - - (workspace / "skills").mkdir(exist_ok=True) - def _make_provider(config: Config): """Create the appropriate LLM provider from config.""" @@ -294,6 +264,7 @@ def gateway( console.print(f"{__logo__} Starting nanobot gateway on port {port}...") config = load_config() + sync_workspace_templates(config.workspace_path) bus = MessageBus() provider = _make_provider(config) session_manager = SessionManager(config.workspace_path) @@ -447,6 +418,7 @@ def agent( from loguru import logger config = load_config() + sync_workspace_templates(config.workspace_path) bus = MessageBus() provider = _make_provider(config) diff --git a/nanobot/config/schema.py b/nanobot/config/schema.py index 61aee96..1ff9782 100644 --- a/nanobot/config/schema.py +++ b/nanobot/config/schema.py @@ -42,6 +42,7 @@ class FeishuConfig(Base): encrypt_key: str = "" # Encrypt Key for event subscription (optional) verification_token: str = "" # Verification Token for event subscription (optional) allow_from: list[str] = Field(default_factory=list) # Allowed user open_ids + react_emoji: str = "THUMBSUP" # Emoji type for message reactions (e.g. THUMBSUP, OK, DONE, SMILE) class DingTalkConfig(Base): @@ -183,6 +184,20 @@ class QQConfig(Base): secret: str = "" # ζœΊε™¨δΊΊε―†ι’₯ (AppSecret) from q.qq.com allow_from: list[str] = Field(default_factory=list) # Allowed user openids (empty = public access) +class MatrixConfig(Base): + """Matrix (Element) channel configuration.""" + enabled: bool = False + homeserver: str = "https://matrix.org" + access_token: str = "" + user_id: str = "" # e.g. @bot:matrix.org + device_id: str = "" + e2ee_enabled: bool = True # end-to-end encryption support + sync_stop_grace_seconds: int = 2 # graceful sync_forever shutdown timeout + max_media_bytes: int = 20 * 1024 * 1024 # inbound + outbound attachment limit + allow_from: list[str] = Field(default_factory=list) + group_policy: Literal["open", "mention", "allowlist"] = "open" + group_allow_from: list[str] = Field(default_factory=list) + allow_room_mentions: bool = False class ChannelsConfig(Base): """Configuration for chat channels.""" diff --git a/nanobot/utils/helpers.py b/nanobot/utils/helpers.py index 06d8fd5..8322bc8 100644 --- a/nanobot/utils/helpers.py +++ b/nanobot/utils/helpers.py @@ -1,79 +1,67 @@ """Utility functions for nanobot.""" +import re from pathlib import Path from datetime import datetime + def ensure_dir(path: Path) -> Path: - """Ensure a directory exists, creating it if necessary.""" + """Ensure directory exists, return it.""" path.mkdir(parents=True, exist_ok=True) return path def get_data_path() -> Path: - """Get the nanobot data directory (~/.nanobot).""" + """~/.nanobot data directory.""" return ensure_dir(Path.home() / ".nanobot") def get_workspace_path(workspace: str | None = None) -> Path: - """ - Get the workspace path. - - Args: - workspace: Optional workspace path. Defaults to ~/.nanobot/workspace. - - Returns: - Expanded and ensured workspace path. - """ - if workspace: - path = Path(workspace).expanduser() - else: - path = Path.home() / ".nanobot" / "workspace" + """Resolve and ensure workspace path. Defaults to ~/.nanobot/workspace.""" + path = Path(workspace).expanduser() if workspace else Path.home() / ".nanobot" / "workspace" return ensure_dir(path) -def get_sessions_path() -> Path: - """Get the sessions storage directory.""" - return ensure_dir(get_data_path() / "sessions") - - -def get_skills_path(workspace: Path | None = None) -> Path: - """Get the skills directory within the workspace.""" - ws = workspace or get_workspace_path() - return ensure_dir(ws / "skills") - - def timestamp() -> str: - """Get current timestamp in ISO format.""" + """Current ISO timestamp.""" return datetime.now().isoformat() -def truncate_string(s: str, max_len: int = 100, suffix: str = "...") -> str: - """Truncate a string to max length, adding suffix if truncated.""" - if len(s) <= max_len: - return s - return s[: max_len - len(suffix)] + suffix - +_UNSAFE_CHARS = re.compile(r'[<>:"/\\|?*]') def safe_filename(name: str) -> str: - """Convert a string to a safe filename.""" - # Replace unsafe characters - unsafe = '<>:"/\\|?*' - for char in unsafe: - name = name.replace(char, "_") - return name.strip() + """Replace unsafe path characters with underscores.""" + return _UNSAFE_CHARS.sub("_", name).strip() -def parse_session_key(key: str) -> tuple[str, str]: - """ - Parse a session key into channel and chat_id. - - Args: - key: Session key in format "channel:chat_id" - - Returns: - Tuple of (channel, chat_id) - """ - parts = key.split(":", 1) - if len(parts) != 2: - raise ValueError(f"Invalid session key: {key}") - return parts[0], parts[1] \ No newline at end of file +def sync_workspace_templates(workspace: Path, silent: bool = False) -> list[str]: + """Sync bundled templates to workspace. Only creates missing files.""" + from importlib.resources import files as pkg_files + try: + tpl = pkg_files("nanobot") / "templates" + except Exception: + return [] + if not tpl.is_dir(): + return [] + + added: list[str] = [] + + def _write(src, dest: Path): + if dest.exists(): + return + dest.parent.mkdir(parents=True, exist_ok=True) + dest.write_text(src.read_text(encoding="utf-8") if src else "", encoding="utf-8") + added.append(str(dest.relative_to(workspace))) + + for item in tpl.iterdir(): + if item.name.endswith(".md"): + _write(item, workspace / item.name) + _write(tpl / "memory" / "MEMORY.md", workspace / "memory" / "MEMORY.md") + _write(None, workspace / "memory" / "HISTORY.md") + (workspace / "skills").mkdir(exist_ok=True) + + if added and not silent: + from rich.console import Console + for name in added: + Console().print(f" [dim]Created {name}[/dim]") + return added diff --git a/tests/test_consolidate_offset.py b/tests/test_consolidate_offset.py index 323519e..6755124 100644 --- a/tests/test_consolidate_offset.py +++ b/tests/test_consolidate_offset.py @@ -812,7 +812,7 @@ class TestConsolidationDeduplicationGuard: loop.sessions.save(session) # Ensure lock exists before /new. - _ = loop._get_consolidation_lock(session.key) + loop._consolidation_locks.setdefault(session.key, asyncio.Lock()) assert session.key in loop._consolidation_locks async def _ok_consolidate(sess, archive_all: bool = False) -> bool: diff --git a/tests/test_message_tool_suppress.py b/tests/test_message_tool_suppress.py new file mode 100644 index 0000000..26b8a16 --- /dev/null +++ b/tests/test_message_tool_suppress.py @@ -0,0 +1,103 @@ +"""Test message tool suppress logic for final replies.""" + +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from nanobot.agent.loop import AgentLoop +from nanobot.agent.tools.message import MessageTool +from nanobot.bus.events import InboundMessage, OutboundMessage +from nanobot.bus.queue import MessageBus +from nanobot.providers.base import LLMResponse, ToolCallRequest + + +def _make_loop(tmp_path: Path) -> AgentLoop: + bus = MessageBus() + provider = MagicMock() + provider.get_default_model.return_value = "test-model" + return AgentLoop(bus=bus, provider=provider, workspace=tmp_path, model="test-model", memory_window=10) + + +class TestMessageToolSuppressLogic: + """Final reply suppressed only when message tool sends to the same target.""" + + @pytest.mark.asyncio + async def test_suppress_when_sent_to_same_target(self, tmp_path: Path) -> None: + loop = _make_loop(tmp_path) + tool_call = ToolCallRequest( + id="call1", name="message", + arguments={"content": "Hello", "channel": "feishu", "chat_id": "chat123"}, + ) + calls = iter([ + LLMResponse(content="", tool_calls=[tool_call]), + LLMResponse(content="Done", tool_calls=[]), + ]) + loop.provider.chat = AsyncMock(side_effect=lambda *a, **kw: next(calls)) + loop.tools.get_definitions = MagicMock(return_value=[]) + + sent: list[OutboundMessage] = [] + mt = loop.tools.get("message") + if isinstance(mt, MessageTool): + mt.set_send_callback(AsyncMock(side_effect=lambda m: sent.append(m))) + + msg = InboundMessage(channel="feishu", sender_id="user1", chat_id="chat123", content="Send") + result = await loop._process_message(msg) + + assert len(sent) == 1 + assert result is None # suppressed + + @pytest.mark.asyncio + async def test_not_suppress_when_sent_to_different_target(self, tmp_path: Path) -> None: + loop = _make_loop(tmp_path) + tool_call = ToolCallRequest( + id="call1", name="message", + arguments={"content": "Email content", "channel": "email", "chat_id": "user@example.com"}, + ) + calls = iter([ + LLMResponse(content="", tool_calls=[tool_call]), + LLMResponse(content="I've sent the email.", tool_calls=[]), + ]) + loop.provider.chat = AsyncMock(side_effect=lambda *a, **kw: next(calls)) + loop.tools.get_definitions = MagicMock(return_value=[]) + + sent: list[OutboundMessage] = [] + mt = loop.tools.get("message") + if isinstance(mt, MessageTool): + mt.set_send_callback(AsyncMock(side_effect=lambda m: sent.append(m))) + + msg = InboundMessage(channel="feishu", sender_id="user1", chat_id="chat123", content="Send email") + result = await loop._process_message(msg) + + assert len(sent) == 1 + assert sent[0].channel == "email" + assert result is not None # not suppressed + assert result.channel == "feishu" + + @pytest.mark.asyncio + async def test_not_suppress_when_no_message_tool_used(self, tmp_path: Path) -> None: + loop = _make_loop(tmp_path) + loop.provider.chat = AsyncMock(return_value=LLMResponse(content="Hello!", tool_calls=[])) + loop.tools.get_definitions = MagicMock(return_value=[]) + + msg = InboundMessage(channel="feishu", sender_id="user1", chat_id="chat123", content="Hi") + result = await loop._process_message(msg) + + assert result is not None + assert "Hello" in result.content + + +class TestMessageToolTurnTracking: + + def test_sent_in_turn_tracks_same_target(self) -> None: + tool = MessageTool() + tool.set_context("feishu", "chat1") + assert not tool._sent_in_turn + tool._sent_in_turn = True + assert tool._sent_in_turn + + def test_start_turn_resets(self) -> None: + tool = MessageTool() + tool._sent_in_turn = True + tool.start_turn() + assert not tool._sent_in_turn