Merge pull request #908 to route CLI interactive mode through message bus
refactor: route CLI interactive mode through message bus
This commit is contained in:
@@ -277,8 +277,9 @@ class AgentLoop:
|
|||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
response = await self._process_message(msg)
|
response = await self._process_message(msg)
|
||||||
if response:
|
await self.bus.publish_outbound(response or OutboundMessage(
|
||||||
await self.bus.publish_outbound(response)
|
channel=msg.channel, chat_id=msg.chat_id, content="",
|
||||||
|
))
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error("Error processing message: {}", e)
|
logger.error("Error processing message: {}", e)
|
||||||
await self.bus.publish_outbound(OutboundMessage(
|
await self.bus.publish_outbound(OutboundMessage(
|
||||||
@@ -376,9 +377,11 @@ class AgentLoop:
|
|||||||
)
|
)
|
||||||
|
|
||||||
async def _bus_progress(content: str) -> None:
|
async def _bus_progress(content: str) -> None:
|
||||||
|
meta = dict(msg.metadata or {})
|
||||||
|
meta["_progress"] = True
|
||||||
await self.bus.publish_outbound(OutboundMessage(
|
await self.bus.publish_outbound(OutboundMessage(
|
||||||
channel=msg.channel, chat_id=msg.chat_id, content=content,
|
channel=msg.channel, chat_id=msg.chat_id, content=content,
|
||||||
metadata=msg.metadata or {},
|
metadata=meta,
|
||||||
))
|
))
|
||||||
|
|
||||||
final_content, tools_used = await self._run_agent_loop(
|
final_content, tools_used = await self._run_agent_loop(
|
||||||
|
|||||||
@@ -498,27 +498,58 @@ def agent(
|
|||||||
console.print(f" [dim]↳ {content}[/dim]")
|
console.print(f" [dim]↳ {content}[/dim]")
|
||||||
|
|
||||||
if message:
|
if message:
|
||||||
# Single message mode
|
# Single message mode — direct call, no bus needed
|
||||||
async def run_once():
|
async def run_once():
|
||||||
with _thinking_ctx():
|
with _thinking_ctx():
|
||||||
response = await agent_loop.process_direct(message, session_id, on_progress=_cli_progress)
|
response = await agent_loop.process_direct(message, session_id, on_progress=_cli_progress)
|
||||||
_print_agent_response(response, render_markdown=markdown)
|
_print_agent_response(response, render_markdown=markdown)
|
||||||
await agent_loop.close_mcp()
|
await agent_loop.close_mcp()
|
||||||
|
|
||||||
asyncio.run(run_once())
|
asyncio.run(run_once())
|
||||||
else:
|
else:
|
||||||
# Interactive mode
|
# Interactive mode — route through bus like other channels
|
||||||
|
from nanobot.bus.events import InboundMessage
|
||||||
_init_prompt_session()
|
_init_prompt_session()
|
||||||
console.print(f"{__logo__} Interactive mode (type [bold]exit[/bold] or [bold]Ctrl+C[/bold] to quit)\n")
|
console.print(f"{__logo__} Interactive mode (type [bold]exit[/bold] or [bold]Ctrl+C[/bold] to quit)\n")
|
||||||
|
|
||||||
|
if ":" in session_id:
|
||||||
|
cli_channel, cli_chat_id = session_id.split(":", 1)
|
||||||
|
else:
|
||||||
|
cli_channel, cli_chat_id = "cli", session_id
|
||||||
|
|
||||||
def _exit_on_sigint(signum, frame):
|
def _exit_on_sigint(signum, frame):
|
||||||
_restore_terminal()
|
_restore_terminal()
|
||||||
console.print("\nGoodbye!")
|
console.print("\nGoodbye!")
|
||||||
os._exit(0)
|
os._exit(0)
|
||||||
|
|
||||||
signal.signal(signal.SIGINT, _exit_on_sigint)
|
signal.signal(signal.SIGINT, _exit_on_sigint)
|
||||||
|
|
||||||
async def run_interactive():
|
async def run_interactive():
|
||||||
|
bus_task = asyncio.create_task(agent_loop.run())
|
||||||
|
turn_done = asyncio.Event()
|
||||||
|
turn_done.set()
|
||||||
|
turn_response: list[str] = []
|
||||||
|
|
||||||
|
async def _consume_outbound():
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
msg = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0)
|
||||||
|
if msg.metadata.get("_progress"):
|
||||||
|
console.print(f" [dim]↳ {msg.content}[/dim]")
|
||||||
|
elif not turn_done.is_set():
|
||||||
|
if msg.content:
|
||||||
|
turn_response.append(msg.content)
|
||||||
|
turn_done.set()
|
||||||
|
elif msg.content:
|
||||||
|
console.print()
|
||||||
|
_print_agent_response(msg.content, render_markdown=markdown)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
continue
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
break
|
||||||
|
|
||||||
|
outbound_task = asyncio.create_task(_consume_outbound())
|
||||||
|
|
||||||
try:
|
try:
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
@@ -532,10 +563,22 @@ def agent(
|
|||||||
_restore_terminal()
|
_restore_terminal()
|
||||||
console.print("\nGoodbye!")
|
console.print("\nGoodbye!")
|
||||||
break
|
break
|
||||||
|
|
||||||
|
turn_done.clear()
|
||||||
|
turn_response.clear()
|
||||||
|
|
||||||
|
await bus.publish_inbound(InboundMessage(
|
||||||
|
channel=cli_channel,
|
||||||
|
sender_id="user",
|
||||||
|
chat_id=cli_chat_id,
|
||||||
|
content=user_input,
|
||||||
|
))
|
||||||
|
|
||||||
with _thinking_ctx():
|
with _thinking_ctx():
|
||||||
response = await agent_loop.process_direct(user_input, session_id, on_progress=_cli_progress)
|
await turn_done.wait()
|
||||||
_print_agent_response(response, render_markdown=markdown)
|
|
||||||
|
if turn_response:
|
||||||
|
_print_agent_response(turn_response[0], render_markdown=markdown)
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
_restore_terminal()
|
_restore_terminal()
|
||||||
console.print("\nGoodbye!")
|
console.print("\nGoodbye!")
|
||||||
@@ -545,8 +588,11 @@ def agent(
|
|||||||
console.print("\nGoodbye!")
|
console.print("\nGoodbye!")
|
||||||
break
|
break
|
||||||
finally:
|
finally:
|
||||||
|
agent_loop.stop()
|
||||||
|
outbound_task.cancel()
|
||||||
|
await asyncio.gather(bus_task, outbound_task, return_exceptions=True)
|
||||||
await agent_loop.close_mcp()
|
await agent_loop.close_mcp()
|
||||||
|
|
||||||
asyncio.run(run_interactive())
|
asyncio.run(run_interactive())
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user