From 9d5e511a6e69a2735f65a7959350c991f2d5bd4b Mon Sep 17 00:00:00 2001 From: Xubin Ren Date: Sun, 22 Mar 2026 17:33:09 +0000 Subject: [PATCH] feat(streaming): centralize think-tag filtering and add Telegram streaming - Add strip_think() to helpers.py as single source of truth - Filter deltas in agent loop before dispatching to consumers - Implement send_delta in TelegramChannel with progressive edit_message_text - Remove duplicate think filtering from CLI stream.py and telegram.py - Remove legacy fake streaming (send_message_draft) from Telegram - Default Telegram streaming to true - Update CHANNEL_PLUGIN_GUIDE.md with streaming documentation Made-with: Cursor --- docs/CHANNEL_PLUGIN_GUIDE.md | 100 +++++++++++++++++++++++++++++++++- nanobot/agent/loop.py | 22 +++++++- nanobot/channels/telegram.py | 103 +++++++++++++++++++++++++---------- nanobot/cli/stream.py | 15 ++--- nanobot/utils/helpers.py | 7 +++ 5 files changed, 204 insertions(+), 43 deletions(-) diff --git a/docs/CHANNEL_PLUGIN_GUIDE.md b/docs/CHANNEL_PLUGIN_GUIDE.md index a23ea07..575cad6 100644 --- a/docs/CHANNEL_PLUGIN_GUIDE.md +++ b/docs/CHANNEL_PLUGIN_GUIDE.md @@ -182,12 +182,19 @@ The agent receives the message and processes it. Replies arrive in your `send()` | Method / Property | Description | |-------------------|-------------| -| `_handle_message(sender_id, chat_id, content, media?, metadata?, session_key?)` | **Call this when you receive a message.** Checks `is_allowed()`, then publishes to the bus. | +| `_handle_message(sender_id, chat_id, content, media?, metadata?, session_key?)` | **Call this when you receive a message.** Checks `is_allowed()`, then publishes to the bus. Automatically sets `_wants_stream` if `supports_streaming` is true. | | `is_allowed(sender_id)` | Checks against `config["allowFrom"]`; `"*"` allows all, `[]` denies all. | | `default_config()` (classmethod) | Returns default config dict for `nanobot onboard`. Override to declare your fields. | | `transcribe_audio(file_path)` | Transcribes audio via Groq Whisper (if configured). | +| `supports_streaming` (property) | `True` when config has `"streaming": true` **and** subclass overrides `send_delta()`. | | `is_running` | Returns `self._running`. | +### Optional (streaming) + +| Method | Description | +|--------|-------------| +| `async send_delta(chat_id, delta, metadata?)` | Override to receive streaming chunks. See [Streaming Support](#streaming-support) for details. | + ### Message Types ```python @@ -201,6 +208,97 @@ class OutboundMessage: # "message_id" for reply threading ``` +## Streaming Support + +Channels can opt into real-time streaming — the agent sends content token-by-token instead of one final message. This is entirely optional; channels work fine without it. + +### How It Works + +When **both** conditions are met, the agent streams content through your channel: + +1. Config has `"streaming": true` +2. Your subclass overrides `send_delta()` + +If either is missing, the agent falls back to the normal one-shot `send()` path. + +### Implementing `send_delta` + +Override `send_delta` to handle two types of calls: + +```python +async def send_delta(self, chat_id: str, delta: str, metadata: dict[str, Any] | None = None) -> None: + meta = metadata or {} + + if meta.get("_stream_end"): + # Streaming finished — do final formatting, cleanup, etc. + return + + # Regular delta — append text, update the message on screen + # delta contains a small chunk of text (a few tokens) +``` + +**Metadata flags:** + +| Flag | Meaning | +|------|---------| +| `_stream_delta: True` | A content chunk (delta contains the new text) | +| `_stream_end: True` | Streaming finished (delta is empty) | +| `_resuming: True` | More streaming rounds coming (e.g. tool call then another response) | + +### Example: Webhook with Streaming + +```python +class WebhookChannel(BaseChannel): + name = "webhook" + display_name = "Webhook" + + def __init__(self, config, bus): + super().__init__(config, bus) + self._buffers: dict[str, str] = {} + + async def send_delta(self, chat_id: str, delta: str, metadata: dict[str, Any] | None = None) -> None: + meta = metadata or {} + if meta.get("_stream_end"): + text = self._buffers.pop(chat_id, "") + # Final delivery — format and send the complete message + await self._deliver(chat_id, text, final=True) + return + + self._buffers.setdefault(chat_id, "") + self._buffers[chat_id] += delta + # Incremental update — push partial text to the client + await self._deliver(chat_id, self._buffers[chat_id], final=False) + + async def send(self, msg: OutboundMessage) -> None: + # Non-streaming path — unchanged + await self._deliver(msg.chat_id, msg.content, final=True) +``` + +### Config + +Enable streaming per channel: + +```json +{ + "channels": { + "webhook": { + "enabled": true, + "streaming": true, + "allowFrom": ["*"] + } + } +} +``` + +When `streaming` is `false` (default) or omitted, only `send()` is called — no streaming overhead. + +### BaseChannel Streaming API + +| Method / Property | Description | +|-------------------|-------------| +| `async send_delta(chat_id, delta, metadata?)` | Override to handle streaming chunks. No-op by default. | +| `supports_streaming` (property) | Returns `True` when config has `streaming: true` **and** subclass overrides `send_delta`. | + ## Config Your channel receives config as a plain `dict`. Access fields with `.get()`: diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index 1bbb7cf..6cf2ec3 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -173,7 +173,8 @@ class AgentLoop: """Remove blocks that some models embed in content.""" if not text: return None - return re.sub(r"[\s\S]*?", "", text).strip() or None + from nanobot.utils.helpers import strip_think + return strip_think(text) or None @staticmethod def _tool_hint(tool_calls: list) -> str: @@ -227,6 +228,21 @@ class AgentLoop: final_content = None tools_used: list[str] = [] + # Wrap on_stream with stateful think-tag filter so downstream + # consumers (CLI, channels) never see blocks. + _raw_stream = on_stream + _stream_buf = "" + + async def _filtered_stream(delta: str) -> None: + nonlocal _stream_buf + from nanobot.utils.helpers import strip_think + prev_clean = strip_think(_stream_buf) + _stream_buf += delta + new_clean = strip_think(_stream_buf) + incremental = new_clean[len(prev_clean):] + if incremental and _raw_stream: + await _raw_stream(incremental) + while iteration < self.max_iterations: iteration += 1 @@ -237,7 +253,7 @@ class AgentLoop: messages=messages, tools=tool_defs, model=self.model, - on_content_delta=on_stream, + on_content_delta=_filtered_stream, ) else: response = await self.provider.chat_with_retry( @@ -255,6 +271,7 @@ class AgentLoop: if response.has_tool_calls: if on_stream and on_stream_end: await on_stream_end(resuming=True) + _stream_buf = "" if on_progress: if not on_stream: @@ -286,6 +303,7 @@ class AgentLoop: else: if on_stream and on_stream_end: await on_stream_end(resuming=False) + _stream_buf = "" clean = self._strip_think(response.content) if response.finish_reason == "error": diff --git a/nanobot/channels/telegram.py b/nanobot/channels/telegram.py index fc2e47d..850e09c 100644 --- a/nanobot/channels/telegram.py +++ b/nanobot/channels/telegram.py @@ -6,6 +6,7 @@ import asyncio import re import time import unicodedata +from dataclasses import dataclass, field from typing import Any, Literal from loguru import logger @@ -156,6 +157,14 @@ _SEND_MAX_RETRIES = 3 _SEND_RETRY_BASE_DELAY = 0.5 # seconds, doubled each retry +@dataclass +class _StreamBuf: + """Per-chat streaming accumulator for progressive message editing.""" + text: str = "" + message_id: int | None = None + last_edit: float = 0.0 + + class TelegramConfig(Base): """Telegram channel configuration.""" @@ -167,6 +176,7 @@ class TelegramConfig(Base): group_policy: Literal["open", "mention"] = "mention" connection_pool_size: int = 32 pool_timeout: float = 5.0 + streaming: bool = True class TelegramChannel(BaseChannel): @@ -193,6 +203,8 @@ class TelegramChannel(BaseChannel): def default_config(cls) -> dict[str, Any]: return TelegramConfig().model_dump(by_alias=True) + _STREAM_EDIT_INTERVAL = 0.6 # min seconds between edit_message_text calls + def __init__(self, config: Any, bus: MessageBus): if isinstance(config, dict): config = TelegramConfig.model_validate(config) @@ -206,6 +218,7 @@ class TelegramChannel(BaseChannel): self._message_threads: dict[tuple[str, int], int] = {} self._bot_user_id: int | None = None self._bot_username: str | None = None + self._stream_bufs: dict[str, _StreamBuf] = {} # chat_id -> streaming state def is_allowed(self, sender_id: str) -> bool: """Preserve Telegram's legacy id|username allowlist matching.""" @@ -416,14 +429,8 @@ class TelegramChannel(BaseChannel): # Send text content if msg.content and msg.content != "[empty message]": - is_progress = msg.metadata.get("_progress", False) - for chunk in split_message(msg.content, TELEGRAM_MAX_MESSAGE_LEN): - # Final response: simulate streaming via draft, then persist. - if not is_progress: - await self._send_with_streaming(chat_id, chunk, reply_params, thread_kwargs) - else: - await self._send_text(chat_id, chunk, reply_params, thread_kwargs) + await self._send_text(chat_id, chunk, reply_params, thread_kwargs) async def _call_with_retry(self, fn, *args, **kwargs): """Call an async Telegram API function with retry on pool/network timeout.""" @@ -469,29 +476,67 @@ class TelegramChannel(BaseChannel): except Exception as e2: logger.error("Error sending Telegram message: {}", e2) - async def _send_with_streaming( - self, - chat_id: int, - text: str, - reply_params=None, - thread_kwargs: dict | None = None, - ) -> None: - """Simulate streaming via send_message_draft, then persist with send_message.""" - draft_id = int(time.time() * 1000) % (2**31) - try: - step = max(len(text) // 8, 40) - for i in range(step, len(text), step): - await self._app.bot.send_message_draft( - chat_id=chat_id, draft_id=draft_id, text=text[:i], + async def send_delta(self, chat_id: str, delta: str, metadata: dict[str, Any] | None = None) -> None: + """Progressive message editing: send on first delta, edit on subsequent ones.""" + if not self._app: + return + meta = metadata or {} + int_chat_id = int(chat_id) + + if meta.get("_stream_end"): + buf = self._stream_bufs.pop(chat_id, None) + if not buf or not buf.message_id or not buf.text: + return + self._stop_typing(chat_id) + try: + html = _markdown_to_telegram_html(buf.text) + await self._call_with_retry( + self._app.bot.edit_message_text, + chat_id=int_chat_id, message_id=buf.message_id, + text=html, parse_mode="HTML", ) - await asyncio.sleep(0.04) - await self._app.bot.send_message_draft( - chat_id=chat_id, draft_id=draft_id, text=text, - ) - await asyncio.sleep(0.15) - except Exception: - pass - await self._send_text(chat_id, text, reply_params, thread_kwargs) + except Exception as e: + logger.debug("Final stream edit failed (HTML), trying plain: {}", e) + try: + await self._call_with_retry( + self._app.bot.edit_message_text, + chat_id=int_chat_id, message_id=buf.message_id, + text=buf.text, + ) + except Exception: + pass + return + + buf = self._stream_bufs.get(chat_id) + if buf is None: + buf = _StreamBuf() + self._stream_bufs[chat_id] = buf + buf.text += delta + + if not buf.text.strip(): + return + + now = time.monotonic() + if buf.message_id is None: + try: + sent = await self._call_with_retry( + self._app.bot.send_message, + chat_id=int_chat_id, text=buf.text, + ) + buf.message_id = sent.message_id + buf.last_edit = now + except Exception as e: + logger.warning("Stream initial send failed: {}", e) + elif (now - buf.last_edit) >= self._STREAM_EDIT_INTERVAL: + try: + await self._call_with_retry( + self._app.bot.edit_message_text, + chat_id=int_chat_id, message_id=buf.message_id, + text=buf.text, + ) + buf.last_edit = now + except Exception: + pass async def _on_start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Handle /start command.""" diff --git a/nanobot/cli/stream.py b/nanobot/cli/stream.py index 3ee28fe..161d530 100644 --- a/nanobot/cli/stream.py +++ b/nanobot/cli/stream.py @@ -6,10 +6,8 @@ markdown rendering during streaming. Ellipsis mode handles overflow. from __future__ import annotations -import re import sys import time -from typing import Any from rich.console import Console from rich.live import Live @@ -61,6 +59,8 @@ class ThinkingSpinner: class StreamRenderer: """Rich Live streaming with markdown. auto_refresh=False avoids render races. + Deltas arrive pre-filtered (no tags) from the agent loop. + Flow per round: spinner -> first visible delta -> header + Live renders -> on_end -> Live stops (content stays on screen) @@ -76,15 +76,8 @@ class StreamRenderer: self._spinner: ThinkingSpinner | None = None self._start_spinner() - @staticmethod - def _clean(text: str) -> str: - text = re.sub(r"[\s\S]*?", "", text) - text = re.sub(r"[\s\S]*$", "", text) - return text.strip() - def _render(self): - clean = self._clean(self._buf) - return Markdown(clean) if self._md and clean else Text(clean or "") + return Markdown(self._buf) if self._md and self._buf else Text(self._buf or "") def _start_spinner(self) -> None: if self._show_spinner: @@ -100,7 +93,7 @@ class StreamRenderer: self.streamed = True self._buf += delta if self._live is None: - if not self._clean(self._buf): + if not self._buf.strip(): return self._stop_spinner() c = _make_console() diff --git a/nanobot/utils/helpers.py b/nanobot/utils/helpers.py index f89b956..f265870 100644 --- a/nanobot/utils/helpers.py +++ b/nanobot/utils/helpers.py @@ -11,6 +11,13 @@ from typing import Any import tiktoken +def strip_think(text: str) -> str: + """Remove blocks and any unclosed trailing tag.""" + text = re.sub(r"[\s\S]*?", "", text) + text = re.sub(r"[\s\S]*$", "", text) + return text.strip() + + def detect_image_mime(data: bytes) -> str | None: """Detect image MIME type from magic bytes, ignoring file extension.""" if data[:8] == b"\x89PNG\r\n\x1a\n":