feat(streaming): centralize think-tag filtering and add Telegram streaming

- Add strip_think() to helpers.py as single source of truth
- Filter deltas in agent loop before dispatching to consumers
- Implement send_delta in TelegramChannel with progressive edit_message_text
- Remove duplicate think filtering from CLI stream.py and telegram.py
- Remove legacy fake streaming (send_message_draft) from Telegram
- Default Telegram streaming to true
- Update CHANNEL_PLUGIN_GUIDE.md with streaming documentation

Made-with: Cursor
This commit is contained in:
Xubin Ren
2026-03-22 17:33:09 +00:00
committed by Xubin Ren
parent f2e1cb3662
commit 9d5e511a6e
5 changed files with 204 additions and 43 deletions

View File

@@ -182,12 +182,19 @@ The agent receives the message and processes it. Replies arrive in your `send()`
| Method / Property | Description | | Method / Property | Description |
|-------------------|-------------| |-------------------|-------------|
| `_handle_message(sender_id, chat_id, content, media?, metadata?, session_key?)` | **Call this when you receive a message.** Checks `is_allowed()`, then publishes to the bus. | | `_handle_message(sender_id, chat_id, content, media?, metadata?, session_key?)` | **Call this when you receive a message.** Checks `is_allowed()`, then publishes to the bus. Automatically sets `_wants_stream` if `supports_streaming` is true. |
| `is_allowed(sender_id)` | Checks against `config["allowFrom"]`; `"*"` allows all, `[]` denies all. | | `is_allowed(sender_id)` | Checks against `config["allowFrom"]`; `"*"` allows all, `[]` denies all. |
| `default_config()` (classmethod) | Returns default config dict for `nanobot onboard`. Override to declare your fields. | | `default_config()` (classmethod) | Returns default config dict for `nanobot onboard`. Override to declare your fields. |
| `transcribe_audio(file_path)` | Transcribes audio via Groq Whisper (if configured). | | `transcribe_audio(file_path)` | Transcribes audio via Groq Whisper (if configured). |
| `supports_streaming` (property) | `True` when config has `"streaming": true` **and** subclass overrides `send_delta()`. |
| `is_running` | Returns `self._running`. | | `is_running` | Returns `self._running`. |
### Optional (streaming)
| Method | Description |
|--------|-------------|
| `async send_delta(chat_id, delta, metadata?)` | Override to receive streaming chunks. See [Streaming Support](#streaming-support) for details. |
### Message Types ### Message Types
```python ```python
@@ -201,6 +208,97 @@ class OutboundMessage:
# "message_id" for reply threading # "message_id" for reply threading
``` ```
## Streaming Support
Channels can opt into real-time streaming — the agent sends content token-by-token instead of one final message. This is entirely optional; channels work fine without it.
### How It Works
When **both** conditions are met, the agent streams content through your channel:
1. Config has `"streaming": true`
2. Your subclass overrides `send_delta()`
If either is missing, the agent falls back to the normal one-shot `send()` path.
### Implementing `send_delta`
Override `send_delta` to handle two types of calls:
```python
async def send_delta(self, chat_id: str, delta: str, metadata: dict[str, Any] | None = None) -> None:
meta = metadata or {}
if meta.get("_stream_end"):
# Streaming finished — do final formatting, cleanup, etc.
return
# Regular delta — append text, update the message on screen
# delta contains a small chunk of text (a few tokens)
```
**Metadata flags:**
| Flag | Meaning |
|------|---------|
| `_stream_delta: True` | A content chunk (delta contains the new text) |
| `_stream_end: True` | Streaming finished (delta is empty) |
| `_resuming: True` | More streaming rounds coming (e.g. tool call then another response) |
### Example: Webhook with Streaming
```python
class WebhookChannel(BaseChannel):
name = "webhook"
display_name = "Webhook"
def __init__(self, config, bus):
super().__init__(config, bus)
self._buffers: dict[str, str] = {}
async def send_delta(self, chat_id: str, delta: str, metadata: dict[str, Any] | None = None) -> None:
meta = metadata or {}
if meta.get("_stream_end"):
text = self._buffers.pop(chat_id, "")
# Final delivery — format and send the complete message
await self._deliver(chat_id, text, final=True)
return
self._buffers.setdefault(chat_id, "")
self._buffers[chat_id] += delta
# Incremental update — push partial text to the client
await self._deliver(chat_id, self._buffers[chat_id], final=False)
async def send(self, msg: OutboundMessage) -> None:
# Non-streaming path — unchanged
await self._deliver(msg.chat_id, msg.content, final=True)
```
### Config
Enable streaming per channel:
```json
{
"channels": {
"webhook": {
"enabled": true,
"streaming": true,
"allowFrom": ["*"]
}
}
}
```
When `streaming` is `false` (default) or omitted, only `send()` is called — no streaming overhead.
### BaseChannel Streaming API
| Method / Property | Description |
|-------------------|-------------|
| `async send_delta(chat_id, delta, metadata?)` | Override to handle streaming chunks. No-op by default. |
| `supports_streaming` (property) | Returns `True` when config has `streaming: true` **and** subclass overrides `send_delta`. |
## Config ## Config
Your channel receives config as a plain `dict`. Access fields with `.get()`: Your channel receives config as a plain `dict`. Access fields with `.get()`:

View File

@@ -173,7 +173,8 @@ class AgentLoop:
"""Remove <think>…</think> blocks that some models embed in content.""" """Remove <think>…</think> blocks that some models embed in content."""
if not text: if not text:
return None return None
return re.sub(r"<think>[\s\S]*?</think>", "", text).strip() or None from nanobot.utils.helpers import strip_think
return strip_think(text) or None
@staticmethod @staticmethod
def _tool_hint(tool_calls: list) -> str: def _tool_hint(tool_calls: list) -> str:
@@ -227,6 +228,21 @@ class AgentLoop:
final_content = None final_content = None
tools_used: list[str] = [] tools_used: list[str] = []
# Wrap on_stream with stateful think-tag filter so downstream
# consumers (CLI, channels) never see <think> blocks.
_raw_stream = on_stream
_stream_buf = ""
async def _filtered_stream(delta: str) -> None:
nonlocal _stream_buf
from nanobot.utils.helpers import strip_think
prev_clean = strip_think(_stream_buf)
_stream_buf += delta
new_clean = strip_think(_stream_buf)
incremental = new_clean[len(prev_clean):]
if incremental and _raw_stream:
await _raw_stream(incremental)
while iteration < self.max_iterations: while iteration < self.max_iterations:
iteration += 1 iteration += 1
@@ -237,7 +253,7 @@ class AgentLoop:
messages=messages, messages=messages,
tools=tool_defs, tools=tool_defs,
model=self.model, model=self.model,
on_content_delta=on_stream, on_content_delta=_filtered_stream,
) )
else: else:
response = await self.provider.chat_with_retry( response = await self.provider.chat_with_retry(
@@ -255,6 +271,7 @@ class AgentLoop:
if response.has_tool_calls: if response.has_tool_calls:
if on_stream and on_stream_end: if on_stream and on_stream_end:
await on_stream_end(resuming=True) await on_stream_end(resuming=True)
_stream_buf = ""
if on_progress: if on_progress:
if not on_stream: if not on_stream:
@@ -286,6 +303,7 @@ class AgentLoop:
else: else:
if on_stream and on_stream_end: if on_stream and on_stream_end:
await on_stream_end(resuming=False) await on_stream_end(resuming=False)
_stream_buf = ""
clean = self._strip_think(response.content) clean = self._strip_think(response.content)
if response.finish_reason == "error": if response.finish_reason == "error":

View File

@@ -6,6 +6,7 @@ import asyncio
import re import re
import time import time
import unicodedata import unicodedata
from dataclasses import dataclass, field
from typing import Any, Literal from typing import Any, Literal
from loguru import logger from loguru import logger
@@ -156,6 +157,14 @@ _SEND_MAX_RETRIES = 3
_SEND_RETRY_BASE_DELAY = 0.5 # seconds, doubled each retry _SEND_RETRY_BASE_DELAY = 0.5 # seconds, doubled each retry
@dataclass
class _StreamBuf:
"""Per-chat streaming accumulator for progressive message editing."""
text: str = ""
message_id: int | None = None
last_edit: float = 0.0
class TelegramConfig(Base): class TelegramConfig(Base):
"""Telegram channel configuration.""" """Telegram channel configuration."""
@@ -167,6 +176,7 @@ class TelegramConfig(Base):
group_policy: Literal["open", "mention"] = "mention" group_policy: Literal["open", "mention"] = "mention"
connection_pool_size: int = 32 connection_pool_size: int = 32
pool_timeout: float = 5.0 pool_timeout: float = 5.0
streaming: bool = True
class TelegramChannel(BaseChannel): class TelegramChannel(BaseChannel):
@@ -193,6 +203,8 @@ class TelegramChannel(BaseChannel):
def default_config(cls) -> dict[str, Any]: def default_config(cls) -> dict[str, Any]:
return TelegramConfig().model_dump(by_alias=True) return TelegramConfig().model_dump(by_alias=True)
_STREAM_EDIT_INTERVAL = 0.6 # min seconds between edit_message_text calls
def __init__(self, config: Any, bus: MessageBus): def __init__(self, config: Any, bus: MessageBus):
if isinstance(config, dict): if isinstance(config, dict):
config = TelegramConfig.model_validate(config) config = TelegramConfig.model_validate(config)
@@ -206,6 +218,7 @@ class TelegramChannel(BaseChannel):
self._message_threads: dict[tuple[str, int], int] = {} self._message_threads: dict[tuple[str, int], int] = {}
self._bot_user_id: int | None = None self._bot_user_id: int | None = None
self._bot_username: str | None = None self._bot_username: str | None = None
self._stream_bufs: dict[str, _StreamBuf] = {} # chat_id -> streaming state
def is_allowed(self, sender_id: str) -> bool: def is_allowed(self, sender_id: str) -> bool:
"""Preserve Telegram's legacy id|username allowlist matching.""" """Preserve Telegram's legacy id|username allowlist matching."""
@@ -416,13 +429,7 @@ class TelegramChannel(BaseChannel):
# Send text content # Send text content
if msg.content and msg.content != "[empty message]": if msg.content and msg.content != "[empty message]":
is_progress = msg.metadata.get("_progress", False)
for chunk in split_message(msg.content, TELEGRAM_MAX_MESSAGE_LEN): for chunk in split_message(msg.content, TELEGRAM_MAX_MESSAGE_LEN):
# Final response: simulate streaming via draft, then persist.
if not is_progress:
await self._send_with_streaming(chat_id, chunk, reply_params, thread_kwargs)
else:
await self._send_text(chat_id, chunk, reply_params, thread_kwargs) await self._send_text(chat_id, chunk, reply_params, thread_kwargs)
async def _call_with_retry(self, fn, *args, **kwargs): async def _call_with_retry(self, fn, *args, **kwargs):
@@ -469,29 +476,67 @@ class TelegramChannel(BaseChannel):
except Exception as e2: except Exception as e2:
logger.error("Error sending Telegram message: {}", e2) logger.error("Error sending Telegram message: {}", e2)
async def _send_with_streaming( async def send_delta(self, chat_id: str, delta: str, metadata: dict[str, Any] | None = None) -> None:
self, """Progressive message editing: send on first delta, edit on subsequent ones."""
chat_id: int, if not self._app:
text: str, return
reply_params=None, meta = metadata or {}
thread_kwargs: dict | None = None, int_chat_id = int(chat_id)
) -> None:
"""Simulate streaming via send_message_draft, then persist with send_message.""" if meta.get("_stream_end"):
draft_id = int(time.time() * 1000) % (2**31) buf = self._stream_bufs.pop(chat_id, None)
if not buf or not buf.message_id or not buf.text:
return
self._stop_typing(chat_id)
try: try:
step = max(len(text) // 8, 40) html = _markdown_to_telegram_html(buf.text)
for i in range(step, len(text), step): await self._call_with_retry(
await self._app.bot.send_message_draft( self._app.bot.edit_message_text,
chat_id=chat_id, draft_id=draft_id, text=text[:i], chat_id=int_chat_id, message_id=buf.message_id,
text=html, parse_mode="HTML",
) )
await asyncio.sleep(0.04) except Exception as e:
await self._app.bot.send_message_draft( logger.debug("Final stream edit failed (HTML), trying plain: {}", e)
chat_id=chat_id, draft_id=draft_id, text=text, try:
await self._call_with_retry(
self._app.bot.edit_message_text,
chat_id=int_chat_id, message_id=buf.message_id,
text=buf.text,
) )
await asyncio.sleep(0.15)
except Exception: except Exception:
pass pass
await self._send_text(chat_id, text, reply_params, thread_kwargs) return
buf = self._stream_bufs.get(chat_id)
if buf is None:
buf = _StreamBuf()
self._stream_bufs[chat_id] = buf
buf.text += delta
if not buf.text.strip():
return
now = time.monotonic()
if buf.message_id is None:
try:
sent = await self._call_with_retry(
self._app.bot.send_message,
chat_id=int_chat_id, text=buf.text,
)
buf.message_id = sent.message_id
buf.last_edit = now
except Exception as e:
logger.warning("Stream initial send failed: {}", e)
elif (now - buf.last_edit) >= self._STREAM_EDIT_INTERVAL:
try:
await self._call_with_retry(
self._app.bot.edit_message_text,
chat_id=int_chat_id, message_id=buf.message_id,
text=buf.text,
)
buf.last_edit = now
except Exception:
pass
async def _on_start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: async def _on_start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle /start command.""" """Handle /start command."""

View File

@@ -6,10 +6,8 @@ markdown rendering during streaming. Ellipsis mode handles overflow.
from __future__ import annotations from __future__ import annotations
import re
import sys import sys
import time import time
from typing import Any
from rich.console import Console from rich.console import Console
from rich.live import Live from rich.live import Live
@@ -61,6 +59,8 @@ class ThinkingSpinner:
class StreamRenderer: class StreamRenderer:
"""Rich Live streaming with markdown. auto_refresh=False avoids render races. """Rich Live streaming with markdown. auto_refresh=False avoids render races.
Deltas arrive pre-filtered (no <think> tags) from the agent loop.
Flow per round: Flow per round:
spinner -> first visible delta -> header + Live renders -> spinner -> first visible delta -> header + Live renders ->
on_end -> Live stops (content stays on screen) on_end -> Live stops (content stays on screen)
@@ -76,15 +76,8 @@ class StreamRenderer:
self._spinner: ThinkingSpinner | None = None self._spinner: ThinkingSpinner | None = None
self._start_spinner() self._start_spinner()
@staticmethod
def _clean(text: str) -> str:
text = re.sub(r"<think>[\s\S]*?</think>", "", text)
text = re.sub(r"<think>[\s\S]*$", "", text)
return text.strip()
def _render(self): def _render(self):
clean = self._clean(self._buf) return Markdown(self._buf) if self._md and self._buf else Text(self._buf or "")
return Markdown(clean) if self._md and clean else Text(clean or "")
def _start_spinner(self) -> None: def _start_spinner(self) -> None:
if self._show_spinner: if self._show_spinner:
@@ -100,7 +93,7 @@ class StreamRenderer:
self.streamed = True self.streamed = True
self._buf += delta self._buf += delta
if self._live is None: if self._live is None:
if not self._clean(self._buf): if not self._buf.strip():
return return
self._stop_spinner() self._stop_spinner()
c = _make_console() c = _make_console()

View File

@@ -11,6 +11,13 @@ from typing import Any
import tiktoken import tiktoken
def strip_think(text: str) -> str:
"""Remove <think>…</think> blocks and any unclosed trailing <think> tag."""
text = re.sub(r"<think>[\s\S]*?</think>", "", text)
text = re.sub(r"<think>[\s\S]*$", "", text)
return text.strip()
def detect_image_mime(data: bytes) -> str | None: def detect_image_mime(data: bytes) -> str | None:
"""Detect image MIME type from magic bytes, ignoring file extension.""" """Detect image MIME type from magic bytes, ignoring file extension."""
if data[:8] == b"\x89PNG\r\n\x1a\n": if data[:8] == b"\x89PNG\r\n\x1a\n":