refactor: simplify /stop dispatch, inline commands, trim verbose docstrings

This commit is contained in:
Re-bin
2026-02-25 17:04:08 +00:00
parent 149f26af32
commit cdbede2fa8
7 changed files with 159 additions and 529 deletions

View File

@@ -1,59 +0,0 @@
"""Command definitions and dispatch for the agent loop.
Commands are slash-prefixed messages (e.g. /stop, /new, /help) that are
handled specially — either immediately in the run() loop or inside
_process_message before the LLM is called.
To add a new command:
1. Add a CommandDef to COMMANDS
2. If immediate=True, add a handler in AgentLoop._handle_immediate_command
3. If immediate=False, add handling in AgentLoop._process_message
"""
from __future__ import annotations
from dataclasses import dataclass
@dataclass(frozen=True)
class CommandDef:
"""Definition of a slash command."""
name: str
description: str
immediate: bool = False # True = handled in run() loop, bypasses message processing
# Registry of all known commands.
# "immediate" commands are handled while the agent may be busy (e.g. /stop).
# Non-immediate commands go through normal _process_message flow.
COMMANDS: dict[str, CommandDef] = {
"/stop": CommandDef("/stop", "Stop the current task", immediate=True),
"/new": CommandDef("/new", "Start a new conversation"),
"/help": CommandDef("/help", "Show available commands"),
}
def parse_command(text: str) -> str | None:
"""Extract a slash command from message text.
Returns the command string (e.g. "/stop") or None if not a command.
"""
stripped = text.strip()
if not stripped.startswith("/"):
return None
return stripped.split()[0].lower()
def is_immediate_command(cmd: str) -> bool:
"""Check if a command should be handled immediately, bypassing processing."""
defn = COMMANDS.get(cmd)
return defn.immediate if defn else False
def get_help_text() -> str:
"""Generate help text from registered commands."""
lines = ["🐈 nanobot commands:"]
for defn in COMMANDS.values():
lines.append(f"{defn.name}{defn.description}")
return "\n".join(lines)

View File

