From 4768b9a09d043aef75f31a8b337cd07ebcab167b Mon Sep 17 00:00:00 2001 From: coldxiangyu Date: Wed, 25 Feb 2026 18:21:46 +0800 Subject: [PATCH] fix: parallel subagent cancellation + register task before lock - cancel_by_session: use asyncio.gather for parallel cancellation instead of sequential await per task - _dispatch: register in _active_tasks before acquiring lock so /stop can find queued tasks (synced from #1179) --- nanobot/agent/loop.py | 49 +++++++++++++++++++++------------------ nanobot/agent/subagent.py | 12 ++++------ 2 files changed, 32 insertions(+), 29 deletions(-) diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index 95ccc19..5e0b056 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -304,30 +304,35 @@ class AgentLoop: )) async def _dispatch(self, msg: InboundMessage) -> None: - """Dispatch a message for processing under the global lock.""" - async with self._processing_lock: - self._active_tasks[msg.session_key] = asyncio.current_task() # type: ignore[arg-type] - try: - response = await self._process_message(msg) - if response is not None: - await self.bus.publish_outbound(response) - elif msg.channel == "cli": + """Dispatch a message for processing under the global lock. + + The task is registered in _active_tasks *before* acquiring the lock + so that /stop can find (and cancel) tasks that are still queued. + """ + self._active_tasks[msg.session_key] = asyncio.current_task() # type: ignore[arg-type] + try: + async with self._processing_lock: + try: + response = await self._process_message(msg) + if response is not None: + await self.bus.publish_outbound(response) + elif msg.channel == "cli": + await self.bus.publish_outbound(OutboundMessage( + channel=msg.channel, chat_id=msg.chat_id, + content="", metadata=msg.metadata or {}, + )) + except asyncio.CancelledError: + logger.info("Task cancelled for session {}", msg.session_key) + # Response already sent by _handle_immediate_command + except Exception as e: + logger.error("Error processing message: {}", e) await self.bus.publish_outbound(OutboundMessage( - channel=msg.channel, chat_id=msg.chat_id, - content="", metadata=msg.metadata or {}, + channel=msg.channel, + chat_id=msg.chat_id, + content=f"Sorry, I encountered an error: {str(e)}" )) - except asyncio.CancelledError: - logger.info("Task cancelled for session {}", msg.session_key) - # Response already sent by _handle_immediate_command - except Exception as e: - logger.error("Error processing message: {}", e) - await self.bus.publish_outbound(OutboundMessage( - channel=msg.channel, - chat_id=msg.chat_id, - content=f"Sorry, I encountered an error: {str(e)}" - )) - finally: - self._active_tasks.pop(msg.session_key, None) + finally: + self._active_tasks.pop(msg.session_key, None) async def close_mcp(self) -> None: """Close MCP connections.""" diff --git a/nanobot/agent/subagent.py b/nanobot/agent/subagent.py index 78d972e..c28412b 100644 --- a/nanobot/agent/subagent.py +++ b/nanobot/agent/subagent.py @@ -268,17 +268,15 @@ When you have completed the task, provide a clear summary of your findings or ac async def cancel_by_session(self, session_key: str) -> int: """Cancel all subagents spawned under the given session. Returns count cancelled.""" task_ids = list(self._session_tasks.get(session_key, [])) - cancelled = 0 + to_cancel: list[asyncio.Task] = [] for tid in task_ids: t = self._running_tasks.get(tid) if t and not t.done(): t.cancel() - try: - await t - except (asyncio.CancelledError, Exception): - pass - cancelled += 1 - return cancelled + to_cancel.append(t) + if to_cancel: + await asyncio.gather(*to_cancel, return_exceptions=True) + return len(to_cancel) def get_running_count(self) -> int: """Return the number of currently running subagents."""