fix(agent): make status command responsive and accurate

Handle /status at the run-loop level so it can return immediately while the agent is busy, and reset last-usage stats when providers omit usage data. Also keep Telegram help/menu coverage for /status without changing the existing final-response send path.

Made-with: Cursor
This commit is contained in:
Xubin Ren
2026-03-21 15:21:32 +00:00
parent 570ca47483
commit 4d1897609d
4 changed files with 125 additions and 44 deletions

View File

@@ -185,6 +185,47 @@ class AgentLoop:
return f'{tc.name}("{val[:40]}")' if len(val) > 40 else f'{tc.name}("{val}")' return f'{tc.name}("{val[:40]}")' if len(val) > 40 else f'{tc.name}("{val}")'
return ", ".join(_fmt(tc) for tc in tool_calls) return ", ".join(_fmt(tc) for tc in tool_calls)
def _build_status_content(self, session: Session) -> str:
"""Build a human-readable runtime status snapshot."""
history = session.get_history(max_messages=0)
msg_count = len(history)
active_subs = self.subagents.get_running_count()
uptime_s = int(time.time() - self._start_time)
uptime = (
f"{uptime_s // 3600}h {(uptime_s % 3600) // 60}m"
if uptime_s >= 3600
else f"{uptime_s // 60}m {uptime_s % 60}s"
)
last_in = self._last_usage.get("prompt_tokens", 0)
last_out = self._last_usage.get("completion_tokens", 0)
ctx_used = last_in
ctx_total_tokens = max(self.context_window_tokens, 0)
ctx_pct = int((ctx_used / ctx_total_tokens) * 100) if ctx_total_tokens > 0 else 0
ctx_used_str = f"{ctx_used // 1000}k" if ctx_used >= 1000 else str(ctx_used)
ctx_total_str = f"{ctx_total_tokens // 1024}k" if ctx_total_tokens > 0 else "n/a"
return "\n".join([
f"🐈 nanobot v{__version__}",
f"🧠 Model: {self.model}",
f"📊 Tokens: {last_in} in / {last_out} out",
f"📚 Context: {ctx_used_str}/{ctx_total_str} ({ctx_pct}%)",
f"💬 Session: {msg_count} messages",
f"👾 Subagents: {active_subs} active",
f"🪢 Queue: {self.bus.inbound.qsize()} pending",
f"⏱ Uptime: {uptime}",
])
def _status_response(self, msg: InboundMessage, session: Session) -> OutboundMessage:
"""Build an outbound status message for a session."""
return OutboundMessage(
channel=msg.channel,
chat_id=msg.chat_id,
content=self._build_status_content(session),
)
async def _run_agent_loop( async def _run_agent_loop(
self, self,
initial_messages: list[dict], initial_messages: list[dict],
@@ -206,11 +247,11 @@ class AgentLoop:
tools=tool_defs, tools=tool_defs,
model=self.model, model=self.model,
) )
if response.usage: usage = response.usage or {}
self._last_usage = { self._last_usage = {
"prompt_tokens": int(response.usage.get("prompt_tokens", 0) or 0), "prompt_tokens": int(usage.get("prompt_tokens", 0) or 0),
"completion_tokens": int(response.usage.get("completion_tokens", 0) or 0), "completion_tokens": int(usage.get("completion_tokens", 0) or 0),
} }
if response.has_tool_calls: if response.has_tool_calls:
if on_progress: if on_progress:
@@ -289,6 +330,9 @@ class AgentLoop:
await self._handle_stop(msg) await self._handle_stop(msg)
elif cmd == "/restart": elif cmd == "/restart":
await self._handle_restart(msg) await self._handle_restart(msg)
elif cmd == "/status":
session = self.sessions.get_or_create(msg.session_key)
await self.bus.publish_outbound(self._status_response(msg, session))
else: else:
task = asyncio.create_task(self._dispatch(msg)) task = asyncio.create_task(self._dispatch(msg))
self._active_tasks.setdefault(msg.session_key, []).append(task) self._active_tasks.setdefault(msg.session_key, []).append(task)
@@ -420,41 +464,7 @@ class AgentLoop:
return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id,
content="New session started.") content="New session started.")
if cmd == "/status": if cmd == "/status":
history = session.get_history(max_messages=0) return self._status_response(msg, session)
msg_count = len(history)
active_subs = self.subagents.get_running_count()
uptime_s = int(time.time() - self._start_time)
uptime = (
f"{uptime_s // 3600}h {(uptime_s % 3600) // 60}m"
if uptime_s >= 3600
else f"{uptime_s // 60}m {uptime_s % 60}s"
)
last_in = self._last_usage.get("prompt_tokens", 0)
last_out = self._last_usage.get("completion_tokens", 0)
ctx_used = last_in
ctx_total_tokens = max(self.context_window_tokens, 0)
ctx_pct = int((ctx_used / ctx_total_tokens) * 100) if ctx_total_tokens > 0 else 0
ctx_used_str = f"{ctx_used // 1000}k" if ctx_used >= 1000 else str(ctx_used)
ctx_total_str = f"{ctx_total_tokens // 1024}k" if ctx_total_tokens > 0 else "n/a"
lines = [
f"🐈 nanobot v{__version__}",
f"🧠 Model: {self.model}",
f"📊 Tokens: {last_in} in / {last_out} out",
f"📚 Context: {ctx_used_str}/{ctx_total_str} ({ctx_pct}%)",
f"💬 Session: {msg_count} messages",
f"👾 Subagents: {active_subs} active",
f"🪢 Queue: {self.bus.inbound.qsize()} pending",
f"⏱ Uptime: {uptime}",
]
return OutboundMessage(
channel=msg.channel,
chat_id=msg.chat_id,
content="\n".join(lines),
)
if cmd == "/help": if cmd == "/help":
lines = [ lines = [
"🐈 nanobot commands:", "🐈 nanobot commands:",

View File

@@ -419,8 +419,11 @@ class TelegramChannel(BaseChannel):
is_progress = msg.metadata.get("_progress", False) is_progress = msg.metadata.get("_progress", False)
for chunk in split_message(msg.content, TELEGRAM_MAX_MESSAGE_LEN): for chunk in split_message(msg.content, TELEGRAM_MAX_MESSAGE_LEN):
# Use plain send for final responses too; draft streaming can create duplicates. # Final response: simulate streaming via draft, then persist.
await self._send_text(chat_id, chunk, reply_params, thread_kwargs) if not is_progress:
await self._send_with_streaming(chat_id, chunk, reply_params, thread_kwargs)
else:
await self._send_text(chat_id, chunk, reply_params, thread_kwargs)
async def _call_with_retry(self, fn, *args, **kwargs): async def _call_with_retry(self, fn, *args, **kwargs):
"""Call an async Telegram API function with retry on pool/network timeout.""" """Call an async Telegram API function with retry on pool/network timeout."""

View File

@@ -3,11 +3,13 @@
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
from unittest.mock import MagicMock, patch import time
from unittest.mock import AsyncMock, MagicMock, patch
import pytest import pytest
from nanobot.bus.events import InboundMessage from nanobot.bus.events import InboundMessage, OutboundMessage
from nanobot.providers.base import LLMResponse
def _make_loop(): def _make_loop():
@@ -65,6 +67,32 @@ class TestRestartCommand:
mock_handle.assert_called_once() mock_handle.assert_called_once()
@pytest.mark.asyncio
async def test_status_intercepted_in_run_loop(self):
"""Verify /status is handled at the run-loop level for immediate replies."""
loop, bus = _make_loop()
msg = InboundMessage(channel="telegram", sender_id="u1", chat_id="c1", content="/status")
with patch.object(loop, "_status_response") as mock_status:
mock_status.return_value = OutboundMessage(
channel="telegram", chat_id="c1", content="status ok"
)
await bus.publish_inbound(msg)
loop._running = True
run_task = asyncio.create_task(loop.run())
await asyncio.sleep(0.1)
loop._running = False
run_task.cancel()
try:
await run_task
except asyncio.CancelledError:
pass
mock_status.assert_called_once()
out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0)
assert out.content == "status ok"
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_run_propagates_external_cancellation(self): async def test_run_propagates_external_cancellation(self):
"""External task cancellation should not be swallowed by the inbound wait loop.""" """External task cancellation should not be swallowed by the inbound wait loop."""
@@ -86,3 +114,41 @@ class TestRestartCommand:
assert response is not None assert response is not None
assert "/restart" in response.content assert "/restart" in response.content
assert "/status" in response.content
@pytest.mark.asyncio
async def test_status_reports_runtime_info(self):
loop, _bus = _make_loop()
session = MagicMock()
session.get_history.return_value = [{"role": "user"}] * 3
loop.sessions.get_or_create.return_value = session
loop.subagents.get_running_count.return_value = 2
loop._start_time = time.time() - 125
loop._last_usage = {"prompt_tokens": 1200, "completion_tokens": 34}
msg = InboundMessage(channel="telegram", sender_id="u1", chat_id="c1", content="/status")
response = await loop._process_message(msg)
assert response is not None
assert "Model: test-model" in response.content
assert "Tokens: 1200 in / 34 out" in response.content
assert "Context: 1k/64k (1%)" in response.content
assert "Session: 3 messages" in response.content
assert "Subagents: 2 active" in response.content
assert "Queue: 0 pending" in response.content
assert "Uptime: 2m 5s" in response.content
@pytest.mark.asyncio
async def test_run_agent_loop_resets_usage_when_provider_omits_it(self):
loop, _bus = _make_loop()
loop.provider.chat_with_retry = AsyncMock(side_effect=[
LLMResponse(content="first", usage={"prompt_tokens": 9, "completion_tokens": 4}),
LLMResponse(content="second", usage={}),
])
await loop._run_agent_loop([])
assert loop._last_usage == {"prompt_tokens": 9, "completion_tokens": 4}
await loop._run_agent_loop([])
assert loop._last_usage == {"prompt_tokens": 0, "completion_tokens": 0}

View File

@@ -177,6 +177,7 @@ async def test_start_creates_separate_pools_with_proxy(monkeypatch) -> None:
assert poll_req.kwargs["connection_pool_size"] == 4 assert poll_req.kwargs["connection_pool_size"] == 4
assert builder.request_value is api_req assert builder.request_value is api_req
assert builder.get_updates_request_value is poll_req assert builder.get_updates_request_value is poll_req
assert any(cmd.command == "status" for cmd in app.bot.commands)
@pytest.mark.asyncio @pytest.mark.asyncio
@@ -836,3 +837,4 @@ async def test_on_help_includes_restart_command() -> None:
update.message.reply_text.assert_awaited_once() update.message.reply_text.assert_awaited_once()
help_text = update.message.reply_text.await_args.args[0] help_text = update.message.reply_text.await_args.args[0]
assert "/restart" in help_text assert "/restart" in help_text
assert "/status" in help_text