Merge process_direct() and process_direct_outbound() into a single interface returning OutboundMessage | None. This eliminates the dual-path detection logic in CLI single-message mode that relied on inspect.iscoroutinefunction to distinguish between the two APIs. Extract status rendering into a pure function build_status_content() in utils/helpers.py, decoupling it from AgentLoop internals. Made-with: Cursor
189 lines
7.0 KiB
Python
189 lines
7.0 KiB
Python
"""Tests for /restart slash command."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import time
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from nanobot.bus.events import InboundMessage, OutboundMessage
|
|
from nanobot.providers.base import LLMResponse
|
|
|
|
|
|
def _make_loop():
|
|
"""Create a minimal AgentLoop with mocked dependencies."""
|
|
from nanobot.agent.loop import AgentLoop
|
|
from nanobot.bus.queue import MessageBus
|
|
|
|
bus = MessageBus()
|
|
provider = MagicMock()
|
|
provider.get_default_model.return_value = "test-model"
|
|
workspace = MagicMock()
|
|
workspace.__truediv__ = MagicMock(return_value=MagicMock())
|
|
|
|
with patch("nanobot.agent.loop.ContextBuilder"), \
|
|
patch("nanobot.agent.loop.SessionManager"), \
|
|
patch("nanobot.agent.loop.SubagentManager"):
|
|
loop = AgentLoop(bus=bus, provider=provider, workspace=workspace)
|
|
return loop, bus
|
|
|
|
|
|
class TestRestartCommand:
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_restart_sends_message_and_calls_execv(self):
|
|
loop, bus = _make_loop()
|
|
msg = InboundMessage(channel="cli", sender_id="user", chat_id="direct", content="/restart")
|
|
|
|
with patch("nanobot.agent.loop.os.execv") as mock_execv:
|
|
await loop._handle_restart(msg)
|
|
out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0)
|
|
assert "Restarting" in out.content
|
|
|
|
await asyncio.sleep(1.5)
|
|
mock_execv.assert_called_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_restart_intercepted_in_run_loop(self):
|
|
"""Verify /restart is handled at the run-loop level, not inside _dispatch."""
|
|
loop, bus = _make_loop()
|
|
msg = InboundMessage(channel="telegram", sender_id="u1", chat_id="c1", content="/restart")
|
|
|
|
with patch.object(loop, "_handle_restart") as mock_handle:
|
|
mock_handle.return_value = None
|
|
await bus.publish_inbound(msg)
|
|
|
|
loop._running = True
|
|
run_task = asyncio.create_task(loop.run())
|
|
await asyncio.sleep(0.1)
|
|
loop._running = False
|
|
run_task.cancel()
|
|
try:
|
|
await run_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
mock_handle.assert_called_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_status_intercepted_in_run_loop(self):
|
|
"""Verify /status is handled at the run-loop level for immediate replies."""
|
|
loop, bus = _make_loop()
|
|
msg = InboundMessage(channel="telegram", sender_id="u1", chat_id="c1", content="/status")
|
|
|
|
with patch.object(loop, "_status_response") as mock_status:
|
|
mock_status.return_value = OutboundMessage(
|
|
channel="telegram", chat_id="c1", content="status ok"
|
|
)
|
|
await bus.publish_inbound(msg)
|
|
|
|
loop._running = True
|
|
run_task = asyncio.create_task(loop.run())
|
|
await asyncio.sleep(0.1)
|
|
loop._running = False
|
|
run_task.cancel()
|
|
try:
|
|
await run_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
mock_status.assert_called_once()
|
|
out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0)
|
|
assert out.content == "status ok"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_propagates_external_cancellation(self):
|
|
"""External task cancellation should not be swallowed by the inbound wait loop."""
|
|
loop, _bus = _make_loop()
|
|
|
|
run_task = asyncio.create_task(loop.run())
|
|
await asyncio.sleep(0.1)
|
|
run_task.cancel()
|
|
|
|
with pytest.raises(asyncio.CancelledError):
|
|
await asyncio.wait_for(run_task, timeout=1.0)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_help_includes_restart(self):
|
|
loop, bus = _make_loop()
|
|
msg = InboundMessage(channel="telegram", sender_id="u1", chat_id="c1", content="/help")
|
|
|
|
response = await loop._process_message(msg)
|
|
|
|
assert response is not None
|
|
assert "/restart" in response.content
|
|
assert "/status" in response.content
|
|
assert response.metadata == {"render_as": "text"}
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_status_reports_runtime_info(self):
|
|
loop, _bus = _make_loop()
|
|
session = MagicMock()
|
|
session.get_history.return_value = [{"role": "user"}] * 3
|
|
loop.sessions.get_or_create.return_value = session
|
|
loop._start_time = time.time() - 125
|
|
loop._last_usage = {"prompt_tokens": 0, "completion_tokens": 0}
|
|
loop.memory_consolidator.estimate_session_prompt_tokens = MagicMock(
|
|
return_value=(20500, "tiktoken")
|
|
)
|
|
|
|
msg = InboundMessage(channel="telegram", sender_id="u1", chat_id="c1", content="/status")
|
|
|
|
response = await loop._process_message(msg)
|
|
|
|
assert response is not None
|
|
assert "Model: test-model" in response.content
|
|
assert "Tokens: 0 in / 0 out" in response.content
|
|
assert "Context: 20k/64k (31%)" in response.content
|
|
assert "Session: 3 messages" in response.content
|
|
assert "Uptime: 2m 5s" in response.content
|
|
assert response.metadata == {"render_as": "text"}
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_agent_loop_resets_usage_when_provider_omits_it(self):
|
|
loop, _bus = _make_loop()
|
|
loop.provider.chat_with_retry = AsyncMock(side_effect=[
|
|
LLMResponse(content="first", usage={"prompt_tokens": 9, "completion_tokens": 4}),
|
|
LLMResponse(content="second", usage={}),
|
|
])
|
|
|
|
await loop._run_agent_loop([])
|
|
assert loop._last_usage == {"prompt_tokens": 9, "completion_tokens": 4}
|
|
|
|
await loop._run_agent_loop([])
|
|
assert loop._last_usage == {"prompt_tokens": 0, "completion_tokens": 0}
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_status_falls_back_to_last_usage_when_context_estimate_missing(self):
|
|
loop, _bus = _make_loop()
|
|
session = MagicMock()
|
|
session.get_history.return_value = [{"role": "user"}]
|
|
loop.sessions.get_or_create.return_value = session
|
|
loop._last_usage = {"prompt_tokens": 1200, "completion_tokens": 34}
|
|
loop.memory_consolidator.estimate_session_prompt_tokens = MagicMock(
|
|
return_value=(0, "none")
|
|
)
|
|
|
|
response = await loop._process_message(
|
|
InboundMessage(channel="telegram", sender_id="u1", chat_id="c1", content="/status")
|
|
)
|
|
|
|
assert response is not None
|
|
assert "Tokens: 1200 in / 34 out" in response.content
|
|
assert "Context: 1k/64k (1%)" in response.content
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_process_direct_preserves_render_metadata(self):
|
|
loop, _bus = _make_loop()
|
|
session = MagicMock()
|
|
session.get_history.return_value = []
|
|
loop.sessions.get_or_create.return_value = session
|
|
loop.subagents.get_running_count.return_value = 0
|
|
|
|
response = await loop.process_direct("/status", session_key="cli:test")
|
|
|
|
assert response is not None
|
|
assert response.metadata == {"render_as": "text"}
|