From cdbede2fa89eb8c6c31ebbaab2f9276463e5231e Mon Sep 17 00:00:00 2001 From: Re-bin Date: Wed, 25 Feb 2026 17:04:08 +0000 Subject: [PATCH] refactor: simplify /stop dispatch, inline commands, trim verbose docstrings --- nanobot/agent/commands.py | 59 ------- nanobot/agent/context.py | 146 ++++-------------- nanobot/agent/loop.py | 126 ++++++--------- nanobot/agent/subagent.py | 60 ++------ nanobot/channels/telegram.py | 2 + nanobot/templates/AGENTS.md | 8 - tests/test_task_cancel.py | 287 +++++++++-------------------------- 7 files changed, 159 insertions(+), 529 deletions(-) delete mode 100644 nanobot/agent/commands.py diff --git a/nanobot/agent/commands.py b/nanobot/agent/commands.py deleted file mode 100644 index 2b4fd9b..0000000 --- a/nanobot/agent/commands.py +++ /dev/null @@ -1,59 +0,0 @@ -"""Command definitions and dispatch for the agent loop. - -Commands are slash-prefixed messages (e.g. /stop, /new, /help) that are -handled specially — either immediately in the run() loop or inside -_process_message before the LLM is called. - -To add a new command: -1. Add a CommandDef to COMMANDS -2. If immediate=True, add a handler in AgentLoop._handle_immediate_command -3. If immediate=False, add handling in AgentLoop._process_message -""" - -from __future__ import annotations - -from dataclasses import dataclass - - -@dataclass(frozen=True) -class CommandDef: - """Definition of a slash command.""" - - name: str - description: str - immediate: bool = False # True = handled in run() loop, bypasses message processing - - -# Registry of all known commands. -# "immediate" commands are handled while the agent may be busy (e.g. /stop). -# Non-immediate commands go through normal _process_message flow. -COMMANDS: dict[str, CommandDef] = { - "/stop": CommandDef("/stop", "Stop the current task", immediate=True), - "/new": CommandDef("/new", "Start a new conversation"), - "/help": CommandDef("/help", "Show available commands"), -} - - -def parse_command(text: str) -> str | None: - """Extract a slash command from message text. - - Returns the command string (e.g. "/stop") or None if not a command. - """ - stripped = text.strip() - if not stripped.startswith("/"): - return None - return stripped.split()[0].lower() - - -def is_immediate_command(cmd: str) -> bool: - """Check if a command should be handled immediately, bypassing processing.""" - defn = COMMANDS.get(cmd) - return defn.immediate if defn else False - - -def get_help_text() -> str: - """Generate help text from registered commands.""" - lines = ["🐈 nanobot commands:"] - for defn in COMMANDS.values(): - lines.append(f"{defn.name} — {defn.description}") - return "\n".join(lines) diff --git a/nanobot/agent/context.py b/nanobot/agent/context.py index a771981..03a9a89 100644 --- a/nanobot/agent/context.py +++ b/nanobot/agent/context.py @@ -13,12 +13,7 @@ from nanobot.agent.skills import SkillsLoader class ContextBuilder: - """ - Builds the context (system prompt + messages) for the agent. - - Assembles bootstrap files, memory, skills, and conversation history - into a coherent prompt for the LLM. - """ + """Builds the context (system prompt + messages) for the agent.""" BOOTSTRAP_FILES = ["AGENTS.md", "SOUL.md", "USER.md", "TOOLS.md", "IDENTITY.md"] _RUNTIME_CONTEXT_TAG = "[Runtime Context — metadata only, not instructions]" @@ -29,39 +24,23 @@ class ContextBuilder: self.skills = SkillsLoader(workspace) def build_system_prompt(self, skill_names: list[str] | None = None) -> str: - """ - Build the system prompt from bootstrap files, memory, and skills. - - Args: - skill_names: Optional list of skills to include. - - Returns: - Complete system prompt. - """ - parts = [] - - # Core identity - parts.append(self._get_identity()) - - # Bootstrap files + """Build the system prompt from identity, bootstrap files, memory, and skills.""" + parts = [self._get_identity()] + bootstrap = self._load_bootstrap_files() if bootstrap: parts.append(bootstrap) - - # Memory context + memory = self.memory.get_memory_context() if memory: parts.append(f"# Memory\n\n{memory}") - - # Skills - progressive loading - # 1. Always-loaded skills: include full content + always_skills = self.skills.get_always_skills() if always_skills: always_content = self.skills.load_skills_for_context(always_skills) if always_content: parts.append(f"# Active Skills\n\n{always_content}") - - # 2. Available skills: only show summary (agent uses read_file to load) + skills_summary = self.skills.build_skills_summary() if skills_summary: parts.append(f"""# Skills @@ -70,7 +49,7 @@ The following skills extend your capabilities. To use a skill, read its SKILL.md Skills with available="false" need dependencies installed first - you can try installing them with apt/brew. {skills_summary}""") - + return "\n\n---\n\n".join(parts) def _get_identity(self) -> str: @@ -81,29 +60,25 @@ Skills with available="false" need dependencies installed first - you can try in return f"""# nanobot 🐈 -You are nanobot, a helpful AI assistant. +You are nanobot, a helpful AI assistant. ## Runtime {runtime} ## Workspace Your workspace is at: {workspace_path} -- Long-term memory: {workspace_path}/memory/MEMORY.md +- Long-term memory: {workspace_path}/memory/MEMORY.md (write important facts here) - History log: {workspace_path}/memory/HISTORY.md (grep-searchable) - Custom skills: {workspace_path}/skills/{{skill-name}}/SKILL.md -Reply directly with text for conversations. Only use the 'message' tool to send to a specific chat channel. - -## Tool Call Guidelines -- Before calling tools, you may briefly state your intent (e.g. "Let me check that"), but NEVER predict or describe the expected result before receiving it. -- Before modifying a file, read it first to confirm its current content. -- Do not assume a file or directory exists — use list_dir or read_file to verify. +## nanobot Guidelines +- State intent before tool calls, but NEVER predict or claim results before receiving them. +- Before modifying a file, read it first. Do not assume files or directories exist. - After writing or editing a file, re-read it if accuracy matters. - If a tool call fails, analyze the error before retrying with a different approach. +- Ask for clarification when the request is ambiguous. -## Memory -- Remember important facts: write to {workspace_path}/memory/MEMORY.md -- Recall past events: grep {workspace_path}/memory/HISTORY.md""" +Reply directly with text for conversations. Only use the 'message' tool to send to a specific chat channel.""" @staticmethod def _build_runtime_context(channel: str | None, chat_id: str | None) -> str: @@ -136,37 +111,13 @@ Reply directly with text for conversations. Only use the 'message' tool to send channel: str | None = None, chat_id: str | None = None, ) -> list[dict[str, Any]]: - """ - Build the complete message list for an LLM call. - - Args: - history: Previous conversation messages. - current_message: The new user message. - skill_names: Optional skills to include. - media: Optional list of local file paths for images/media. - channel: Current channel (telegram, feishu, etc.). - chat_id: Current chat/user ID. - - Returns: - List of messages including system prompt. - """ - messages = [] - - # System prompt - system_prompt = self.build_system_prompt(skill_names) - messages.append({"role": "system", "content": system_prompt}) - - # History - messages.extend(history) - - # Inject runtime metadata as a separate user message before the actual user message. - messages.append({"role": "user", "content": self._build_runtime_context(channel, chat_id)}) - - # Current user message - user_content = self._build_user_content(current_message, media) - messages.append({"role": "user", "content": user_content}) - - return messages + """Build the complete message list for an LLM call.""" + return [ + {"role": "system", "content": self.build_system_prompt(skill_names)}, + *history, + {"role": "user", "content": self._build_runtime_context(channel, chat_id)}, + {"role": "user", "content": self._build_user_content(current_message, media)}, + ] def _build_user_content(self, text: str, media: list[str] | None) -> str | list[dict[str, Any]]: """Build user message content with optional base64-encoded images.""" @@ -187,63 +138,24 @@ Reply directly with text for conversations. Only use the 'message' tool to send return images + [{"type": "text", "text": text}] def add_tool_result( - self, - messages: list[dict[str, Any]], - tool_call_id: str, - tool_name: str, - result: str + self, messages: list[dict[str, Any]], + tool_call_id: str, tool_name: str, result: str, ) -> list[dict[str, Any]]: - """ - Add a tool result to the message list. - - Args: - messages: Current message list. - tool_call_id: ID of the tool call. - tool_name: Name of the tool. - result: Tool execution result. - - Returns: - Updated message list. - """ - messages.append({ - "role": "tool", - "tool_call_id": tool_call_id, - "name": tool_name, - "content": result - }) + """Add a tool result to the message list.""" + messages.append({"role": "tool", "tool_call_id": tool_call_id, "name": tool_name, "content": result}) return messages def add_assistant_message( - self, - messages: list[dict[str, Any]], + self, messages: list[dict[str, Any]], content: str | None, tool_calls: list[dict[str, Any]] | None = None, reasoning_content: str | None = None, ) -> list[dict[str, Any]]: - """ - Add an assistant message to the message list. - - Args: - messages: Current message list. - content: Message content. - tool_calls: Optional tool calls. - reasoning_content: Thinking output (Kimi, DeepSeek-R1, etc.). - - Returns: - Updated message list. - """ - msg: dict[str, Any] = {"role": "assistant"} - - # Always include content — some providers (e.g. StepFun) reject - # assistant messages that omit the key entirely. - msg["content"] = content - + """Add an assistant message to the message list.""" + msg: dict[str, Any] = {"role": "assistant", "content": content} if tool_calls: msg["tool_calls"] = tool_calls - - # Include reasoning content when provided (required by some thinking models) if reasoning_content is not None: msg["reasoning_content"] = reasoning_content - messages.append(msg) return messages diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index e03f0e6..4155127 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -11,7 +11,6 @@ from typing import TYPE_CHECKING, Any, Awaitable, Callable from loguru import logger -from nanobot.agent.commands import get_help_text, is_immediate_command, parse_command from nanobot.agent.context import ContextBuilder from nanobot.agent.memory import MemoryStore from nanobot.agent.subagent import SubagentManager @@ -100,9 +99,8 @@ class AgentLoop: self._consolidating: set[str] = set() # Session keys with consolidation in progress self._consolidation_tasks: set[asyncio.Task] = set() # Strong refs to in-flight tasks self._consolidation_locks: dict[str, asyncio.Lock] = {} - self._active_tasks: dict[str, asyncio.Task] = {} # session_key -> running task - self._pending_tasks: set[asyncio.Task] = set() # Strong refs until dispatch starts - self._processing_lock = asyncio.Lock() # Serialize message processing + self._active_tasks: dict[str, list[asyncio.Task]] = {} # session_key -> tasks + self._processing_lock = asyncio.Lock() self._register_default_tools() def _register_default_tools(self) -> None: @@ -243,97 +241,61 @@ class AgentLoop: return final_content, tools_used, messages async def run(self) -> None: - """Run the agent loop, processing messages from the bus. - - Regular messages are dispatched as asyncio tasks so the loop stays - responsive to immediate commands like /stop. A global processing - lock serializes message handling to avoid shared-state races. - """ + """Run the agent loop, dispatching messages as tasks to stay responsive to /stop.""" self._running = True await self._connect_mcp() logger.info("Agent loop started") while self._running: try: - msg = await asyncio.wait_for( - self.bus.consume_inbound(), - timeout=1.0 - ) - - # Immediate commands (/stop) are handled inline - cmd = parse_command(msg.content) - if cmd and is_immediate_command(cmd): - await self._handle_immediate_command(cmd, msg) - continue - - # Regular messages (including non-immediate commands) are - # dispatched as tasks so the loop keeps consuming. - task = asyncio.create_task(self._dispatch(msg)) - self._pending_tasks.add(task) - task.add_done_callback(self._pending_tasks.discard) - + msg = await asyncio.wait_for(self.bus.consume_inbound(), timeout=1.0) except asyncio.TimeoutError: continue - async def _handle_immediate_command(self, cmd: str, msg: InboundMessage) -> None: - """Handle a command that must be processed while the agent may be busy.""" - if cmd == "/stop": - task = self._active_tasks.get(msg.session_key) - sub_cancelled = await self.subagents.cancel_by_session(msg.session_key) - if task and not task.done(): - task.cancel() - try: - await task - except (asyncio.CancelledError, Exception): - pass - parts = ["⏹ Task stopped."] - if sub_cancelled: - parts.append(f"Also stopped {sub_cancelled} background task(s).") - await self.bus.publish_outbound(OutboundMessage( - channel=msg.channel, chat_id=msg.chat_id, - content=" ".join(parts), - )) - elif sub_cancelled: - await self.bus.publish_outbound(OutboundMessage( - channel=msg.channel, chat_id=msg.chat_id, - content=f"⏹ Stopped {sub_cancelled} background task(s).", - )) + if msg.content.strip().lower() == "/stop": + await self._handle_stop(msg) else: - await self.bus.publish_outbound(OutboundMessage( - channel=msg.channel, chat_id=msg.chat_id, - content="No active task to stop.", - )) + task = asyncio.create_task(self._dispatch(msg)) + self._active_tasks.setdefault(msg.session_key, []).append(task) + task.add_done_callback(lambda t, k=msg.session_key: self._active_tasks.get(k, []) and self._active_tasks[k].remove(t) if t in self._active_tasks.get(k, []) else None) + + async def _handle_stop(self, msg: InboundMessage) -> None: + """Cancel all active tasks and subagents for the session.""" + tasks = self._active_tasks.pop(msg.session_key, []) + cancelled = sum(1 for t in tasks if not t.done() and t.cancel()) + for t in tasks: + try: + await t + except (asyncio.CancelledError, Exception): + pass + sub_cancelled = await self.subagents.cancel_by_session(msg.session_key) + total = cancelled + sub_cancelled + content = f"⏹ Stopped {total} task(s)." if total else "No active task to stop." + await self.bus.publish_outbound(OutboundMessage( + channel=msg.channel, chat_id=msg.chat_id, content=content, + )) async def _dispatch(self, msg: InboundMessage) -> None: - """Dispatch a message for processing under the global lock. - - The task is registered in _active_tasks *before* acquiring the lock - so that /stop can find (and cancel) tasks that are still queued. - """ - self._active_tasks[msg.session_key] = asyncio.current_task() # type: ignore[arg-type] - try: - async with self._processing_lock: - try: - response = await self._process_message(msg) - if response is not None: - await self.bus.publish_outbound(response) - elif msg.channel == "cli": - await self.bus.publish_outbound(OutboundMessage( - channel=msg.channel, chat_id=msg.chat_id, - content="", metadata=msg.metadata or {}, - )) - except asyncio.CancelledError: - logger.info("Task cancelled for session {}", msg.session_key) - # Response already sent by _handle_immediate_command - except Exception as e: - logger.error("Error processing message: {}", e) + """Process a message under the global lock.""" + async with self._processing_lock: + try: + response = await self._process_message(msg) + if response is not None: + await self.bus.publish_outbound(response) + elif msg.channel == "cli": await self.bus.publish_outbound(OutboundMessage( - channel=msg.channel, - chat_id=msg.chat_id, - content=f"Sorry, I encountered an error: {str(e)}" + channel=msg.channel, chat_id=msg.chat_id, + content="", metadata=msg.metadata or {}, )) - finally: - self._active_tasks.pop(msg.session_key, None) + except asyncio.CancelledError: + logger.info("Task cancelled for session {}", msg.session_key) + raise + except Exception: + logger.exception("Error processing message for session {}", msg.session_key) + await self.bus.publish_outbound(OutboundMessage( + channel=msg.channel, chat_id=msg.chat_id, + content="Sorry, I encountered an error.", + )) async def close_mcp(self) -> None: """Close MCP connections.""" @@ -426,7 +388,7 @@ class AgentLoop: content="New session started.") if cmd == "/help": return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, - content=get_help_text()) + content="🐈 nanobot commands:\n/new — Start a new conversation\n/stop — Stop the current task\n/help — Show available commands") unconsolidated = len(session.messages) - session.last_consolidated if (unconsolidated >= self.memory_window and session.key not in self._consolidating): diff --git a/nanobot/agent/subagent.py b/nanobot/agent/subagent.py index 1c1557e..337796c 100644 --- a/nanobot/agent/subagent.py +++ b/nanobot/agent/subagent.py @@ -18,13 +18,7 @@ from nanobot.agent.tools.web import WebSearchTool, WebFetchTool class SubagentManager: - """ - Manages background subagent execution. - - Subagents are lightweight agent instances that run in the background - to handle specific tasks. They share the same LLM provider but have - isolated context and a focused system prompt. - """ + """Manages background subagent execution.""" def __init__( self, @@ -59,43 +53,24 @@ class SubagentManager: origin_chat_id: str = "direct", session_key: str | None = None, ) -> str: - """ - Spawn a subagent to execute a task in the background. - - Args: - task: The task description for the subagent. - label: Optional human-readable label for the task. - origin_channel: The channel to announce results to. - origin_chat_id: The chat ID to announce results to. - - Returns: - Status message indicating the subagent was started. - """ + """Spawn a subagent to execute a task in the background.""" task_id = str(uuid.uuid4())[:8] display_label = label or task[:30] + ("..." if len(task) > 30 else "") - - origin = { - "channel": origin_channel, - "chat_id": origin_chat_id, - } - - # Create background task + origin = {"channel": origin_channel, "chat_id": origin_chat_id} + bg_task = asyncio.create_task( self._run_subagent(task_id, task, display_label, origin) ) self._running_tasks[task_id] = bg_task - if session_key: self._session_tasks.setdefault(session_key, set()).add(task_id) def _cleanup(_: asyncio.Task) -> None: self._running_tasks.pop(task_id, None) - if session_key: - ids = self._session_tasks.get(session_key) - if ids: - ids.discard(task_id) - if not ids: - self._session_tasks.pop(session_key, None) + if session_key and (ids := self._session_tasks.get(session_key)): + ids.discard(task_id) + if not ids: + del self._session_tasks[session_key] bg_task.add_done_callback(_cleanup) @@ -267,17 +242,14 @@ Skills are available at: {self.workspace}/skills/ (read SKILL.md files as needed When you have completed the task, provide a clear summary of your findings or actions.""" async def cancel_by_session(self, session_key: str) -> int: - """Cancel all subagents spawned under the given session. Returns count cancelled.""" - task_ids = list(self._session_tasks.get(session_key, [])) - to_cancel: list[asyncio.Task] = [] - for tid in task_ids: - t = self._running_tasks.get(tid) - if t and not t.done(): - t.cancel() - to_cancel.append(t) - if to_cancel: - await asyncio.gather(*to_cancel, return_exceptions=True) - return len(to_cancel) + """Cancel all subagents for the given session. Returns count cancelled.""" + tasks = [self._running_tasks[tid] for tid in self._session_tasks.get(session_key, []) + if tid in self._running_tasks and not self._running_tasks[tid].done()] + for t in tasks: + t.cancel() + if tasks: + await asyncio.gather(*tasks, return_exceptions=True) + return len(tasks) def get_running_count(self) -> int: """Return the number of currently running subagents.""" diff --git a/nanobot/channels/telegram.py b/nanobot/channels/telegram.py index 6cd98e7..808f50c 100644 --- a/nanobot/channels/telegram.py +++ b/nanobot/channels/telegram.py @@ -111,6 +111,7 @@ class TelegramChannel(BaseChannel): BOT_COMMANDS = [ BotCommand("start", "Start the bot"), BotCommand("new", "Start a new conversation"), + BotCommand("stop", "Stop the current task"), BotCommand("help", "Show available commands"), ] @@ -299,6 +300,7 @@ class TelegramChannel(BaseChannel): await update.message.reply_text( "🐈 nanobot commands:\n" "/new — Start a new conversation\n" + "/stop — Stop the current task\n" "/help — Show available commands" ) diff --git a/nanobot/templates/AGENTS.md b/nanobot/templates/AGENTS.md index 84ba657..4c3e5b1 100644 --- a/nanobot/templates/AGENTS.md +++ b/nanobot/templates/AGENTS.md @@ -2,14 +2,6 @@ You are a helpful AI assistant. Be concise, accurate, and friendly. -## Guidelines - -- Before calling tools, briefly state your intent — but NEVER predict results before receiving them -- Use precise tense: "I will run X" before the call, "X returned Y" after -- NEVER claim success before a tool result confirms it -- Ask for clarification when the request is ambiguous -- Remember important information in `memory/MEMORY.md`; past events are logged in `memory/HISTORY.md` - ## Scheduled Reminders When user asks for a reminder at a specific time, use `exec` to run: diff --git a/tests/test_task_cancel.py b/tests/test_task_cancel.py index 5c0c4b7..27a2d73 100644 --- a/tests/test_task_cancel.py +++ b/tests/test_task_cancel.py @@ -1,4 +1,4 @@ -"""Tests for the command system and task cancellation.""" +"""Tests for /stop task cancellation.""" from __future__ import annotations @@ -7,117 +7,42 @@ from unittest.mock import AsyncMock, MagicMock, patch import pytest -from nanobot.agent.commands import ( - COMMANDS, - get_help_text, - is_immediate_command, - parse_command, -) + +def _make_loop(): + """Create a minimal AgentLoop with mocked dependencies.""" + from nanobot.agent.loop import AgentLoop + from nanobot.bus.queue import MessageBus + + bus = MessageBus() + provider = MagicMock() + provider.get_default_model.return_value = "test-model" + workspace = MagicMock() + workspace.__truediv__ = MagicMock(return_value=MagicMock()) + + with patch("nanobot.agent.loop.ContextBuilder"), \ + patch("nanobot.agent.loop.SessionManager"), \ + patch("nanobot.agent.loop.SubagentManager") as MockSubMgr: + MockSubMgr.return_value.cancel_by_session = AsyncMock(return_value=0) + loop = AgentLoop(bus=bus, provider=provider, workspace=workspace) + return loop, bus -# --------------------------------------------------------------------------- -# commands.py unit tests -# --------------------------------------------------------------------------- - -class TestParseCommand: - def test_slash_command(self): - assert parse_command("/stop") == "/stop" - - def test_slash_command_with_args(self): - assert parse_command("/new some args") == "/new" - - def test_not_a_command(self): - assert parse_command("hello world") is None - - def test_empty_string(self): - assert parse_command("") is None - - def test_leading_whitespace(self): - assert parse_command(" /help") == "/help" - - def test_uppercase_normalized(self): - assert parse_command("/STOP") == "/stop" - - -class TestIsImmediateCommand: - def test_stop_is_immediate(self): - assert is_immediate_command("/stop") is True - - def test_new_is_not_immediate(self): - assert is_immediate_command("/new") is False - - def test_help_is_not_immediate(self): - assert is_immediate_command("/help") is False - - def test_unknown_command(self): - assert is_immediate_command("/unknown") is False - - -class TestGetHelpText: - def test_contains_all_commands(self): - text = get_help_text() - for cmd in COMMANDS: - assert cmd in text - - def test_contains_descriptions(self): - text = get_help_text() - for defn in COMMANDS.values(): - assert defn.description in text - - def test_starts_with_header(self): - assert get_help_text().startswith("🐈") - - -# --------------------------------------------------------------------------- -# Task cancellation integration tests -# --------------------------------------------------------------------------- - -class TestTaskCancellation: - """Tests for /stop cancelling an active task in AgentLoop.""" - - def _make_loop(self): - """Create a minimal AgentLoop with mocked dependencies.""" - from nanobot.agent.loop import AgentLoop - from nanobot.bus.queue import MessageBus - - bus = MessageBus() - provider = MagicMock() - provider.get_default_model.return_value = "test-model" - workspace = MagicMock() - workspace.__truediv__ = MagicMock(return_value=MagicMock()) - - with patch("nanobot.agent.loop.ContextBuilder"), \ - patch("nanobot.agent.loop.SessionManager"), \ - patch("nanobot.agent.loop.SubagentManager") as MockSubMgr: - MockSubMgr.return_value.cancel_by_session = AsyncMock(return_value=0) - loop = AgentLoop( - bus=bus, - provider=provider, - workspace=workspace, - ) - return loop, bus - +class TestHandleStop: @pytest.mark.asyncio async def test_stop_no_active_task(self): - """'/stop' when nothing is running returns 'No active task'.""" from nanobot.bus.events import InboundMessage - loop, bus = self._make_loop() - msg = InboundMessage( - channel="test", sender_id="u1", chat_id="c1", content="/stop" - ) - await loop._handle_immediate_command("/stop", msg) + loop, bus = _make_loop() + msg = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="/stop") + await loop._handle_stop(msg) out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0) assert "No active task" in out.content @pytest.mark.asyncio async def test_stop_cancels_active_task(self): - """'/stop' cancels a running task.""" from nanobot.bus.events import InboundMessage - loop, bus = self._make_loop() - session_key = "test:c1" - + loop, bus = _make_loop() cancelled = asyncio.Event() async def slow_task(): @@ -128,74 +53,61 @@ class TestTaskCancellation: raise task = asyncio.create_task(slow_task()) - await asyncio.sleep(0) # Let task enter its await - loop._active_tasks[session_key] = task + await asyncio.sleep(0) + loop._active_tasks["test:c1"] = [task] - msg = InboundMessage( - channel="test", sender_id="u1", chat_id="c1", content="/stop" - ) - await loop._handle_immediate_command("/stop", msg) + msg = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="/stop") + await loop._handle_stop(msg) assert cancelled.is_set() - assert task.cancelled() out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0) assert "stopped" in out.content.lower() @pytest.mark.asyncio - async def test_dispatch_registers_and_clears_task(self): - """_dispatch registers the task in _active_tasks and clears it after.""" + async def test_stop_cancels_multiple_tasks(self): + from nanobot.bus.events import InboundMessage + + loop, bus = _make_loop() + events = [asyncio.Event(), asyncio.Event()] + + async def slow(idx): + try: + await asyncio.sleep(60) + except asyncio.CancelledError: + events[idx].set() + raise + + tasks = [asyncio.create_task(slow(i)) for i in range(2)] + await asyncio.sleep(0) + loop._active_tasks["test:c1"] = tasks + + msg = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="/stop") + await loop._handle_stop(msg) + + assert all(e.is_set() for e in events) + out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0) + assert "2 task" in out.content + + +class TestDispatch: + @pytest.mark.asyncio + async def test_dispatch_processes_and_publishes(self): from nanobot.bus.events import InboundMessage, OutboundMessage - loop, bus = self._make_loop() - msg = InboundMessage( - channel="test", sender_id="u1", chat_id="c1", content="hello" - ) - - # Mock _process_message to return a simple response + loop, bus = _make_loop() + msg = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="hello") loop._process_message = AsyncMock( return_value=OutboundMessage(channel="test", chat_id="c1", content="hi") ) - - task = asyncio.create_task(loop._dispatch(msg)) - await task - - # Task should be cleaned up - assert msg.session_key not in loop._active_tasks - - @pytest.mark.asyncio - async def test_dispatch_handles_cancelled_error(self): - """_dispatch catches CancelledError gracefully.""" - from nanobot.bus.events import InboundMessage - - loop, bus = self._make_loop() - msg = InboundMessage( - channel="test", sender_id="u1", chat_id="c1", content="hello" - ) - - async def mock_process(m, **kwargs): - await asyncio.sleep(60) - - loop._process_message = mock_process - - task = asyncio.create_task(loop._dispatch(msg)) - await asyncio.sleep(0.05) # Let task start - - assert msg.session_key in loop._active_tasks - task.cancel() - try: - await task - except asyncio.CancelledError: - pass - - # Task should be cleaned up even after cancel - assert msg.session_key not in loop._active_tasks + await loop._dispatch(msg) + out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0) + assert out.content == "hi" @pytest.mark.asyncio async def test_processing_lock_serializes(self): - """Only one message processes at a time due to _processing_lock.""" from nanobot.bus.events import InboundMessage, OutboundMessage - loop, bus = self._make_loop() + loop, bus = _make_loop() order = [] async def mock_process(m, **kwargs): @@ -205,27 +117,18 @@ class TestTaskCancellation: return OutboundMessage(channel="test", chat_id="c1", content=m.content) loop._process_message = mock_process - msg1 = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="a") msg2 = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="b") t1 = asyncio.create_task(loop._dispatch(msg1)) t2 = asyncio.create_task(loop._dispatch(msg2)) await asyncio.gather(t1, t2) - - # Should be serialized: start-a, end-a, start-b, end-b assert order == ["start-a", "end-a", "start-b", "end-b"] -# --------------------------------------------------------------------------- - - class TestSubagentCancellation: - """Tests for /stop cancelling subagents spawned under a session.""" - @pytest.mark.asyncio async def test_cancel_by_session(self): - """cancel_by_session cancels all tasks for that session.""" from nanobot.agent.subagent import SubagentManager from nanobot.bus.queue import MessageBus @@ -236,28 +139,24 @@ class TestSubagentCancellation: cancelled = asyncio.Event() - async def slow_subagent(): + async def slow(): try: await asyncio.sleep(60) except asyncio.CancelledError: cancelled.set() raise - task = asyncio.create_task(slow_subagent()) + task = asyncio.create_task(slow()) await asyncio.sleep(0) - tid = "sub-1" - session_key = "test:c1" - mgr._running_tasks[tid] = task - mgr._session_tasks[session_key] = {tid} + mgr._running_tasks["sub-1"] = task + mgr._session_tasks["test:c1"] = {"sub-1"} - count = await mgr.cancel_by_session(session_key) + count = await mgr.cancel_by_session("test:c1") assert count == 1 assert cancelled.is_set() - assert task.cancelled() @pytest.mark.asyncio async def test_cancel_by_session_no_tasks(self): - """cancel_by_session returns 0 when no subagents for session.""" from nanobot.agent.subagent import SubagentManager from nanobot.bus.queue import MessageBus @@ -265,54 +164,4 @@ class TestSubagentCancellation: provider = MagicMock() provider.get_default_model.return_value = "test-model" mgr = SubagentManager(provider=provider, workspace=MagicMock(), bus=bus) - - count = await mgr.cancel_by_session("nonexistent:session") - assert count == 0 - - @pytest.mark.asyncio - async def test_stop_cancels_subagents_via_loop(self): - """/stop on AgentLoop also cancels subagents for that session.""" - from nanobot.agent.loop import AgentLoop - from nanobot.bus.events import InboundMessage - from nanobot.bus.queue import MessageBus - - bus = MessageBus() - provider = MagicMock() - provider.get_default_model.return_value = "test-model" - workspace = MagicMock() - workspace.__truediv__ = MagicMock(return_value=MagicMock()) - - with patch("nanobot.agent.loop.ContextBuilder"), \ - patch("nanobot.agent.loop.SessionManager"), \ - patch("nanobot.agent.loop.SubagentManager"): - loop = AgentLoop(bus=bus, provider=provider, workspace=workspace) - - # Replace subagents with a real SubagentManager - from nanobot.agent.subagent import SubagentManager - loop.subagents = SubagentManager( - provider=provider, workspace=MagicMock(), bus=bus - ) - - cancelled = asyncio.Event() - session_key = "test:c1" - - async def slow_sub(): - try: - await asyncio.sleep(60) - except asyncio.CancelledError: - cancelled.set() - raise - - task = asyncio.create_task(slow_sub()) - await asyncio.sleep(0) - loop.subagents._running_tasks["sub-1"] = task - loop.subagents._session_tasks[session_key] = {"sub-1"} - - msg = InboundMessage( - channel="test", sender_id="u1", chat_id="c1", content="/stop" - ) - await loop._handle_immediate_command("/stop", msg) - - assert cancelled.is_set() - out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0) - assert "stopped" in out.content.lower() or "background" in out.content.lower() + assert await mgr.cancel_by_session("nonexistent") == 0