Merge PR #1985: feat: add /status command to show runtime info

feat: add /status command to show runtime info
This commit is contained in:
Xubin Ren
2026-03-22 00:11:34 +08:00
committed by GitHub
6 changed files with 273 additions and 17 deletions

View File

@@ -7,12 +7,14 @@ import json
import os
import re
import sys
import time
from contextlib import AsyncExitStack
from pathlib import Path
from typing import TYPE_CHECKING, Any, Awaitable, Callable
from loguru import logger
from nanobot import __version__
from nanobot.agent.context import ContextBuilder
from nanobot.agent.memory import MemoryConsolidator
from nanobot.agent.subagent import SubagentManager
@@ -79,6 +81,8 @@ class AgentLoop:
self.exec_config = exec_config or ExecToolConfig()
self.cron_service = cron_service
self.restrict_to_workspace = restrict_to_workspace
self._start_time = time.time()
self._last_usage: dict[str, int] = {}
self.context = ContextBuilder(workspace)
self.sessions = session_manager or SessionManager(workspace)
@@ -181,6 +185,51 @@ class AgentLoop:
return f'{tc.name}("{val[:40]}")' if len(val) > 40 else f'{tc.name}("{val}")'
return ", ".join(_fmt(tc) for tc in tool_calls)
def _build_status_content(self, session: Session) -> str:
"""Build a human-readable runtime status snapshot."""
history = session.get_history(max_messages=0)
msg_count = len(history)
uptime_s = int(time.time() - self._start_time)
uptime = (
f"{uptime_s // 3600}h {(uptime_s % 3600) // 60}m"
if uptime_s >= 3600
else f"{uptime_s // 60}m {uptime_s % 60}s"
)
last_in = self._last_usage.get("prompt_tokens", 0)
last_out = self._last_usage.get("completion_tokens", 0)
ctx_used = 0
try:
ctx_used, _ = self.memory_consolidator.estimate_session_prompt_tokens(session)
except Exception:
ctx_used = 0
if ctx_used <= 0:
ctx_used = last_in
ctx_total_tokens = max(self.context_window_tokens, 0)
ctx_pct = int((ctx_used / ctx_total_tokens) * 100) if ctx_total_tokens > 0 else 0
ctx_used_str = f"{ctx_used // 1000}k" if ctx_used >= 1000 else str(ctx_used)
ctx_total_str = f"{ctx_total_tokens // 1024}k" if ctx_total_tokens > 0 else "n/a"
return "\n".join([
f"🐈 nanobot v{__version__}",
f"🧠 Model: {self.model}",
f"📊 Tokens: {last_in} in / {last_out} out",
f"📚 Context: {ctx_used_str}/{ctx_total_str} ({ctx_pct}%)",
f"💬 Session: {msg_count} messages",
f"⏱ Uptime: {uptime}",
])
def _status_response(self, msg: InboundMessage, session: Session) -> OutboundMessage:
"""Build an outbound status message for a session."""
return OutboundMessage(
channel=msg.channel,
chat_id=msg.chat_id,
content=self._build_status_content(session),
metadata={"render_as": "text"},
)
async def _run_agent_loop(
self,
initial_messages: list[dict],
@@ -202,6 +251,11 @@ class AgentLoop:
tools=tool_defs,
model=self.model,
)
usage = response.usage or {}
self._last_usage = {
"prompt_tokens": int(usage.get("prompt_tokens", 0) or 0),
"completion_tokens": int(usage.get("completion_tokens", 0) or 0),
}
if response.has_tool_calls:
if on_progress:
@@ -280,6 +334,9 @@ class AgentLoop:
await self._handle_stop(msg)
elif cmd == "/restart":
await self._handle_restart(msg)
elif cmd == "/status":
session = self.sessions.get_or_create(msg.session_key)
await self.bus.publish_outbound(self._status_response(msg, session))
else:
task = asyncio.create_task(self._dispatch(msg))
self._active_tasks.setdefault(msg.session_key, []).append(task)
@@ -410,16 +467,22 @@ class AgentLoop:
return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id,
content="New session started.")
if cmd == "/status":
return self._status_response(msg, session)
if cmd == "/help":
lines = [
"🐈 nanobot commands:",
"/new — Start a new conversation",
"/stop — Stop the current task",
"/restart — Restart the bot",
"/status — Show bot status",
"/help — Show available commands",
]
return OutboundMessage(
channel=msg.channel, chat_id=msg.chat_id, content="\n".join(lines),
channel=msg.channel,
chat_id=msg.chat_id,
content="\n".join(lines),
metadata={"render_as": "text"},
)
await self.memory_consolidator.maybe_consolidate_by_tokens(session)
@@ -544,6 +607,19 @@ class AgentLoop:
session.messages.append(entry)
session.updated_at = datetime.now()
async def process_direct_outbound(
self,
content: str,
session_key: str = "cli:direct",
channel: str = "cli",
chat_id: str = "direct",
on_progress: Callable[[str], Awaitable[None]] | None = None,
) -> OutboundMessage | None:
"""Process a message directly and return the outbound payload."""
await self._connect_mcp()
msg = InboundMessage(channel=channel, sender_id="user", chat_id=chat_id, content=content)
return await self._process_message(msg, session_key=session_key, on_progress=on_progress)
async def process_direct(
self,
content: str,
@@ -553,7 +629,11 @@ class AgentLoop:
on_progress: Callable[[str], Awaitable[None]] | None = None,
) -> str:
"""Process a message directly (for CLI or cron usage)."""
await self._connect_mcp()
msg = InboundMessage(channel=channel, sender_id="user", chat_id=chat_id, content=content)
response = await self._process_message(msg, session_key=session_key, on_progress=on_progress)
response = await self.process_direct_outbound(
content,
session_key=session_key,
channel=channel,
chat_id=chat_id,
on_progress=on_progress,
)
return response.content if response else ""

