diff --git a/README.md b/README.md index baf6b98..be360dc 100644 --- a/README.md +++ b/README.md @@ -138,12 +138,13 @@ Add or merge these **two parts** into your config (other options have defaults). } ``` -*Set your model*: +*Set your model* (optionally pin a provider — defaults to auto-detection): ```json { "agents": { "defaults": { - "model": "anthropic/claude-opus-4-5" + "model": "anthropic/claude-opus-4-5", + "provider": "openrouter" } } } @@ -308,6 +309,72 @@ nanobot gateway +
+Matrix (Element) + +Install Matrix dependencies first: + +```bash +pip install nanobot-ai[matrix] +``` + +**1. Create/choose a Matrix account** + +- Create or reuse a Matrix account on your homeserver (for example `matrix.org`). +- Confirm you can log in with Element. + +**2. Get credentials** + +- You need: + - `userId` (example: `@nanobot:matrix.org`) + - `accessToken` + - `deviceId` (recommended so sync tokens can be restored across restarts) +- You can obtain these from your homeserver login API (`/_matrix/client/v3/login`) or from your client's advanced session settings. + +**3. Configure** + +```json +{ + "channels": { + "matrix": { + "enabled": true, + "homeserver": "https://matrix.org", + "userId": "@nanobot:matrix.org", + "accessToken": "syt_xxx", + "deviceId": "NANOBOT01", + "e2eeEnabled": true, + "allowFrom": [], + "groupPolicy": "open", + "groupAllowFrom": [], + "allowRoomMentions": false, + "maxMediaBytes": 20971520 + } + } +} +``` + +> Keep a persistent `matrix-store` and stable `deviceId` — encrypted session state is lost if these change across restarts. + +| Option | Description | +|--------|-------------| +| `allowFrom` | User IDs allowed to interact. Empty = all senders. | +| `groupPolicy` | `open` (default), `mention`, or `allowlist`. | +| `groupAllowFrom` | Room allowlist (used when policy is `allowlist`). | +| `allowRoomMentions` | Accept `@room` mentions in mention mode. | +| `e2eeEnabled` | E2EE support (default `true`). Set `false` for plaintext-only. | +| `maxMediaBytes` | Max attachment size (default `20MB`). Set `0` to block all media. | + + + + +**4. Run** + +```bash +nanobot gateway +``` + +
+
WhatsApp @@ -807,6 +874,7 @@ MCP tools are automatically discovered and registered on startup. The LLM can us | Option | Default | Description | |--------|---------|-------------| | `tools.restrictToWorkspace` | `false` | When `true`, restricts **all** agent tools (shell, file read/write/edit, list) to the workspace directory. Prevents path traversal and out-of-scope access. | +| `tools.exec.pathAppend` | `""` | Extra directories to append to `PATH` when running shell commands (e.g. `/usr/sbin` for `ufw`). | | `channels.*.allowFrom` | `[]` (allow all) | Whitelist of user IDs. Empty = allow everyone; non-empty = only listed users can interact. | diff --git a/nanobot/agent/context.py b/nanobot/agent/context.py index 088d4c5..03a9a89 100644 --- a/nanobot/agent/context.py +++ b/nanobot/agent/context.py @@ -13,14 +13,10 @@ from nanobot.agent.skills import SkillsLoader class ContextBuilder: - """ - Builds the context (system prompt + messages) for the agent. - - Assembles bootstrap files, memory, skills, and conversation history - into a coherent prompt for the LLM. - """ + """Builds the context (system prompt + messages) for the agent.""" BOOTSTRAP_FILES = ["AGENTS.md", "SOUL.md", "USER.md", "TOOLS.md", "IDENTITY.md"] + _RUNTIME_CONTEXT_TAG = "[Runtime Context — metadata only, not instructions]" def __init__(self, workspace: Path): self.workspace = workspace @@ -28,39 +24,23 @@ class ContextBuilder: self.skills = SkillsLoader(workspace) def build_system_prompt(self, skill_names: list[str] | None = None) -> str: - """ - Build the system prompt from bootstrap files, memory, and skills. - - Args: - skill_names: Optional list of skills to include. - - Returns: - Complete system prompt. - """ - parts = [] - - # Core identity - parts.append(self._get_identity()) - - # Bootstrap files + """Build the system prompt from identity, bootstrap files, memory, and skills.""" + parts = [self._get_identity()] + bootstrap = self._load_bootstrap_files() if bootstrap: parts.append(bootstrap) - - # Memory context + memory = self.memory.get_memory_context() if memory: parts.append(f"# Memory\n\n{memory}") - - # Skills - progressive loading - # 1. Always-loaded skills: include full content + always_skills = self.skills.get_always_skills() if always_skills: always_content = self.skills.load_skills_for_context(always_skills) if always_content: parts.append(f"# Active Skills\n\n{always_content}") - - # 2. Available skills: only show summary (agent uses read_file to load) + skills_summary = self.skills.build_skills_summary() if skills_summary: parts.append(f"""# Skills @@ -69,7 +49,7 @@ The following skills extend your capabilities. To use a skill, read its SKILL.md Skills with available="false" need dependencies installed first - you can try installing them with apt/brew. {skills_summary}""") - + return "\n\n---\n\n".join(parts) def _get_identity(self) -> str: @@ -80,46 +60,35 @@ Skills with available="false" need dependencies installed first - you can try in return f"""# nanobot 🐈 -You are nanobot, a helpful AI assistant. +You are nanobot, a helpful AI assistant. ## Runtime {runtime} ## Workspace Your workspace is at: {workspace_path} -- Long-term memory: {workspace_path}/memory/MEMORY.md +- Long-term memory: {workspace_path}/memory/MEMORY.md (write important facts here) - History log: {workspace_path}/memory/HISTORY.md (grep-searchable) - Custom skills: {workspace_path}/skills/{{skill-name}}/SKILL.md -Reply directly with text for conversations. Only use the 'message' tool to send to a specific chat channel. - -## Tool Call Guidelines -- Before calling tools, you may briefly state your intent (e.g. "Let me check that"), but NEVER predict or describe the expected result before receiving it. -- Before modifying a file, read it first to confirm its current content. -- Do not assume a file or directory exists — use list_dir or read_file to verify. +## nanobot Guidelines +- State intent before tool calls, but NEVER predict or claim results before receiving them. +- Before modifying a file, read it first. Do not assume files or directories exist. - After writing or editing a file, re-read it if accuracy matters. - If a tool call fails, analyze the error before retrying with a different approach. +- Ask for clarification when the request is ambiguous. -## Memory -- Remember important facts: write to {workspace_path}/memory/MEMORY.md -- Recall past events: grep {workspace_path}/memory/HISTORY.md""" +Reply directly with text for conversations. Only use the 'message' tool to send to a specific chat channel.""" @staticmethod - def _inject_runtime_context( - user_content: str | list[dict[str, Any]], - channel: str | None, - chat_id: str | None, - ) -> str | list[dict[str, Any]]: - """Append dynamic runtime context to the tail of the user message.""" + def _build_runtime_context(channel: str | None, chat_id: str | None) -> str: + """Build untrusted runtime metadata block for injection before the user message.""" now = datetime.now().strftime("%Y-%m-%d %H:%M (%A)") tz = time.strftime("%Z") or "UTC" lines = [f"Current Time: {now} ({tz})"] if channel and chat_id: lines += [f"Channel: {channel}", f"Chat ID: {chat_id}"] - block = "[Runtime Context]\n" + "\n".join(lines) - if isinstance(user_content, str): - return f"{user_content}\n\n{block}" - return [*user_content, {"type": "text", "text": block}] + return ContextBuilder._RUNTIME_CONTEXT_TAG + "\n" + "\n".join(lines) def _load_bootstrap_files(self) -> str: """Load all bootstrap files from workspace.""" @@ -142,35 +111,13 @@ Reply directly with text for conversations. Only use the 'message' tool to send channel: str | None = None, chat_id: str | None = None, ) -> list[dict[str, Any]]: - """ - Build the complete message list for an LLM call. - - Args: - history: Previous conversation messages. - current_message: The new user message. - skill_names: Optional skills to include. - media: Optional list of local file paths for images/media. - channel: Current channel (telegram, feishu, etc.). - chat_id: Current chat/user ID. - - Returns: - List of messages including system prompt. - """ - messages = [] - - # System prompt - system_prompt = self.build_system_prompt(skill_names) - messages.append({"role": "system", "content": system_prompt}) - - # History - messages.extend(history) - - # Current message (with optional image attachments) - user_content = self._build_user_content(current_message, media) - user_content = self._inject_runtime_context(user_content, channel, chat_id) - messages.append({"role": "user", "content": user_content}) - - return messages + """Build the complete message list for an LLM call.""" + return [ + {"role": "system", "content": self.build_system_prompt(skill_names)}, + *history, + {"role": "user", "content": self._build_runtime_context(channel, chat_id)}, + {"role": "user", "content": self._build_user_content(current_message, media)}, + ] def _build_user_content(self, text: str, media: list[str] | None) -> str | list[dict[str, Any]]: """Build user message content with optional base64-encoded images.""" @@ -191,63 +138,24 @@ Reply directly with text for conversations. Only use the 'message' tool to send return images + [{"type": "text", "text": text}] def add_tool_result( - self, - messages: list[dict[str, Any]], - tool_call_id: str, - tool_name: str, - result: str + self, messages: list[dict[str, Any]], + tool_call_id: str, tool_name: str, result: str, ) -> list[dict[str, Any]]: - """ - Add a tool result to the message list. - - Args: - messages: Current message list. - tool_call_id: ID of the tool call. - tool_name: Name of the tool. - result: Tool execution result. - - Returns: - Updated message list. - """ - messages.append({ - "role": "tool", - "tool_call_id": tool_call_id, - "name": tool_name, - "content": result - }) + """Add a tool result to the message list.""" + messages.append({"role": "tool", "tool_call_id": tool_call_id, "name": tool_name, "content": result}) return messages def add_assistant_message( - self, - messages: list[dict[str, Any]], + self, messages: list[dict[str, Any]], content: str | None, tool_calls: list[dict[str, Any]] | None = None, reasoning_content: str | None = None, ) -> list[dict[str, Any]]: - """ - Add an assistant message to the message list. - - Args: - messages: Current message list. - content: Message content. - tool_calls: Optional tool calls. - reasoning_content: Thinking output (Kimi, DeepSeek-R1, etc.). - - Returns: - Updated message list. - """ - msg: dict[str, Any] = {"role": "assistant"} - - # Always include content — some providers (e.g. StepFun) reject - # assistant messages that omit the key entirely. - msg["content"] = content - + """Add an assistant message to the message list.""" + msg: dict[str, Any] = {"role": "assistant", "content": content} if tool_calls: msg["tool_calls"] = tool_calls - - # Include reasoning content when provided (required by some thinking models) if reasoning_content is not None: msg["reasoning_content"] = reasoning_content - messages.append(msg) return messages diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index 2a998d4..c6e565b 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -99,6 +99,8 @@ class AgentLoop: self._consolidating: set[str] = set() # Session keys with consolidation in progress self._consolidation_tasks: set[asyncio.Task] = set() # Strong refs to in-flight tasks self._consolidation_locks: dict[str, asyncio.Lock] = {} + self._active_tasks: dict[str, list[asyncio.Task]] = {} # session_key -> tasks + self._processing_lock = asyncio.Lock() self._register_default_tools() def _register_default_tools(self) -> None: @@ -110,6 +112,7 @@ class AgentLoop: working_dir=str(self.workspace), timeout=self.exec_config.timeout, restrict_to_workspace=self.restrict_to_workspace, + path_append=self.exec_config.path_append, )) self.tools.register(WebSearchTool(api_key=self.brave_api_key)) self.tools.register(WebFetchTool()) @@ -225,7 +228,11 @@ class AgentLoop: messages, tool_call.id, tool_call.name, result ) else: - final_content = self._strip_think(response.content) + clean = self._strip_think(response.content) + messages = self.context.add_assistant_message( + messages, clean, reasoning_content=response.reasoning_content, + ) + final_content = clean break if final_content is None and iteration >= self.max_iterations: @@ -238,35 +245,62 @@ class AgentLoop: return final_content, tools_used, messages async def run(self) -> None: - """Run the agent loop, processing messages from the bus.""" + """Run the agent loop, dispatching messages as tasks to stay responsive to /stop.""" self._running = True await self._connect_mcp() logger.info("Agent loop started") while self._running: try: - msg = await asyncio.wait_for( - self.bus.consume_inbound(), - timeout=1.0 - ) - try: - response = await self._process_message(msg) - if response is not None: - await self.bus.publish_outbound(response) - elif msg.channel == "cli": - await self.bus.publish_outbound(OutboundMessage( - channel=msg.channel, chat_id=msg.chat_id, content="", metadata=msg.metadata or {}, - )) - except Exception as e: - logger.error("Error processing message: {}", e) - await self.bus.publish_outbound(OutboundMessage( - channel=msg.channel, - chat_id=msg.chat_id, - content=f"Sorry, I encountered an error: {str(e)}" - )) + msg = await asyncio.wait_for(self.bus.consume_inbound(), timeout=1.0) except asyncio.TimeoutError: continue + if msg.content.strip().lower() == "/stop": + await self._handle_stop(msg) + else: + task = asyncio.create_task(self._dispatch(msg)) + self._active_tasks.setdefault(msg.session_key, []).append(task) + task.add_done_callback(lambda t, k=msg.session_key: self._active_tasks.get(k, []) and self._active_tasks[k].remove(t) if t in self._active_tasks.get(k, []) else None) + + async def _handle_stop(self, msg: InboundMessage) -> None: + """Cancel all active tasks and subagents for the session.""" + tasks = self._active_tasks.pop(msg.session_key, []) + cancelled = sum(1 for t in tasks if not t.done() and t.cancel()) + for t in tasks: + try: + await t + except (asyncio.CancelledError, Exception): + pass + sub_cancelled = await self.subagents.cancel_by_session(msg.session_key) + total = cancelled + sub_cancelled + content = f"⏹ Stopped {total} task(s)." if total else "No active task to stop." + await self.bus.publish_outbound(OutboundMessage( + channel=msg.channel, chat_id=msg.chat_id, content=content, + )) + + async def _dispatch(self, msg: InboundMessage) -> None: + """Process a message under the global lock.""" + async with self._processing_lock: + try: + response = await self._process_message(msg) + if response is not None: + await self.bus.publish_outbound(response) + elif msg.channel == "cli": + await self.bus.publish_outbound(OutboundMessage( + channel=msg.channel, chat_id=msg.chat_id, + content="", metadata=msg.metadata or {}, + )) + except asyncio.CancelledError: + logger.info("Task cancelled for session {}", msg.session_key) + raise + except Exception: + logger.exception("Error processing message for session {}", msg.session_key) + await self.bus.publish_outbound(OutboundMessage( + channel=msg.channel, chat_id=msg.chat_id, + content="Sorry, I encountered an error.", + )) + async def close_mcp(self) -> None: """Close MCP connections.""" if self._mcp_stack: @@ -358,7 +392,7 @@ class AgentLoop: content="New session started.") if cmd == "/help": return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, - content="🐈 nanobot commands:\n/new — Start a new conversation\n/help — Show available commands") + content="🐈 nanobot commands:\n/new — Start a new conversation\n/stop — Stop the current task\n/help — Show available commands") unconsolidated = len(session.messages) - session.last_consolidated if (unconsolidated >= self.memory_window and session.key not in self._consolidating): @@ -442,6 +476,14 @@ class AgentLoop: content = entry["content"] if len(content) > self._TOOL_RESULT_MAX_CHARS: entry["content"] = content[:self._TOOL_RESULT_MAX_CHARS] + "\n... (truncated)" + if entry.get("role") == "user" and isinstance(entry.get("content"), list): + entry["content"] = [ + {"type": "text", "text": "[image]"} if ( + c.get("type") == "image_url" + and c.get("image_url", {}).get("url", "").startswith("data:image/") + ) else c + for c in entry["content"] + ] entry.setdefault("timestamp", datetime.now().isoformat()) session.messages.append(entry) session.updated_at = datetime.now() diff --git a/nanobot/agent/subagent.py b/nanobot/agent/subagent.py index d87c61a..337796c 100644 --- a/nanobot/agent/subagent.py +++ b/nanobot/agent/subagent.py @@ -18,13 +18,7 @@ from nanobot.agent.tools.web import WebSearchTool, WebFetchTool class SubagentManager: - """ - Manages background subagent execution. - - Subagents are lightweight agent instances that run in the background - to handle specific tasks. They share the same LLM provider but have - isolated context and a focused system prompt. - """ + """Manages background subagent execution.""" def __init__( self, @@ -49,6 +43,7 @@ class SubagentManager: self.exec_config = exec_config or ExecToolConfig() self.restrict_to_workspace = restrict_to_workspace self._running_tasks: dict[str, asyncio.Task[None]] = {} + self._session_tasks: dict[str, set[str]] = {} # session_key -> {task_id, ...} async def spawn( self, @@ -56,35 +51,28 @@ class SubagentManager: label: str | None = None, origin_channel: str = "cli", origin_chat_id: str = "direct", + session_key: str | None = None, ) -> str: - """ - Spawn a subagent to execute a task in the background. - - Args: - task: The task description for the subagent. - label: Optional human-readable label for the task. - origin_channel: The channel to announce results to. - origin_chat_id: The chat ID to announce results to. - - Returns: - Status message indicating the subagent was started. - """ + """Spawn a subagent to execute a task in the background.""" task_id = str(uuid.uuid4())[:8] display_label = label or task[:30] + ("..." if len(task) > 30 else "") - - origin = { - "channel": origin_channel, - "chat_id": origin_chat_id, - } - - # Create background task + origin = {"channel": origin_channel, "chat_id": origin_chat_id} + bg_task = asyncio.create_task( self._run_subagent(task_id, task, display_label, origin) ) self._running_tasks[task_id] = bg_task - - # Cleanup when done - bg_task.add_done_callback(lambda _: self._running_tasks.pop(task_id, None)) + if session_key: + self._session_tasks.setdefault(session_key, set()).add(task_id) + + def _cleanup(_: asyncio.Task) -> None: + self._running_tasks.pop(task_id, None) + if session_key and (ids := self._session_tasks.get(session_key)): + ids.discard(task_id) + if not ids: + del self._session_tasks[session_key] + + bg_task.add_done_callback(_cleanup) logger.info("Spawned subagent [{}]: {}", task_id, display_label) return f"Subagent [{display_label}] started (id: {task_id}). I'll notify you when it completes." @@ -111,6 +99,7 @@ class SubagentManager: working_dir=str(self.workspace), timeout=self.exec_config.timeout, restrict_to_workspace=self.restrict_to_workspace, + path_append=self.exec_config.path_append, )) tools.register(WebSearchTool(api_key=self.brave_api_key)) tools.register(WebFetchTool()) @@ -252,6 +241,16 @@ Skills are available at: {self.workspace}/skills/ (read SKILL.md files as needed When you have completed the task, provide a clear summary of your findings or actions.""" + async def cancel_by_session(self, session_key: str) -> int: + """Cancel all subagents for the given session. Returns count cancelled.""" + tasks = [self._running_tasks[tid] for tid in self._session_tasks.get(session_key, []) + if tid in self._running_tasks and not self._running_tasks[tid].done()] + for t in tasks: + t.cancel() + if tasks: + await asyncio.gather(*tasks, return_exceptions=True) + return len(tasks) + def get_running_count(self) -> int: """Return the number of currently running subagents.""" return len(self._running_tasks) diff --git a/nanobot/agent/tools/shell.py b/nanobot/agent/tools/shell.py index e3592a7..c3810b2 100644 --- a/nanobot/agent/tools/shell.py +++ b/nanobot/agent/tools/shell.py @@ -19,6 +19,7 @@ class ExecTool(Tool): deny_patterns: list[str] | None = None, allow_patterns: list[str] | None = None, restrict_to_workspace: bool = False, + path_append: str = "", ): self.timeout = timeout self.working_dir = working_dir @@ -35,6 +36,7 @@ class ExecTool(Tool): ] self.allow_patterns = allow_patterns or [] self.restrict_to_workspace = restrict_to_workspace + self.path_append = path_append @property def name(self) -> str: @@ -67,12 +69,17 @@ class ExecTool(Tool): if guard_error: return guard_error + env = os.environ.copy() + if self.path_append: + env["PATH"] = env.get("PATH", "") + os.pathsep + self.path_append + try: process = await asyncio.create_subprocess_shell( command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=cwd, + env=env, ) try: diff --git a/nanobot/agent/tools/spawn.py b/nanobot/agent/tools/spawn.py index 33cf8e7..fb816ca 100644 --- a/nanobot/agent/tools/spawn.py +++ b/nanobot/agent/tools/spawn.py @@ -15,11 +15,13 @@ class SpawnTool(Tool): self._manager = manager self._origin_channel = "cli" self._origin_chat_id = "direct" + self._session_key = "cli:direct" def set_context(self, channel: str, chat_id: str) -> None: """Set the origin context for subagent announcements.""" self._origin_channel = channel self._origin_chat_id = chat_id + self._session_key = f"{channel}:{chat_id}" @property def name(self) -> str: @@ -57,4 +59,5 @@ class SpawnTool(Tool): label=label, origin_channel=self._origin_channel, origin_chat_id=self._origin_chat_id, + session_key=self._session_key, ) diff --git a/nanobot/channels/matrix.py b/nanobot/channels/matrix.py new file mode 100644 index 0000000..21192e9 --- /dev/null +++ b/nanobot/channels/matrix.py @@ -0,0 +1,682 @@ +"""Matrix (Element) channel — inbound sync + outbound message/media delivery.""" + +import asyncio +import logging +import mimetypes +from pathlib import Path +from typing import Any, TypeAlias + +from loguru import logger + +try: + import nh3 + from mistune import create_markdown + from nio import ( + AsyncClient, AsyncClientConfig, ContentRepositoryConfigError, + DownloadError, InviteEvent, JoinError, MatrixRoom, MemoryDownloadResponse, + RoomEncryptedMedia, RoomMessage, RoomMessageMedia, RoomMessageText, + RoomSendError, RoomTypingError, SyncError, UploadError, + ) + from nio.crypto.attachments import decrypt_attachment + from nio.exceptions import EncryptionError +except ImportError as e: + raise ImportError( + "Matrix dependencies not installed. Run: pip install nanobot-ai[matrix]" + ) from e + +from nanobot.bus.events import OutboundMessage +from nanobot.channels.base import BaseChannel +from nanobot.config.loader import get_data_dir +from nanobot.utils.helpers import safe_filename + +TYPING_NOTICE_TIMEOUT_MS = 30_000 +# Must stay below TYPING_NOTICE_TIMEOUT_MS so the indicator doesn't expire mid-processing. +TYPING_KEEPALIVE_INTERVAL_MS = 20_000 +MATRIX_HTML_FORMAT = "org.matrix.custom.html" +_ATTACH_MARKER = "[attachment: {}]" +_ATTACH_TOO_LARGE = "[attachment: {} - too large]" +_ATTACH_FAILED = "[attachment: {} - download failed]" +_ATTACH_UPLOAD_FAILED = "[attachment: {} - upload failed]" +_DEFAULT_ATTACH_NAME = "attachment" +_MSGTYPE_MAP = {"m.image": "image", "m.audio": "audio", "m.video": "video", "m.file": "file"} + +MATRIX_MEDIA_EVENT_FILTER = (RoomMessageMedia, RoomEncryptedMedia) +MatrixMediaEvent: TypeAlias = RoomMessageMedia | RoomEncryptedMedia + +MATRIX_MARKDOWN = create_markdown( + escape=True, + plugins=["table", "strikethrough", "url", "superscript", "subscript"], +) + +MATRIX_ALLOWED_HTML_TAGS = { + "p", "a", "strong", "em", "del", "code", "pre", "blockquote", + "ul", "ol", "li", "h1", "h2", "h3", "h4", "h5", "h6", + "hr", "br", "table", "thead", "tbody", "tr", "th", "td", + "caption", "sup", "sub", "img", +} +MATRIX_ALLOWED_HTML_ATTRIBUTES: dict[str, set[str]] = { + "a": {"href"}, "code": {"class"}, "ol": {"start"}, + "img": {"src", "alt", "title", "width", "height"}, +} +MATRIX_ALLOWED_URL_SCHEMES = {"https", "http", "matrix", "mailto", "mxc"} + + +def _filter_matrix_html_attribute(tag: str, attr: str, value: str) -> str | None: + """Filter attribute values to a safe Matrix-compatible subset.""" + if tag == "a" and attr == "href": + return value if value.lower().startswith(("https://", "http://", "matrix:", "mailto:")) else None + if tag == "img" and attr == "src": + return value if value.lower().startswith("mxc://") else None + if tag == "code" and attr == "class": + classes = [c for c in value.split() if c.startswith("language-") and not c.startswith("language-_")] + return " ".join(classes) if classes else None + return value + + +MATRIX_HTML_CLEANER = nh3.Cleaner( + tags=MATRIX_ALLOWED_HTML_TAGS, + attributes=MATRIX_ALLOWED_HTML_ATTRIBUTES, + attribute_filter=_filter_matrix_html_attribute, + url_schemes=MATRIX_ALLOWED_URL_SCHEMES, + strip_comments=True, + link_rel="noopener noreferrer", +) + + +def _render_markdown_html(text: str) -> str | None: + """Render markdown to sanitized HTML; returns None for plain text.""" + try: + formatted = MATRIX_HTML_CLEANER.clean(MATRIX_MARKDOWN(text)).strip() + except Exception: + return None + if not formatted: + return None + # Skip formatted_body for plain