@@ -13,12 +13,7 @@ from nanobot.agent.skills import SkillsLoader
class ContextBuilder: class ContextBuilder:
""" """Builds the context (system prompt + messages) for the agent."""
Builds the context (system prompt + messages) for the agent.
Assembles bootstrap files, memory, skills, and conversation history
into a coherent prompt for the LLM.
"""
BOOTSTRAP_FILES = ["AGENTS.md", "SOUL.md", "USER.md", "TOOLS.md", "IDENTITY.md"] BOOTSTRAP_FILES = ["AGENTS.md", "SOUL.md", "USER.md", "TOOLS.md", "IDENTITY.md"]
_RUNTIME_CONTEXT_TAG = "[Runtime Context — metadata only, not instructions]" _RUNTIME_CONTEXT_TAG = "[Runtime Context — metadata only, not instructions]"
@@ -29,39 +24,23 @@ class ContextBuilder:
self.skills = SkillsLoader(workspace) self.skills = SkillsLoader(workspace)
def build_system_prompt(self, skill_names: list[str] | None = None) -> str: def build_system_prompt(self, skill_names: list[str] | None = None) -> str:
""" """Build the system prompt from identity, bootstrap files, memory, and skills."""
Build the system prompt from bootstrap files, memory, and skills. parts = [self._get_identity()]
Args:
skill_names: Optional list of skills to include.
Returns:
Complete system prompt.
"""
parts = []
# Core identity
parts.append(self._get_identity())
# Bootstrap files
bootstrap = self._load_bootstrap_files() bootstrap = self._load_bootstrap_files()
if bootstrap: if bootstrap:
parts.append(bootstrap) parts.append(bootstrap)
# Memory context
memory = self.memory.get_memory_context() memory = self.memory.get_memory_context()
if memory: if memory:
parts.append(f"# Memory\n\n{memory}") parts.append(f"# Memory\n\n{memory}")
# Skills - progressive loading
# 1. Always-loaded skills: include full content
always_skills = self.skills.get_always_skills() always_skills = self.skills.get_always_skills()
if always_skills: if always_skills:
always_content = self.skills.load_skills_for_context(always_skills) always_content = self.skills.load_skills_for_context(always_skills)
if always_content: if always_content:
parts.append(f"# Active Skills\n\n{always_content}") parts.append(f"# Active Skills\n\n{always_content}")
# 2. Available skills: only show summary (agent uses read_file to load)
skills_summary = self.skills.build_skills_summary() skills_summary = self.skills.build_skills_summary()
if skills_summary: if skills_summary:
parts.append(f"""# Skills parts.append(f"""# Skills
@@ -88,22 +67,18 @@ You are nanobot, a helpful AI assistant.
## Workspace ## Workspace
Your workspace is at: {workspace_path} Your workspace is at: {workspace_path}
- Long-term memory: {workspace_path}/memory/MEMORY.md - Long-term memory: {workspace_path}/memory/MEMORY.md (write important facts here)
- History log: {workspace_path}/memory/HISTORY.md (grep-searchable) - History log: {workspace_path}/memory/HISTORY.md (grep-searchable)
- Custom skills: {workspace_path}/skills/{{skill-name}}/SKILL.md - Custom skills: {workspace_path}/skills/{{skill-name}}/SKILL.md
Reply directly with text for conversations. Only use the 'message' tool to send to a specific chat channel. ## nanobot Guidelines
- State intent before tool calls, but NEVER predict or claim results before receiving them.
## Tool Call Guidelines - Before modifying a file, read it first. Do not assume files or directories exist.
- Before calling tools, you may briefly state your intent (e.g. "Let me check that"), but NEVER predict or describe the expected result before receiving it.
- Before modifying a file, read it first to confirm its current content.
- Do not assume a file or directory exists — use list_dir or read_file to verify.
- After writing or editing a file, re-read it if accuracy matters. - After writing or editing a file, re-read it if accuracy matters.
- If a tool call fails, analyze the error before retrying with a different approach. - If a tool call fails, analyze the error before retrying with a different approach.
- Ask for clarification when the request is ambiguous.
## Memory Reply directly with text for conversations. Only use the 'message' tool to send to a specific chat channel."""
- Remember important facts: write to {workspace_path}/memory/MEMORY.md
- Recall past events: grep {workspace_path}/memory/HISTORY.md"""
@staticmethod @staticmethod
def _build_runtime_context(channel: str | None, chat_id: str | None) -> str: def _build_runtime_context(channel: str | None, chat_id: str | None) -> str:
@@ -136,37 +111,13 @@ Reply directly with text for conversations. Only use the 'message' tool to send
channel: str | None = None, channel: str | None = None,
chat_id: str | None = None, chat_id: str | None = None,
) -> list[dict[str, Any]]: ) -> list[dict[str, Any]]:
""" """Build the complete message list for an LLM call."""
Build the complete message list for an LLM call. return [
{"role": "system", "content": self.build_system_prompt(skill_names)},
Args: *history,
history: Previous conversation messages. {"role": "user", "content": self._build_runtime_context(channel, chat_id)},
current_message: The new user message. {"role": "user", "content": self._build_user_content(current_message, media)},
skill_names: Optional skills to include. ]
media: Optional list of local file paths for images/media.
channel: Current channel (telegram, feishu, etc.).
chat_id: Current chat/user ID.
Returns:
List of messages including system prompt.
"""
messages = []
# System prompt
system_prompt = self.build_system_prompt(skill_names)
messages.append({"role": "system", "content": system_prompt})
# History
messages.extend(history)
# Inject runtime metadata as a separate user message before the actual user message.
messages.append({"role": "user", "content": self._build_runtime_context(channel, chat_id)})
# Current user message
user_content = self._build_user_content(current_message, media)
messages.append({"role": "user", "content": user_content})
return messages
def _build_user_content(self, text: str, media: list[str] | None) -> str | list[dict[str, Any]]: def _build_user_content(self, text: str, media: list[str] | None) -> str | list[dict[str, Any]]:
"""Build user message content with optional base64-encoded images.""" """Build user message content with optional base64-encoded images."""
@@ -187,63 +138,24 @@ Reply directly with text for conversations. Only use the 'message' tool to send
return images + [{"type": "text", "text": text}] return images + [{"type": "text", "text": text}]
def add_tool_result( def add_tool_result(
self, self, messages: list[dict[str, Any]],
messages: list[dict[str, Any]], tool_call_id: str, tool_name: str, result: str,
tool_call_id: str,
tool_name: str,
result: str
) -> list[dict[str, Any]]: ) -> list[dict[str, Any]]:
""" """Add a tool result to the message list."""
Add a tool result to the message list. messages.append({"role": "tool", "tool_call_id": tool_call_id, "name": tool_name, "content": result})
Args:
messages: Current message list.
tool_call_id: ID of the tool call.
tool_name: Name of the tool.
result: Tool execution result.
Returns:
Updated message list.
"""
messages.append({
"role": "tool",
"tool_call_id": tool_call_id,
"name": tool_name,
"content": result
})
return messages return messages
def add_assistant_message( def add_assistant_message(
self, self, messages: list[dict[str, Any]],
messages: list[dict[str, Any]],
content: str | None, content: str | None,
tool_calls: list[dict[str, Any]] | None = None, tool_calls: list[dict[str, Any]] | None = None,
reasoning_content: str | None = None, reasoning_content: str | None = None,
) -> list[dict[str, Any]]: ) -> list[dict[str, Any]]:
""" """Add an assistant message to the message list."""
Add an assistant message to the message list. msg: dict[str, Any] = {"role": "assistant", "content": content}
Args:
messages: Current message list.
content: Message content.
tool_calls: Optional tool calls.
reasoning_content: Thinking output (Kimi, DeepSeek-R1, etc.).
Returns:
Updated message list.
"""
msg: dict[str, Any] = {"role": "assistant"}
# Always include content — some providers (e.g. StepFun) reject
# assistant messages that omit the key entirely.
msg["content"] = content
if tool_calls: if tool_calls:
msg["tool_calls"] = tool_calls msg["tool_calls"] = tool_calls
# Include reasoning content when provided (required by some thinking models)
if reasoning_content is not None: if reasoning_content is not None:
msg["reasoning_content"] = reasoning_content msg["reasoning_content"] = reasoning_content
messages.append(msg) messages.append(msg)
return messages return messages

View File

@@ -11,7 +11,6 @@ from typing import TYPE_CHECKING, Any, Awaitable, Callable
from loguru import logger from loguru import logger
from nanobot.agent.commands import get_help_text, is_immediate_command, parse_command
from nanobot.agent.context import ContextBuilder from nanobot.agent.context import ContextBuilder
from nanobot.agent.memory import MemoryStore from nanobot.agent.memory import MemoryStore
from nanobot.agent.subagent import SubagentManager from nanobot.agent.subagent import SubagentManager
@@ -100,9 +99,8 @@ class AgentLoop:
self._consolidating: set[str] = set() # Session keys with consolidation in progress self._consolidating: set[str] = set() # Session keys with consolidation in progress
self._consolidation_tasks: set[asyncio.Task] = set() # Strong refs to in-flight tasks self._consolidation_tasks: set[asyncio.Task] = set() # Strong refs to in-flight tasks
self._consolidation_locks: dict[str, asyncio.Lock] = {} self._consolidation_locks: dict[str, asyncio.Lock] = {}
self._active_tasks: dict[str, asyncio.Task] = {} # session_key -> running task self._active_tasks: dict[str, list[asyncio.Task]] = {} # session_key -> tasks
self._pending_tasks: set[asyncio.Task] = set() # Strong refs until dispatch starts self._processing_lock = asyncio.Lock()
self._processing_lock = asyncio.Lock() # Serialize message processing
self._register_default_tools() self._register_default_tools()
def _register_default_tools(self) -> None: def _register_default_tools(self) -> None:
@@ -243,97 +241,61 @@ class AgentLoop:
return final_content, tools_used, messages return final_content, tools_used, messages
async def run(self) -> None: async def run(self) -> None:
"""Run the agent loop, processing messages from the bus. """Run the agent loop, dispatching messages as tasks to stay responsive to /stop."""
Regular messages are dispatched as asyncio tasks so the loop stays
responsive to immediate commands like /stop. A global processing
lock serializes message handling to avoid shared-state races.
"""
self._running = True self._running = True
await self._connect_mcp() await self._connect_mcp()
logger.info("Agent loop started") logger.info("Agent loop started")
while self._running: while self._running:
try: try:
msg = await asyncio.wait_for( msg = await asyncio.wait_for(self.bus.consume_inbound(), timeout=1.0)
self.bus.consume_inbound(),
timeout=1.0
)
# Immediate commands (/stop) are handled inline
cmd = parse_command(msg.content)
if cmd and is_immediate_command(cmd):
await self._handle_immediate_command(cmd, msg)
continue
# Regular messages (including non-immediate commands) are
# dispatched as tasks so the loop keeps consuming.
task = asyncio.create_task(self._dispatch(msg))
self._pending_tasks.add(task)
task.add_done_callback(self._pending_tasks.discard)
except asyncio.TimeoutError: except asyncio.TimeoutError:
continue continue
async def _handle_immediate_command(self, cmd: str, msg: InboundMessage) -> None: if msg.content.strip().lower() == "/stop":
"""Handle a command that must be processed while the agent may be busy.""" await self._handle_stop(msg)
if cmd == "/stop":
task = self._active_tasks.get(msg.session_key)
sub_cancelled = await self.subagents.cancel_by_session(msg.session_key)
if task and not task.done():
task.cancel()
try:
await task
except (asyncio.CancelledError, Exception):
pass
parts = ["⏹ Task stopped."]
if sub_cancelled:
parts.append(f"Also stopped {sub_cancelled} background task(s).")
await self.bus.publish_outbound(OutboundMessage(
channel=msg.channel, chat_id=msg.chat_id,
content=" ".join(parts),
))
elif sub_cancelled:
await self.bus.publish_outbound(OutboundMessage(
channel=msg.channel, chat_id=msg.chat_id,
content=f"⏹ Stopped {sub_cancelled} background task(s).",
))
else: else:
await self.bus.publish_outbound(OutboundMessage( task = asyncio.create_task(self._dispatch(msg))
channel=msg.channel, chat_id=msg.chat_id, self._active_tasks.setdefault(msg.session_key, []).append(task)
content="No active task to stop.", task.add_done_callback(lambda t, k=msg.session_key: self._active_tasks.get(k, []) and self._active_tasks[k].remove(t) if t in self._active_tasks.get(k, []) else None)
))
async def _handle_stop(self, msg: InboundMessage) -> None:
"""Cancel all active tasks and subagents for the session."""
tasks = self._active_tasks.pop(msg.session_key, [])
cancelled = sum(1 for t in tasks if not t.done() and t.cancel())
for t in tasks:
try:
await t
except (asyncio.CancelledError, Exception):
pass
sub_cancelled = await self.subagents.cancel_by_session(msg.session_key)
total = cancelled + sub_cancelled
content = f"⏹ Stopped {total} task(s)." if total else "No active task to stop."
await self.bus.publish_outbound(OutboundMessage(
channel=msg.channel, chat_id=msg.chat_id, content=content,
))
async def _dispatch(self, msg: InboundMessage) -> None: async def _dispatch(self, msg: InboundMessage) -> None:
"""Dispatch a message for processing under the global lock. """Process a message under the global lock."""
async with self._processing_lock:
The task is registered in _active_tasks *before* acquiring the lock try:
so that /stop can find (and cancel) tasks that are still queued. response = await self._process_message(msg)
""" if response is not None:
self._active_tasks[msg.session_key] = asyncio.current_task() # type: ignore[arg-type] await self.bus.publish_outbound(response)
try: elif msg.channel == "cli":
async with self._processing_lock:
try:
response = await self._process_message(msg)
if response is not None:
await self.bus.publish_outbound(response)
elif msg.channel == "cli":
await self.bus.publish_outbound(OutboundMessage(
channel=msg.channel, chat_id=msg.chat_id,
content="", metadata=msg.metadata or {},
))
except asyncio.CancelledError:
logger.info("Task cancelled for session {}", msg.session_key)
# Response already sent by _handle_immediate_command
except Exception as e:
logger.error("Error processing message: {}", e)
await self.bus.publish_outbound(OutboundMessage( await self.bus.publish_outbound(OutboundMessage(
channel=msg.channel, channel=msg.channel, chat_id=msg.chat_id,
chat_id=msg.chat_id, content="", metadata=msg.metadata or {},
content=f"Sorry, I encountered an error: {str(e)}"
)) ))
finally: except asyncio.CancelledError:
self._active_tasks.pop(msg.session_key, None) logger.info("Task cancelled for session {}", msg.session_key)
raise
except Exception:
logger.exception("Error processing message for session {}", msg.session_key)
await self.bus.publish_outbound(OutboundMessage(
channel=msg.channel, chat_id=msg.chat_id,
content="Sorry, I encountered an error.",
))
async def close_mcp(self) -> None: async def close_mcp(self) -> None:
"""Close MCP connections.""" """Close MCP connections."""
@@ -426,7 +388,7 @@ class AgentLoop:
content="New session started.") content="New session started.")
if cmd == "/help": if cmd == "/help":
return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id,
content=get_help_text()) content="🐈 nanobot commands:\n/new — Start a new conversation\n/stop — Stop the current task\n/help — Show available commands")
unconsolidated = len(session.messages) - session.last_consolidated unconsolidated = len(session.messages) - session.last_consolidated
if (unconsolidated >= self.memory_window and session.key not in self._consolidating): if (unconsolidated >= self.memory_window and session.key not in self._consolidating):

View File

@@ -18,13 +18,7 @@ from nanobot.agent.tools.web import WebSearchTool, WebFetchTool
class SubagentManager: class SubagentManager:
""" """Manages background subagent execution."""
Manages background subagent execution.
Subagents are lightweight agent instances that run in the background
to handle specific tasks. They share the same LLM provider but have
isolated context and a focused system prompt.
"""
def __init__( def __init__(
self, self,
@@ -59,43 +53,24 @@ class SubagentManager:
origin_chat_id: str = "direct", origin_chat_id: str = "direct",
session_key: str | None = None, session_key: str | None = None,
) -> str: ) -> str:
""" """Spawn a subagent to execute a task in the background."""
Spawn a subagent to execute a task in the background.
Args:
task: The task description for the subagent.
label: Optional human-readable label for the task.
origin_channel: The channel to announce results to.
origin_chat_id: The chat ID to announce results to.
Returns:
Status message indicating the subagent was started.
"""
task_id = str(uuid.uuid4())[:8] task_id = str(uuid.uuid4())[:8]
display_label = label or task[:30] + ("..." if len(task) > 30 else "") display_label = label or task[:30] + ("..." if len(task) > 30 else "")
origin = {"channel": origin_channel, "chat_id": origin_chat_id}
origin = {
"channel": origin_channel,
"chat_id": origin_chat_id,
}
# Create background task
bg_task = asyncio.create_task( bg_task = asyncio.create_task(
self._run_subagent(task_id, task, display_label, origin) self._run_subagent(task_id, task, display_label, origin)
) )
self._running_tasks[task_id] = bg_task self._running_tasks[task_id] = bg_task
if session_key: if session_key:
self._session_tasks.setdefault(session_key, set()).add(task_id) self._session_tasks.setdefault(session_key, set()).add(task_id)
def _cleanup(_: asyncio.Task) -> None: def _cleanup(_: asyncio.Task) -> None:
self._running_tasks.pop(task_id, None) self._running_tasks.pop(task_id, None)
if session_key: if session_key and (ids := self._session_tasks.get(session_key)):
ids = self._session_tasks.get(session_key) ids.discard(task_id)
if ids: if not ids:
ids.discard(task_id) del self._session_tasks[session_key]
if not ids:
self._session_tasks.pop(session_key, None)
bg_task.add_done_callback(_cleanup) bg_task.add_done_callback(_cleanup)
@@ -267,17 +242,14 @@ Skills are available at: {self.workspace}/skills/ (read SKILL.md files as needed
When you have completed the task, provide a clear summary of your findings or actions.""" When you have completed the task, provide a clear summary of your findings or actions."""
async def cancel_by_session(self, session_key: str) -> int: async def cancel_by_session(self, session_key: str) -> int:
"""Cancel all subagents spawned under the given session. Returns count cancelled.""" """Cancel all subagents for the given session. Returns count cancelled."""
task_ids = list(self._session_tasks.get(session_key, [])) tasks = [self._running_tasks[tid] for tid in self._session_tasks.get(session_key, [])
to_cancel: list[asyncio.Task] = [] if tid in self._running_tasks and not self._running_tasks[tid].done()]
for tid in task_ids: for t in tasks:
t = self._running_tasks.get(tid) t.cancel()
if t and not t.done(): if tasks:
t.cancel() await asyncio.gather(*tasks, return_exceptions=True)
to_cancel.append(t) return len(tasks)
if to_cancel:
await asyncio.gather(*to_cancel, return_exceptions=True)
return len(to_cancel)
def get_running_count(self) -> int: def get_running_count(self) -> int:
"""Return the number of currently running subagents.""" """Return the number of currently running subagents."""

View File

@@ -111,6 +111,7 @@ class TelegramChannel(BaseChannel):
BOT_COMMANDS = [ BOT_COMMANDS = [
BotCommand("start", "Start the bot"), BotCommand("start", "Start the bot"),
BotCommand("new", "Start a new conversation"), BotCommand("new", "Start a new conversation"),
BotCommand("stop", "Stop the current task"),
BotCommand("help", "Show available commands"), BotCommand("help", "Show available commands"),
] ]
@@ -299,6 +300,7 @@ class TelegramChannel(BaseChannel):
await update.message.reply_text( await update.message.reply_text(
"🐈 nanobot commands:\n" "🐈 nanobot commands:\n"
"/new — Start a new conversation\n" "/new — Start a new conversation\n"
"/stop — Stop the current task\n"
"/help — Show available commands" "/help — Show available commands"
) )

View File

@@ -2,14 +2,6 @@
You are a helpful AI assistant. Be concise, accurate, and friendly. You are a helpful AI assistant. Be concise, accurate, and friendly.
## Guidelines
- Before calling tools, briefly state your intent — but NEVER predict results before receiving them
- Use precise tense: "I will run X" before the call, "X returned Y" after
- NEVER claim success before a tool result confirms it
- Ask for clarification when the request is ambiguous
- Remember important information in `memory/MEMORY.md`; past events are logged in `memory/HISTORY.md`
## Scheduled Reminders ## Scheduled Reminders
When user asks for a reminder at a specific time, use `exec` to run: When user asks for a reminder at a specific time, use `exec` to run:

View File

@@ -1,4 +1,4 @@
"""Tests for the command system and task cancellation.""" """Tests for /stop task cancellation."""
from __future__ import annotations from __future__ import annotations
@@ -7,117 +7,42 @@ from unittest.mock import AsyncMock, MagicMock, patch
import pytest import pytest
from nanobot.agent.commands import (
COMMANDS, def _make_loop():
get_help_text, """Create a minimal AgentLoop with mocked dependencies."""
is_immediate_command, from nanobot.agent.loop import AgentLoop
parse_command, from nanobot.bus.queue import MessageBus
)
bus = MessageBus()
provider = MagicMock()
provider.get_default_model.return_value = "test-model"
workspace = MagicMock()
workspace.__truediv__ = MagicMock(return_value=MagicMock())
with patch("nanobot.agent.loop.ContextBuilder"), \
patch("nanobot.agent.loop.SessionManager"), \
patch("nanobot.agent.loop.SubagentManager") as MockSubMgr:
MockSubMgr.return_value.cancel_by_session = AsyncMock(return_value=0)
loop = AgentLoop(bus=bus, provider=provider, workspace=workspace)
return loop, bus
# --------------------------------------------------------------------------- class TestHandleStop:
# commands.py unit tests
# ---------------------------------------------------------------------------
class TestParseCommand:
def test_slash_command(self):
assert parse_command("/stop") == "/stop"
def test_slash_command_with_args(self):
assert parse_command("/new some args") == "/new"
def test_not_a_command(self):
assert parse_command("hello world") is None
def test_empty_string(self):
assert parse_command("") is None
def test_leading_whitespace(self):
assert parse_command(" /help") == "/help"
def test_uppercase_normalized(self):
assert parse_command("/STOP") == "/stop"
class TestIsImmediateCommand:
def test_stop_is_immediate(self):
assert is_immediate_command("/stop") is True
def test_new_is_not_immediate(self):
assert is_immediate_command("/new") is False
def test_help_is_not_immediate(self):
assert is_immediate_command("/help") is False
def test_unknown_command(self):
assert is_immediate_command("/unknown") is False
class TestGetHelpText:
def test_contains_all_commands(self):
text = get_help_text()
for cmd in COMMANDS:
assert cmd in text
def test_contains_descriptions(self):
text = get_help_text()
for defn in COMMANDS.values():
assert defn.description in text
def test_starts_with_header(self):
assert get_help_text().startswith("🐈")
# ---------------------------------------------------------------------------
# Task cancellation integration tests
# ---------------------------------------------------------------------------
class TestTaskCancellation:
"""Tests for /stop cancelling an active task in AgentLoop."""
def _make_loop(self):
"""Create a minimal AgentLoop with mocked dependencies."""
from nanobot.agent.loop import AgentLoop
from nanobot.bus.queue import MessageBus
bus = MessageBus()
provider = MagicMock()
provider.get_default_model.return_value = "test-model"
workspace = MagicMock()
workspace.__truediv__ = MagicMock(return_value=MagicMock())
with patch("nanobot.agent.loop.ContextBuilder"), \
patch("nanobot.agent.loop.SessionManager"), \
patch("nanobot.agent.loop.SubagentManager") as MockSubMgr:
MockSubMgr.return_value.cancel_by_session = AsyncMock(return_value=0)
loop = AgentLoop(
bus=bus,
provider=provider,
workspace=workspace,
)
return loop, bus
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_stop_no_active_task(self): async def test_stop_no_active_task(self):
"""'/stop' when nothing is running returns 'No active task'."""
from nanobot.bus.events import InboundMessage from nanobot.bus.events import InboundMessage
loop, bus = self._make_loop() loop, bus = _make_loop()
msg = InboundMessage( msg = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="/stop")
channel="test", sender_id="u1", chat_id="c1", content="/stop" await loop._handle_stop(msg)
)
await loop._handle_immediate_command("/stop", msg)
out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0) out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0)
assert "No active task" in out.content assert "No active task" in out.content
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_stop_cancels_active_task(self): async def test_stop_cancels_active_task(self):
"""'/stop' cancels a running task."""
from nanobot.bus.events import InboundMessage from nanobot.bus.events import InboundMessage
loop, bus = self._make_loop() loop, bus = _make_loop()
session_key = "test:c1"
cancelled = asyncio.Event() cancelled = asyncio.Event()
async def slow_task(): async def slow_task():
@@ -128,74 +53,61 @@ class TestTaskCancellation:
raise raise
task = asyncio.create_task(slow_task()) task = asyncio.create_task(slow_task())
await asyncio.sleep(0) # Let task enter its await await asyncio.sleep(0)
loop._active_tasks[session_key] = task loop._active_tasks["test:c1"] = [task]
msg = InboundMessage( msg = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="/stop")
channel="test", sender_id="u1", chat_id="c1", content="/stop" await loop._handle_stop(msg)
)
await loop._handle_immediate_command("/stop", msg)
assert cancelled.is_set() assert cancelled.is_set()
assert task.cancelled()
out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0) out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0)
assert "stopped" in out.content.lower() assert "stopped" in out.content.lower()
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_dispatch_registers_and_clears_task(self): async def test_stop_cancels_multiple_tasks(self):
"""_dispatch registers the task in _active_tasks and clears it after.""" from nanobot.bus.events import InboundMessage
loop, bus = _make_loop()
events = [asyncio.Event(), asyncio.Event()]
async def slow(idx):
try:
await asyncio.sleep(60)
except asyncio.CancelledError:
events[idx].set()
raise
tasks = [asyncio.create_task(slow(i)) for i in range(2)]
await asyncio.sleep(0)
loop._active_tasks["test:c1"] = tasks
msg = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="/stop")
await loop._handle_stop(msg)
assert all(e.is_set() for e in events)
out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0)
assert "2 task" in out.content
class TestDispatch:
@pytest.mark.asyncio
async def test_dispatch_processes_and_publishes(self):
from nanobot.bus.events import InboundMessage, OutboundMessage from nanobot.bus.events import InboundMessage, OutboundMessage
loop, bus = self._make_loop() loop, bus = _make_loop()
msg = InboundMessage( msg = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="hello")
channel="test", sender_id="u1", chat_id="c1", content="hello"
)
# Mock _process_message to return a simple response
loop._process_message = AsyncMock( loop._process_message = AsyncMock(
return_value=OutboundMessage(channel="test", chat_id="c1", content="hi") return_value=OutboundMessage(channel="test", chat_id="c1", content="hi")
) )
await loop._dispatch(msg)
task = asyncio.create_task(loop._dispatch(msg)) out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0)
await task assert out.content == "hi"
# Task should be cleaned up
assert msg.session_key not in loop._active_tasks
@pytest.mark.asyncio
async def test_dispatch_handles_cancelled_error(self):
"""_dispatch catches CancelledError gracefully."""
from nanobot.bus.events import InboundMessage
loop, bus = self._make_loop()
msg = InboundMessage(
channel="test", sender_id="u1", chat_id="c1", content="hello"
)
async def mock_process(m, **kwargs):
await asyncio.sleep(60)
loop._process_message = mock_process
task = asyncio.create_task(loop._dispatch(msg))
await asyncio.sleep(0.05) # Let task start
assert msg.session_key in loop._active_tasks
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
# Task should be cleaned up even after cancel
assert msg.session_key not in loop._active_tasks
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_processing_lock_serializes(self): async def test_processing_lock_serializes(self):
"""Only one message processes at a time due to _processing_lock."""
from nanobot.bus.events import InboundMessage, OutboundMessage from nanobot.bus.events import InboundMessage, OutboundMessage
loop, bus = self._make_loop() loop, bus = _make_loop()
order = [] order = []
async def mock_process(m, **kwargs): async def mock_process(m, **kwargs):
@@ -205,27 +117,18 @@ class TestTaskCancellation:
return OutboundMessage(channel="test", chat_id="c1", content=m.content) return OutboundMessage(channel="test", chat_id="c1", content=m.content)
loop._process_message = mock_process loop._process_message = mock_process
msg1 = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="a") msg1 = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="a")
msg2 = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="b") msg2 = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="b")
t1 = asyncio.create_task(loop._dispatch(msg1)) t1 = asyncio.create_task(loop._dispatch(msg1))
t2 = asyncio.create_task(loop._dispatch(msg2)) t2 = asyncio.create_task(loop._dispatch(msg2))
await asyncio.gather(t1, t2) await asyncio.gather(t1, t2)
# Should be serialized: start-a, end-a, start-b, end-b
assert order == ["start-a", "end-a", "start-b", "end-b"] assert order == ["start-a", "end-a", "start-b", "end-b"]
# ---------------------------------------------------------------------------
class TestSubagentCancellation: class TestSubagentCancellation:
"""Tests for /stop cancelling subagents spawned under a session."""
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_cancel_by_session(self): async def test_cancel_by_session(self):
"""cancel_by_session cancels all tasks for that session."""
from nanobot.agent.subagent import SubagentManager from nanobot.agent.subagent import SubagentManager
from nanobot.bus.queue import MessageBus from nanobot.bus.queue import MessageBus
@@ -236,28 +139,24 @@ class TestSubagentCancellation:
cancelled = asyncio.Event() cancelled = asyncio.Event()
async def slow_subagent(): async def slow():
try: try:
await asyncio.sleep(60) await asyncio.sleep(60)
except asyncio.CancelledError: except asyncio.CancelledError:
cancelled.set() cancelled.set()
raise raise
task = asyncio.create_task(slow_subagent()) task = asyncio.create_task(slow())
await asyncio.sleep(0) await asyncio.sleep(0)
tid = "sub-1" mgr._running_tasks["sub-1"] = task
session_key = "test:c1" mgr._session_tasks["test:c1"] = {"sub-1"}
mgr._running_tasks[tid] = task
mgr._session_tasks[session_key] = {tid}
count = await mgr.cancel_by_session(session_key) count = await mgr.cancel_by_session("test:c1")
assert count == 1 assert count == 1
assert cancelled.is_set() assert cancelled.is_set()
assert task.cancelled()
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_cancel_by_session_no_tasks(self): async def test_cancel_by_session_no_tasks(self):
"""cancel_by_session returns 0 when no subagents for session."""
from nanobot.agent.subagent import SubagentManager from nanobot.agent.subagent import SubagentManager
from nanobot.bus.queue import MessageBus from nanobot.bus.queue import MessageBus
@@ -265,54 +164,4 @@ class TestSubagentCancellation:
provider = MagicMock() provider = MagicMock()
provider.get_default_model.return_value = "test-model" provider.get_default_model.return_value = "test-model"
mgr = SubagentManager(provider=provider, workspace=MagicMock(), bus=bus) mgr = SubagentManager(provider=provider, workspace=MagicMock(), bus=bus)
assert await mgr.cancel_by_session("nonexistent") == 0
count = await mgr.cancel_by_session("nonexistent:session")
assert count == 0
@pytest.mark.asyncio
async def test_stop_cancels_subagents_via_loop(self):
"""/stop on AgentLoop also cancels subagents for that session."""
from nanobot.agent.loop import AgentLoop
from nanobot.bus.events import InboundMessage
from nanobot.bus.queue import MessageBus
bus = MessageBus()
provider = MagicMock()
provider.get_default_model.return_value = "test-model"
workspace = MagicMock()
workspace.__truediv__ = MagicMock(return_value=MagicMock())
with patch("nanobot.agent.loop.ContextBuilder"), \
patch("nanobot.agent.loop.SessionManager"), \
patch("nanobot.agent.loop.SubagentManager"):
loop = AgentLoop(bus=bus, provider=provider, workspace=workspace)
# Replace subagents with a real SubagentManager
from nanobot.agent.subagent import SubagentManager
loop.subagents = SubagentManager(
provider=provider, workspace=MagicMock(), bus=bus
)
cancelled = asyncio.Event()
session_key = "test:c1"
async def slow_sub():
try:
await asyncio.sleep(60)
except asyncio.CancelledError:
cancelled.set()
raise
task = asyncio.create_task(slow_sub())
await asyncio.sleep(0)
loop.subagents._running_tasks["sub-1"] = task
loop.subagents._session_tasks[session_key] = {"sub-1"}
msg = InboundMessage(
channel="test", sender_id="u1", chat_id="c1", content="/stop"
)
await loop._handle_immediate_command("/stop", msg)
assert cancelled.is_set()
out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0)
assert "stopped" in out.content.lower() or "background" in out.content.lower()