View File

@@ -186,6 +186,7 @@ class TelegramChannel(BaseChannel):
BotCommand("stop", "Stop the current task"),
BotCommand("help", "Show available commands"),
BotCommand("restart", "Restart the bot"),
BotCommand("status", "Show bot status"),
]
@classmethod
@@ -264,6 +265,7 @@ class TelegramChannel(BaseChannel):
self._app.add_handler(CommandHandler("new", self._forward_command))
self._app.add_handler(CommandHandler("stop", self._forward_command))
self._app.add_handler(CommandHandler("restart", self._forward_command))
self._app.add_handler(CommandHandler("status", self._forward_command))
self._app.add_handler(CommandHandler("help", self._on_help))
# Add message handler for text, photos, voice, documents
@@ -417,7 +419,7 @@ class TelegramChannel(BaseChannel):
is_progress = msg.metadata.get("_progress", False)
for chunk in split_message(msg.content, TELEGRAM_MAX_MESSAGE_LEN):
# Final response: simulate streaming via draft, then persist
# Final response: simulate streaming via draft, then persist.
if not is_progress:
await self._send_with_streaming(chat_id, chunk, reply_params, thread_kwargs)
else:
@@ -512,6 +514,7 @@ class TelegramChannel(BaseChannel):
"/new — Start a new conversation\n"
"/stop — Stop the current task\n"
"/restart — Restart the bot\n"
"/status — Show bot status\n"
"/help — Show available commands"
)

View File

