114 lines
4.0 KiB
Python
114 lines
4.0 KiB
Python
import asyncio
|
|
from unittest.mock import AsyncMock, MagicMock, call, patch
|
|
|
|
import pytest
|
|
from prompt_toolkit.formatted_text import HTML
|
|
|
|
from nanobot.cli import commands
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_prompt_session():
|
|
"""Mock the global prompt session."""
|
|
mock_session = MagicMock()
|
|
mock_session.prompt_async = AsyncMock()
|
|
with patch("nanobot.cli.commands._PROMPT_SESSION", mock_session), \
|
|
patch("nanobot.cli.commands.patch_stdout"):
|
|
yield mock_session
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_read_interactive_input_async_returns_input(mock_prompt_session):
|
|
"""Test that _read_interactive_input_async returns the user input from prompt_session."""
|
|
mock_prompt_session.prompt_async.return_value = "hello world"
|
|
|
|
result = await commands._read_interactive_input_async()
|
|
|
|
assert result == "hello world"
|
|
mock_prompt_session.prompt_async.assert_called_once()
|
|
args, _ = mock_prompt_session.prompt_async.call_args
|
|
assert isinstance(args[0], HTML) # Verify HTML prompt is used
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_read_interactive_input_async_handles_eof(mock_prompt_session):
|
|
"""Test that EOFError converts to KeyboardInterrupt."""
|
|
mock_prompt_session.prompt_async.side_effect = EOFError()
|
|
|
|
with pytest.raises(KeyboardInterrupt):
|
|
await commands._read_interactive_input_async()
|
|
|
|
|
|
def test_init_prompt_session_creates_session():
|
|
"""Test that _init_prompt_session initializes the global session."""
|
|
# Ensure global is None before test
|
|
commands._PROMPT_SESSION = None
|
|
|
|
with patch("nanobot.cli.commands.PromptSession") as MockSession, \
|
|
patch("nanobot.cli.commands.FileHistory") as MockHistory, \
|
|
patch("pathlib.Path.home") as mock_home:
|
|
|
|
mock_home.return_value = MagicMock()
|
|
|
|
commands._init_prompt_session()
|
|
|
|
assert commands._PROMPT_SESSION is not None
|
|
MockSession.assert_called_once()
|
|
_, kwargs = MockSession.call_args
|
|
assert kwargs["multiline"] is False
|
|
assert kwargs["enable_open_in_editor"] is False
|
|
|
|
|
|
def test_thinking_spinner_pause_stops_and_restarts():
|
|
"""Pause should stop the active spinner and restart it afterward."""
|
|
spinner = MagicMock()
|
|
|
|
with patch.object(commands.console, "status", return_value=spinner):
|
|
thinking = commands._ThinkingSpinner(enabled=True)
|
|
with thinking:
|
|
with thinking.pause():
|
|
pass
|
|
|
|
assert spinner.method_calls == [
|
|
call.start(),
|
|
call.stop(),
|
|
call.start(),
|
|
call.stop(),
|
|
]
|
|
|
|
|
|
def test_print_cli_progress_line_pauses_spinner_before_printing():
|
|
"""CLI progress output should pause spinner to avoid garbled lines."""
|
|
order: list[str] = []
|
|
spinner = MagicMock()
|
|
spinner.start.side_effect = lambda: order.append("start")
|
|
spinner.stop.side_effect = lambda: order.append("stop")
|
|
|
|
with patch.object(commands.console, "status", return_value=spinner), \
|
|
patch.object(commands.console, "print", side_effect=lambda *_args, **_kwargs: order.append("print")):
|
|
thinking = commands._ThinkingSpinner(enabled=True)
|
|
with thinking:
|
|
commands._print_cli_progress_line("tool running", thinking)
|
|
|
|
assert order == ["start", "stop", "print", "start", "stop"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_print_interactive_progress_line_pauses_spinner_before_printing():
|
|
"""Interactive progress output should also pause spinner cleanly."""
|
|
order: list[str] = []
|
|
spinner = MagicMock()
|
|
spinner.start.side_effect = lambda: order.append("start")
|
|
spinner.stop.side_effect = lambda: order.append("stop")
|
|
|
|
async def fake_print(_text: str) -> None:
|
|
order.append("print")
|
|
|
|
with patch.object(commands.console, "status", return_value=spinner), \
|
|
patch("nanobot.cli.commands._print_interactive_line", side_effect=fake_print):
|
|
thinking = commands._ThinkingSpinner(enabled=True)
|
|
with thinking:
|
|
await commands._print_interactive_progress_line("tool running", thinking)
|
|
|
|
assert order == ["start", "stop", "print", "start", "stop"]
|