diff --git a/README.md b/README.md index baf6b98..ec12cfe 100644 --- a/README.md +++ b/README.md @@ -138,12 +138,13 @@ Add or merge these **two parts** into your config (other options have defaults). } ``` -*Set your model*: +*Set your model* (optionally pin a provider — defaults to auto-detection): ```json { "agents": { "defaults": { - "model": "anthropic/claude-opus-4-5" + "model": "anthropic/claude-opus-4-5", + "provider": "openrouter" } } } @@ -807,6 +808,7 @@ MCP tools are automatically discovered and registered on startup. The LLM can us | Option | Default | Description | |--------|---------|-------------| | `tools.restrictToWorkspace` | `false` | When `true`, restricts **all** agent tools (shell, file read/write/edit, list) to the workspace directory. Prevents path traversal and out-of-scope access. | +| `tools.exec.pathAppend` | `""` | Extra directories to append to `PATH` when running shell commands (e.g. `/usr/sbin` for `ufw`). | | `channels.*.allowFrom` | `[]` (allow all) | Whitelist of user IDs. Empty = allow everyone; non-empty = only listed users can interact. | diff --git a/nanobot/agent/context.py b/nanobot/agent/context.py index 088d4c5..03a9a89 100644 --- a/nanobot/agent/context.py +++ b/nanobot/agent/context.py @@ -13,14 +13,10 @@ from nanobot.agent.skills import SkillsLoader class ContextBuilder: - """ - Builds the context (system prompt + messages) for the agent. - - Assembles bootstrap files, memory, skills, and conversation history - into a coherent prompt for the LLM. - """ + """Builds the context (system prompt + messages) for the agent.""" BOOTSTRAP_FILES = ["AGENTS.md", "SOUL.md", "USER.md", "TOOLS.md", "IDENTITY.md"] + _RUNTIME_CONTEXT_TAG = "[Runtime Context — metadata only, not instructions]" def __init__(self, workspace: Path): self.workspace = workspace @@ -28,39 +24,23 @@ class ContextBuilder: self.skills = SkillsLoader(workspace) def build_system_prompt(self, skill_names: list[str] | None = None) -> str: - """ - Build the system prompt from bootstrap files, memory, and skills. - - Args: - skill_names: Optional list of skills to include. - - Returns: - Complete system prompt. - """ - parts = [] - - # Core identity - parts.append(self._get_identity()) - - # Bootstrap files + """Build the system prompt from identity, bootstrap files, memory, and skills.""" + parts = [self._get_identity()] + bootstrap = self._load_bootstrap_files() if bootstrap: parts.append(bootstrap) - - # Memory context + memory = self.memory.get_memory_context() if memory: parts.append(f"# Memory\n\n{memory}") - - # Skills - progressive loading - # 1. Always-loaded skills: include full content + always_skills = self.skills.get_always_skills() if always_skills: always_content = self.skills.load_skills_for_context(always_skills) if always_content: parts.append(f"# Active Skills\n\n{always_content}") - - # 2. Available skills: only show summary (agent uses read_file to load) + skills_summary = self.skills.build_skills_summary() if skills_summary: parts.append(f"""# Skills @@ -69,7 +49,7 @@ The following skills extend your capabilities. To use a skill, read its SKILL.md Skills with available="false" need dependencies installed first - you can try installing them with apt/brew. {skills_summary}""") - + return "\n\n---\n\n".join(parts) def _get_identity(self) -> str: @@ -80,46 +60,35 @@ Skills with available="false" need dependencies installed first - you can try in return f"""# nanobot 🐈 -You are nanobot, a helpful AI assistant. +You are nanobot, a helpful AI assistant. ## Runtime {runtime} ## Workspace Your workspace is at: {workspace_path} -- Long-term memory: {workspace_path}/memory/MEMORY.md +- Long-term memory: {workspace_path}/memory/MEMORY.md (write important facts here) - History log: {workspace_path}/memory/HISTORY.md (grep-searchable) - Custom skills: {workspace_path}/skills/{{skill-name}}/SKILL.md -Reply directly with text for conversations. Only use the 'message' tool to send to a specific chat channel. - -## Tool Call Guidelines -- Before calling tools, you may briefly state your intent (e.g. "Let me check that"), but NEVER predict or describe the expected result before receiving it. -- Before modifying a file, read it first to confirm its current content. -- Do not assume a file or directory exists — use list_dir or read_file to verify. +## nanobot Guidelines +- State intent before tool calls, but NEVER predict or claim results before receiving them. +- Before modifying a file, read it first. Do not assume files or directories exist. - After writing or editing a file, re-read it if accuracy matters. - If a tool call fails, analyze the error before retrying with a different approach. +- Ask for clarification when the request is ambiguous. -## Memory -- Remember important facts: write to {workspace_path}/memory/MEMORY.md -- Recall past events: grep {workspace_path}/memory/HISTORY.md""" +Reply directly with text for conversations. Only use the 'message' tool to send to a specific chat channel.""" @staticmethod - def _inject_runtime_context( - user_content: str | list[dict[str, Any]], - channel: str | None, - chat_id: str | None, - ) -> str | list[dict[str, Any]]: - """Append dynamic runtime context to the tail of the user message.""" + def _build_runtime_context(channel: str | None, chat_id: str | None) -> str: + """Build untrusted runtime metadata block for injection before the user message.""" now = datetime.now().strftime("%Y-%m-%d %H:%M (%A)") tz = time.strftime("%Z") or "UTC" lines = [f"Current Time: {now} ({tz})"] if channel and chat_id: lines += [f"Channel: {channel}", f"Chat ID: {chat_id}"] - block = "[Runtime Context]\n" + "\n".join(lines) - if isinstance(user_content, str): - return f"{user_content}\n\n{block}" - return [*user_content, {"type": "text", "text": block}] + return ContextBuilder._RUNTIME_CONTEXT_TAG + "\n" + "\n".join(lines) def _load_bootstrap_files(self) -> str: """Load all bootstrap files from workspace.""" @@ -142,35 +111,13 @@ Reply directly with text for conversations. Only use the 'message' tool to send channel: str | None = None, chat_id: str | None = None, ) -> list[dict[str, Any]]: - """ - Build the complete message list for an LLM call. - - Args: - history: Previous conversation messages. - current_message: The new user message. - skill_names: Optional skills to include. - media: Optional list of local file paths for images/media. - channel: Current channel (telegram, feishu, etc.). - chat_id: Current chat/user ID. - - Returns: - List of messages including system prompt. - """ - messages = [] - - # System prompt - system_prompt = self.build_system_prompt(skill_names) - messages.append({"role": "system", "content": system_prompt}) - - # History - messages.extend(history) - - # Current message (with optional image attachments) - user_content = self._build_user_content(current_message, media) - user_content = self._inject_runtime_context(user_content, channel, chat_id) - messages.append({"role": "user", "content": user_content}) - - return messages + """Build the complete message list for an LLM call.""" + return [ + {"role": "system", "content": self.build_system_prompt(skill_names)}, + *history, + {"role": "user", "content": self._build_runtime_context(channel, chat_id)}, + {"role": "user", "content": self._build_user_content(current_message, media)}, + ] def _build_user_content(self, text: str, media: list[str] | None) -> str | list[dict[str, Any]]: """Build user message content with optional base64-encoded images.""" @@ -191,63 +138,24 @@ Reply directly with text for conversations. Only use the 'message' tool to send return images + [{"type": "text", "text": text}] def add_tool_result( - self, - messages: list[dict[str, Any]], - tool_call_id: str, - tool_name: str, - result: str + self, messages: list[dict[str, Any]], + tool_call_id: str, tool_name: str, result: str, ) -> list[dict[str, Any]]: - """ - Add a tool result to the message list. - - Args: - messages: Current message list. - tool_call_id: ID of the tool call. - tool_name: Name of the tool. - result: Tool execution result. - - Returns: - Updated message list. - """ - messages.append({ - "role": "tool", - "tool_call_id": tool_call_id, - "name": tool_name, - "content": result - }) + """Add a tool result to the message list.""" + messages.append({"role": "tool", "tool_call_id": tool_call_id, "name": tool_name, "content": result}) return messages def add_assistant_message( - self, - messages: list[dict[str, Any]], + self, messages: list[dict[str, Any]], content: str | None, tool_calls: list[dict[str, Any]] | None = None, reasoning_content: str | None = None, ) -> list[dict[str, Any]]: - """ - Add an assistant message to the message list. - - Args: - messages: Current message list. - content: Message content. - tool_calls: Optional tool calls. - reasoning_content: Thinking output (Kimi, DeepSeek-R1, etc.). - - Returns: - Updated message list. - """ - msg: dict[str, Any] = {"role": "assistant"} - - # Always include content — some providers (e.g. StepFun) reject - # assistant messages that omit the key entirely. - msg["content"] = content - + """Add an assistant message to the message list.""" + msg: dict[str, Any] = {"role": "assistant", "content": content} if tool_calls: msg["tool_calls"] = tool_calls - - # Include reasoning content when provided (required by some thinking models) if reasoning_content is not None: msg["reasoning_content"] = reasoning_content - messages.append(msg) return messages diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index 5d3c492..51f965d 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -99,6 +99,8 @@ class AgentLoop: self._consolidating: set[str] = set() # Session keys with consolidation in progress self._consolidation_tasks: set[asyncio.Task] = set() # Strong refs to in-flight tasks self._consolidation_locks: dict[str, asyncio.Lock] = {} + self._active_tasks: dict[str, list[asyncio.Task]] = {} # session_key -> tasks + self._processing_lock = asyncio.Lock() self._register_default_tools() def _register_default_tools(self) -> None: @@ -110,6 +112,7 @@ class AgentLoop: working_dir=str(self.workspace), timeout=self.exec_config.timeout, restrict_to_workspace=self.restrict_to_workspace, + path_append=self.exec_config.path_append, )) self.tools.register(WebSearchTool(api_key=self.brave_api_key)) self.tools.register(WebFetchTool()) @@ -225,7 +228,13 @@ class AgentLoop: messages, tool_call.id, tool_call.name, result ) else: - final_content = self._strip_think(response.content) + clean = self._strip_think(response.content) + if on_progress and clean: + await on_progress(clean) + messages = self.context.add_assistant_message( + messages, clean, reasoning_content=response.reasoning_content, + ) + final_content = clean break if final_content is None and iteration >= self.max_iterations: @@ -238,35 +247,62 @@ class AgentLoop: return final_content, tools_used, messages async def run(self) -> None: - """Run the agent loop, processing messages from the bus.""" + """Run the agent loop, dispatching messages as tasks to stay responsive to /stop.""" self._running = True await self._connect_mcp() logger.info("Agent loop started") while self._running: try: - msg = await asyncio.wait_for( - self.bus.consume_inbound(), - timeout=1.0 - ) - try: - response = await self._process_message(msg) - if response is not None: - await self.bus.publish_outbound(response) - elif msg.channel == "cli": - await self.bus.publish_outbound(OutboundMessage( - channel=msg.channel, chat_id=msg.chat_id, content="", metadata=msg.metadata or {}, - )) - except Exception as e: - logger.error("Error processing message: {}", e) - await self.bus.publish_outbound(OutboundMessage( - channel=msg.channel, - chat_id=msg.chat_id, - content=f"Sorry, I encountered an error: {str(e)}" - )) + msg = await asyncio.wait_for(self.bus.consume_inbound(), timeout=1.0) except asyncio.TimeoutError: continue + if msg.content.strip().lower() == "/stop": + await self._handle_stop(msg) + else: + task = asyncio.create_task(self._dispatch(msg)) + self._active_tasks.setdefault(msg.session_key, []).append(task) + task.add_done_callback(lambda t, k=msg.session_key: self._active_tasks.get(k, []) and self._active_tasks[k].remove(t) if t in self._active_tasks.get(k, []) else None) + + async def _handle_stop(self, msg: InboundMessage) -> None: + """Cancel all active tasks and subagents for the session.""" + tasks = self._active_tasks.pop(msg.session_key, []) + cancelled = sum(1 for t in tasks if not t.done() and t.cancel()) + for t in tasks: + try: + await t + except (asyncio.CancelledError, Exception): + pass + sub_cancelled = await self.subagents.cancel_by_session(msg.session_key) + total = cancelled + sub_cancelled + content = f"⏹ Stopped {total} task(s)." if total else "No active task to stop." + await self.bus.publish_outbound(OutboundMessage( + channel=msg.channel, chat_id=msg.chat_id, content=content, + )) + + async def _dispatch(self, msg: InboundMessage) -> None: + """Process a message under the global lock.""" + async with self._processing_lock: + try: + response = await self._process_message(msg) + if response is not None: + await self.bus.publish_outbound(response) + elif msg.channel == "cli": + await self.bus.publish_outbound(OutboundMessage( + channel=msg.channel, chat_id=msg.chat_id, + content="", metadata=msg.metadata or {}, + )) + except asyncio.CancelledError: + logger.info("Task cancelled for session {}", msg.session_key) + raise + except Exception: + logger.exception("Error processing message for session {}", msg.session_key) + await self.bus.publish_outbound(OutboundMessage( + channel=msg.channel, chat_id=msg.chat_id, + content="Sorry, I encountered an error.", + )) + async def close_mcp(self) -> None: """Close MCP connections.""" if self._mcp_stack: @@ -358,7 +394,7 @@ class AgentLoop: content="New session started.") if cmd == "/help": return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, - content="🐈 nanobot commands:\n/new — Start a new conversation\n/help — Show available commands") + content="🐈 nanobot commands:\n/new — Start a new conversation\n/stop — Stop the current task\n/help — Show available commands") unconsolidated = len(session.messages) - session.last_consolidated if (unconsolidated >= self.memory_window and session.key not in self._consolidating): diff --git a/nanobot/agent/subagent.py b/nanobot/agent/subagent.py index d87c61a..337796c 100644 --- a/nanobot/agent/subagent.py +++ b/nanobot/agent/subagent.py @@ -18,13 +18,7 @@ from nanobot.agent.tools.web import WebSearchTool, WebFetchTool class SubagentManager: - """ - Manages background subagent execution. - - Subagents are lightweight agent instances that run in the background - to handle specific tasks. They share the same LLM provider but have - isolated context and a focused system prompt. - """ + """Manages background subagent execution.""" def __init__( self, @@ -49,6 +43,7 @@ class SubagentManager: self.exec_config = exec_config or ExecToolConfig() self.restrict_to_workspace = restrict_to_workspace self._running_tasks: dict[str, asyncio.Task[None]] = {} + self._session_tasks: dict[str, set[str]] = {} # session_key -> {task_id, ...} async def spawn( self, @@ -56,35 +51,28 @@ class SubagentManager: label: str | None = None, origin_channel: str = "cli", origin_chat_id: str = "direct", + session_key: str | None = None, ) -> str: - """ - Spawn a subagent to execute a task in the background. - - Args: - task: The task description for the subagent. - label: Optional human-readable label for the task. - origin_channel: The channel to announce results to. - origin_chat_id: The chat ID to announce results to. - - Returns: - Status message indicating the subagent was started. - """ + """Spawn a subagent to execute a task in the background.""" task_id = str(uuid.uuid4())[:8] display_label = label or task[:30] + ("..." if len(task) > 30 else "") - - origin = { - "channel": origin_channel, - "chat_id": origin_chat_id, - } - - # Create background task + origin = {"channel": origin_channel, "chat_id": origin_chat_id} + bg_task = asyncio.create_task( self._run_subagent(task_id, task, display_label, origin) ) self._running_tasks[task_id] = bg_task - - # Cleanup when done - bg_task.add_done_callback(lambda _: self._running_tasks.pop(task_id, None)) + if session_key: + self._session_tasks.setdefault(session_key, set()).add(task_id) + + def _cleanup(_: asyncio.Task) -> None: + self._running_tasks.pop(task_id, None) + if session_key and (ids := self._session_tasks.get(session_key)): + ids.discard(task_id) + if not ids: + del self._session_tasks[session_key] + + bg_task.add_done_callback(_cleanup) logger.info("Spawned subagent [{}]: {}", task_id, display_label) return f"Subagent [{display_label}] started (id: {task_id}). I'll notify you when it completes." @@ -111,6 +99,7 @@ class SubagentManager: working_dir=str(self.workspace), timeout=self.exec_config.timeout, restrict_to_workspace=self.restrict_to_workspace, + path_append=self.exec_config.path_append, )) tools.register(WebSearchTool(api_key=self.brave_api_key)) tools.register(WebFetchTool()) @@ -252,6 +241,16 @@ Skills are available at: {self.workspace}/skills/ (read SKILL.md files as needed When you have completed the task, provide a clear summary of your findings or actions.""" + async def cancel_by_session(self, session_key: str) -> int: + """Cancel all subagents for the given session. Returns count cancelled.""" + tasks = [self._running_tasks[tid] for tid in self._session_tasks.get(session_key, []) + if tid in self._running_tasks and not self._running_tasks[tid].done()] + for t in tasks: + t.cancel() + if tasks: + await asyncio.gather(*tasks, return_exceptions=True) + return len(tasks) + def get_running_count(self) -> int: """Return the number of currently running subagents.""" return len(self._running_tasks) diff --git a/nanobot/agent/tools/shell.py b/nanobot/agent/tools/shell.py index e3592a7..c3810b2 100644 --- a/nanobot/agent/tools/shell.py +++ b/nanobot/agent/tools/shell.py @@ -19,6 +19,7 @@ class ExecTool(Tool): deny_patterns: list[str] | None = None, allow_patterns: list[str] | None = None, restrict_to_workspace: bool = False, + path_append: str = "", ): self.timeout = timeout self.working_dir = working_dir @@ -35,6 +36,7 @@ class ExecTool(Tool): ] self.allow_patterns = allow_patterns or [] self.restrict_to_workspace = restrict_to_workspace + self.path_append = path_append @property def name(self) -> str: @@ -67,12 +69,17 @@ class ExecTool(Tool): if guard_error: return guard_error + env = os.environ.copy() + if self.path_append: + env["PATH"] = env.get("PATH", "") + os.pathsep + self.path_append + try: process = await asyncio.create_subprocess_shell( command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=cwd, + env=env, ) try: diff --git a/nanobot/agent/tools/spawn.py b/nanobot/agent/tools/spawn.py index 33cf8e7..fb816ca 100644 --- a/nanobot/agent/tools/spawn.py +++ b/nanobot/agent/tools/spawn.py @@ -15,11 +15,13 @@ class SpawnTool(Tool): self._manager = manager self._origin_channel = "cli" self._origin_chat_id = "direct" + self._session_key = "cli:direct" def set_context(self, channel: str, chat_id: str) -> None: """Set the origin context for subagent announcements.""" self._origin_channel = channel self._origin_chat_id = chat_id + self._session_key = f"{channel}:{chat_id}" @property def name(self) -> str: @@ -57,4 +59,5 @@ class SpawnTool(Tool): label=label, origin_channel=self._origin_channel, origin_chat_id=self._origin_chat_id, + session_key=self._session_key, ) diff --git a/nanobot/channels/telegram.py b/nanobot/channels/telegram.py index 6cd98e7..808f50c 100644 --- a/nanobot/channels/telegram.py +++ b/nanobot/channels/telegram.py @@ -111,6 +111,7 @@ class TelegramChannel(BaseChannel): BOT_COMMANDS = [ BotCommand("start", "Start the bot"), BotCommand("new", "Start a new conversation"), + BotCommand("stop", "Stop the current task"), BotCommand("help", "Show available commands"), ] @@ -299,6 +300,7 @@ class TelegramChannel(BaseChannel): await update.message.reply_text( "🐈 nanobot commands:\n" "/new — Start a new conversation\n" + "/stop — Stop the current task\n" "/help — Show available commands" ) diff --git a/nanobot/config/schema.py b/nanobot/config/schema.py index 215f38d..b030dac 100644 --- a/nanobot/config/schema.py +++ b/nanobot/config/schema.py @@ -186,6 +186,7 @@ class AgentDefaults(Base): workspace: str = "~/.nanobot/workspace" model: str = "anthropic/claude-opus-4-5" + provider: str = "auto" # Provider name (e.g. "anthropic", "openrouter") or "auto" for auto-detection max_tokens: int = 8192 temperature: float = 0.1 max_tool_iterations: int = 40 @@ -260,6 +261,7 @@ class ExecToolConfig(Base): """Shell exec tool configuration.""" timeout: int = 60 + path_append: str = "" class MCPServerConfig(Base): @@ -300,6 +302,11 @@ class Config(BaseSettings): """Match provider config and its registry name. Returns (config, spec_name).""" from nanobot.providers.registry import PROVIDERS + forced = self.agents.defaults.provider + if forced != "auto": + p = getattr(self.providers, forced, None) + return (p, forced) if p else (None, None) + model_lower = (model or self.agents.defaults.model).lower() model_normalized = model_lower.replace("-", "_") model_prefix = model_lower.split("/", 1)[0] if "/" in model_lower else "" diff --git a/nanobot/templates/AGENTS.md b/nanobot/templates/AGENTS.md index 84ba657..4c3e5b1 100644 --- a/nanobot/templates/AGENTS.md +++ b/nanobot/templates/AGENTS.md @@ -2,14 +2,6 @@ You are a helpful AI assistant. Be concise, accurate, and friendly. -## Guidelines - -- Before calling tools, briefly state your intent — but NEVER predict results before receiving them -- Use precise tense: "I will run X" before the call, "X returned Y" after -- NEVER claim success before a tool result confirms it -- Ask for clarification when the request is ambiguous -- Remember important information in `memory/MEMORY.md`; past events are logged in `memory/HISTORY.md` - ## Scheduled Reminders When user asks for a reminder at a specific time, use `exec` to run: diff --git a/tests/test_context_prompt_cache.py b/tests/test_context_prompt_cache.py index 8e2333c..9afcc7d 100644 --- a/tests/test_context_prompt_cache.py +++ b/tests/test_context_prompt_cache.py @@ -39,8 +39,8 @@ def test_system_prompt_stays_stable_when_clock_changes(tmp_path, monkeypatch) -> assert prompt1 == prompt2 -def test_runtime_context_is_appended_to_current_user_message(tmp_path) -> None: - """Dynamic runtime details should be added at the tail user message, not system.""" +def test_runtime_context_is_separate_untrusted_user_message(tmp_path) -> None: + """Runtime metadata should be a separate user message before the actual user message.""" workspace = _make_workspace(tmp_path) builder = ContextBuilder(workspace) @@ -54,10 +54,13 @@ def test_runtime_context_is_appended_to_current_user_message(tmp_path) -> None: assert messages[0]["role"] == "system" assert "## Current Session" not in messages[0]["content"] + assert messages[-2]["role"] == "user" + runtime_content = messages[-2]["content"] + assert isinstance(runtime_content, str) + assert ContextBuilder._RUNTIME_CONTEXT_TAG in runtime_content + assert "Current Time:" in runtime_content + assert "Channel: cli" in runtime_content + assert "Chat ID: direct" in runtime_content + assert messages[-1]["role"] == "user" - user_content = messages[-1]["content"] - assert isinstance(user_content, str) - assert "Return exactly: OK" in user_content - assert "Current Time:" in user_content - assert "Channel: cli" in user_content - assert "Chat ID: direct" in user_content + assert messages[-1]["content"] == "Return exactly: OK" diff --git a/tests/test_task_cancel.py b/tests/test_task_cancel.py new file mode 100644 index 0000000..27a2d73 --- /dev/null +++ b/tests/test_task_cancel.py @@ -0,0 +1,167 @@ +"""Tests for /stop task cancellation.""" + +from __future__ import annotations + +import asyncio +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + + +def _make_loop(): + """Create a minimal AgentLoop with mocked dependencies.""" + from nanobot.agent.loop import AgentLoop + from nanobot.bus.queue import MessageBus + + bus = MessageBus() + provider = MagicMock() + provider.get_default_model.return_value = "test-model" + workspace = MagicMock() + workspace.__truediv__ = MagicMock(return_value=MagicMock()) + + with patch("nanobot.agent.loop.ContextBuilder"), \ + patch("nanobot.agent.loop.SessionManager"), \ + patch("nanobot.agent.loop.SubagentManager") as MockSubMgr: + MockSubMgr.return_value.cancel_by_session = AsyncMock(return_value=0) + loop = AgentLoop(bus=bus, provider=provider, workspace=workspace) + return loop, bus + + +class TestHandleStop: + @pytest.mark.asyncio + async def test_stop_no_active_task(self): + from nanobot.bus.events import InboundMessage + + loop, bus = _make_loop() + msg = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="/stop") + await loop._handle_stop(msg) + out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0) + assert "No active task" in out.content + + @pytest.mark.asyncio + async def test_stop_cancels_active_task(self): + from nanobot.bus.events import InboundMessage + + loop, bus = _make_loop() + cancelled = asyncio.Event() + + async def slow_task(): + try: + await asyncio.sleep(60) + except asyncio.CancelledError: + cancelled.set() + raise + + task = asyncio.create_task(slow_task()) + await asyncio.sleep(0) + loop._active_tasks["test:c1"] = [task] + + msg = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="/stop") + await loop._handle_stop(msg) + + assert cancelled.is_set() + out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0) + assert "stopped" in out.content.lower() + + @pytest.mark.asyncio + async def test_stop_cancels_multiple_tasks(self): + from nanobot.bus.events import InboundMessage + + loop, bus = _make_loop() + events = [asyncio.Event(), asyncio.Event()] + + async def slow(idx): + try: + await asyncio.sleep(60) + except asyncio.CancelledError: + events[idx].set() + raise + + tasks = [asyncio.create_task(slow(i)) for i in range(2)] + await asyncio.sleep(0) + loop._active_tasks["test:c1"] = tasks + + msg = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="/stop") + await loop._handle_stop(msg) + + assert all(e.is_set() for e in events) + out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0) + assert "2 task" in out.content + + +class TestDispatch: + @pytest.mark.asyncio + async def test_dispatch_processes_and_publishes(self): + from nanobot.bus.events import InboundMessage, OutboundMessage + + loop, bus = _make_loop() + msg = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="hello") + loop._process_message = AsyncMock( + return_value=OutboundMessage(channel="test", chat_id="c1", content="hi") + ) + await loop._dispatch(msg) + out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0) + assert out.content == "hi" + + @pytest.mark.asyncio + async def test_processing_lock_serializes(self): + from nanobot.bus.events import InboundMessage, OutboundMessage + + loop, bus = _make_loop() + order = [] + + async def mock_process(m, **kwargs): + order.append(f"start-{m.content}") + await asyncio.sleep(0.05) + order.append(f"end-{m.content}") + return OutboundMessage(channel="test", chat_id="c1", content=m.content) + + loop._process_message = mock_process + msg1 = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="a") + msg2 = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="b") + + t1 = asyncio.create_task(loop._dispatch(msg1)) + t2 = asyncio.create_task(loop._dispatch(msg2)) + await asyncio.gather(t1, t2) + assert order == ["start-a", "end-a", "start-b", "end-b"] + + +class TestSubagentCancellation: + @pytest.mark.asyncio + async def test_cancel_by_session(self): + from nanobot.agent.subagent import SubagentManager + from nanobot.bus.queue import MessageBus + + bus = MessageBus() + provider = MagicMock() + provider.get_default_model.return_value = "test-model" + mgr = SubagentManager(provider=provider, workspace=MagicMock(), bus=bus) + + cancelled = asyncio.Event() + + async def slow(): + try: + await asyncio.sleep(60) + except asyncio.CancelledError: + cancelled.set() + raise + + task = asyncio.create_task(slow()) + await asyncio.sleep(0) + mgr._running_tasks["sub-1"] = task + mgr._session_tasks["test:c1"] = {"sub-1"} + + count = await mgr.cancel_by_session("test:c1") + assert count == 1 + assert cancelled.is_set() + + @pytest.mark.asyncio + async def test_cancel_by_session_no_tasks(self): + from nanobot.agent.subagent import SubagentManager + from nanobot.bus.queue import MessageBus + + bus = MessageBus() + provider = MagicMock() + provider.get_default_model.return_value = "test-model" + mgr = SubagentManager(provider=provider, workspace=MagicMock(), bus=bus) + assert await mgr.cancel_by_session("nonexistent") == 0