diff --git a/nanobot/cli/commands.py b/nanobot/cli/commands.py index ea06acb..b915ce9 100644 --- a/nanobot/cli/commands.py +++ b/nanobot/cli/commands.py @@ -33,6 +33,7 @@ from rich.table import Table from rich.text import Text from nanobot import __logo__, __version__ +from nanobot.cli.stream import StreamRenderer, ThinkingSpinner from nanobot.config.paths import get_workspace_path from nanobot.config.schema import Config from nanobot.utils.helpers import sync_workspace_templates @@ -188,46 +189,13 @@ async def _print_interactive_response( await run_in_terminal(_write) -class _ThinkingSpinner: - """Spinner wrapper with pause support for clean progress output.""" - - def __init__(self, enabled: bool): - self._spinner = console.status( - "[dim]nanobot is thinking...[/dim]", spinner="dots" - ) if enabled else None - self._active = False - - def __enter__(self): - if self._spinner: - self._spinner.start() - self._active = True - return self - - def __exit__(self, *exc): - self._active = False - if self._spinner: - self._spinner.stop() - return False - - @contextmanager - def pause(self): - """Temporarily stop spinner while printing progress.""" - if self._spinner and self._active: - self._spinner.stop() - try: - yield - finally: - if self._spinner and self._active: - self._spinner.start() - - -def _print_cli_progress_line(text: str, thinking: _ThinkingSpinner | None) -> None: +def _print_cli_progress_line(text: str, thinking: ThinkingSpinner | None) -> None: """Print a CLI progress line, pausing the spinner if needed.""" with thinking.pause() if thinking else nullcontext(): console.print(f" [dim]↳ {text}[/dim]") -async def _print_interactive_progress_line(text: str, thinking: _ThinkingSpinner | None) -> None: +async def _print_interactive_progress_line(text: str, thinking: ThinkingSpinner | None) -> None: """Print an interactive progress line, pausing the spinner if needed.""" with thinking.pause() if thinking else nullcontext(): await _print_interactive_line(text) @@ -755,7 +723,7 @@ def agent( ) # Shared reference for progress callbacks - _thinking: _ThinkingSpinner | None = None + _thinking: ThinkingSpinner | None = None async def _cli_progress(content: str, *, tool_hint: bool = False) -> None: ch = agent_loop.channels_config @@ -768,18 +736,19 @@ def agent( if message: # Single message mode — direct call, no bus needed async def run_once(): - nonlocal _thinking - _thinking = _ThinkingSpinner(enabled=not logs) - with _thinking: - response = await agent_loop.process_direct( - message, session_id, on_progress=_cli_progress, - ) - _thinking = None - _print_agent_response( - response.content if response else "", - render_markdown=markdown, - metadata=response.metadata if response else None, + renderer = StreamRenderer(render_markdown=markdown) + response = await agent_loop.process_direct( + message, session_id, + on_progress=_cli_progress, + on_stream=renderer.on_delta, + on_stream_end=renderer.on_end, ) + if not renderer.streamed: + _print_agent_response( + response.content if response else "", + render_markdown=markdown, + metadata=response.metadata if response else None, + ) await agent_loop.close_mcp() asyncio.run(run_once()) @@ -815,11 +784,27 @@ def agent( turn_done = asyncio.Event() turn_done.set() turn_response: list[tuple[str, dict]] = [] + renderer: StreamRenderer | None = None async def _consume_outbound(): while True: try: msg = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0) + + if msg.metadata.get("_stream_delta"): + if renderer: + await renderer.on_delta(msg.content) + continue + if msg.metadata.get("_stream_end"): + if renderer: + await renderer.on_end( + resuming=msg.metadata.get("_resuming", False), + ) + continue + if msg.metadata.get("_streamed"): + turn_done.set() + continue + if msg.metadata.get("_progress"): is_tool_hint = msg.metadata.get("_tool_hint", False) ch = agent_loop.channels_config @@ -829,8 +814,9 @@ def agent( pass else: await _print_interactive_progress_line(msg.content, _thinking) + continue - elif not turn_done.is_set(): + if not turn_done.is_set(): if msg.content: turn_response.append((msg.content, dict(msg.metadata or {}))) turn_done.set() @@ -864,23 +850,24 @@ def agent( turn_done.clear() turn_response.clear() + renderer = StreamRenderer(render_markdown=markdown) await bus.publish_inbound(InboundMessage( channel=cli_channel, sender_id="user", chat_id=cli_chat_id, content=user_input, + metadata={"_wants_stream": True}, )) - nonlocal _thinking - _thinking = _ThinkingSpinner(enabled=not logs) - with _thinking: - await turn_done.wait() - _thinking = None + await turn_done.wait() if turn_response: content, meta = turn_response[0] - _print_agent_response(content, render_markdown=markdown, metadata=meta) + if content and not meta.get("_streamed"): + _print_agent_response( + content, render_markdown=markdown, metadata=meta, + ) except KeyboardInterrupt: _restore_terminal() console.print("\nGoodbye!") diff --git a/nanobot/cli/stream.py b/nanobot/cli/stream.py new file mode 100644 index 0000000..3ee28fe --- /dev/null +++ b/nanobot/cli/stream.py @@ -0,0 +1,128 @@ +"""Streaming renderer for CLI output. + +Uses Rich Live with auto_refresh=False for stable, flicker-free +markdown rendering during streaming. Ellipsis mode handles overflow. +""" + +from __future__ import annotations + +import re +import sys +import time +from typing import Any + +from rich.console import Console +from rich.live import Live +from rich.markdown import Markdown +from rich.text import Text + +from nanobot import __logo__ + + +def _make_console() -> Console: + return Console(file=sys.stdout) + + +class ThinkingSpinner: + """Spinner that shows 'nanobot is thinking...' with pause support.""" + + def __init__(self, console: Console | None = None): + c = console or _make_console() + self._spinner = c.status("[dim]nanobot is thinking...[/dim]", spinner="dots") + self._active = False + + def __enter__(self): + self._spinner.start() + self._active = True + return self + + def __exit__(self, *exc): + self._active = False + self._spinner.stop() + return False + + def pause(self): + """Context manager: temporarily stop spinner for clean output.""" + from contextlib import contextmanager + + @contextmanager + def _ctx(): + if self._spinner and self._active: + self._spinner.stop() + try: + yield + finally: + if self._spinner and self._active: + self._spinner.start() + + return _ctx() + + +class StreamRenderer: + """Rich Live streaming with markdown. auto_refresh=False avoids render races. + + Flow per round: + spinner -> first visible delta -> header + Live renders -> + on_end -> Live stops (content stays on screen) + """ + + def __init__(self, render_markdown: bool = True, show_spinner: bool = True): + self._md = render_markdown + self._show_spinner = show_spinner + self._buf = "" + self._live: Live | None = None + self._t = 0.0 + self.streamed = False + self._spinner: ThinkingSpinner | None = None + self._start_spinner() + + @staticmethod + def _clean(text: str) -> str: + text = re.sub(r"[\s\S]*?", "", text) + text = re.sub(r"[\s\S]*$", "", text) + return text.strip() + + def _render(self): + clean = self._clean(self._buf) + return Markdown(clean) if self._md and clean else Text(clean or "") + + def _start_spinner(self) -> None: + if self._show_spinner: + self._spinner = ThinkingSpinner() + self._spinner.__enter__() + + def _stop_spinner(self) -> None: + if self._spinner: + self._spinner.__exit__(None, None, None) + self._spinner = None + + async def on_delta(self, delta: str) -> None: + self.streamed = True + self._buf += delta + if self._live is None: + if not self._clean(self._buf): + return + self._stop_spinner() + c = _make_console() + c.print() + c.print(f"[cyan]{__logo__} nanobot[/cyan]") + self._live = Live(self._render(), console=c, auto_refresh=False) + self._live.start() + now = time.monotonic() + if "\n" in delta or (now - self._t) > 0.05: + self._live.update(self._render()) + self._live.refresh() + self._t = now + + async def on_end(self, *, resuming: bool = False) -> None: + if self._live: + self._live.update(self._render()) + self._live.refresh() + self._live.stop() + self._live = None + self._stop_spinner() + if resuming: + self._buf = "" + self._start_spinner() + else: + _make_console().print() diff --git a/tests/test_cli_input.py b/tests/test_cli_input.py index 2fc9748..142dc72 100644 --- a/tests/test_cli_input.py +++ b/tests/test_cli_input.py @@ -5,6 +5,7 @@ import pytest from prompt_toolkit.formatted_text import HTML from nanobot.cli import commands +from nanobot.cli import stream as stream_mod @pytest.fixture @@ -62,12 +63,13 @@ def test_init_prompt_session_creates_session(): def test_thinking_spinner_pause_stops_and_restarts(): """Pause should stop the active spinner and restart it afterward.""" spinner = MagicMock() + mock_console = MagicMock() + mock_console.status.return_value = spinner - with patch.object(commands.console, "status", return_value=spinner): - thinking = commands._ThinkingSpinner(enabled=True) - with thinking: - with thinking.pause(): - pass + thinking = stream_mod.ThinkingSpinner(console=mock_console) + with thinking: + with thinking.pause(): + pass assert spinner.method_calls == [ call.start(), @@ -83,10 +85,11 @@ def test_print_cli_progress_line_pauses_spinner_before_printing(): spinner = MagicMock() spinner.start.side_effect = lambda: order.append("start") spinner.stop.side_effect = lambda: order.append("stop") + mock_console = MagicMock() + mock_console.status.return_value = spinner - with patch.object(commands.console, "status", return_value=spinner), \ - patch.object(commands.console, "print", side_effect=lambda *_args, **_kwargs: order.append("print")): - thinking = commands._ThinkingSpinner(enabled=True) + with patch.object(commands.console, "print", side_effect=lambda *_args, **_kwargs: order.append("print")): + thinking = stream_mod.ThinkingSpinner(console=mock_console) with thinking: commands._print_cli_progress_line("tool running", thinking) @@ -100,13 +103,14 @@ async def test_print_interactive_progress_line_pauses_spinner_before_printing(): spinner = MagicMock() spinner.start.side_effect = lambda: order.append("start") spinner.stop.side_effect = lambda: order.append("stop") + mock_console = MagicMock() + mock_console.status.return_value = spinner async def fake_print(_text: str) -> None: order.append("print") - with patch.object(commands.console, "status", return_value=spinner), \ - patch("nanobot.cli.commands._print_interactive_line", side_effect=fake_print): - thinking = commands._ThinkingSpinner(enabled=True) + with patch("nanobot.cli.commands._print_interactive_line", side_effect=fake_print): + thinking = stream_mod.ThinkingSpinner(console=mock_console) with thinking: await commands._print_interactive_progress_line("tool running", thinking)