feat: stream intermediate progress to user during tool execution

This commit is contained in:
Re-bin
2026-02-18 14:23:51 +00:00
parent ce4f00529e
commit 715b2db24b
4 changed files with 57 additions and 11 deletions

View File

@@ -5,7 +5,8 @@ from contextlib import AsyncExitStack
import json
import json_repair
from pathlib import Path
from typing import Any
import re
from typing import Any, Awaitable, Callable
from loguru import logger
@@ -146,12 +147,34 @@ class AgentLoop:
if isinstance(cron_tool, CronTool):
cron_tool.set_context(channel, chat_id)
async def _run_agent_loop(self, initial_messages: list[dict]) -> tuple[str | None, list[str]]:
@staticmethod
def _strip_think(text: str | None) -> str | None:
"""Remove <think>…</think> blocks that some models embed in content."""
if not text:
return None
return re.sub(r"<think>[\s\S]*?</think>", "", text).strip() or None
@staticmethod
def _tool_hint(tool_calls: list) -> str:
"""Format tool calls as concise hint, e.g. 'web_search("query")'."""
def _fmt(tc):
val = next(iter(tc.arguments.values()), None) if tc.arguments else None
if not isinstance(val, str):
return tc.name
return f'{tc.name}("{val[:40]}")' if len(val) > 40 else f'{tc.name}("{val}")'
return ", ".join(_fmt(tc) for tc in tool_calls)
async def _run_agent_loop(
self,
initial_messages: list[dict],
on_progress: Callable[[str], Awaitable[None]] | None = None,
) -> tuple[str | None, list[str]]:
"""
Run the agent iteration loop.
Args:
initial_messages: Starting messages for the LLM conversation.
on_progress: Optional callback to push intermediate content to the user.
Returns:
Tuple of (final_content, list_of_tools_used).
@@ -173,6 +196,10 @@ class AgentLoop:
)
if response.has_tool_calls:
if on_progress:
clean = self._strip_think(response.content)
await on_progress(clean or self._tool_hint(response.tool_calls))
tool_call_dicts = [
{
"id": tc.id,
@@ -197,9 +224,8 @@ class AgentLoop:
messages = self.context.add_tool_result(
messages, tool_call.id, tool_call.name, result
)
messages.append({"role": "user", "content": "Reflect on the results and decide next steps."})
else:
final_content = response.content
final_content = self._strip_think(response.content)
break
return final_content, tools_used
@@ -244,13 +270,19 @@ class AgentLoop:
self._running = False
logger.info("Agent loop stopping")
async def _process_message(self, msg: InboundMessage, session_key: str | None = None) -> OutboundMessage | None:
async def _process_message(
self,
msg: InboundMessage,
session_key: str | None = None,
on_progress: Callable[[str], Awaitable[None]] | None = None,
) -> OutboundMessage | None:
"""
Process a single inbound message.
Args:
msg: The inbound message to process.
session_key: Override session key (used by process_direct).
on_progress: Optional callback for intermediate output (defaults to bus publish).
Returns:
The response message, or None if no response needed.
@@ -297,7 +329,16 @@ class AgentLoop:
channel=msg.channel,
chat_id=msg.chat_id,
)
final_content, tools_used = await self._run_agent_loop(initial_messages)
async def _bus_progress(content: str) -> None:
await self.bus.publish_outbound(OutboundMessage(
channel=msg.channel, chat_id=msg.chat_id, content=content,
metadata=msg.metadata or {},
))
final_content, tools_used = await self._run_agent_loop(
initial_messages, on_progress=on_progress or _bus_progress,
)
if final_content is None:
final_content = "I've completed processing but have no response to give."
@@ -451,6 +492,7 @@ Respond with ONLY valid JSON, no markdown fences."""
session_key: str = "cli:direct",
channel: str = "cli",
chat_id: str = "direct",
on_progress: Callable[[str], Awaitable[None]] | None = None,
) -> str:
"""
Process a message directly (for CLI or cron usage).
@@ -460,6 +502,7 @@ Respond with ONLY valid JSON, no markdown fences."""
session_key: Session identifier (overrides channel:chat_id for session lookup).
channel: Source channel (for tool context routing).
chat_id: Source chat ID (for tool context routing).
on_progress: Optional callback for intermediate output.
Returns:
The agent's response.
@@ -472,5 +515,5 @@ Respond with ONLY valid JSON, no markdown fences."""
content=content
)
response = await self._process_message(msg, session_key=session_key)
response = await self._process_message(msg, session_key=session_key, on_progress=on_progress)
return response.content if response else ""