From 7279ff0167fbbdcd06f380b0c52e69371f95b73b Mon Sep 17 00:00:00 2001 From: Re-bin Date: Fri, 20 Feb 2026 16:45:21 +0000 Subject: [PATCH] refactor: route CLI interactive mode through message bus for subagent support --- nanobot/agent/loop.py | 9 ++++-- nanobot/cli/commands.py | 62 +++++++++++++++++++++++++++++++++++------ 2 files changed, 60 insertions(+), 11 deletions(-) diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index e5ed6e4..9f1e265 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -277,8 +277,9 @@ class AgentLoop: ) try: response = await self._process_message(msg) - if response: - await self.bus.publish_outbound(response) + await self.bus.publish_outbound(response or OutboundMessage( + channel=msg.channel, chat_id=msg.chat_id, content="", + )) except Exception as e: logger.error("Error processing message: {}", e) await self.bus.publish_outbound(OutboundMessage( @@ -376,9 +377,11 @@ class AgentLoop: ) async def _bus_progress(content: str) -> None: + meta = dict(msg.metadata or {}) + meta["_progress"] = True await self.bus.publish_outbound(OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, content=content, - metadata=msg.metadata or {}, + metadata=meta, )) final_content, tools_used = await self._run_agent_loop( diff --git a/nanobot/cli/commands.py b/nanobot/cli/commands.py index a135349..6155463 100644 --- a/nanobot/cli/commands.py +++ b/nanobot/cli/commands.py @@ -498,27 +498,58 @@ def agent( console.print(f" [dim]↳ {content}[/dim]") if message: - # Single message mode + # Single message mode — direct call, no bus needed async def run_once(): with _thinking_ctx(): response = await agent_loop.process_direct(message, session_id, on_progress=_cli_progress) _print_agent_response(response, render_markdown=markdown) await agent_loop.close_mcp() - + asyncio.run(run_once()) else: - # Interactive mode + # Interactive mode — route through bus like other channels + from nanobot.bus.events import InboundMessage _init_prompt_session() console.print(f"{__logo__} Interactive mode (type [bold]exit[/bold] or [bold]Ctrl+C[/bold] to quit)\n") + if ":" in session_id: + cli_channel, cli_chat_id = session_id.split(":", 1) + else: + cli_channel, cli_chat_id = "cli", session_id + def _exit_on_sigint(signum, frame): _restore_terminal() console.print("\nGoodbye!") os._exit(0) signal.signal(signal.SIGINT, _exit_on_sigint) - + async def run_interactive(): + bus_task = asyncio.create_task(agent_loop.run()) + turn_done = asyncio.Event() + turn_done.set() + turn_response: list[str] = [] + + async def _consume_outbound(): + while True: + try: + msg = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0) + if msg.metadata.get("_progress"): + console.print(f" [dim]↳ {msg.content}[/dim]") + elif not turn_done.is_set(): + if msg.content: + turn_response.append(msg.content) + turn_done.set() + elif msg.content: + console.print() + _print_agent_response(msg.content, render_markdown=markdown) + except asyncio.TimeoutError: + continue + except asyncio.CancelledError: + break + + outbound_task = asyncio.create_task(_consume_outbound()) + try: while True: try: @@ -532,10 +563,22 @@ def agent( _restore_terminal() console.print("\nGoodbye!") break - + + turn_done.clear() + turn_response.clear() + + await bus.publish_inbound(InboundMessage( + channel=cli_channel, + sender_id="user", + chat_id=cli_chat_id, + content=user_input, + )) + with _thinking_ctx(): - response = await agent_loop.process_direct(user_input, session_id, on_progress=_cli_progress) - _print_agent_response(response, render_markdown=markdown) + await turn_done.wait() + + if turn_response: + _print_agent_response(turn_response[0], render_markdown=markdown) except KeyboardInterrupt: _restore_terminal() console.print("\nGoodbye!") @@ -545,8 +588,11 @@ def agent( console.print("\nGoodbye!") break finally: + agent_loop.stop() + outbound_task.cancel() + await asyncio.gather(bus_task, outbound_task, return_exceptions=True) await agent_loop.close_mcp() - + asyncio.run(run_interactive())