import asyncio from unittest.mock import AsyncMock, MagicMock, call, patch import pytest from prompt_toolkit.formatted_text import HTML from nanobot.cli import commands @pytest.fixture def mock_prompt_session(): """Mock the global prompt session.""" mock_session = MagicMock() mock_session.prompt_async = AsyncMock() with patch("nanobot.cli.commands._PROMPT_SESSION", mock_session), \ patch("nanobot.cli.commands.patch_stdout"): yield mock_session @pytest.mark.asyncio async def test_read_interactive_input_async_returns_input(mock_prompt_session): """Test that _read_interactive_input_async returns the user input from prompt_session.""" mock_prompt_session.prompt_async.return_value = "hello world" result = await commands._read_interactive_input_async() assert result == "hello world" mock_prompt_session.prompt_async.assert_called_once() args, _ = mock_prompt_session.prompt_async.call_args assert isinstance(args[0], HTML) # Verify HTML prompt is used @pytest.mark.asyncio async def test_read_interactive_input_async_handles_eof(mock_prompt_session): """Test that EOFError converts to KeyboardInterrupt.""" mock_prompt_session.prompt_async.side_effect = EOFError() with pytest.raises(KeyboardInterrupt): await commands._read_interactive_input_async() def test_init_prompt_session_creates_session(): """Test that _init_prompt_session initializes the global session.""" # Ensure global is None before test commands._PROMPT_SESSION = None with patch("nanobot.cli.commands.PromptSession") as MockSession, \ patch("nanobot.cli.commands.FileHistory") as MockHistory, \ patch("pathlib.Path.home") as mock_home: mock_home.return_value = MagicMock() commands._init_prompt_session() assert commands._PROMPT_SESSION is not None MockSession.assert_called_once() _, kwargs = MockSession.call_args assert kwargs["multiline"] is False assert kwargs["enable_open_in_editor"] is False def test_thinking_spinner_pause_stops_and_restarts(): """Pause should stop the active spinner and restart it afterward.""" spinner = MagicMock() with patch.object(commands.console, "status", return_value=spinner): thinking = commands._ThinkingSpinner(enabled=True) with thinking: with thinking.pause(): pass assert spinner.method_calls == [ call.start(), call.stop(), call.start(), call.stop(), ] def test_print_cli_progress_line_pauses_spinner_before_printing(): """CLI progress output should pause spinner to avoid garbled lines.""" order: list[str] = [] spinner = MagicMock() spinner.start.side_effect = lambda: order.append("start") spinner.stop.side_effect = lambda: order.append("stop") with patch.object(commands.console, "status", return_value=spinner), \ patch.object(commands.console, "print", side_effect=lambda *_args, **_kwargs: order.append("print")): thinking = commands._ThinkingSpinner(enabled=True) with thinking: commands._print_cli_progress_line("tool running", thinking) assert order == ["start", "stop", "print", "start", "stop"] @pytest.mark.asyncio async def test_print_interactive_progress_line_pauses_spinner_before_printing(): """Interactive progress output should also pause spinner cleanly.""" order: list[str] = [] spinner = MagicMock() spinner.start.side_effect = lambda: order.append("start") spinner.stop.side_effect = lambda: order.append("stop") async def fake_print(_text: str) -> None: order.append("print") with patch.object(commands.console, "status", return_value=spinner), \ patch("nanobot.cli.commands._print_interactive_line", side_effect=fake_print): thinking = commands._ThinkingSpinner(enabled=True) with thinking: await commands._print_interactive_progress_line("tool running", thinking) assert order == ["start", "stop", "print", "start", "stop"]