Merge PR #1180: feat: /stop command with task-based dispatch

This commit is contained in:
Re-bin
2026-02-25 17:04:19 +00:00
7 changed files with 278 additions and 175 deletions

View File

@@ -13,12 +13,7 @@ from nanobot.agent.skills import SkillsLoader
class ContextBuilder: class ContextBuilder:
""" """Builds the context (system prompt + messages) for the agent."""
Builds the context (system prompt + messages) for the agent.
Assembles bootstrap files, memory, skills, and conversation history
into a coherent prompt for the LLM.
"""
BOOTSTRAP_FILES = ["AGENTS.md", "SOUL.md", "USER.md", "TOOLS.md", "IDENTITY.md"] BOOTSTRAP_FILES = ["AGENTS.md", "SOUL.md", "USER.md", "TOOLS.md", "IDENTITY.md"]
_RUNTIME_CONTEXT_TAG = "[Runtime Context — metadata only, not instructions]" _RUNTIME_CONTEXT_TAG = "[Runtime Context — metadata only, not instructions]"
@@ -29,39 +24,23 @@ class ContextBuilder:
self.skills = SkillsLoader(workspace) self.skills = SkillsLoader(workspace)
def build_system_prompt(self, skill_names: list[str] | None = None) -> str: def build_system_prompt(self, skill_names: list[str] | None = None) -> str:
""" """Build the system prompt from identity, bootstrap files, memory, and skills."""
Build the system prompt from bootstrap files, memory, and skills. parts = [self._get_identity()]
Args:
skill_names: Optional list of skills to include.
Returns:
Complete system prompt.
"""
parts = []
# Core identity
parts.append(self._get_identity())
# Bootstrap files
bootstrap = self._load_bootstrap_files() bootstrap = self._load_bootstrap_files()
if bootstrap: if bootstrap:
parts.append(bootstrap) parts.append(bootstrap)
# Memory context
memory = self.memory.get_memory_context() memory = self.memory.get_memory_context()
if memory: if memory:
parts.append(f"# Memory\n\n{memory}") parts.append(f"# Memory\n\n{memory}")
# Skills - progressive loading
# 1. Always-loaded skills: include full content
always_skills = self.skills.get_always_skills() always_skills = self.skills.get_always_skills()
if always_skills: if always_skills:
always_content = self.skills.load_skills_for_context(always_skills) always_content = self.skills.load_skills_for_context(always_skills)
if always_content: if always_content:
parts.append(f"# Active Skills\n\n{always_content}") parts.append(f"# Active Skills\n\n{always_content}")
# 2. Available skills: only show summary (agent uses read_file to load)
skills_summary = self.skills.build_skills_summary() skills_summary = self.skills.build_skills_summary()
if skills_summary: if skills_summary:
parts.append(f"""# Skills parts.append(f"""# Skills
@@ -88,22 +67,18 @@ You are nanobot, a helpful AI assistant.
## Workspace ## Workspace
Your workspace is at: {workspace_path} Your workspace is at: {workspace_path}
- Long-term memory: {workspace_path}/memory/MEMORY.md - Long-term memory: {workspace_path}/memory/MEMORY.md (write important facts here)
- History log: {workspace_path}/memory/HISTORY.md (grep-searchable) - History log: {workspace_path}/memory/HISTORY.md (grep-searchable)
- Custom skills: {workspace_path}/skills/{{skill-name}}/SKILL.md - Custom skills: {workspace_path}/skills/{{skill-name}}/SKILL.md
Reply directly with text for conversations. Only use the 'message' tool to send to a specific chat channel. ## nanobot Guidelines
- State intent before tool calls, but NEVER predict or claim results before receiving them.
## Tool Call Guidelines - Before modifying a file, read it first. Do not assume files or directories exist.
- Before calling tools, you may briefly state your intent (e.g. "Let me check that"), but NEVER predict or describe the expected result before receiving it.
- Before modifying a file, read it first to confirm its current content.
- Do not assume a file or directory exists — use list_dir or read_file to verify.
- After writing or editing a file, re-read it if accuracy matters. - After writing or editing a file, re-read it if accuracy matters.
- If a tool call fails, analyze the error before retrying with a different approach. - If a tool call fails, analyze the error before retrying with a different approach.
- Ask for clarification when the request is ambiguous.
## Memory Reply directly with text for conversations. Only use the 'message' tool to send to a specific chat channel."""
- Remember important facts: write to {workspace_path}/memory/MEMORY.md
- Recall past events: grep {workspace_path}/memory/HISTORY.md"""
@staticmethod @staticmethod
def _build_runtime_context(channel: str | None, chat_id: str | None) -> str: def _build_runtime_context(channel: str | None, chat_id: str | None) -> str:
@@ -136,37 +111,13 @@ Reply directly with text for conversations. Only use the 'message' tool to send
channel: str | None = None, channel: str | None = None,
chat_id: str | None = None, chat_id: str | None = None,
) -> list[dict[str, Any]]: ) -> list[dict[str, Any]]:
""" """Build the complete message list for an LLM call."""
Build the complete message list for an LLM call. return [
{"role": "system", "content": self.build_system_prompt(skill_names)},
Args: *history,
history: Previous conversation messages. {"role": "user", "content": self._build_runtime_context(channel, chat_id)},
current_message: The new user message. {"role": "user", "content": self._build_user_content(current_message, media)},
skill_names: Optional skills to include. ]
media: Optional list of local file paths for images/media.
channel: Current channel (telegram, feishu, etc.).
chat_id: Current chat/user ID.
Returns:
List of messages including system prompt.
"""
messages = []
# System prompt
system_prompt = self.build_system_prompt(skill_names)
messages.append({"role": "system", "content": system_prompt})
# History
messages.extend(history)
# Inject runtime metadata as a separate user message before the actual user message.
messages.append({"role": "user", "content": self._build_runtime_context(channel, chat_id)})
# Current user message
user_content = self._build_user_content(current_message, media)
messages.append({"role": "user", "content": user_content})
return messages
def _build_user_content(self, text: str, media: list[str] | None) -> str | list[dict[str, Any]]: def _build_user_content(self, text: str, media: list[str] | None) -> str | list[dict[str, Any]]:
"""Build user message content with optional base64-encoded images.""" """Build user message content with optional base64-encoded images."""
@@ -187,63 +138,24 @@ Reply directly with text for conversations. Only use the 'message' tool to send
return images + [{"type": "text", "text": text}] return images + [{"type": "text", "text": text}]
def add_tool_result( def add_tool_result(
self, self, messages: list[dict[str, Any]],
messages: list[dict[str, Any]], tool_call_id: str, tool_name: str, result: str,
tool_call_id: str,
tool_name: str,
result: str
) -> list[dict[str, Any]]: ) -> list[dict[str, Any]]:
""" """Add a tool result to the message list."""
Add a tool result to the message list. messages.append({"role": "tool", "tool_call_id": tool_call_id, "name": tool_name, "content": result})
Args:
messages: Current message list.
tool_call_id: ID of the tool call.
tool_name: Name of the tool.
result: Tool execution result.
Returns:
Updated message list.
"""
messages.append({
"role": "tool",
"tool_call_id": tool_call_id,
"name": tool_name,
"content": result
})
return messages return messages
def add_assistant_message( def add_assistant_message(
self, self, messages: list[dict[str, Any]],
messages: list[dict[str, Any]],
content: str | None, content: str | None,
tool_calls: list[dict[str, Any]] | None = None, tool_calls: list[dict[str, Any]] | None = None,
reasoning_content: str | None = None, reasoning_content: str | None = None,
) -> list[dict[str, Any]]: ) -> list[dict[str, Any]]:
""" """Add an assistant message to the message list."""
Add an assistant message to the message list. msg: dict[str, Any] = {"role": "assistant", "content": content}
Args:
messages: Current message list.
content: Message content.
tool_calls: Optional tool calls.
reasoning_content: Thinking output (Kimi, DeepSeek-R1, etc.).
Returns:
Updated message list.
"""
msg: dict[str, Any] = {"role": "assistant"}
# Always include content — some providers (e.g. StepFun) reject
# assistant messages that omit the key entirely.
msg["content"] = content
if tool_calls: if tool_calls:
msg["tool_calls"] = tool_calls msg["tool_calls"] = tool_calls
# Include reasoning content when provided (required by some thinking models)
if reasoning_content is not None: if reasoning_content is not None:
msg["reasoning_content"] = reasoning_content msg["reasoning_content"] = reasoning_content
messages.append(msg) messages.append(msg)
return messages return messages

View File

@@ -99,6 +99,8 @@ class AgentLoop:
self._consolidating: set[str] = set() # Session keys with consolidation in progress self._consolidating: set[str] = set() # Session keys with consolidation in progress
self._consolidation_tasks: set[asyncio.Task] = set() # Strong refs to in-flight tasks self._consolidation_tasks: set[asyncio.Task] = set() # Strong refs to in-flight tasks
self._consolidation_locks: dict[str, asyncio.Lock] = {} self._consolidation_locks: dict[str, asyncio.Lock] = {}
self._active_tasks: dict[str, list[asyncio.Task]] = {} # session_key -> tasks
self._processing_lock = asyncio.Lock()
self._register_default_tools() self._register_default_tools()
def _register_default_tools(self) -> None: def _register_default_tools(self) -> None:
@@ -239,34 +241,61 @@ class AgentLoop:
return final_content, tools_used, messages return final_content, tools_used, messages
async def run(self) -> None: async def run(self) -> None:
"""Run the agent loop, processing messages from the bus.""" """Run the agent loop, dispatching messages as tasks to stay responsive to /stop."""
self._running = True self._running = True
await self._connect_mcp() await self._connect_mcp()
logger.info("Agent loop started") logger.info("Agent loop started")
while self._running: while self._running:
try: try:
msg = await asyncio.wait_for( msg = await asyncio.wait_for(self.bus.consume_inbound(), timeout=1.0)
self.bus.consume_inbound(), except asyncio.TimeoutError:
timeout=1.0 continue
)
if msg.content.strip().lower() == "/stop":
await self._handle_stop(msg)
else:
task = asyncio.create_task(self._dispatch(msg))
self._active_tasks.setdefault(msg.session_key, []).append(task)
task.add_done_callback(lambda t, k=msg.session_key: self._active_tasks.get(k, []) and self._active_tasks[k].remove(t) if t in self._active_tasks.get(k, []) else None)
async def _handle_stop(self, msg: InboundMessage) -> None:
"""Cancel all active tasks and subagents for the session."""
tasks = self._active_tasks.pop(msg.session_key, [])
cancelled = sum(1 for t in tasks if not t.done() and t.cancel())
for t in tasks:
try:
await t
except (asyncio.CancelledError, Exception):
pass
sub_cancelled = await self.subagents.cancel_by_session(msg.session_key)
total = cancelled + sub_cancelled
content = f"⏹ Stopped {total} task(s)." if total else "No active task to stop."
await self.bus.publish_outbound(OutboundMessage(
channel=msg.channel, chat_id=msg.chat_id, content=content,
))
async def _dispatch(self, msg: InboundMessage) -> None:
"""Process a message under the global lock."""
async with self._processing_lock:
try: try:
response = await self._process_message(msg) response = await self._process_message(msg)
if response is not None: if response is not None:
await self.bus.publish_outbound(response) await self.bus.publish_outbound(response)
elif msg.channel == "cli": elif msg.channel == "cli":
await self.bus.publish_outbound(OutboundMessage( await self.bus.publish_outbound(OutboundMessage(
channel=msg.channel, chat_id=msg.chat_id, content="", metadata=msg.metadata or {}, channel=msg.channel, chat_id=msg.chat_id,
content="", metadata=msg.metadata or {},
)) ))
except Exception as e: except asyncio.CancelledError:
logger.error("Error processing message: {}", e) logger.info("Task cancelled for session {}", msg.session_key)
raise
except Exception:
logger.exception("Error processing message for session {}", msg.session_key)
await self.bus.publish_outbound(OutboundMessage( await self.bus.publish_outbound(OutboundMessage(
channel=msg.channel, channel=msg.channel, chat_id=msg.chat_id,
chat_id=msg.chat_id, content="Sorry, I encountered an error.",
content=f"Sorry, I encountered an error: {str(e)}"
)) ))
except asyncio.TimeoutError:
continue
async def close_mcp(self) -> None: async def close_mcp(self) -> None:
"""Close MCP connections.""" """Close MCP connections."""
@@ -359,7 +388,7 @@ class AgentLoop:
content="New session started.") content="New session started.")
if cmd == "/help": if cmd == "/help":
return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id,
content="🐈 nanobot commands:\n/new — Start a new conversation\n/help — Show available commands") content="🐈 nanobot commands:\n/new — Start a new conversation\n/stop — Stop the current task\n/help — Show available commands")
unconsolidated = len(session.messages) - session.last_consolidated unconsolidated = len(session.messages) - session.last_consolidated
if (unconsolidated >= self.memory_window and session.key not in self._consolidating): if (unconsolidated >= self.memory_window and session.key not in self._consolidating):

View File

@@ -18,13 +18,7 @@ from nanobot.agent.tools.web import WebSearchTool, WebFetchTool
class SubagentManager: class SubagentManager:
""" """Manages background subagent execution."""
Manages background subagent execution.
Subagents are lightweight agent instances that run in the background
to handle specific tasks. They share the same LLM provider but have
isolated context and a focused system prompt.
"""
def __init__( def __init__(
self, self,
@@ -49,6 +43,7 @@ class SubagentManager:
self.exec_config = exec_config or ExecToolConfig() self.exec_config = exec_config or ExecToolConfig()
self.restrict_to_workspace = restrict_to_workspace self.restrict_to_workspace = restrict_to_workspace
self._running_tasks: dict[str, asyncio.Task[None]] = {} self._running_tasks: dict[str, asyncio.Task[None]] = {}
self._session_tasks: dict[str, set[str]] = {} # session_key -> {task_id, ...}
async def spawn( async def spawn(
self, self,
@@ -56,35 +51,28 @@ class SubagentManager:
label: str | None = None, label: str | None = None,
origin_channel: str = "cli", origin_channel: str = "cli",
origin_chat_id: str = "direct", origin_chat_id: str = "direct",
session_key: str | None = None,
) -> str: ) -> str:
""" """Spawn a subagent to execute a task in the background."""
Spawn a subagent to execute a task in the background.
Args:
task: The task description for the subagent.
label: Optional human-readable label for the task.
origin_channel: The channel to announce results to.
origin_chat_id: The chat ID to announce results to.
Returns:
Status message indicating the subagent was started.
"""
task_id = str(uuid.uuid4())[:8] task_id = str(uuid.uuid4())[:8]
display_label = label or task[:30] + ("..." if len(task) > 30 else "") display_label = label or task[:30] + ("..." if len(task) > 30 else "")
origin = {"channel": origin_channel, "chat_id": origin_chat_id}
origin = {
"channel": origin_channel,
"chat_id": origin_chat_id,
}
# Create background task
bg_task = asyncio.create_task( bg_task = asyncio.create_task(
self._run_subagent(task_id, task, display_label, origin) self._run_subagent(task_id, task, display_label, origin)
) )
self._running_tasks[task_id] = bg_task self._running_tasks[task_id] = bg_task
if session_key:
self._session_tasks.setdefault(session_key, set()).add(task_id)
# Cleanup when done def _cleanup(_: asyncio.Task) -> None:
bg_task.add_done_callback(lambda _: self._running_tasks.pop(task_id, None)) self._running_tasks.pop(task_id, None)
if session_key and (ids := self._session_tasks.get(session_key)):
ids.discard(task_id)
if not ids:
del self._session_tasks[session_key]
bg_task.add_done_callback(_cleanup)
logger.info("Spawned subagent [{}]: {}", task_id, display_label) logger.info("Spawned subagent [{}]: {}", task_id, display_label)
return f"Subagent [{display_label}] started (id: {task_id}). I'll notify you when it completes." return f"Subagent [{display_label}] started (id: {task_id}). I'll notify you when it completes."
@@ -253,6 +241,16 @@ Skills are available at: {self.workspace}/skills/ (read SKILL.md files as needed
When you have completed the task, provide a clear summary of your findings or actions.""" When you have completed the task, provide a clear summary of your findings or actions."""
async def cancel_by_session(self, session_key: str) -> int:
"""Cancel all subagents for the given session. Returns count cancelled."""
tasks = [self._running_tasks[tid] for tid in self._session_tasks.get(session_key, [])
if tid in self._running_tasks and not self._running_tasks[tid].done()]
for t in tasks:
t.cancel()
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
return len(tasks)
def get_running_count(self) -> int: def get_running_count(self) -> int:
"""Return the number of currently running subagents.""" """Return the number of currently running subagents."""
return len(self._running_tasks) return len(self._running_tasks)

View File

@@ -15,11 +15,13 @@ class SpawnTool(Tool):
self._manager = manager self._manager = manager
self._origin_channel = "cli" self._origin_channel = "cli"
self._origin_chat_id = "direct" self._origin_chat_id = "direct"
self._session_key = "cli:direct"
def set_context(self, channel: str, chat_id: str) -> None: def set_context(self, channel: str, chat_id: str) -> None:
"""Set the origin context for subagent announcements.""" """Set the origin context for subagent announcements."""
self._origin_channel = channel self._origin_channel = channel
self._origin_chat_id = chat_id self._origin_chat_id = chat_id
self._session_key = f"{channel}:{chat_id}"
@property @property
def name(self) -> str: def name(self) -> str:
@@ -57,4 +59,5 @@ class SpawnTool(Tool):
label=label, label=label,
origin_channel=self._origin_channel, origin_channel=self._origin_channel,
origin_chat_id=self._origin_chat_id, origin_chat_id=self._origin_chat_id,
session_key=self._session_key,
) )

View File

@@ -111,6 +111,7 @@ class TelegramChannel(BaseChannel):
BOT_COMMANDS = [ BOT_COMMANDS = [
BotCommand("start", "Start the bot"), BotCommand("start", "Start the bot"),
BotCommand("new", "Start a new conversation"), BotCommand("new", "Start a new conversation"),
BotCommand("stop", "Stop the current task"),
BotCommand("help", "Show available commands"), BotCommand("help", "Show available commands"),
] ]
@@ -299,6 +300,7 @@ class TelegramChannel(BaseChannel):
await update.message.reply_text( await update.message.reply_text(
"🐈 nanobot commands:\n" "🐈 nanobot commands:\n"
"/new — Start a new conversation\n" "/new — Start a new conversation\n"
"/stop — Stop the current task\n"
"/help — Show available commands" "/help — Show available commands"
) )

View File

@@ -2,14 +2,6 @@
You are a helpful AI assistant. Be concise, accurate, and friendly. You are a helpful AI assistant. Be concise, accurate, and friendly.
## Guidelines
- Before calling tools, briefly state your intent — but NEVER predict results before receiving them
- Use precise tense: "I will run X" before the call, "X returned Y" after
- NEVER claim success before a tool result confirms it
- Ask for clarification when the request is ambiguous
- Remember important information in `memory/MEMORY.md`; past events are logged in `memory/HISTORY.md`
## Scheduled Reminders ## Scheduled Reminders
When user asks for a reminder at a specific time, use `exec` to run: When user asks for a reminder at a specific time, use `exec` to run:

167
tests/test_task_cancel.py Normal file
View File

@@ -0,0 +1,167 @@
"""Tests for /stop task cancellation."""
from __future__ import annotations
import asyncio
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
def _make_loop():
"""Create a minimal AgentLoop with mocked dependencies."""
from nanobot.agent.loop import AgentLoop
from nanobot.bus.queue import MessageBus
bus = MessageBus()
provider = MagicMock()
provider.get_default_model.return_value = "test-model"
workspace = MagicMock()
workspace.__truediv__ = MagicMock(return_value=MagicMock())
with patch("nanobot.agent.loop.ContextBuilder"), \
patch("nanobot.agent.loop.SessionManager"), \
patch("nanobot.agent.loop.SubagentManager") as MockSubMgr:
MockSubMgr.return_value.cancel_by_session = AsyncMock(return_value=0)
loop = AgentLoop(bus=bus, provider=provider, workspace=workspace)
return loop, bus
class TestHandleStop:
@pytest.mark.asyncio
async def test_stop_no_active_task(self):
from nanobot.bus.events import InboundMessage
loop, bus = _make_loop()
msg = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="/stop")
await loop._handle_stop(msg)
out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0)
assert "No active task" in out.content
@pytest.mark.asyncio
async def test_stop_cancels_active_task(self):
from nanobot.bus.events import InboundMessage
loop, bus = _make_loop()
cancelled = asyncio.Event()
async def slow_task():
try:
await asyncio.sleep(60)
except asyncio.CancelledError:
cancelled.set()
raise
task = asyncio.create_task(slow_task())
await asyncio.sleep(0)
loop._active_tasks["test:c1"] = [task]
msg = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="/stop")
await loop._handle_stop(msg)
assert cancelled.is_set()
out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0)
assert "stopped" in out.content.lower()
@pytest.mark.asyncio
async def test_stop_cancels_multiple_tasks(self):
from nanobot.bus.events import InboundMessage
loop, bus = _make_loop()
events = [asyncio.Event(), asyncio.Event()]
async def slow(idx):
try:
await asyncio.sleep(60)
except asyncio.CancelledError:
events[idx].set()
raise
tasks = [asyncio.create_task(slow(i)) for i in range(2)]
await asyncio.sleep(0)
loop._active_tasks["test:c1"] = tasks
msg = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="/stop")
await loop._handle_stop(msg)
assert all(e.is_set() for e in events)
out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0)
assert "2 task" in out.content
class TestDispatch:
@pytest.mark.asyncio
async def test_dispatch_processes_and_publishes(self):
from nanobot.bus.events import InboundMessage, OutboundMessage
loop, bus = _make_loop()
msg = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="hello")
loop._process_message = AsyncMock(
return_value=OutboundMessage(channel="test", chat_id="c1", content="hi")
)
await loop._dispatch(msg)
out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0)
assert out.content == "hi"
@pytest.mark.asyncio
async def test_processing_lock_serializes(self):
from nanobot.bus.events import InboundMessage, OutboundMessage
loop, bus = _make_loop()
order = []
async def mock_process(m, **kwargs):
order.append(f"start-{m.content}")
await asyncio.sleep(0.05)
order.append(f"end-{m.content}")
return OutboundMessage(channel="test", chat_id="c1", content=m.content)
loop._process_message = mock_process
msg1 = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="a")
msg2 = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="b")
t1 = asyncio.create_task(loop._dispatch(msg1))
t2 = asyncio.create_task(loop._dispatch(msg2))
await asyncio.gather(t1, t2)
assert order == ["start-a", "end-a", "start-b", "end-b"]
class TestSubagentCancellation:
@pytest.mark.asyncio
async def test_cancel_by_session(self):
from nanobot.agent.subagent import SubagentManager
from nanobot.bus.queue import MessageBus
bus = MessageBus()
provider = MagicMock()
provider.get_default_model.return_value = "test-model"
mgr = SubagentManager(provider=provider, workspace=MagicMock(), bus=bus)
cancelled = asyncio.Event()
async def slow():
try:
await asyncio.sleep(60)
except asyncio.CancelledError:
cancelled.set()
raise
task = asyncio.create_task(slow())
await asyncio.sleep(0)
mgr._running_tasks["sub-1"] = task
mgr._session_tasks["test:c1"] = {"sub-1"}
count = await mgr.cancel_by_session("test:c1")
assert count == 1
assert cancelled.is_set()
@pytest.mark.asyncio
async def test_cancel_by_session_no_tasks(self):
from nanobot.agent.subagent import SubagentManager
from nanobot.bus.queue import MessageBus
bus = MessageBus()
provider = MagicMock()
provider.get_default_model.return_value = "test-model"
mgr = SubagentManager(provider=provider, workspace=MagicMock(), bus=bus)
assert await mgr.cancel_by_session("nonexistent") == 0