refactor: route CLI interactive mode through message bus for subagent support

This commit is contained in:
Re-bin
2026-02-20 16:45:21 +00:00
parent f8ffff98a5
commit 7279ff0167
2 changed files with 60 additions and 11 deletions

View File

@@ -277,8 +277,9 @@ class AgentLoop:
)
try:
response = await self._process_message(msg)
if response:
await self.bus.publish_outbound(response)
await self.bus.publish_outbound(response or OutboundMessage(
channel=msg.channel, chat_id=msg.chat_id, content="",
))
except Exception as e:
logger.error("Error processing message: {}", e)
await self.bus.publish_outbound(OutboundMessage(
@@ -376,9 +377,11 @@ class AgentLoop:
)
async def _bus_progress(content: str) -> None:
meta = dict(msg.metadata or {})
meta["_progress"] = True
await self.bus.publish_outbound(OutboundMessage(
channel=msg.channel, chat_id=msg.chat_id, content=content,
metadata=msg.metadata or {},
metadata=meta,
))
final_content, tools_used = await self._run_agent_loop(

View File

@@ -498,27 +498,58 @@ def agent(
console.print(f" [dim]↳ {content}[/dim]")
if message:
# Single message mode
# Single message mode — direct call, no bus needed
async def run_once():
with _thinking_ctx():
response = await agent_loop.process_direct(message, session_id, on_progress=_cli_progress)
_print_agent_response(response, render_markdown=markdown)
await agent_loop.close_mcp()
asyncio.run(run_once())
else:
# Interactive mode
# Interactive mode — route through bus like other channels
from nanobot.bus.events import InboundMessage
_init_prompt_session()
console.print(f"{__logo__} Interactive mode (type [bold]exit[/bold] or [bold]Ctrl+C[/bold] to quit)\n")
if ":" in session_id:
cli_channel, cli_chat_id = session_id.split(":", 1)
else:
cli_channel, cli_chat_id = "cli", session_id
def _exit_on_sigint(signum, frame):
_restore_terminal()
console.print("\nGoodbye!")
os._exit(0)
signal.signal(signal.SIGINT, _exit_on_sigint)
async def run_interactive():
bus_task = asyncio.create_task(agent_loop.run())
turn_done = asyncio.Event()
turn_done.set()
turn_response: list[str] = []
async def _consume_outbound():
while True:
try:
msg = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0)
if msg.metadata.get("_progress"):
console.print(f" [dim]↳ {msg.content}[/dim]")
elif not turn_done.is_set():
if msg.content:
turn_response.append(msg.content)
turn_done.set()
elif msg.content:
console.print()
_print_agent_response(msg.content, render_markdown=markdown)
except asyncio.TimeoutError:
continue
except asyncio.CancelledError:
break
outbound_task = asyncio.create_task(_consume_outbound())
try:
while True:
try:
@@ -532,10 +563,22 @@ def agent(
_restore_terminal()
console.print("\nGoodbye!")
break
turn_done.clear()
turn_response.clear()
await bus.publish_inbound(InboundMessage(
channel=cli_channel,
sender_id="user",
chat_id=cli_chat_id,
content=user_input,
))
with _thinking_ctx():
response = await agent_loop.process_direct(user_input, session_id, on_progress=_cli_progress)
_print_agent_response(response, render_markdown=markdown)
await turn_done.wait()
if turn_response:
_print_agent_response(turn_response[0], render_markdown=markdown)
except KeyboardInterrupt:
_restore_terminal()
console.print("\nGoodbye!")
@@ -545,8 +588,11 @@ def agent(
console.print("\nGoodbye!")
break
finally:
agent_loop.stop()
outbound_task.cancel()
await asyncio.gather(bus_task, outbound_task, return_exceptions=True)
await agent_loop.close_mcp()
asyncio.run(run_interactive())