feat(cli): extract streaming renderer to stream.py with Rich Live
Move ThinkingSpinner and StreamRenderer into a dedicated module to keep commands.py focused on orchestration. Uses Rich Live with manual refresh (auto_refresh=False) and ellipsis overflow for stable streaming output. Made-with: Cursor
This commit is contained in:
@@ -33,6 +33,7 @@ from rich.table import Table
|
|||||||
from rich.text import Text
|
from rich.text import Text
|
||||||
|
|
||||||
from nanobot import __logo__, __version__
|
from nanobot import __logo__, __version__
|
||||||
|
from nanobot.cli.stream import StreamRenderer, ThinkingSpinner
|
||||||
from nanobot.config.paths import get_workspace_path
|
from nanobot.config.paths import get_workspace_path
|
||||||
from nanobot.config.schema import Config
|
from nanobot.config.schema import Config
|
||||||
from nanobot.utils.helpers import sync_workspace_templates
|
from nanobot.utils.helpers import sync_workspace_templates
|
||||||
@@ -188,46 +189,13 @@ async def _print_interactive_response(
|
|||||||
await run_in_terminal(_write)
|
await run_in_terminal(_write)
|
||||||
|
|
||||||
|
|
||||||
class _ThinkingSpinner:
|
def _print_cli_progress_line(text: str, thinking: ThinkingSpinner | None) -> None:
|
||||||
"""Spinner wrapper with pause support for clean progress output."""
|
|
||||||
|
|
||||||
def __init__(self, enabled: bool):
|
|
||||||
self._spinner = console.status(
|
|
||||||
"[dim]nanobot is thinking...[/dim]", spinner="dots"
|
|
||||||
) if enabled else None
|
|
||||||
self._active = False
|
|
||||||
|
|
||||||
def __enter__(self):
|
|
||||||
if self._spinner:
|
|
||||||
self._spinner.start()
|
|
||||||
self._active = True
|
|
||||||
return self
|
|
||||||
|
|
||||||
def __exit__(self, *exc):
|
|
||||||
self._active = False
|
|
||||||
if self._spinner:
|
|
||||||
self._spinner.stop()
|
|
||||||
return False
|
|
||||||
|
|
||||||
@contextmanager
|
|
||||||
def pause(self):
|
|
||||||
"""Temporarily stop spinner while printing progress."""
|
|
||||||
if self._spinner and self._active:
|
|
||||||
self._spinner.stop()
|
|
||||||
try:
|
|
||||||
yield
|
|
||||||
finally:
|
|
||||||
if self._spinner and self._active:
|
|
||||||
self._spinner.start()
|
|
||||||
|
|
||||||
|
|
||||||
def _print_cli_progress_line(text: str, thinking: _ThinkingSpinner | None) -> None:
|
|
||||||
"""Print a CLI progress line, pausing the spinner if needed."""
|
"""Print a CLI progress line, pausing the spinner if needed."""
|
||||||
with thinking.pause() if thinking else nullcontext():
|
with thinking.pause() if thinking else nullcontext():
|
||||||
console.print(f" [dim]↳ {text}[/dim]")
|
console.print(f" [dim]↳ {text}[/dim]")
|
||||||
|
|
||||||
|
|
||||||
async def _print_interactive_progress_line(text: str, thinking: _ThinkingSpinner | None) -> None:
|
async def _print_interactive_progress_line(text: str, thinking: ThinkingSpinner | None) -> None:
|
||||||
"""Print an interactive progress line, pausing the spinner if needed."""
|
"""Print an interactive progress line, pausing the spinner if needed."""
|
||||||
with thinking.pause() if thinking else nullcontext():
|
with thinking.pause() if thinking else nullcontext():
|
||||||
await _print_interactive_line(text)
|
await _print_interactive_line(text)
|
||||||
@@ -755,7 +723,7 @@ def agent(
|
|||||||
)
|
)
|
||||||
|
|
||||||
# Shared reference for progress callbacks
|
# Shared reference for progress callbacks
|
||||||
_thinking: _ThinkingSpinner | None = None
|
_thinking: ThinkingSpinner | None = None
|
||||||
|
|
||||||
async def _cli_progress(content: str, *, tool_hint: bool = False) -> None:
|
async def _cli_progress(content: str, *, tool_hint: bool = False) -> None:
|
||||||
ch = agent_loop.channels_config
|
ch = agent_loop.channels_config
|
||||||
@@ -768,18 +736,19 @@ def agent(
|
|||||||
if message:
|
if message:
|
||||||
# Single message mode — direct call, no bus needed
|
# Single message mode — direct call, no bus needed
|
||||||
async def run_once():
|
async def run_once():
|
||||||
nonlocal _thinking
|
renderer = StreamRenderer(render_markdown=markdown)
|
||||||
_thinking = _ThinkingSpinner(enabled=not logs)
|
response = await agent_loop.process_direct(
|
||||||
with _thinking:
|
message, session_id,
|
||||||
response = await agent_loop.process_direct(
|
on_progress=_cli_progress,
|
||||||
message, session_id, on_progress=_cli_progress,
|
on_stream=renderer.on_delta,
|
||||||
)
|
on_stream_end=renderer.on_end,
|
||||||
_thinking = None
|
|
||||||
_print_agent_response(
|
|
||||||
response.content if response else "",
|
|
||||||
render_markdown=markdown,
|
|
||||||
metadata=response.metadata if response else None,
|
|
||||||
)
|
)
|
||||||
|
if not renderer.streamed:
|
||||||
|
_print_agent_response(
|
||||||
|
response.content if response else "",
|
||||||
|
render_markdown=markdown,
|
||||||
|
metadata=response.metadata if response else None,
|
||||||
|
)
|
||||||
await agent_loop.close_mcp()
|
await agent_loop.close_mcp()
|
||||||
|
|
||||||
asyncio.run(run_once())
|
asyncio.run(run_once())
|
||||||
@@ -815,11 +784,27 @@ def agent(
|
|||||||
turn_done = asyncio.Event()
|
turn_done = asyncio.Event()
|
||||||
turn_done.set()
|
turn_done.set()
|
||||||
turn_response: list[tuple[str, dict]] = []
|
turn_response: list[tuple[str, dict]] = []
|
||||||
|
renderer: StreamRenderer | None = None
|
||||||
|
|
||||||
async def _consume_outbound():
|
async def _consume_outbound():
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
msg = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0)
|
msg = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0)
|
||||||
|
|
||||||
|
if msg.metadata.get("_stream_delta"):
|
||||||
|
if renderer:
|
||||||
|
await renderer.on_delta(msg.content)
|
||||||
|
continue
|
||||||
|
if msg.metadata.get("_stream_end"):
|
||||||
|
if renderer:
|
||||||
|
await renderer.on_end(
|
||||||
|
resuming=msg.metadata.get("_resuming", False),
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
if msg.metadata.get("_streamed"):
|
||||||
|
turn_done.set()
|
||||||
|
continue
|
||||||
|
|
||||||
if msg.metadata.get("_progress"):
|
if msg.metadata.get("_progress"):
|
||||||
is_tool_hint = msg.metadata.get("_tool_hint", False)
|
is_tool_hint = msg.metadata.get("_tool_hint", False)
|
||||||
ch = agent_loop.channels_config
|
ch = agent_loop.channels_config
|
||||||
@@ -829,8 +814,9 @@ def agent(
|
|||||||
pass
|
pass
|
||||||
else:
|
else:
|
||||||
await _print_interactive_progress_line(msg.content, _thinking)
|
await _print_interactive_progress_line(msg.content, _thinking)
|
||||||
|
continue
|
||||||
|
|
||||||
elif not turn_done.is_set():
|
if not turn_done.is_set():
|
||||||
if msg.content:
|
if msg.content:
|
||||||
turn_response.append((msg.content, dict(msg.metadata or {})))
|
turn_response.append((msg.content, dict(msg.metadata or {})))
|
||||||
turn_done.set()
|
turn_done.set()
|
||||||
@@ -864,23 +850,24 @@ def agent(
|
|||||||
|
|
||||||
turn_done.clear()
|
turn_done.clear()
|
||||||
turn_response.clear()
|
turn_response.clear()
|
||||||
|
renderer = StreamRenderer(render_markdown=markdown)
|
||||||
|
|
||||||
await bus.publish_inbound(InboundMessage(
|
await bus.publish_inbound(InboundMessage(
|
||||||
channel=cli_channel,
|
channel=cli_channel,
|
||||||
sender_id="user",
|
sender_id="user",
|
||||||
chat_id=cli_chat_id,
|
chat_id=cli_chat_id,
|
||||||
content=user_input,
|
content=user_input,
|
||||||
|
metadata={"_wants_stream": True},
|
||||||
))
|
))
|
||||||
|
|
||||||
nonlocal _thinking
|
await turn_done.wait()
|
||||||
_thinking = _ThinkingSpinner(enabled=not logs)
|
|
||||||
with _thinking:
|
|
||||||
await turn_done.wait()
|
|
||||||
_thinking = None
|
|
||||||
|
|
||||||
if turn_response:
|
if turn_response:
|
||||||
content, meta = turn_response[0]
|
content, meta = turn_response[0]
|
||||||
_print_agent_response(content, render_markdown=markdown, metadata=meta)
|
if content and not meta.get("_streamed"):
|
||||||
|
_print_agent_response(
|
||||||
|
content, render_markdown=markdown, metadata=meta,
|
||||||
|
)
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
_restore_terminal()
|
_restore_terminal()
|
||||||
console.print("\nGoodbye!")
|
console.print("\nGoodbye!")
|
||||||
|
|||||||
128
nanobot/cli/stream.py
Normal file
128
nanobot/cli/stream.py
Normal file
@@ -0,0 +1,128 @@
|
|||||||
|
"""Streaming renderer for CLI output.
|
||||||
|
|
||||||
|
Uses Rich Live with auto_refresh=False for stable, flicker-free
|
||||||
|
markdown rendering during streaming. Ellipsis mode handles overflow.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import re
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from rich.console import Console
|
||||||
|
from rich.live import Live
|
||||||
|
from rich.markdown import Markdown
|
||||||
|
from rich.text import Text
|
||||||
|
|
||||||
|
from nanobot import __logo__
|
||||||
|
|
||||||
|
|
||||||
|
def _make_console() -> Console:
|
||||||
|
return Console(file=sys.stdout)
|
||||||
|
|
||||||
|
|
||||||
|
class ThinkingSpinner:
|
||||||
|
"""Spinner that shows 'nanobot is thinking...' with pause support."""
|
||||||
|
|
||||||
|
def __init__(self, console: Console | None = None):
|
||||||
|
c = console or _make_console()
|
||||||
|
self._spinner = c.status("[dim]nanobot is thinking...[/dim]", spinner="dots")
|
||||||
|
self._active = False
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
self._spinner.start()
|
||||||
|
self._active = True
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, *exc):
|
||||||
|
self._active = False
|
||||||
|
self._spinner.stop()
|
||||||
|
return False
|
||||||
|
|
||||||
|
def pause(self):
|
||||||
|
"""Context manager: temporarily stop spinner for clean output."""
|
||||||
|
from contextlib import contextmanager
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def _ctx():
|
||||||
|
if self._spinner and self._active:
|
||||||
|
self._spinner.stop()
|
||||||
|
try:
|
||||||
|
yield
|
||||||
|
finally:
|
||||||
|
if self._spinner and self._active:
|
||||||
|
self._spinner.start()
|
||||||
|
|
||||||
|
return _ctx()
|
||||||
|
|
||||||
|
|
||||||
|
class StreamRenderer:
|
||||||
|
"""Rich Live streaming with markdown. auto_refresh=False avoids render races.
|
||||||
|
|
||||||
|
Flow per round:
|
||||||
|
spinner -> first visible delta -> header + Live renders ->
|
||||||
|
on_end -> Live stops (content stays on screen)
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, render_markdown: bool = True, show_spinner: bool = True):
|
||||||
|
self._md = render_markdown
|
||||||
|
self._show_spinner = show_spinner
|
||||||
|
self._buf = ""
|
||||||
|
self._live: Live | None = None
|
||||||
|
self._t = 0.0
|
||||||
|
self.streamed = False
|
||||||
|
self._spinner: ThinkingSpinner | None = None
|
||||||
|
self._start_spinner()
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _clean(text: str) -> str:
|
||||||
|
text = re.sub(r"<think>[\s\S]*?</think>", "", text)
|
||||||
|
text = re.sub(r"<think>[\s\S]*$", "", text)
|
||||||
|
return text.strip()
|
||||||
|
|
||||||
|
def _render(self):
|
||||||
|
clean = self._clean(self._buf)
|
||||||
|
return Markdown(clean) if self._md and clean else Text(clean or "")
|
||||||
|
|
||||||
|
def _start_spinner(self) -> None:
|
||||||
|
if self._show_spinner:
|
||||||
|
self._spinner = ThinkingSpinner()
|
||||||
|
self._spinner.__enter__()
|
||||||
|
|
||||||
|
def _stop_spinner(self) -> None:
|
||||||
|
if self._spinner:
|
||||||
|
self._spinner.__exit__(None, None, None)
|
||||||
|
self._spinner = None
|
||||||
|
|
||||||
|
async def on_delta(self, delta: str) -> None:
|
||||||
|
self.streamed = True
|
||||||
|
self._buf += delta
|
||||||
|
if self._live is None:
|
||||||
|
if not self._clean(self._buf):
|
||||||
|
return
|
||||||
|
self._stop_spinner()
|
||||||
|
c = _make_console()
|
||||||
|
c.print()
|
||||||
|
c.print(f"[cyan]{__logo__} nanobot[/cyan]")
|
||||||
|
self._live = Live(self._render(), console=c, auto_refresh=False)
|
||||||
|
self._live.start()
|
||||||
|
now = time.monotonic()
|
||||||
|
if "\n" in delta or (now - self._t) > 0.05:
|
||||||
|
self._live.update(self._render())
|
||||||
|
self._live.refresh()
|
||||||
|
self._t = now
|
||||||
|
|
||||||
|
async def on_end(self, *, resuming: bool = False) -> None:
|
||||||
|
if self._live:
|
||||||
|
self._live.update(self._render())
|
||||||
|
self._live.refresh()
|
||||||
|
self._live.stop()
|
||||||
|
self._live = None
|
||||||
|
self._stop_spinner()
|
||||||
|
if resuming:
|
||||||
|
self._buf = ""
|
||||||
|
self._start_spinner()
|
||||||
|
else:
|
||||||
|
_make_console().print()
|
||||||
@@ -5,6 +5,7 @@ import pytest
|
|||||||
from prompt_toolkit.formatted_text import HTML
|
from prompt_toolkit.formatted_text import HTML
|
||||||
|
|
||||||
from nanobot.cli import commands
|
from nanobot.cli import commands
|
||||||
|
from nanobot.cli import stream as stream_mod
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
@@ -62,12 +63,13 @@ def test_init_prompt_session_creates_session():
|
|||||||
def test_thinking_spinner_pause_stops_and_restarts():
|
def test_thinking_spinner_pause_stops_and_restarts():
|
||||||
"""Pause should stop the active spinner and restart it afterward."""
|
"""Pause should stop the active spinner and restart it afterward."""
|
||||||
spinner = MagicMock()
|
spinner = MagicMock()
|
||||||
|
mock_console = MagicMock()
|
||||||
|
mock_console.status.return_value = spinner
|
||||||
|
|
||||||
with patch.object(commands.console, "status", return_value=spinner):
|
thinking = stream_mod.ThinkingSpinner(console=mock_console)
|
||||||
thinking = commands._ThinkingSpinner(enabled=True)
|
with thinking:
|
||||||
with thinking:
|
with thinking.pause():
|
||||||
with thinking.pause():
|
pass
|
||||||
pass
|
|
||||||
|
|
||||||
assert spinner.method_calls == [
|
assert spinner.method_calls == [
|
||||||
call.start(),
|
call.start(),
|
||||||
@@ -83,10 +85,11 @@ def test_print_cli_progress_line_pauses_spinner_before_printing():
|
|||||||
spinner = MagicMock()
|
spinner = MagicMock()
|
||||||
spinner.start.side_effect = lambda: order.append("start")
|
spinner.start.side_effect = lambda: order.append("start")
|
||||||
spinner.stop.side_effect = lambda: order.append("stop")
|
spinner.stop.side_effect = lambda: order.append("stop")
|
||||||
|
mock_console = MagicMock()
|
||||||
|
mock_console.status.return_value = spinner
|
||||||
|
|
||||||
with patch.object(commands.console, "status", return_value=spinner), \
|
with patch.object(commands.console, "print", side_effect=lambda *_args, **_kwargs: order.append("print")):
|
||||||
patch.object(commands.console, "print", side_effect=lambda *_args, **_kwargs: order.append("print")):
|
thinking = stream_mod.ThinkingSpinner(console=mock_console)
|
||||||
thinking = commands._ThinkingSpinner(enabled=True)
|
|
||||||
with thinking:
|
with thinking:
|
||||||
commands._print_cli_progress_line("tool running", thinking)
|
commands._print_cli_progress_line("tool running", thinking)
|
||||||
|
|
||||||
@@ -100,13 +103,14 @@ async def test_print_interactive_progress_line_pauses_spinner_before_printing():
|
|||||||
spinner = MagicMock()
|
spinner = MagicMock()
|
||||||
spinner.start.side_effect = lambda: order.append("start")
|
spinner.start.side_effect = lambda: order.append("start")
|
||||||
spinner.stop.side_effect = lambda: order.append("stop")
|
spinner.stop.side_effect = lambda: order.append("stop")
|
||||||
|
mock_console = MagicMock()
|
||||||
|
mock_console.status.return_value = spinner
|
||||||
|
|
||||||
async def fake_print(_text: str) -> None:
|
async def fake_print(_text: str) -> None:
|
||||||
order.append("print")
|
order.append("print")
|
||||||
|
|
||||||
with patch.object(commands.console, "status", return_value=spinner), \
|
with patch("nanobot.cli.commands._print_interactive_line", side_effect=fake_print):
|
||||||
patch("nanobot.cli.commands._print_interactive_line", side_effect=fake_print):
|
thinking = stream_mod.ThinkingSpinner(console=mock_console)
|
||||||
thinking = commands._ThinkingSpinner(enabled=True)
|
|
||||||
with thinking:
|
with thinking:
|
||||||
await commands._print_interactive_progress_line("tool running", thinking)
|
await commands._print_interactive_progress_line("tool running", thinking)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user