diff --git a/docs/CHANNEL_PLUGIN_GUIDE.md b/docs/CHANNEL_PLUGIN_GUIDE.md
index a23ea07..575cad6 100644
--- a/docs/CHANNEL_PLUGIN_GUIDE.md
+++ b/docs/CHANNEL_PLUGIN_GUIDE.md
@@ -182,12 +182,19 @@ The agent receives the message and processes it. Replies arrive in your `send()`
| Method / Property | Description |
|-------------------|-------------|
-| `_handle_message(sender_id, chat_id, content, media?, metadata?, session_key?)` | **Call this when you receive a message.** Checks `is_allowed()`, then publishes to the bus. |
+| `_handle_message(sender_id, chat_id, content, media?, metadata?, session_key?)` | **Call this when you receive a message.** Checks `is_allowed()`, then publishes to the bus. Automatically sets `_wants_stream` if `supports_streaming` is true. |
| `is_allowed(sender_id)` | Checks against `config["allowFrom"]`; `"*"` allows all, `[]` denies all. |
| `default_config()` (classmethod) | Returns default config dict for `nanobot onboard`. Override to declare your fields. |
| `transcribe_audio(file_path)` | Transcribes audio via Groq Whisper (if configured). |
+| `supports_streaming` (property) | `True` when config has `"streaming": true` **and** subclass overrides `send_delta()`. |
| `is_running` | Returns `self._running`. |
+### Optional (streaming)
+
+| Method | Description |
+|--------|-------------|
+| `async send_delta(chat_id, delta, metadata?)` | Override to receive streaming chunks. See [Streaming Support](#streaming-support) for details. |
+
### Message Types
```python
@@ -201,6 +208,97 @@ class OutboundMessage:
# "message_id" for reply threading
```
+## Streaming Support
+
+Channels can opt into real-time streaming — the agent sends content token-by-token instead of one final message. This is entirely optional; channels work fine without it.
+
+### How It Works
+
+When **both** conditions are met, the agent streams content through your channel:
+
+1. Config has `"streaming": true`
+2. Your subclass overrides `send_delta()`
+
+If either is missing, the agent falls back to the normal one-shot `send()` path.
+
+### Implementing `send_delta`
+
+Override `send_delta` to handle two types of calls:
+
+```python
+async def send_delta(self, chat_id: str, delta: str, metadata: dict[str, Any] | None = None) -> None:
+ meta = metadata or {}
+
+ if meta.get("_stream_end"):
+ # Streaming finished — do final formatting, cleanup, etc.
+ return
+
+ # Regular delta — append text, update the message on screen
+ # delta contains a small chunk of text (a few tokens)
+```
+
+**Metadata flags:**
+
+| Flag | Meaning |
+|------|---------|
+| `_stream_delta: True` | A content chunk (delta contains the new text) |
+| `_stream_end: True` | Streaming finished (delta is empty) |
+| `_resuming: True` | More streaming rounds coming (e.g. tool call then another response) |
+
+### Example: Webhook with Streaming
+
+```python
+class WebhookChannel(BaseChannel):
+ name = "webhook"
+ display_name = "Webhook"
+
+ def __init__(self, config, bus):
+ super().__init__(config, bus)
+ self._buffers: dict[str, str] = {}
+
+ async def send_delta(self, chat_id: str, delta: str, metadata: dict[str, Any] | None = None) -> None:
+ meta = metadata or {}
+ if meta.get("_stream_end"):
+ text = self._buffers.pop(chat_id, "")
+ # Final delivery — format and send the complete message
+ await self._deliver(chat_id, text, final=True)
+ return
+
+ self._buffers.setdefault(chat_id, "")
+ self._buffers[chat_id] += delta
+ # Incremental update — push partial text to the client
+ await self._deliver(chat_id, self._buffers[chat_id], final=False)
+
+ async def send(self, msg: OutboundMessage) -> None:
+ # Non-streaming path — unchanged
+ await self._deliver(msg.chat_id, msg.content, final=True)
+```
+
+### Config
+
+Enable streaming per channel:
+
+```json
+{
+ "channels": {
+ "webhook": {
+ "enabled": true,
+ "streaming": true,
+ "allowFrom": ["*"]
+ }
+ }
+}
+```
+
+When `streaming` is `false` (default) or omitted, only `send()` is called — no streaming overhead.
+
+### BaseChannel Streaming API
+
+| Method / Property | Description |
+|-------------------|-------------|
+| `async send_delta(chat_id, delta, metadata?)` | Override to handle streaming chunks. No-op by default. |
+| `supports_streaming` (property) | Returns `True` when config has `streaming: true` **and** subclass overrides `send_delta`. |
+
## Config
Your channel receives config as a plain `dict`. Access fields with `.get()`:
diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py
index 1bbb7cf..6cf2ec3 100644
--- a/nanobot/agent/loop.py
+++ b/nanobot/agent/loop.py
@@ -173,7 +173,8 @@ class AgentLoop:
"""Remove … blocks that some models embed in content."""
if not text:
return None
- return re.sub(r"[\s\S]*?", "", text).strip() or None
+ from nanobot.utils.helpers import strip_think
+ return strip_think(text) or None
@staticmethod
def _tool_hint(tool_calls: list) -> str:
@@ -227,6 +228,21 @@ class AgentLoop:
final_content = None
tools_used: list[str] = []
+ # Wrap on_stream with stateful think-tag filter so downstream
+ # consumers (CLI, channels) never see blocks.
+ _raw_stream = on_stream
+ _stream_buf = ""
+
+ async def _filtered_stream(delta: str) -> None:
+ nonlocal _stream_buf
+ from nanobot.utils.helpers import strip_think
+ prev_clean = strip_think(_stream_buf)
+ _stream_buf += delta
+ new_clean = strip_think(_stream_buf)
+ incremental = new_clean[len(prev_clean):]
+ if incremental and _raw_stream:
+ await _raw_stream(incremental)
+
while iteration < self.max_iterations:
iteration += 1
@@ -237,7 +253,7 @@ class AgentLoop:
messages=messages,
tools=tool_defs,
model=self.model,
- on_content_delta=on_stream,
+ on_content_delta=_filtered_stream,
)
else:
response = await self.provider.chat_with_retry(
@@ -255,6 +271,7 @@ class AgentLoop:
if response.has_tool_calls:
if on_stream and on_stream_end:
await on_stream_end(resuming=True)
+ _stream_buf = ""
if on_progress:
if not on_stream:
@@ -286,6 +303,7 @@ class AgentLoop:
else:
if on_stream and on_stream_end:
await on_stream_end(resuming=False)
+ _stream_buf = ""
clean = self._strip_think(response.content)
if response.finish_reason == "error":
diff --git a/nanobot/channels/telegram.py b/nanobot/channels/telegram.py
index fc2e47d..850e09c 100644
--- a/nanobot/channels/telegram.py
+++ b/nanobot/channels/telegram.py
@@ -6,6 +6,7 @@ import asyncio
import re
import time
import unicodedata
+from dataclasses import dataclass, field
from typing import Any, Literal
from loguru import logger
@@ -156,6 +157,14 @@ _SEND_MAX_RETRIES = 3
_SEND_RETRY_BASE_DELAY = 0.5 # seconds, doubled each retry
+@dataclass
+class _StreamBuf:
+ """Per-chat streaming accumulator for progressive message editing."""
+ text: str = ""
+ message_id: int | None = None
+ last_edit: float = 0.0
+
+
class TelegramConfig(Base):
"""Telegram channel configuration."""
@@ -167,6 +176,7 @@ class TelegramConfig(Base):
group_policy: Literal["open", "mention"] = "mention"
connection_pool_size: int = 32
pool_timeout: float = 5.0
+ streaming: bool = True
class TelegramChannel(BaseChannel):
@@ -193,6 +203,8 @@ class TelegramChannel(BaseChannel):
def default_config(cls) -> dict[str, Any]:
return TelegramConfig().model_dump(by_alias=True)
+ _STREAM_EDIT_INTERVAL = 0.6 # min seconds between edit_message_text calls
+
def __init__(self, config: Any, bus: MessageBus):
if isinstance(config, dict):
config = TelegramConfig.model_validate(config)
@@ -206,6 +218,7 @@ class TelegramChannel(BaseChannel):
self._message_threads: dict[tuple[str, int], int] = {}
self._bot_user_id: int | None = None
self._bot_username: str | None = None
+ self._stream_bufs: dict[str, _StreamBuf] = {} # chat_id -> streaming state
def is_allowed(self, sender_id: str) -> bool:
"""Preserve Telegram's legacy id|username allowlist matching."""
@@ -416,14 +429,8 @@ class TelegramChannel(BaseChannel):
# Send text content
if msg.content and msg.content != "[empty message]":
- is_progress = msg.metadata.get("_progress", False)
-
for chunk in split_message(msg.content, TELEGRAM_MAX_MESSAGE_LEN):
- # Final response: simulate streaming via draft, then persist.
- if not is_progress:
- await self._send_with_streaming(chat_id, chunk, reply_params, thread_kwargs)
- else:
- await self._send_text(chat_id, chunk, reply_params, thread_kwargs)
+ await self._send_text(chat_id, chunk, reply_params, thread_kwargs)
async def _call_with_retry(self, fn, *args, **kwargs):
"""Call an async Telegram API function with retry on pool/network timeout."""
@@ -469,29 +476,67 @@ class TelegramChannel(BaseChannel):
except Exception as e2:
logger.error("Error sending Telegram message: {}", e2)
- async def _send_with_streaming(
- self,
- chat_id: int,
- text: str,
- reply_params=None,
- thread_kwargs: dict | None = None,
- ) -> None:
- """Simulate streaming via send_message_draft, then persist with send_message."""
- draft_id = int(time.time() * 1000) % (2**31)
- try:
- step = max(len(text) // 8, 40)
- for i in range(step, len(text), step):
- await self._app.bot.send_message_draft(
- chat_id=chat_id, draft_id=draft_id, text=text[:i],
+ async def send_delta(self, chat_id: str, delta: str, metadata: dict[str, Any] | None = None) -> None:
+ """Progressive message editing: send on first delta, edit on subsequent ones."""
+ if not self._app:
+ return
+ meta = metadata or {}
+ int_chat_id = int(chat_id)
+
+ if meta.get("_stream_end"):
+ buf = self._stream_bufs.pop(chat_id, None)
+ if not buf or not buf.message_id or not buf.text:
+ return
+ self._stop_typing(chat_id)
+ try:
+ html = _markdown_to_telegram_html(buf.text)
+ await self._call_with_retry(
+ self._app.bot.edit_message_text,
+ chat_id=int_chat_id, message_id=buf.message_id,
+ text=html, parse_mode="HTML",
)
- await asyncio.sleep(0.04)
- await self._app.bot.send_message_draft(
- chat_id=chat_id, draft_id=draft_id, text=text,
- )
- await asyncio.sleep(0.15)
- except Exception:
- pass
- await self._send_text(chat_id, text, reply_params, thread_kwargs)
+ except Exception as e:
+ logger.debug("Final stream edit failed (HTML), trying plain: {}", e)
+ try:
+ await self._call_with_retry(
+ self._app.bot.edit_message_text,
+ chat_id=int_chat_id, message_id=buf.message_id,
+ text=buf.text,
+ )
+ except Exception:
+ pass
+ return
+
+ buf = self._stream_bufs.get(chat_id)
+ if buf is None:
+ buf = _StreamBuf()
+ self._stream_bufs[chat_id] = buf
+ buf.text += delta
+
+ if not buf.text.strip():
+ return
+
+ now = time.monotonic()
+ if buf.message_id is None:
+ try:
+ sent = await self._call_with_retry(
+ self._app.bot.send_message,
+ chat_id=int_chat_id, text=buf.text,
+ )
+ buf.message_id = sent.message_id
+ buf.last_edit = now
+ except Exception as e:
+ logger.warning("Stream initial send failed: {}", e)
+ elif (now - buf.last_edit) >= self._STREAM_EDIT_INTERVAL:
+ try:
+ await self._call_with_retry(
+ self._app.bot.edit_message_text,
+ chat_id=int_chat_id, message_id=buf.message_id,
+ text=buf.text,
+ )
+ buf.last_edit = now
+ except Exception:
+ pass
async def _on_start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle /start command."""
diff --git a/nanobot/cli/stream.py b/nanobot/cli/stream.py
index 3ee28fe..161d530 100644
--- a/nanobot/cli/stream.py
+++ b/nanobot/cli/stream.py
@@ -6,10 +6,8 @@ markdown rendering during streaming. Ellipsis mode handles overflow.
from __future__ import annotations
-import re
import sys
import time
-from typing import Any
from rich.console import Console
from rich.live import Live
@@ -61,6 +59,8 @@ class ThinkingSpinner:
class StreamRenderer:
"""Rich Live streaming with markdown. auto_refresh=False avoids render races.
+ Deltas arrive pre-filtered (no tags) from the agent loop.
+
Flow per round:
spinner -> first visible delta -> header + Live renders ->
on_end -> Live stops (content stays on screen)
@@ -76,15 +76,8 @@ class StreamRenderer:
self._spinner: ThinkingSpinner | None = None
self._start_spinner()
- @staticmethod
- def _clean(text: str) -> str:
- text = re.sub(r"[\s\S]*?", "", text)
- text = re.sub(r"[\s\S]*$", "", text)
- return text.strip()
-
def _render(self):
- clean = self._clean(self._buf)
- return Markdown(clean) if self._md and clean else Text(clean or "")
+ return Markdown(self._buf) if self._md and self._buf else Text(self._buf or "")
def _start_spinner(self) -> None:
if self._show_spinner:
@@ -100,7 +93,7 @@ class StreamRenderer:
self.streamed = True
self._buf += delta
if self._live is None:
- if not self._clean(self._buf):
+ if not self._buf.strip():
return
self._stop_spinner()
c = _make_console()
diff --git a/nanobot/utils/helpers.py b/nanobot/utils/helpers.py
index f89b956..f265870 100644
--- a/nanobot/utils/helpers.py
+++ b/nanobot/utils/helpers.py
@@ -11,6 +11,13 @@ from typing import Any
import tiktoken
+def strip_think(text: str) -> str:
+ """Remove … blocks and any unclosed trailing tag."""
+ text = re.sub(r"[\s\S]*?", "", text)
+ text = re.sub(r"[\s\S]*$", "", text)
+ return text.strip()
+
+
def detect_image_mime(data: bytes) -> str | None:
"""Detect image MIME type from magic bytes, ignoring file extension."""
if data[:8] == b"\x89PNG\r\n\x1a\n":