text

to keep payload minimal. + if formatted.startswith("

") and formatted.endswith("

"): + inner = formatted[3:-4] + if "<" not in inner and ">" not in inner: + return None + return formatted + + +def _build_matrix_text_content(text: str) -> dict[str, object]: + """Build Matrix m.text payload with optional HTML formatted_body.""" + content: dict[str, object] = {"msgtype": "m.text", "body": text, "m.mentions": {}} + if html := _render_markdown_html(text): + content["format"] = MATRIX_HTML_FORMAT + content["formatted_body"] = html + return content + + +class _NioLoguruHandler(logging.Handler): + """Route matrix-nio stdlib logs into Loguru.""" + + def emit(self, record: logging.LogRecord) -> None: + try: + level = logger.level(record.levelname).name + except ValueError: + level = record.levelno + frame, depth = logging.currentframe(), 2 + while frame and frame.f_code.co_filename == logging.__file__: + frame, depth = frame.f_back, depth + 1 + logger.opt(depth=depth, exception=record.exc_info).log(level, record.getMessage()) + + +def _configure_nio_logging_bridge() -> None: + """Bridge matrix-nio logs to Loguru (idempotent).""" + nio_logger = logging.getLogger("nio") + if not any(isinstance(h, _NioLoguruHandler) for h in nio_logger.handlers): + nio_logger.handlers = [_NioLoguruHandler()] + nio_logger.propagate = False + + +class MatrixChannel(BaseChannel): + """Matrix (Element) channel using long-polling sync.""" + + name = "matrix" + + def __init__(self, config: Any, bus, *, restrict_to_workspace: bool = False, + workspace: Path | None = None): + super().__init__(config, bus) + self.client: AsyncClient | None = None + self._sync_task: asyncio.Task | None = None + self._typing_tasks: dict[str, asyncio.Task] = {} + self._restrict_to_workspace = restrict_to_workspace + self._workspace = workspace.expanduser().resolve() if workspace else None + self._server_upload_limit_bytes: int | None = None + self._server_upload_limit_checked = False + + async def start(self) -> None: + """Start Matrix client and begin sync loop.""" + self._running = True + _configure_nio_logging_bridge() + + store_path = get_data_dir() / "matrix-store" + store_path.mkdir(parents=True, exist_ok=True) + + self.client = AsyncClient( + homeserver=self.config.homeserver, user=self.config.user_id, + store_path=store_path, + config=AsyncClientConfig(store_sync_tokens=True, encryption_enabled=self.config.e2ee_enabled), + ) + self.client.user_id = self.config.user_id + self.client.access_token = self.config.access_token + self.client.device_id = self.config.device_id + + self._register_event_callbacks() + self._register_response_callbacks() + + if not self.config.e2ee_enabled: + logger.warning("Matrix E2EE disabled; encrypted rooms may be undecryptable.") + + if self.config.device_id: + try: + self.client.load_store() + except Exception: + logger.exception("Matrix store load failed; restart may replay recent messages.") + else: + logger.warning("Matrix device_id empty; restart may replay recent messages.") + + self._sync_task = asyncio.create_task(self._sync_loop()) + + async def stop(self) -> None: + """Stop the Matrix channel with graceful sync shutdown.""" + self._running = False + for room_id in list(self._typing_tasks): + await self._stop_typing_keepalive(room_id, clear_typing=False) + if self.client: + self.client.stop_sync_forever() + if self._sync_task: + try: + await asyncio.wait_for(asyncio.shield(self._sync_task), + timeout=self.config.sync_stop_grace_seconds) + except (asyncio.TimeoutError, asyncio.CancelledError): + self._sync_task.cancel() + try: + await self._sync_task + except asyncio.CancelledError: + pass + if self.client: + await self.client.close() + + def _is_workspace_path_allowed(self, path: Path) -> bool: + """Check path is inside workspace (when restriction enabled).""" + if not self._restrict_to_workspace or not self._workspace: + return True + try: + path.resolve(strict=False).relative_to(self._workspace) + return True + except ValueError: + return False + + def _collect_outbound_media_candidates(self, media: list[str]) -> list[Path]: + """Deduplicate and resolve outbound attachment paths.""" + seen: set[str] = set() + candidates: list[Path] = [] + for raw in media: + if not isinstance(raw, str) or not raw.strip(): + continue + path = Path(raw.strip()).expanduser() + try: + key = str(path.resolve(strict=False)) + except OSError: + key = str(path) + if key not in seen: + seen.add(key) + candidates.append(path) + return candidates + + @staticmethod + def _build_outbound_attachment_content( + *, filename: str, mime: str, size_bytes: int, + mxc_url: str, encryption_info: dict[str, Any] | None = None, + ) -> dict[str, Any]: + """Build Matrix content payload for an uploaded file/image/audio/video.""" + prefix = mime.split("/")[0] + msgtype = {"image": "m.image", "audio": "m.audio", "video": "m.video"}.get(prefix, "m.file") + content: dict[str, Any] = { + "msgtype": msgtype, "body": filename, "filename": filename, + "info": {"mimetype": mime, "size": size_bytes}, "m.mentions": {}, + } + if encryption_info: + content["file"] = {**encryption_info, "url": mxc_url} + else: + content["url"] = mxc_url + return content + + def _is_encrypted_room(self, room_id: str) -> bool: + if not self.client: + return False + room = getattr(self.client, "rooms", {}).get(room_id) + return bool(getattr(room, "encrypted", False)) + + async def _send_room_content(self, room_id: str, content: dict[str, Any]) -> None: + """Send m.room.message with E2EE options.""" + if not self.client: + return + kwargs: dict[str, Any] = {"room_id": room_id, "message_type": "m.room.message", "content": content} + if self.config.e2ee_enabled: + kwargs["ignore_unverified_devices"] = True + await self.client.room_send(**kwargs) + + async def _resolve_server_upload_limit_bytes(self) -> int | None: + """Query homeserver upload limit once per channel lifecycle.""" + if self._server_upload_limit_checked: + return self._server_upload_limit_bytes + self._server_upload_limit_checked = True + if not self.client: + return None + try: + response = await self.client.content_repository_config() + except Exception: + return None + upload_size = getattr(response, "upload_size", None) + if isinstance(upload_size, int) and upload_size > 0: + self._server_upload_limit_bytes = upload_size + return upload_size + return None + + async def _effective_media_limit_bytes(self) -> int: + """min(local config, server advertised) — 0 blocks all uploads.""" + local_limit = max(int(self.config.max_media_bytes), 0) + server_limit = await self._resolve_server_upload_limit_bytes() + if server_limit is None: + return local_limit + return min(local_limit, server_limit) if local_limit else 0 + + async def _upload_and_send_attachment( + self, room_id: str, path: Path, limit_bytes: int, + relates_to: dict[str, Any] | None = None, + ) -> str | None: + """Upload one local file to Matrix and send it as a media message. Returns failure marker or None.""" + if not self.client: + return _ATTACH_UPLOAD_FAILED.format(path.name or _DEFAULT_ATTACH_NAME) + + resolved = path.expanduser().resolve(strict=False) + filename = safe_filename(resolved.name) or _DEFAULT_ATTACH_NAME + fail = _ATTACH_UPLOAD_FAILED.format(filename) + + if not resolved.is_file() or not self._is_workspace_path_allowed(resolved): + return fail + try: + size_bytes = resolved.stat().st_size + except OSError: + return fail + if limit_bytes <= 0 or size_bytes > limit_bytes: + return _ATTACH_TOO_LARGE.format(filename) + + mime = mimetypes.guess_type(filename, strict=False)[0] or "application/octet-stream" + try: + with resolved.open("rb") as f: + upload_result = await self.client.upload( + f, content_type=mime, filename=filename, + encrypt=self.config.e2ee_enabled and self._is_encrypted_room(room_id), + filesize=size_bytes, + ) + except Exception: + return fail + + upload_response = upload_result[0] if isinstance(upload_result, tuple) else upload_result + encryption_info = upload_result[1] if isinstance(upload_result, tuple) and isinstance(upload_result[1], dict) else None + if isinstance(upload_response, UploadError): + return fail + mxc_url = getattr(upload_response, "content_uri", None) + if not isinstance(mxc_url, str) or not mxc_url.startswith("mxc://"): + return fail + + content = self._build_outbound_attachment_content( + filename=filename, mime=mime, size_bytes=size_bytes, + mxc_url=mxc_url, encryption_info=encryption_info, + ) + if relates_to: + content["m.relates_to"] = relates_to + try: + await self._send_room_content(room_id, content) + except Exception: + return fail + return None + + async def send(self, msg: OutboundMessage) -> None: + """Send outbound content; clear typing for non-progress messages.""" + if not self.client: + return + text = msg.content or "" + candidates = self._collect_outbound_media_candidates(msg.media) + relates_to = self._build_thread_relates_to(msg.metadata) + is_progress = bool((msg.metadata or {}).get("_progress")) + try: + failures: list[str] = [] + if candidates: + limit_bytes = await self._effective_media_limit_bytes() + for path in candidates: + if fail := await self._upload_and_send_attachment( + msg.chat_id, path, limit_bytes, relates_to): + failures.append(fail) + if failures: + text = f"{text.rstrip()}\n{chr(10).join(failures)}" if text.strip() else "\n".join(failures) + if text or not candidates: + content = _build_matrix_text_content(text) + if relates_to: + content["m.relates_to"] = relates_to + await self._send_room_content(msg.chat_id, content) + finally: + if not is_progress: + await self._stop_typing_keepalive(msg.chat_id, clear_typing=True) + + def _register_event_callbacks(self) -> None: + self.client.add_event_callback(self._on_message, RoomMessageText) + self.client.add_event_callback(self._on_media_message, MATRIX_MEDIA_EVENT_FILTER) + self.client.add_event_callback(self._on_room_invite, InviteEvent) + + def _register_response_callbacks(self) -> None: + self.client.add_response_callback(self._on_sync_error, SyncError) + self.client.add_response_callback(self._on_join_error, JoinError) + self.client.add_response_callback(self._on_send_error, RoomSendError) + + def _log_response_error(self, label: str, response: Any) -> None: + """Log Matrix response errors — auth errors at ERROR level, rest at WARNING.""" + code = getattr(response, "status_code", None) + is_auth = code in {"M_UNKNOWN_TOKEN", "M_FORBIDDEN", "M_UNAUTHORIZED"} + is_fatal = is_auth or getattr(response, "soft_logout", False) + (logger.error if is_fatal else logger.warning)("Matrix {} failed: {}", label, response) + + async def _on_sync_error(self, response: SyncError) -> None: + self._log_response_error("sync", response) + + async def _on_join_error(self, response: JoinError) -> None: + self._log_response_error("join", response) + + async def _on_send_error(self, response: RoomSendError) -> None: + self._log_response_error("send", response) + + async def _set_typing(self, room_id: str, typing: bool) -> None: + """Best-effort typing indicator update.""" + if not self.client: + return + try: + response = await self.client.room_typing(room_id=room_id, typing_state=typing, + timeout=TYPING_NOTICE_TIMEOUT_MS) + if isinstance(response, RoomTypingError): + logger.debug("Matrix typing failed for {}: {}", room_id, response) + except Exception: + pass + + async def _start_typing_keepalive(self, room_id: str) -> None: + """Start periodic typing refresh (spec-recommended keepalive).""" + await self._stop_typing_keepalive(room_id, clear_typing=False) + await self._set_typing(room_id, True) + if not self._running: + return + + async def loop() -> None: + try: + while self._running: + await asyncio.sleep(TYPING_KEEPALIVE_INTERVAL_MS / 1000) + await self._set_typing(room_id, True) + except asyncio.CancelledError: + pass + + self._typing_tasks[room_id] = asyncio.create_task(loop()) + + async def _stop_typing_keepalive(self, room_id: str, *, clear_typing: bool) -> None: + if task := self._typing_tasks.pop(room_id, None): + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + if clear_typing: + await self._set_typing(room_id, False) + + async def _sync_loop(self) -> None: + while self._running: + try: + await self.client.sync_forever(timeout=30000, full_state=True) + except asyncio.CancelledError: + break + except Exception: + await asyncio.sleep(2) + + async def _on_room_invite(self, room: MatrixRoom, event: InviteEvent) -> None: + allow_from = self.config.allow_from or [] + if not allow_from or event.sender in allow_from: + await self.client.join(room.room_id) + + def _is_direct_room(self, room: MatrixRoom) -> bool: + count = getattr(room, "member_count", None) + return isinstance(count, int) and count <= 2 + + def _is_bot_mentioned(self, event: RoomMessage) -> bool: + """Check m.mentions payload for bot mention.""" + source = getattr(event, "source", None) + if not isinstance(source, dict): + return False + mentions = (source.get("content") or {}).get("m.mentions") + if not isinstance(mentions, dict): + return False + user_ids = mentions.get("user_ids") + if isinstance(user_ids, list) and self.config.user_id in user_ids: + return True + return bool(self.config.allow_room_mentions and mentions.get("room") is True) + + def _should_process_message(self, room: MatrixRoom, event: RoomMessage) -> bool: + """Apply sender and room policy checks.""" + if not self.is_allowed(event.sender): + return False + if self._is_direct_room(room): + return True + policy = self.config.group_policy + if policy == "open": + return True + if policy == "allowlist": + return room.room_id in (self.config.group_allow_from or []) + if policy == "mention": + return self._is_bot_mentioned(event) + return False + + def _media_dir(self) -> Path: + d = get_data_dir() / "media" / "matrix" + d.mkdir(parents=True, exist_ok=True) + return d + + @staticmethod + def _event_source_content(event: RoomMessage) -> dict[str, Any]: + source = getattr(event, "source", None) + if not isinstance(source, dict): + return {} + content = source.get("content") + return content if isinstance(content, dict) else {} + + def _event_thread_root_id(self, event: RoomMessage) -> str | None: + relates_to = self._event_source_content(event).get("m.relates_to") + if not isinstance(relates_to, dict) or relates_to.get("rel_type") != "m.thread": + return None + root_id = relates_to.get("event_id") + return root_id if isinstance(root_id, str) and root_id else None + + def _thread_metadata(self, event: RoomMessage) -> dict[str, str] | None: + if not (root_id := self._event_thread_root_id(event)): + return None + meta: dict[str, str] = {"thread_root_event_id": root_id} + if isinstance(reply_to := getattr(event, "event_id", None), str) and reply_to: + meta["thread_reply_to_event_id"] = reply_to + return meta + + @staticmethod + def _build_thread_relates_to(metadata: dict[str, Any] | None) -> dict[str, Any] | None: + if not metadata: + return None + root_id = metadata.get("thread_root_event_id") + if not isinstance(root_id, str) or not root_id: + return None + reply_to = metadata.get("thread_reply_to_event_id") or metadata.get("event_id") + if not isinstance(reply_to, str) or not reply_to: + return None + return {"rel_type": "m.thread", "event_id": root_id, + "m.in_reply_to": {"event_id": reply_to}, "is_falling_back": True} + + def _event_attachment_type(self, event: MatrixMediaEvent) -> str: + msgtype = self._event_source_content(event).get("msgtype") + return _MSGTYPE_MAP.get(msgtype, "file") + + @staticmethod + def _is_encrypted_media_event(event: MatrixMediaEvent) -> bool: + return (isinstance(getattr(event, "key", None), dict) + and isinstance(getattr(event, "hashes", None), dict) + and isinstance(getattr(event, "iv", None), str)) + + def _event_declared_size_bytes(self, event: MatrixMediaEvent) -> int | None: + info = self._event_source_content(event).get("info") + size = info.get("size") if isinstance(info, dict) else None + return size if isinstance(size, int) and size >= 0 else None + + def _event_mime(self, event: MatrixMediaEvent) -> str | None: + info = self._event_source_content(event).get("info") + if isinstance(info, dict) and isinstance(m := info.get("mimetype"), str) and m: + return m + m = getattr(event, "mimetype", None) + return m if isinstance(m, str) and m else None + + def _event_filename(self, event: MatrixMediaEvent, attachment_type: str) -> str: + body = getattr(event, "body", None) + if isinstance(body, str) and body.strip(): + if candidate := safe_filename(Path(body).name): + return candidate + return _DEFAULT_ATTACH_NAME if attachment_type == "file" else attachment_type + + def _build_attachment_path(self, event: MatrixMediaEvent, attachment_type: str, + filename: str, mime: str | None) -> Path: + safe_name = safe_filename(Path(filename).name) or _DEFAULT_ATTACH_NAME + suffix = Path(safe_name).suffix + if not suffix and mime: + if guessed := mimetypes.guess_extension(mime, strict=False): + safe_name, suffix = f"{safe_name}{guessed}", guessed + stem = (Path(safe_name).stem or attachment_type)[:72] + suffix = suffix[:16] + event_id = safe_filename(str(getattr(event, "event_id", "") or "evt").lstrip("$")) + event_prefix = (event_id[:24] or "evt").strip("_") + return self._media_dir() / f"{event_prefix}_{stem}{suffix}" + + async def _download_media_bytes(self, mxc_url: str) -> bytes | None: + if not self.client: + return None + response = await self.client.download(mxc=mxc_url) + if isinstance(response, DownloadError): + logger.warning("Matrix download failed for {}: {}", mxc_url, response) + return None + body = getattr(response, "body", None) + if isinstance(body, (bytes, bytearray)): + return bytes(body) + if isinstance(response, MemoryDownloadResponse): + return bytes(response.body) + if isinstance(body, (str, Path)): + path = Path(body) + if path.is_file(): + try: + return path.read_bytes() + except OSError: + return None + return None + + def _decrypt_media_bytes(self, event: MatrixMediaEvent, ciphertext: bytes) -> bytes | None: + key_obj, hashes, iv = getattr(event, "key", None), getattr(event, "hashes", None), getattr(event, "iv", None) + key = key_obj.get("k") if isinstance(key_obj, dict) else None + sha256 = hashes.get("sha256") if isinstance(hashes, dict) else None + if not all(isinstance(v, str) for v in (key, sha256, iv)): + return None + try: + return decrypt_attachment(ciphertext, key, sha256, iv) + except (EncryptionError, ValueError, TypeError): + logger.warning("Matrix decrypt failed for event {}", getattr(event, "event_id", "")) + return None + + async def _fetch_media_attachment( + self, room: MatrixRoom, event: MatrixMediaEvent, + ) -> tuple[dict[str, Any] | None, str]: + """Download, decrypt if needed, and persist a Matrix attachment.""" + atype = self._event_attachment_type(event) + mime = self._event_mime(event) + filename = self._event_filename(event, atype) + mxc_url = getattr(event, "url", None) + fail = _ATTACH_FAILED.format(filename) + + if not isinstance(mxc_url, str) or not mxc_url.startswith("mxc://"): + return None, fail + + limit_bytes = await self._effective_media_limit_bytes() + declared = self._event_declared_size_bytes(event) + if declared is not None and declared > limit_bytes: + return None, _ATTACH_TOO_LARGE.format(filename) + + downloaded = await self._download_media_bytes(mxc_url) + if downloaded is None: + return None, fail + + encrypted = self._is_encrypted_media_event(event) + data = downloaded + if encrypted: + if (data := self._decrypt_media_bytes(event, downloaded)) is None: + return None, fail + + if len(data) > limit_bytes: + return None, _ATTACH_TOO_LARGE.format(filename) + + path = self._build_attachment_path(event, atype, filename, mime) + try: + path.write_bytes(data) + except OSError: + return None, fail + + attachment = { + "type": atype, "mime": mime, "filename": filename, + "event_id": str(getattr(event, "event_id", "") or ""), + "encrypted": encrypted, "size_bytes": len(data), + "path": str(path), "mxc_url": mxc_url, + } + return attachment, _ATTACH_MARKER.format(path) + + def _base_metadata(self, room: MatrixRoom, event: RoomMessage) -> dict[str, Any]: + """Build common metadata for text and media handlers.""" + meta: dict[str, Any] = {"room": getattr(room, "display_name", room.room_id)} + if isinstance(eid := getattr(event, "event_id", None), str) and eid: + meta["event_id"] = eid + if thread := self._thread_metadata(event): + meta.update(thread) + return meta + + async def _on_message(self, room: MatrixRoom, event: RoomMessageText) -> None: + if event.sender == self.config.user_id or not self._should_process_message(room, event): + return + await self._start_typing_keepalive(room.room_id) + try: + await self._handle_message( + sender_id=event.sender, chat_id=room.room_id, + content=event.body, metadata=self._base_metadata(room, event), + ) + except Exception: + await self._stop_typing_keepalive(room.room_id, clear_typing=True) + raise + + async def _on_media_message(self, room: MatrixRoom, event: MatrixMediaEvent) -> None: + if event.sender == self.config.user_id or not self._should_process_message(room, event): + return + attachment, marker = await self._fetch_media_attachment(room, event) + parts: list[str] = [] + if isinstance(body := getattr(event, "body", None), str) and body.strip(): + parts.append(body.strip()) + parts.append(marker) + + await self._start_typing_keepalive(room.room_id) + try: + meta = self._base_metadata(room, event) + if attachment: + meta["attachments"] = [attachment] + await self._handle_message( + sender_id=event.sender, chat_id=room.room_id, + content="\n".join(parts), + media=[attachment["path"]] if attachment else [], + metadata=meta, + ) + except Exception: + await self._stop_typing_keepalive(room.room_id, clear_typing=True) + raise diff --git a/nanobot/channels/telegram.py b/nanobot/channels/telegram.py index 6cd98e7..808f50c 100644 --- a/nanobot/channels/telegram.py +++ b/nanobot/channels/telegram.py @@ -111,6 +111,7 @@ class TelegramChannel(BaseChannel): BOT_COMMANDS = [ BotCommand("start", "Start the bot"), BotCommand("new", "Start a new conversation"), + BotCommand("stop", "Stop the current task"), BotCommand("help", "Show available commands"), ] @@ -299,6 +300,7 @@ class TelegramChannel(BaseChannel): await update.message.reply_text( "🐈 nanobot commands:\n" "/new — Start a new conversation\n" + "/stop — Stop the current task\n" "/help — Show available commands" ) diff --git a/nanobot/config/schema.py b/nanobot/config/schema.py index 215f38d..61aee96 100644 --- a/nanobot/config/schema.py +++ b/nanobot/config/schema.py @@ -1,6 +1,8 @@ """Configuration schema using Pydantic.""" from pathlib import Path +from typing import Literal + from pydantic import BaseModel, Field, ConfigDict from pydantic.alias_generators import to_camel from pydantic_settings import BaseSettings @@ -61,6 +63,23 @@ class DiscordConfig(Base): intents: int = 37377 # GUILDS + GUILD_MESSAGES + DIRECT_MESSAGES + MESSAGE_CONTENT +class MatrixConfig(Base): + """Matrix (Element) channel configuration.""" + + enabled: bool = False + homeserver: str = "https://matrix.org" + access_token: str = "" + user_id: str = "" # @bot:matrix.org + device_id: str = "" + e2ee_enabled: bool = True # Enable Matrix E2EE support (encryption + encrypted room handling). + sync_stop_grace_seconds: int = 2 # Max seconds to wait for sync_forever to stop gracefully before cancellation fallback. + max_media_bytes: int = 20 * 1024 * 1024 # Max attachment size accepted for Matrix media handling (inbound + outbound). + allow_from: list[str] = Field(default_factory=list) + group_policy: Literal["open", "mention", "allowlist"] = "open" + group_allow_from: list[str] = Field(default_factory=list) + allow_room_mentions: bool = False + + class EmailConfig(Base): """Email channel configuration (IMAP inbound + SMTP outbound).""" @@ -179,6 +198,7 @@ class ChannelsConfig(Base): email: EmailConfig = Field(default_factory=EmailConfig) slack: SlackConfig = Field(default_factory=SlackConfig) qq: QQConfig = Field(default_factory=QQConfig) + matrix: MatrixConfig = Field(default_factory=MatrixConfig) class AgentDefaults(Base): @@ -186,6 +206,7 @@ class AgentDefaults(Base): workspace: str = "~/.nanobot/workspace" model: str = "anthropic/claude-opus-4-5" + provider: str = "auto" # Provider name (e.g. "anthropic", "openrouter") or "auto" for auto-detection max_tokens: int = 8192 temperature: float = 0.1 max_tool_iterations: int = 40 @@ -260,6 +281,7 @@ class ExecToolConfig(Base): """Shell exec tool configuration.""" timeout: int = 60 + path_append: str = "" class MCPServerConfig(Base): @@ -300,6 +322,11 @@ class Config(BaseSettings): """Match provider config and its registry name. Returns (config, spec_name).""" from nanobot.providers.registry import PROVIDERS + forced = self.agents.defaults.provider + if forced != "auto": + p = getattr(self.providers, forced, None) + return (p, forced) if p else (None, None) + model_lower = (model or self.agents.defaults.model).lower() model_normalized = model_lower.replace("-", "_") model_prefix = model_lower.split("/", 1)[0] if "/" in model_lower else "" diff --git a/nanobot/templates/AGENTS.md b/nanobot/templates/AGENTS.md index 84ba657..4c3e5b1 100644 --- a/nanobot/templates/AGENTS.md +++ b/nanobot/templates/AGENTS.md @@ -2,14 +2,6 @@ You are a helpful AI assistant. Be concise, accurate, and friendly. -## Guidelines - -- Before calling tools, briefly state your intent — but NEVER predict results before receiving them -- Use precise tense: "I will run X" before the call, "X returned Y" after -- NEVER claim success before a tool result confirms it -- Ask for clarification when the request is ambiguous -- Remember important information in `memory/MEMORY.md`; past events are logged in `memory/HISTORY.md` - ## Scheduled Reminders When user asks for a reminder at a specific time, use `exec` to run: diff --git a/nanobot/utils/helpers.py b/nanobot/utils/helpers.py index 62f80ac..06d8fd5 100644 --- a/nanobot/utils/helpers.py +++ b/nanobot/utils/helpers.py @@ -3,7 +3,6 @@ from pathlib import Path from datetime import datetime - def ensure_dir(path: Path) -> Path: """Ensure a directory exists, creating it if necessary.""" path.mkdir(parents=True, exist_ok=True) @@ -77,4 +76,4 @@ def parse_session_key(key: str) -> tuple[str, str]: parts = key.split(":", 1) if len(parts) != 2: raise ValueError(f"Invalid session key: {key}") - return parts[0], parts[1] + return parts[0], parts[1] \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index d15d18a..20dcb1e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,6 +45,11 @@ dependencies = [ ] [project.optional-dependencies] +matrix = [ + "matrix-nio[e2e]>=0.25.2", + "mistune>=3.0.0,<4.0.0", + "nh3>=0.2.17,<1.0.0", +] dev = [ "pytest>=9.0.0,<10.0.0", "pytest-asyncio>=1.3.0,<2.0.0", diff --git a/tests/test_context_prompt_cache.py b/tests/test_context_prompt_cache.py index 8e2333c..9afcc7d 100644 --- a/tests/test_context_prompt_cache.py +++ b/tests/test_context_prompt_cache.py @@ -39,8 +39,8 @@ def test_system_prompt_stays_stable_when_clock_changes(tmp_path, monkeypatch) -> assert prompt1 == prompt2 -def test_runtime_context_is_appended_to_current_user_message(tmp_path) -> None: - """Dynamic runtime details should be added at the tail user message, not system.""" +def test_runtime_context_is_separate_untrusted_user_message(tmp_path) -> None: + """Runtime metadata should be a separate user message before the actual user message.""" workspace = _make_workspace(tmp_path) builder = ContextBuilder(workspace) @@ -54,10 +54,13 @@ def test_runtime_context_is_appended_to_current_user_message(tmp_path) -> None: assert messages[0]["role"] == "system" assert "## Current Session" not in messages[0]["content"] + assert messages[-2]["role"] == "user" + runtime_content = messages[-2]["content"] + assert isinstance(runtime_content, str) + assert ContextBuilder._RUNTIME_CONTEXT_TAG in runtime_content + assert "Current Time:" in runtime_content + assert "Channel: cli" in runtime_content + assert "Chat ID: direct" in runtime_content + assert messages[-1]["role"] == "user" - user_content = messages[-1]["content"] - assert isinstance(user_content, str) - assert "Return exactly: OK" in user_content - assert "Current Time:" in user_content - assert "Channel: cli" in user_content - assert "Chat ID: direct" in user_content + assert messages[-1]["content"] == "Return exactly: OK" diff --git a/tests/test_matrix_channel.py b/tests/test_matrix_channel.py new file mode 100644 index 0000000..c6714c2 --- /dev/null +++ b/tests/test_matrix_channel.py @@ -0,0 +1,1302 @@ +import asyncio +from pathlib import Path +from types import SimpleNamespace + +import pytest + +import nanobot.channels.matrix as matrix_module +from nanobot.bus.events import OutboundMessage +from nanobot.bus.queue import MessageBus +from nanobot.channels.matrix import ( + MATRIX_HTML_FORMAT, + TYPING_NOTICE_TIMEOUT_MS, + MatrixChannel, +) +from nanobot.config.schema import MatrixConfig + +_ROOM_SEND_UNSET = object() + + +class _DummyTask: + def __init__(self) -> None: + self.cancelled = False + + def cancel(self) -> None: + self.cancelled = True + + def __await__(self): + async def _done(): + return None + + return _done().__await__() + + +class _FakeAsyncClient: + def __init__(self, homeserver, user, store_path, config) -> None: + self.homeserver = homeserver + self.user = user + self.store_path = store_path + self.config = config + self.user_id: str | None = None + self.access_token: str | None = None + self.device_id: str | None = None + self.load_store_called = False + self.stop_sync_forever_called = False + self.join_calls: list[str] = [] + self.callbacks: list[tuple[object, object]] = [] + self.response_callbacks: list[tuple[object, object]] = [] + self.rooms: dict[str, object] = {} + self.room_send_calls: list[dict[str, object]] = [] + self.typing_calls: list[tuple[str, bool, int]] = [] + self.download_calls: list[dict[str, object]] = [] + self.upload_calls: list[dict[str, object]] = [] + self.download_response: object | None = None + self.download_bytes: bytes = b"media" + self.download_content_type: str = "application/octet-stream" + self.download_filename: str | None = None + self.upload_response: object | None = None + self.content_repository_config_response: object = SimpleNamespace(upload_size=None) + self.raise_on_send = False + self.raise_on_typing = False + self.raise_on_upload = False + + def add_event_callback(self, callback, event_type) -> None: + self.callbacks.append((callback, event_type)) + + def add_response_callback(self, callback, response_type) -> None: + self.response_callbacks.append((callback, response_type)) + + def load_store(self) -> None: + self.load_store_called = True + + def stop_sync_forever(self) -> None: + self.stop_sync_forever_called = True + + async def join(self, room_id: str) -> None: + self.join_calls.append(room_id) + + async def room_send( + self, + room_id: str, + message_type: str, + content: dict[str, object], + ignore_unverified_devices: object = _ROOM_SEND_UNSET, + ) -> None: + call: dict[str, object] = { + "room_id": room_id, + "message_type": message_type, + "content": content, + } + if ignore_unverified_devices is not _ROOM_SEND_UNSET: + call["ignore_unverified_devices"] = ignore_unverified_devices + self.room_send_calls.append(call) + if self.raise_on_send: + raise RuntimeError("send failed") + + async def room_typing( + self, + room_id: str, + typing_state: bool = True, + timeout: int = 30_000, + ) -> None: + self.typing_calls.append((room_id, typing_state, timeout)) + if self.raise_on_typing: + raise RuntimeError("typing failed") + + async def download(self, **kwargs): + self.download_calls.append(kwargs) + if self.download_response is not None: + return self.download_response + return matrix_module.MemoryDownloadResponse( + body=self.download_bytes, + content_type=self.download_content_type, + filename=self.download_filename, + ) + + async def upload( + self, + data_provider, + content_type: str | None = None, + filename: str | None = None, + filesize: int | None = None, + encrypt: bool = False, + ): + if self.raise_on_upload: + raise RuntimeError("upload failed") + if isinstance(data_provider, (bytes, bytearray)): + raise TypeError( + f"data_provider type {type(data_provider)!r} is not of a usable type " + "(Callable, IOBase)" + ) + self.upload_calls.append( + { + "data_provider": data_provider, + "content_type": content_type, + "filename": filename, + "filesize": filesize, + "encrypt": encrypt, + } + ) + if self.upload_response is not None: + return self.upload_response + if encrypt: + return ( + SimpleNamespace(content_uri="mxc://example.org/uploaded"), + { + "v": "v2", + "iv": "iv", + "hashes": {"sha256": "hash"}, + "key": {"alg": "A256CTR", "k": "key"}, + }, + ) + return SimpleNamespace(content_uri="mxc://example.org/uploaded"), None + + async def content_repository_config(self): + return self.content_repository_config_response + + async def close(self) -> None: + return None + + +def _make_config(**kwargs) -> MatrixConfig: + return MatrixConfig( + enabled=True, + homeserver="https://matrix.org", + access_token="token", + user_id="@bot:matrix.org", + **kwargs, + ) + + +@pytest.mark.asyncio +async def test_start_skips_load_store_when_device_id_missing( + monkeypatch, tmp_path +) -> None: + clients: list[_FakeAsyncClient] = [] + + def _fake_client(*args, **kwargs) -> _FakeAsyncClient: + client = _FakeAsyncClient(*args, **kwargs) + clients.append(client) + return client + + def _fake_create_task(coro): + coro.close() + return _DummyTask() + + monkeypatch.setattr("nanobot.channels.matrix.get_data_dir", lambda: tmp_path) + monkeypatch.setattr( + "nanobot.channels.matrix.AsyncClientConfig", + lambda **kwargs: SimpleNamespace(**kwargs), + ) + monkeypatch.setattr("nanobot.channels.matrix.AsyncClient", _fake_client) + monkeypatch.setattr( + "nanobot.channels.matrix.asyncio.create_task", _fake_create_task + ) + + channel = MatrixChannel(_make_config(device_id=""), MessageBus()) + await channel.start() + + assert len(clients) == 1 + assert clients[0].config.encryption_enabled is True + assert clients[0].load_store_called is False + assert len(clients[0].callbacks) == 3 + assert len(clients[0].response_callbacks) == 3 + + await channel.stop() + + +@pytest.mark.asyncio +async def test_register_event_callbacks_uses_media_base_filter() -> None: + channel = MatrixChannel(_make_config(), MessageBus()) + client = _FakeAsyncClient("", "", "", None) + channel.client = client + + channel._register_event_callbacks() + + assert len(client.callbacks) == 3 + assert client.callbacks[1][0] == channel._on_media_message + assert client.callbacks[1][1] == matrix_module.MATRIX_MEDIA_EVENT_FILTER + + +def test_media_event_filter_does_not_match_text_events() -> None: + assert not issubclass(matrix_module.RoomMessageText, matrix_module.MATRIX_MEDIA_EVENT_FILTER) + + +@pytest.mark.asyncio +async def test_start_disables_e2ee_when_configured( + monkeypatch, tmp_path +) -> None: + clients: list[_FakeAsyncClient] = [] + + def _fake_client(*args, **kwargs) -> _FakeAsyncClient: + client = _FakeAsyncClient(*args, **kwargs) + clients.append(client) + return client + + def _fake_create_task(coro): + coro.close() + return _DummyTask() + + monkeypatch.setattr("nanobot.channels.matrix.get_data_dir", lambda: tmp_path) + monkeypatch.setattr( + "nanobot.channels.matrix.AsyncClientConfig", + lambda **kwargs: SimpleNamespace(**kwargs), + ) + monkeypatch.setattr("nanobot.channels.matrix.AsyncClient", _fake_client) + monkeypatch.setattr( + "nanobot.channels.matrix.asyncio.create_task", _fake_create_task + ) + + channel = MatrixChannel(_make_config(device_id="", e2ee_enabled=False), MessageBus()) + await channel.start() + + assert len(clients) == 1 + assert clients[0].config.encryption_enabled is False + + await channel.stop() + + +@pytest.mark.asyncio +async def test_stop_stops_sync_forever_before_close(monkeypatch) -> None: + channel = MatrixChannel(_make_config(device_id="DEVICE"), MessageBus()) + client = _FakeAsyncClient("", "", "", None) + task = _DummyTask() + + channel.client = client + channel._sync_task = task + channel._running = True + + await channel.stop() + + assert channel._running is False + assert client.stop_sync_forever_called is True + assert task.cancelled is False + + +@pytest.mark.asyncio +async def test_room_invite_joins_when_allow_list_is_empty() -> None: + channel = MatrixChannel(_make_config(allow_from=[]), MessageBus()) + client = _FakeAsyncClient("", "", "", None) + channel.client = client + + room = SimpleNamespace(room_id="!room:matrix.org") + event = SimpleNamespace(sender="@alice:matrix.org") + + await channel._on_room_invite(room, event) + + assert client.join_calls == ["!room:matrix.org"] + + +@pytest.mark.asyncio +async def test_room_invite_respects_allow_list_when_configured() -> None: + channel = MatrixChannel(_make_config(allow_from=["@bob:matrix.org"]), MessageBus()) + client = _FakeAsyncClient("", "", "", None) + channel.client = client + + room = SimpleNamespace(room_id="!room:matrix.org") + event = SimpleNamespace(sender="@alice:matrix.org") + + await channel._on_room_invite(room, event) + + assert client.join_calls == [] + + +@pytest.mark.asyncio +async def test_on_message_sets_typing_for_allowed_sender() -> None: + channel = MatrixChannel(_make_config(), MessageBus()) + client = _FakeAsyncClient("", "", "", None) + channel.client = client + + handled: list[str] = [] + + async def _fake_handle_message(**kwargs) -> None: + handled.append(kwargs["sender_id"]) + + channel._handle_message = _fake_handle_message # type: ignore[method-assign] + + room = SimpleNamespace(room_id="!room:matrix.org", display_name="Test room") + event = SimpleNamespace(sender="@alice:matrix.org", body="Hello", source={}) + + await channel._on_message(room, event) + + assert handled == ["@alice:matrix.org"] + assert client.typing_calls == [ + ("!room:matrix.org", True, TYPING_NOTICE_TIMEOUT_MS), + ] + + +@pytest.mark.asyncio +async def test_typing_keepalive_refreshes_periodically(monkeypatch) -> None: + channel = MatrixChannel(_make_config(), MessageBus()) + client = _FakeAsyncClient("", "", "", None) + channel.client = client + channel._running = True + + monkeypatch.setattr(matrix_module, "TYPING_KEEPALIVE_INTERVAL_MS", 10) + + await channel._start_typing_keepalive("!room:matrix.org") + await asyncio.sleep(0.03) + await channel._stop_typing_keepalive("!room:matrix.org", clear_typing=True) + + true_updates = [call for call in client.typing_calls if call[1] is True] + assert len(true_updates) >= 2 + assert client.typing_calls[-1] == ("!room:matrix.org", False, TYPING_NOTICE_TIMEOUT_MS) + + +@pytest.mark.asyncio +async def test_on_message_skips_typing_for_self_message() -> None: + channel = MatrixChannel(_make_config(), MessageBus()) + client = _FakeAsyncClient("", "", "", None) + channel.client = client + + room = SimpleNamespace(room_id="!room:matrix.org", display_name="Test room") + event = SimpleNamespace(sender="@bot:matrix.org", body="Hello", source={}) + + await channel._on_message(room, event) + + assert client.typing_calls == [] + + +@pytest.mark.asyncio +async def test_on_message_skips_typing_for_denied_sender() -> None: + channel = MatrixChannel(_make_config(allow_from=["@bob:matrix.org"]), MessageBus()) + client = _FakeAsyncClient("", "", "", None) + channel.client = client + + handled: list[str] = [] + + async def _fake_handle_message(**kwargs) -> None: + handled.append(kwargs["sender_id"]) + + channel._handle_message = _fake_handle_message # type: ignore[method-assign] + + room = SimpleNamespace(room_id="!room:matrix.org", display_name="Test room") + event = SimpleNamespace(sender="@alice:matrix.org", body="Hello", source={}) + + await channel._on_message(room, event) + + assert handled == [] + assert client.typing_calls == [] + + +@pytest.mark.asyncio +async def test_on_message_mention_policy_requires_mx_mentions() -> None: + channel = MatrixChannel(_make_config(group_policy="mention"), MessageBus()) + client = _FakeAsyncClient("", "", "", None) + channel.client = client + + handled: list[str] = [] + + async def _fake_handle_message(**kwargs) -> None: + handled.append(kwargs["sender_id"]) + + channel._handle_message = _fake_handle_message # type: ignore[method-assign] + + room = SimpleNamespace(room_id="!room:matrix.org", display_name="Test room", member_count=3) + event = SimpleNamespace(sender="@alice:matrix.org", body="Hello", source={"content": {}}) + + await channel._on_message(room, event) + + assert handled == [] + assert client.typing_calls == [] + + +@pytest.mark.asyncio +async def test_on_message_mention_policy_accepts_bot_user_mentions() -> None: + channel = MatrixChannel(_make_config(group_policy="mention"), MessageBus()) + client = _FakeAsyncClient("", "", "", None) + channel.client = client + + handled: list[str] = [] + + async def _fake_handle_message(**kwargs) -> None: + handled.append(kwargs["sender_id"]) + + channel._handle_message = _fake_handle_message # type: ignore[method-assign] + + room = SimpleNamespace(room_id="!room:matrix.org", display_name="Test room", member_count=3) + event = SimpleNamespace( + sender="@alice:matrix.org", + body="Hello", + source={"content": {"m.mentions": {"user_ids": ["@bot:matrix.org"]}}}, + ) + + await channel._on_message(room, event) + + assert handled == ["@alice:matrix.org"] + assert client.typing_calls == [("!room:matrix.org", True, TYPING_NOTICE_TIMEOUT_MS)] + + +@pytest.mark.asyncio +async def test_on_message_mention_policy_allows_direct_room_without_mentions() -> None: + channel = MatrixChannel(_make_config(group_policy="mention"), MessageBus()) + client = _FakeAsyncClient("", "", "", None) + channel.client = client + + handled: list[str] = [] + + async def _fake_handle_message(**kwargs) -> None: + handled.append(kwargs["sender_id"]) + + channel._handle_message = _fake_handle_message # type: ignore[method-assign] + + room = SimpleNamespace(room_id="!dm:matrix.org", display_name="DM", member_count=2) + event = SimpleNamespace(sender="@alice:matrix.org", body="Hello", source={"content": {}}) + + await channel._on_message(room, event) + + assert handled == ["@alice:matrix.org"] + assert client.typing_calls == [("!dm:matrix.org", True, TYPING_NOTICE_TIMEOUT_MS)] + + +@pytest.mark.asyncio +async def test_on_message_allowlist_policy_requires_room_id() -> None: + channel = MatrixChannel( + _make_config(group_policy="allowlist", group_allow_from=["!allowed:matrix.org"]), + MessageBus(), + ) + client = _FakeAsyncClient("", "", "", None) + channel.client = client + + handled: list[str] = [] + + async def _fake_handle_message(**kwargs) -> None: + handled.append(kwargs["chat_id"]) + + channel._handle_message = _fake_handle_message # type: ignore[method-assign] + + denied_room = SimpleNamespace(room_id="!denied:matrix.org", display_name="Denied", member_count=3) + event = SimpleNamespace(sender="@alice:matrix.org", body="Hello", source={"content": {}}) + await channel._on_message(denied_room, event) + + allowed_room = SimpleNamespace( + room_id="!allowed:matrix.org", + display_name="Allowed", + member_count=3, + ) + await channel._on_message(allowed_room, event) + + assert handled == ["!allowed:matrix.org"] + assert client.typing_calls == [("!allowed:matrix.org", True, TYPING_NOTICE_TIMEOUT_MS)] + + +@pytest.mark.asyncio +async def test_on_message_room_mention_requires_opt_in() -> None: + channel = MatrixChannel(_make_config(group_policy="mention"), MessageBus()) + client = _FakeAsyncClient("", "", "", None) + channel.client = client + + handled: list[str] = [] + + async def _fake_handle_message(**kwargs) -> None: + handled.append(kwargs["sender_id"]) + + channel._handle_message = _fake_handle_message # type: ignore[method-assign] + + room = SimpleNamespace(room_id="!room:matrix.org", display_name="Test room", member_count=3) + room_mention_event = SimpleNamespace( + sender="@alice:matrix.org", + body="Hello everyone", + source={"content": {"m.mentions": {"room": True}}}, + ) + + await channel._on_message(room, room_mention_event) + assert handled == [] + assert client.typing_calls == [] + + channel.config.allow_room_mentions = True + await channel._on_message(room, room_mention_event) + assert handled == ["@alice:matrix.org"] + assert client.typing_calls == [("!room:matrix.org", True, TYPING_NOTICE_TIMEOUT_MS)] + + +@pytest.mark.asyncio +async def test_on_message_sets_thread_metadata_when_threaded_event() -> None: + channel = MatrixChannel(_make_config(), MessageBus()) + client = _FakeAsyncClient("", "", "", None) + channel.client = client + + handled: list[dict[str, object]] = [] + + async def _fake_handle_message(**kwargs) -> None: + handled.append(kwargs) + + channel._handle_message = _fake_handle_message # type: ignore[method-assign] + + room = SimpleNamespace(room_id="!room:matrix.org", display_name="Test room", member_count=3) + event = SimpleNamespace( + sender="@alice:matrix.org", + body="Hello", + event_id="$reply1", + source={ + "content": { + "m.relates_to": { + "rel_type": "m.thread", + "event_id": "$root1", + } + } + }, + ) + + await channel._on_message(room, event) + + assert len(handled) == 1 + metadata = handled[0]["metadata"] + assert metadata["thread_root_event_id"] == "$root1" + assert metadata["thread_reply_to_event_id"] == "$reply1" + assert metadata["event_id"] == "$reply1" + + +@pytest.mark.asyncio +async def test_on_media_message_downloads_attachment_and_sets_metadata( + monkeypatch, tmp_path +) -> None: + monkeypatch.setattr("nanobot.channels.matrix.get_data_dir", lambda: tmp_path) + + channel = MatrixChannel(_make_config(), MessageBus()) + client = _FakeAsyncClient("", "", "", None) + client.download_bytes = b"image" + channel.client = client + + handled: list[dict[str, object]] = [] + + async def _fake_handle_message(**kwargs) -> None: + handled.append(kwargs) + + channel._handle_message = _fake_handle_message # type: ignore[method-assign] + + room = SimpleNamespace(room_id="!room:matrix.org", display_name="Test room", member_count=2) + event = SimpleNamespace( + sender="@alice:matrix.org", + body="photo.png", + url="mxc://example.org/mediaid", + event_id="$event1", + source={ + "content": { + "msgtype": "m.image", + "info": {"mimetype": "image/png", "size": 5}, + } + }, + ) + + await channel._on_media_message(room, event) + + assert len(client.download_calls) == 1 + assert len(handled) == 1 + assert client.typing_calls == [("!room:matrix.org", True, TYPING_NOTICE_TIMEOUT_MS)] + + media_paths = handled[0]["media"] + assert isinstance(media_paths, list) and len(media_paths) == 1 + media_path = Path(media_paths[0]) + assert media_path.is_file() + assert media_path.read_bytes() == b"image" + + metadata = handled[0]["metadata"] + attachments = metadata["attachments"] + assert isinstance(attachments, list) and len(attachments) == 1 + assert attachments[0]["type"] == "image" + assert attachments[0]["mxc_url"] == "mxc://example.org/mediaid" + assert attachments[0]["path"] == str(media_path) + assert "[attachment: " in handled[0]["content"] + + +@pytest.mark.asyncio +async def test_on_media_message_sets_thread_metadata_when_threaded_event( + monkeypatch, tmp_path +) -> None: + monkeypatch.setattr("nanobot.channels.matrix.get_data_dir", lambda: tmp_path) + + channel = MatrixChannel(_make_config(), MessageBus()) + client = _FakeAsyncClient("", "", "", None) + client.download_bytes = b"image" + channel.client = client + + handled: list[dict[str, object]] = [] + + async def _fake_handle_message(**kwargs) -> None: + handled.append(kwargs) + + channel._handle_message = _fake_handle_message # type: ignore[method-assign] + + room = SimpleNamespace(room_id="!room:matrix.org", display_name="Test room", member_count=2) + event = SimpleNamespace( + sender="@alice:matrix.org", + body="photo.png", + url="mxc://example.org/mediaid", + event_id="$event1", + source={ + "content": { + "msgtype": "m.image", + "info": {"mimetype": "image/png", "size": 5}, + "m.relates_to": { + "rel_type": "m.thread", + "event_id": "$root1", + }, + } + }, + ) + + await channel._on_media_message(room, event) + + assert len(handled) == 1 + metadata = handled[0]["metadata"] + assert metadata["thread_root_event_id"] == "$root1" + assert metadata["thread_reply_to_event_id"] == "$event1" + assert metadata["event_id"] == "$event1" + + +@pytest.mark.asyncio +async def test_on_media_message_respects_declared_size_limit( + monkeypatch, tmp_path +) -> None: + monkeypatch.setattr("nanobot.channels.matrix.get_data_dir", lambda: tmp_path) + + channel = MatrixChannel(_make_config(max_media_bytes=3), MessageBus()) + client = _FakeAsyncClient("", "", "", None) + channel.client = client + + handled: list[dict[str, object]] = [] + + async def _fake_handle_message(**kwargs) -> None: + handled.append(kwargs) + + channel._handle_message = _fake_handle_message # type: ignore[method-assign] + + room = SimpleNamespace(room_id="!room:matrix.org", display_name="Test room", member_count=2) + event = SimpleNamespace( + sender="@alice:matrix.org", + body="large.bin", + url="mxc://example.org/large", + event_id="$event2", + source={"content": {"msgtype": "m.file", "info": {"size": 10}}}, + ) + + await channel._on_media_message(room, event) + + assert client.download_calls == [] + assert len(handled) == 1 + assert handled[0]["media"] == [] + assert handled[0]["metadata"]["attachments"] == [] + assert "[attachment: large.bin - too large]" in handled[0]["content"] + + +@pytest.mark.asyncio +async def test_on_media_message_uses_server_limit_when_smaller_than_local_limit( + monkeypatch, tmp_path +) -> None: + monkeypatch.setattr("nanobot.channels.matrix.get_data_dir", lambda: tmp_path) + + channel = MatrixChannel(_make_config(max_media_bytes=10), MessageBus()) + client = _FakeAsyncClient("", "", "", None) + client.content_repository_config_response = SimpleNamespace(upload_size=3) + channel.client = client + + handled: list[dict[str, object]] = [] + + async def _fake_handle_message(**kwargs) -> None: + handled.append(kwargs) + + channel._handle_message = _fake_handle_message # type: ignore[method-assign] + + room = SimpleNamespace(room_id="!room:matrix.org", display_name="Test room", member_count=2) + event = SimpleNamespace( + sender="@alice:matrix.org", + body="large.bin", + url="mxc://example.org/large", + event_id="$event2_server", + source={"content": {"msgtype": "m.file", "info": {"size": 5}}}, + ) + + await channel._on_media_message(room, event) + + assert client.download_calls == [] + assert len(handled) == 1 + assert handled[0]["media"] == [] + assert handled[0]["metadata"]["attachments"] == [] + assert "[attachment: large.bin - too large]" in handled[0]["content"] + + +@pytest.mark.asyncio +async def test_on_media_message_handles_download_error(monkeypatch, tmp_path) -> None: + monkeypatch.setattr("nanobot.channels.matrix.get_data_dir", lambda: tmp_path) + + channel = MatrixChannel(_make_config(), MessageBus()) + client = _FakeAsyncClient("", "", "", None) + client.download_response = matrix_module.DownloadError("download failed") + channel.client = client + + handled: list[dict[str, object]] = [] + + async def _fake_handle_message(**kwargs) -> None: + handled.append(kwargs) + + channel._handle_message = _fake_handle_message # type: ignore[method-assign] + + room = SimpleNamespace(room_id="!room:matrix.org", display_name="Test room", member_count=2) + event = SimpleNamespace( + sender="@alice:matrix.org", + body="photo.png", + url="mxc://example.org/mediaid", + event_id="$event3", + source={"content": {"msgtype": "m.image"}}, + ) + + await channel._on_media_message(room, event) + + assert len(client.download_calls) == 1 + assert len(handled) == 1 + assert handled[0]["media"] == [] + assert handled[0]["metadata"]["attachments"] == [] + assert "[attachment: photo.png - download failed]" in handled[0]["content"] + + +@pytest.mark.asyncio +async def test_on_media_message_decrypts_encrypted_media(monkeypatch, tmp_path) -> None: + monkeypatch.setattr("nanobot.channels.matrix.get_data_dir", lambda: tmp_path) + monkeypatch.setattr( + matrix_module, + "decrypt_attachment", + lambda ciphertext, key, sha256, iv: b"plain", + ) + + channel = MatrixChannel(_make_config(), MessageBus()) + client = _FakeAsyncClient("", "", "", None) + client.download_bytes = b"cipher" + channel.client = client + + handled: list[dict[str, object]] = [] + + async def _fake_handle_message(**kwargs) -> None: + handled.append(kwargs) + + channel._handle_message = _fake_handle_message # type: ignore[method-assign] + + room = SimpleNamespace(room_id="!room:matrix.org", display_name="Test room", member_count=2) + event = SimpleNamespace( + sender="@alice:matrix.org", + body="secret.txt", + url="mxc://example.org/encrypted", + event_id="$event4", + key={"k": "key"}, + hashes={"sha256": "hash"}, + iv="iv", + source={"content": {"msgtype": "m.file", "info": {"size": 6}}}, + ) + + await channel._on_media_message(room, event) + + assert len(handled) == 1 + media_path = Path(handled[0]["media"][0]) + assert media_path.read_bytes() == b"plain" + attachment = handled[0]["metadata"]["attachments"][0] + assert attachment["encrypted"] is True + assert attachment["size_bytes"] == 5 + + +@pytest.mark.asyncio +async def test_on_media_message_handles_decrypt_error(monkeypatch, tmp_path) -> None: + monkeypatch.setattr("nanobot.channels.matrix.get_data_dir", lambda: tmp_path) + + def _raise(*args, **kwargs): + raise matrix_module.EncryptionError("boom") + + monkeypatch.setattr(matrix_module, "decrypt_attachment", _raise) + + channel = MatrixChannel(_make_config(), MessageBus()) + client = _FakeAsyncClient("", "", "", None) + client.download_bytes = b"cipher" + channel.client = client + + handled: list[dict[str, object]] = [] + + async def _fake_handle_message(**kwargs) -> None: + handled.append(kwargs) + + channel._handle_message = _fake_handle_message # type: ignore[method-assign] + + room = SimpleNamespace(room_id="!room:matrix.org", display_name="Test room", member_count=2) + event = SimpleNamespace( + sender="@alice:matrix.org", + body="secret.txt", + url="mxc://example.org/encrypted", + event_id="$event5", + key={"k": "key"}, + hashes={"sha256": "hash"}, + iv="iv", + source={"content": {"msgtype": "m.file"}}, + ) + + await channel._on_media_message(room, event) + + assert len(handled) == 1 + assert handled[0]["media"] == [] + assert handled[0]["metadata"]["attachments"] == [] + assert "[attachment: secret.txt - download failed]" in handled[0]["content"] + + +@pytest.mark.asyncio +async def test_send_clears_typing_after_send() -> None: + channel = MatrixChannel(_make_config(), MessageBus()) + client = _FakeAsyncClient("", "", "", None) + channel.client = client + + await channel.send( + OutboundMessage(channel="matrix", chat_id="!room:matrix.org", content="Hi") + ) + + assert len(client.room_send_calls) == 1 + assert client.room_send_calls[0]["content"] == { + "msgtype": "m.text", + "body": "Hi", + "m.mentions": {}, + } + assert client.room_send_calls[0]["ignore_unverified_devices"] is True + assert client.typing_calls[-1] == ("!room:matrix.org", False, TYPING_NOTICE_TIMEOUT_MS) + + +@pytest.mark.asyncio +async def test_send_uploads_media_and_sends_file_event(tmp_path) -> None: + channel = MatrixChannel(_make_config(), MessageBus()) + client = _FakeAsyncClient("", "", "", None) + channel.client = client + + file_path = tmp_path / "test.txt" + file_path.write_text("hello", encoding="utf-8") + + await channel.send( + OutboundMessage( + channel="matrix", + chat_id="!room:matrix.org", + content="Please review.", + media=[str(file_path)], + ) + ) + + assert len(client.upload_calls) == 1 + assert not isinstance(client.upload_calls[0]["data_provider"], (bytes, bytearray)) + assert hasattr(client.upload_calls[0]["data_provider"], "read") + assert client.upload_calls[0]["filename"] == "test.txt" + assert client.upload_calls[0]["filesize"] == 5 + assert len(client.room_send_calls) == 2 + assert client.room_send_calls[0]["content"]["msgtype"] == "m.file" + assert client.room_send_calls[0]["content"]["url"] == "mxc://example.org/uploaded" + assert client.room_send_calls[1]["content"]["body"] == "Please review." + + +@pytest.mark.asyncio +async def test_send_adds_thread_relates_to_for_thread_metadata() -> None: + channel = MatrixChannel(_make_config(), MessageBus()) + client = _FakeAsyncClient("", "", "", None) + channel.client = client + + metadata = { + "thread_root_event_id": "$root1", + "thread_reply_to_event_id": "$reply1", + } + await channel.send( + OutboundMessage( + channel="matrix", + chat_id="!room:matrix.org", + content="Hi", + metadata=metadata, + ) + ) + + content = client.room_send_calls[0]["content"] + assert content["m.relates_to"] == { + "rel_type": "m.thread", + "event_id": "$root1", + "m.in_reply_to": {"event_id": "$reply1"}, + "is_falling_back": True, + } + + +@pytest.mark.asyncio +async def test_send_uses_encrypted_media_payload_in_encrypted_room(tmp_path) -> None: + channel = MatrixChannel(_make_config(e2ee_enabled=True), MessageBus()) + client = _FakeAsyncClient("", "", "", None) + client.rooms["!encrypted:matrix.org"] = SimpleNamespace(encrypted=True) + channel.client = client + + file_path = tmp_path / "secret.txt" + file_path.write_text("topsecret", encoding="utf-8") + + await channel.send( + OutboundMessage( + channel="matrix", + chat_id="!encrypted:matrix.org", + content="", + media=[str(file_path)], + ) + ) + + assert len(client.upload_calls) == 1 + assert client.upload_calls[0]["encrypt"] is True + assert len(client.room_send_calls) == 1 + content = client.room_send_calls[0]["content"] + assert content["msgtype"] == "m.file" + assert "file" in content + assert "url" not in content + assert content["file"]["url"] == "mxc://example.org/uploaded" + assert content["file"]["hashes"]["sha256"] == "hash" + + +@pytest.mark.asyncio +async def test_send_does_not_parse_attachment_marker_without_media(tmp_path) -> None: + channel = MatrixChannel(_make_config(), MessageBus()) + client = _FakeAsyncClient("", "", "", None) + channel.client = client + + missing_path = tmp_path / "missing.txt" + await channel.send( + OutboundMessage( + channel="matrix", + chat_id="!room:matrix.org", + content=f"[attachment: {missing_path}]", + ) + ) + + assert client.upload_calls == [] + assert len(client.room_send_calls) == 1 + assert client.room_send_calls[0]["content"]["body"] == f"[attachment: {missing_path}]" + + +@pytest.mark.asyncio +async def test_send_passes_thread_relates_to_to_attachment_upload(monkeypatch) -> None: + channel = MatrixChannel(_make_config(), MessageBus()) + client = _FakeAsyncClient("", "", "", None) + channel.client = client + channel._server_upload_limit_checked = True + channel._server_upload_limit_bytes = None + + captured: dict[str, object] = {} + + async def _fake_upload_and_send_attachment( + *, + room_id: str, + path: Path, + limit_bytes: int, + relates_to: dict[str, object] | None = None, + ) -> str | None: + captured["relates_to"] = relates_to + return None + + monkeypatch.setattr(channel, "_upload_and_send_attachment", _fake_upload_and_send_attachment) + + metadata = { + "thread_root_event_id": "$root1", + "thread_reply_to_event_id": "$reply1", + } + await channel.send( + OutboundMessage( + channel="matrix", + chat_id="!room:matrix.org", + content="Hi", + media=["/tmp/fake.txt"], + metadata=metadata, + ) + ) + + assert captured["relates_to"] == { + "rel_type": "m.thread", + "event_id": "$root1", + "m.in_reply_to": {"event_id": "$reply1"}, + "is_falling_back": True, + } + + +@pytest.mark.asyncio +async def test_send_workspace_restriction_blocks_external_attachment(tmp_path) -> None: + workspace = tmp_path / "workspace" + workspace.mkdir() + file_path = tmp_path / "external.txt" + file_path.write_text("outside", encoding="utf-8") + + channel = MatrixChannel( + _make_config(), + MessageBus(), + restrict_to_workspace=True, + workspace=workspace, + ) + client = _FakeAsyncClient("", "", "", None) + channel.client = client + + await channel.send( + OutboundMessage( + channel="matrix", + chat_id="!room:matrix.org", + content="", + media=[str(file_path)], + ) + ) + + assert client.upload_calls == [] + assert len(client.room_send_calls) == 1 + assert client.room_send_calls[0]["content"]["body"] == "[attachment: external.txt - upload failed]" + + +@pytest.mark.asyncio +async def test_send_handles_upload_exception_and_reports_failure(tmp_path) -> None: + channel = MatrixChannel(_make_config(), MessageBus()) + client = _FakeAsyncClient("", "", "", None) + client.raise_on_upload = True + channel.client = client + + file_path = tmp_path / "broken.txt" + file_path.write_text("hello", encoding="utf-8") + + await channel.send( + OutboundMessage( + channel="matrix", + chat_id="!room:matrix.org", + content="Please review.", + media=[str(file_path)], + ) + ) + + assert len(client.upload_calls) == 0 + assert len(client.room_send_calls) == 1 + assert ( + client.room_send_calls[0]["content"]["body"] + == "Please review.\n[attachment: broken.txt - upload failed]" + ) + + +@pytest.mark.asyncio +async def test_send_uses_server_upload_limit_when_smaller_than_local_limit(tmp_path) -> None: + channel = MatrixChannel(_make_config(max_media_bytes=10), MessageBus()) + client = _FakeAsyncClient("", "", "", None) + client.content_repository_config_response = SimpleNamespace(upload_size=3) + channel.client = client + + file_path = tmp_path / "tiny.txt" + file_path.write_text("hello", encoding="utf-8") + + await channel.send( + OutboundMessage( + channel="matrix", + chat_id="!room:matrix.org", + content="", + media=[str(file_path)], + ) + ) + + assert client.upload_calls == [] + assert len(client.room_send_calls) == 1 + assert client.room_send_calls[0]["content"]["body"] == "[attachment: tiny.txt - too large]" + + +@pytest.mark.asyncio +async def test_send_blocks_all_outbound_media_when_limit_is_zero(tmp_path) -> None: + channel = MatrixChannel(_make_config(max_media_bytes=0), MessageBus()) + client = _FakeAsyncClient("", "", "", None) + channel.client = client + + file_path = tmp_path / "empty.txt" + file_path.write_bytes(b"") + + await channel.send( + OutboundMessage( + channel="matrix", + chat_id="!room:matrix.org", + content="", + media=[str(file_path)], + ) + ) + + assert client.upload_calls == [] + assert len(client.room_send_calls) == 1 + assert client.room_send_calls[0]["content"]["body"] == "[attachment: empty.txt - too large]" + + +@pytest.mark.asyncio +async def test_send_omits_ignore_unverified_devices_when_e2ee_disabled() -> None: + channel = MatrixChannel(_make_config(e2ee_enabled=False), MessageBus()) + client = _FakeAsyncClient("", "", "", None) + channel.client = client + + await channel.send( + OutboundMessage(channel="matrix", chat_id="!room:matrix.org", content="Hi") + ) + + assert len(client.room_send_calls) == 1 + assert "ignore_unverified_devices" not in client.room_send_calls[0] + + +@pytest.mark.asyncio +async def test_send_stops_typing_keepalive_task() -> None: + channel = MatrixChannel(_make_config(), MessageBus()) + client = _FakeAsyncClient("", "", "", None) + channel.client = client + channel._running = True + + await channel._start_typing_keepalive("!room:matrix.org") + assert "!room:matrix.org" in channel._typing_tasks + + await channel.send( + OutboundMessage(channel="matrix", chat_id="!room:matrix.org", content="Hi") + ) + + assert "!room:matrix.org" not in channel._typing_tasks + assert client.typing_calls[-1] == ("!room:matrix.org", False, TYPING_NOTICE_TIMEOUT_MS) + + +@pytest.mark.asyncio +async def test_send_progress_keeps_typing_keepalive_running() -> None: + channel = MatrixChannel(_make_config(), MessageBus()) + client = _FakeAsyncClient("", "", "", None) + channel.client = client + channel._running = True + + await channel._start_typing_keepalive("!room:matrix.org") + assert "!room:matrix.org" in channel._typing_tasks + + await channel.send( + OutboundMessage( + channel="matrix", + chat_id="!room:matrix.org", + content="working...", + metadata={"_progress": True, "_progress_kind": "reasoning"}, + ) + ) + + assert "!room:matrix.org" in channel._typing_tasks + assert client.typing_calls[-1] == ("!room:matrix.org", True, TYPING_NOTICE_TIMEOUT_MS) + + +@pytest.mark.asyncio +async def test_send_clears_typing_when_send_fails() -> None: + channel = MatrixChannel(_make_config(), MessageBus()) + client = _FakeAsyncClient("", "", "", None) + client.raise_on_send = True + channel.client = client + + with pytest.raises(RuntimeError, match="send failed"): + await channel.send( + OutboundMessage(channel="matrix", chat_id="!room:matrix.org", content="Hi") + ) + + assert client.typing_calls[-1] == ("!room:matrix.org", False, TYPING_NOTICE_TIMEOUT_MS) + + +@pytest.mark.asyncio +async def test_send_adds_formatted_body_for_markdown() -> None: + channel = MatrixChannel(_make_config(), MessageBus()) + client = _FakeAsyncClient("", "", "", None) + channel.client = client + + markdown_text = "# Headline\n\n- [x] done\n\n| A | B |\n| - | - |\n| 1 | 2 |" + await channel.send( + OutboundMessage(channel="matrix", chat_id="!room:matrix.org", content=markdown_text) + ) + + content = client.room_send_calls[0]["content"] + assert content["msgtype"] == "m.text" + assert content["body"] == markdown_text + assert content["m.mentions"] == {} + assert content["format"] == MATRIX_HTML_FORMAT + assert "

Headline

" in str(content["formatted_body"]) + assert "" in str(content["formatted_body"]) + assert "
  • [x] done
  • " in str(content["formatted_body"]) + + +@pytest.mark.asyncio +async def test_send_adds_formatted_body_for_inline_url_superscript_subscript() -> None: + channel = MatrixChannel(_make_config(), MessageBus()) + client = _FakeAsyncClient("", "", "", None) + channel.client = client + + markdown_text = "Visit https://example.com and x^2^ plus H~2~O." + await channel.send( + OutboundMessage(channel="matrix", chat_id="!room:matrix.org", content=markdown_text) + ) + + content = client.room_send_calls[0]["content"] + assert content["msgtype"] == "m.text" + assert content["body"] == markdown_text + assert content["m.mentions"] == {} + assert content["format"] == MATRIX_HTML_FORMAT + assert '' in str( + content["formatted_body"] + ) + assert "2" in str(content["formatted_body"]) + assert "2" in str(content["formatted_body"]) + + +@pytest.mark.asyncio +async def test_send_sanitizes_disallowed_link_scheme() -> None: + channel = MatrixChannel(_make_config(), MessageBus()) + client = _FakeAsyncClient("", "", "", None) + channel.client = client + + markdown_text = "[click](javascript:alert(1))" + await channel.send( + OutboundMessage(channel="matrix", chat_id="!room:matrix.org", content=markdown_text) + ) + + formatted_body = str(client.room_send_calls[0]["content"]["formatted_body"]) + assert "javascript:" not in formatted_body + assert "x' + cleaned_html = matrix_module.MATRIX_HTML_CLEANER.clean(dirty_html) + + assert " None: + channel = MatrixChannel(_make_config(), MessageBus()) + client = _FakeAsyncClient("", "", "", None) + channel.client = client + + markdown_text = "![ok](mxc://example.org/mediaid) ![no](https://example.com/a.png)" + await channel.send( + OutboundMessage(channel="matrix", chat_id="!room:matrix.org", content=markdown_text) + ) + + formatted_body = str(client.room_send_calls[0]["content"]["formatted_body"]) + assert 'src="mxc://example.org/mediaid"' in formatted_body + assert 'src="https://example.com/a.png"' not in formatted_body + + +@pytest.mark.asyncio +async def test_send_falls_back_to_plaintext_when_markdown_render_fails(monkeypatch) -> None: + channel = MatrixChannel(_make_config(), MessageBus()) + client = _FakeAsyncClient("", "", "", None) + channel.client = client + + def _raise(text: str) -> str: + raise RuntimeError("boom") + + monkeypatch.setattr(matrix_module, "MATRIX_MARKDOWN", _raise) + markdown_text = "# Headline" + await channel.send( + OutboundMessage(channel="matrix", chat_id="!room:matrix.org", content=markdown_text) + ) + + content = client.room_send_calls[0]["content"] + assert content == {"msgtype": "m.text", "body": markdown_text, "m.mentions": {}} + + +@pytest.mark.asyncio +async def test_send_keeps_plaintext_only_for_plain_text() -> None: + channel = MatrixChannel(_make_config(), MessageBus()) + client = _FakeAsyncClient("", "", "", None) + channel.client = client + + text = "just a normal sentence without markdown markers" + await channel.send( + OutboundMessage(channel="matrix", chat_id="!room:matrix.org", content=text) + ) + + assert client.room_send_calls[0]["content"] == { + "msgtype": "m.text", + "body": text, + "m.mentions": {}, + } diff --git a/tests/test_message_tool.py b/tests/test_message_tool.py new file mode 100644 index 0000000..dc8e11d --- /dev/null +++ b/tests/test_message_tool.py @@ -0,0 +1,10 @@ +import pytest + +from nanobot.agent.tools.message import MessageTool + + +@pytest.mark.asyncio +async def test_message_tool_returns_error_when_no_target_context() -> None: + tool = MessageTool() + result = await tool.execute(content="test") + assert result == "Error: No target channel/chat specified" diff --git a/tests/test_task_cancel.py b/tests/test_task_cancel.py new file mode 100644 index 0000000..27a2d73 --- /dev/null +++ b/tests/test_task_cancel.py @@ -0,0 +1,167 @@ +"""Tests for /stop task cancellation.""" + +from __future__ import annotations + +import asyncio +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + + +def _make_loop(): + """Create a minimal AgentLoop with mocked dependencies.""" + from nanobot.agent.loop import AgentLoop + from nanobot.bus.queue import MessageBus + + bus = MessageBus() + provider = MagicMock() + provider.get_default_model.return_value = "test-model" + workspace = MagicMock() + workspace.__truediv__ = MagicMock(return_value=MagicMock()) + + with patch("nanobot.agent.loop.ContextBuilder"), \ + patch("nanobot.agent.loop.SessionManager"), \ + patch("nanobot.agent.loop.SubagentManager") as MockSubMgr: + MockSubMgr.return_value.cancel_by_session = AsyncMock(return_value=0) + loop = AgentLoop(bus=bus, provider=provider, workspace=workspace) + return loop, bus + + +class TestHandleStop: + @pytest.mark.asyncio + async def test_stop_no_active_task(self): + from nanobot.bus.events import InboundMessage + + loop, bus = _make_loop() + msg = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="/stop") + await loop._handle_stop(msg) + out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0) + assert "No active task" in out.content + + @pytest.mark.asyncio + async def test_stop_cancels_active_task(self): + from nanobot.bus.events import InboundMessage + + loop, bus = _make_loop() + cancelled = asyncio.Event() + + async def slow_task(): + try: + await asyncio.sleep(60) + except asyncio.CancelledError: + cancelled.set() + raise + + task = asyncio.create_task(slow_task()) + await asyncio.sleep(0) + loop._active_tasks["test:c1"] = [task] + + msg = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="/stop") + await loop._handle_stop(msg) + + assert cancelled.is_set() + out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0) + assert "stopped" in out.content.lower() + + @pytest.mark.asyncio + async def test_stop_cancels_multiple_tasks(self): + from nanobot.bus.events import InboundMessage + + loop, bus = _make_loop() + events = [asyncio.Event(), asyncio.Event()] + + async def slow(idx): + try: + await asyncio.sleep(60) + except asyncio.CancelledError: + events[idx].set() + raise + + tasks = [asyncio.create_task(slow(i)) for i in range(2)] + await asyncio.sleep(0) + loop._active_tasks["test:c1"] = tasks + + msg = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="/stop") + await loop._handle_stop(msg) + + assert all(e.is_set() for e in events) + out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0) + assert "2 task" in out.content + + +class TestDispatch: + @pytest.mark.asyncio + async def test_dispatch_processes_and_publishes(self): + from nanobot.bus.events import InboundMessage, OutboundMessage + + loop, bus = _make_loop() + msg = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="hello") + loop._process_message = AsyncMock( + return_value=OutboundMessage(channel="test", chat_id="c1", content="hi") + ) + await loop._dispatch(msg) + out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0) + assert out.content == "hi" + + @pytest.mark.asyncio + async def test_processing_lock_serializes(self): + from nanobot.bus.events import InboundMessage, OutboundMessage + + loop, bus = _make_loop() + order = [] + + async def mock_process(m, **kwargs): + order.append(f"start-{m.content}") + await asyncio.sleep(0.05) + order.append(f"end-{m.content}") + return OutboundMessage(channel="test", chat_id="c1", content=m.content) + + loop._process_message = mock_process + msg1 = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="a") + msg2 = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="b") + + t1 = asyncio.create_task(loop._dispatch(msg1)) + t2 = asyncio.create_task(loop._dispatch(msg2)) + await asyncio.gather(t1, t2) + assert order == ["start-a", "end-a", "start-b", "end-b"] + + +class TestSubagentCancellation: + @pytest.mark.asyncio + async def test_cancel_by_session(self): + from nanobot.agent.subagent import SubagentManager + from nanobot.bus.queue import MessageBus + + bus = MessageBus() + provider = MagicMock() + provider.get_default_model.return_value = "test-model" + mgr = SubagentManager(provider=provider, workspace=MagicMock(), bus=bus) + + cancelled = asyncio.Event() + + async def slow(): + try: + await asyncio.sleep(60) + except asyncio.CancelledError: + cancelled.set() + raise + + task = asyncio.create_task(slow()) + await asyncio.sleep(0) + mgr._running_tasks["sub-1"] = task + mgr._session_tasks["test:c1"] = {"sub-1"} + + count = await mgr.cancel_by_session("test:c1") + assert count == 1 + assert cancelled.is_set() + + @pytest.mark.asyncio + async def test_cancel_by_session_no_tasks(self): + from nanobot.agent.subagent import SubagentManager + from nanobot.bus.queue import MessageBus + + bus = MessageBus() + provider = MagicMock() + provider.get_default_model.return_value = "test-model" + mgr = SubagentManager(provider=provider, workspace=MagicMock(), bus=bus) + assert await mgr.cancel_by_session("nonexistent") == 0