@@ -2,6 +2,7 @@
import asyncio
from contextlib import contextmanager, nullcontext
import inspect
import os
import select
import signal
@@ -131,17 +132,30 @@ def _render_interactive_ansi(render_fn) -> str:
return capture.get()
def _print_agent_response(response: str, render_markdown: bool) -> None:
def _print_agent_response(
response: str,
render_markdown: bool,
metadata: dict | None = None,
) -> None:
"""Render assistant response with consistent terminal styling."""
console = _make_console()
content = response or ""
body = Markdown(content) if render_markdown else Text(content)
body = _response_renderable(content, render_markdown, metadata)
console.print()
console.print(f"[cyan]{__logo__} nanobot[/cyan]")
console.print(body)
console.print()
def _response_renderable(content: str, render_markdown: bool, metadata: dict | None = None):
"""Render plain-text command output without markdown collapsing newlines."""
if not render_markdown:
return Text(content)
if (metadata or {}).get("render_as") == "text":
return Text(content)
return Markdown(content)
async def _print_interactive_line(text: str) -> None:
"""Print async interactive updates with prompt_toolkit-safe Rich styling."""
def _write() -> None:
@@ -153,7 +167,11 @@ async def _print_interactive_line(text: str) -> None:
await run_in_terminal(_write)
async def _print_interactive_response(response: str, render_markdown: bool) -> None:
async def _print_interactive_response(
response: str,
render_markdown: bool,
metadata: dict | None = None,
) -> None:
"""Print async interactive replies with prompt_toolkit-safe Rich styling."""
def _write() -> None:
content = response or ""
@@ -161,7 +179,7 @@ async def _print_interactive_response(response: str, render_markdown: bool) -> N
lambda c: (
c.print(),
c.print(f"[cyan]{__logo__} nanobot[/cyan]"),
c.print(Markdown(content) if render_markdown else Text(content)),
c.print(_response_renderable(content, render_markdown, metadata)),
c.print(),
)
)
@@ -750,9 +768,27 @@ def agent(
nonlocal _thinking
_thinking = _ThinkingSpinner(enabled=not logs)
with _thinking:
response = await agent_loop.process_direct(message, session_id, on_progress=_cli_progress)
direct_outbound = getattr(agent_loop, "process_direct_outbound", None)
if inspect.iscoroutinefunction(direct_outbound):
response = await agent_loop.process_direct_outbound(
message,
session_id,
on_progress=_cli_progress,
)
response_content = response.content if response else ""
response_meta = response.metadata if response else None
else:
response_content = await agent_loop.process_direct(
message,
session_id,
on_progress=_cli_progress,
)
response_meta = None
_thinking = None
_print_agent_response(response, render_markdown=markdown)
kwargs = {"render_markdown": markdown}
if response_meta is not None:
kwargs["metadata"] = response_meta
_print_agent_response(response_content, **kwargs)
await agent_loop.close_mcp()
asyncio.run(run_once())
@@ -787,7 +823,7 @@ def agent(
bus_task = asyncio.create_task(agent_loop.run())
turn_done = asyncio.Event()
turn_done.set()
turn_response: list[str] = []
turn_response: list[tuple[str, dict]] = []
async def _consume_outbound():
while True:
@@ -805,10 +841,14 @@ def agent(
elif not turn_done.is_set():
if msg.content:
turn_response.append(msg.content)
turn_response.append((msg.content, dict(msg.metadata or {})))
turn_done.set()
elif msg.content:
await _print_interactive_response(msg.content, render_markdown=markdown)
await _print_interactive_response(
msg.content,
render_markdown=markdown,
metadata=msg.metadata,
)
except asyncio.TimeoutError:
continue
@@ -848,7 +888,8 @@ def agent(
_thinking = None
if turn_response:
_print_agent_response(turn_response[0], render_markdown=markdown)
content, meta = turn_response[0]
_print_agent_response(content, render_markdown=markdown, metadata=meta)
except KeyboardInterrupt:
_restore_terminal()
console.print("\nGoodbye!")

View File

@@ -111,3 +111,33 @@ async def test_print_interactive_progress_line_pauses_spinner_before_printing():
await commands._print_interactive_progress_line("tool running", thinking)
assert order == ["start", "stop", "print", "start", "stop"]
def test_response_renderable_uses_text_for_explicit_plain_rendering():
status = (
"🐈 nanobot v0.1.4.post5\n"
"🧠 Model: MiniMax-M2.7\n"
"📊 Tokens: 20639 in / 29 out"
)
renderable = commands._response_renderable(
status,
render_markdown=True,
metadata={"render_as": "text"},
)
assert renderable.__class__.__name__ == "Text"
def test_response_renderable_preserves_normal_markdown_rendering():
renderable = commands._response_renderable("**bold**", render_markdown=True)
assert renderable.__class__.__name__ == "Markdown"
def test_response_renderable_without_metadata_keeps_markdown_path():
help_text = "🐈 nanobot commands:\n/status — Show bot status\n/help — Show available commands"
renderable = commands._response_renderable(help_text, render_markdown=True)
assert renderable.__class__.__name__ == "Markdown"

View File

@@ -3,11 +3,13 @@
from __future__ import annotations
import asyncio
from unittest.mock import MagicMock, patch
import time
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from nanobot.bus.events import InboundMessage
from nanobot.bus.events import InboundMessage, OutboundMessage
from nanobot.providers.base import LLMResponse
def _make_loop():
@@ -65,6 +67,32 @@ class TestRestartCommand:
mock_handle.assert_called_once()
@pytest.mark.asyncio
async def test_status_intercepted_in_run_loop(self):
"""Verify /status is handled at the run-loop level for immediate replies."""
loop, bus = _make_loop()
msg = InboundMessage(channel="telegram", sender_id="u1", chat_id="c1", content="/status")
with patch.object(loop, "_status_response") as mock_status:
mock_status.return_value = OutboundMessage(
channel="telegram", chat_id="c1", content="status ok"
)
await bus.publish_inbound(msg)
loop._running = True
run_task = asyncio.create_task(loop.run())
await asyncio.sleep(0.1)
loop._running = False
run_task.cancel()
try:
await run_task
except asyncio.CancelledError:
pass
mock_status.assert_called_once()
out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0)
assert out.content == "status ok"
@pytest.mark.asyncio
async def test_run_propagates_external_cancellation(self):
"""External task cancellation should not be swallowed by the inbound wait loop."""
@@ -86,3 +114,75 @@ class TestRestartCommand:
assert response is not None
assert "/restart" in response.content
assert "/status" in response.content
assert response.metadata == {"render_as": "text"}
@pytest.mark.asyncio
async def test_status_reports_runtime_info(self):
loop, _bus = _make_loop()
session = MagicMock()
session.get_history.return_value = [{"role": "user"}] * 3
loop.sessions.get_or_create.return_value = session
loop._start_time = time.time() - 125
loop._last_usage = {"prompt_tokens": 0, "completion_tokens": 0}
loop.memory_consolidator.estimate_session_prompt_tokens = MagicMock(
return_value=(20500, "tiktoken")
)
msg = InboundMessage(channel="telegram", sender_id="u1", chat_id="c1", content="/status")
response = await loop._process_message(msg)
assert response is not None
assert "Model: test-model" in response.content
assert "Tokens: 0 in / 0 out" in response.content
assert "Context: 20k/64k (31%)" in response.content
assert "Session: 3 messages" in response.content
assert "Uptime: 2m 5s" in response.content
assert response.metadata == {"render_as": "text"}
@pytest.mark.asyncio
async def test_run_agent_loop_resets_usage_when_provider_omits_it(self):
loop, _bus = _make_loop()
loop.provider.chat_with_retry = AsyncMock(side_effect=[
LLMResponse(content="first", usage={"prompt_tokens": 9, "completion_tokens": 4}),
LLMResponse(content="second", usage={}),
])
await loop._run_agent_loop([])
assert loop._last_usage == {"prompt_tokens": 9, "completion_tokens": 4}
await loop._run_agent_loop([])
assert loop._last_usage == {"prompt_tokens": 0, "completion_tokens": 0}
@pytest.mark.asyncio
async def test_status_falls_back_to_last_usage_when_context_estimate_missing(self):
loop, _bus = _make_loop()
session = MagicMock()
session.get_history.return_value = [{"role": "user"}]
loop.sessions.get_or_create.return_value = session
loop._last_usage = {"prompt_tokens": 1200, "completion_tokens": 34}
loop.memory_consolidator.estimate_session_prompt_tokens = MagicMock(
return_value=(0, "none")
)
response = await loop._process_message(
InboundMessage(channel="telegram", sender_id="u1", chat_id="c1", content="/status")
)
assert response is not None
assert "Tokens: 1200 in / 34 out" in response.content
assert "Context: 1k/64k (1%)" in response.content
@pytest.mark.asyncio
async def test_process_direct_outbound_preserves_render_metadata(self):
loop, _bus = _make_loop()
session = MagicMock()
session.get_history.return_value = []
loop.sessions.get_or_create.return_value = session
loop.subagents.get_running_count.return_value = 0
response = await loop.process_direct_outbound("/status", session_key="cli:test")
assert response is not None
assert response.metadata == {"render_as": "text"}

View File

@@ -177,6 +177,7 @@ async def test_start_creates_separate_pools_with_proxy(monkeypatch) -> None:
assert poll_req.kwargs["connection_pool_size"] == 4
assert builder.request_value is api_req
assert builder.get_updates_request_value is poll_req
assert any(cmd.command == "status" for cmd in app.bot.commands)
@pytest.mark.asyncio
@@ -836,3 +837,4 @@ async def test_on_help_includes_restart_command() -> None:
update.message.reply_text.assert_awaited_once()
help_text = update.message.reply_text.await_args.args[0]
assert "/restart" in help_text
assert "/status" in help_text