From aa2987be3eed79aa31cb4de9e49d7a751262d440 Mon Sep 17 00:00:00 2001 From: Re-bin Date: Fri, 27 Feb 2026 09:30:01 +0000 Subject: [PATCH] refactor: streamline Telegram media-group buffering --- nanobot/channels/telegram.py | 95 ++++++++++-------------------------- 1 file changed, 27 insertions(+), 68 deletions(-) diff --git a/nanobot/channels/telegram.py b/nanobot/channels/telegram.py index ed77963..969d853 100644 --- a/nanobot/channels/telegram.py +++ b/nanobot/channels/telegram.py @@ -127,7 +127,7 @@ class TelegramChannel(BaseChannel): self._app: Application | None = None self._chat_ids: dict[str, int] = {} # Map sender_id to chat_id for replies self._typing_tasks: dict[str, asyncio.Task] = {} # chat_id -> typing loop task - self._media_group_buffers: dict[str, dict[str, object]] = {} + self._media_group_buffers: dict[str, dict] = {} self._media_group_tasks: dict[str, asyncio.Task] = {} async def start(self) -> None: @@ -194,11 +194,9 @@ class TelegramChannel(BaseChannel): for chat_id in list(self._typing_tasks): self._stop_typing(chat_id) - # Cancel buffered media-group flush tasks - for key, task in list(self._media_group_tasks.items()): - if task and not task.done(): - task.cancel() - self._media_group_tasks.pop(key, None) + for task in self._media_group_tasks.values(): + task.cancel() + self._media_group_tasks.clear() self._media_group_buffers.clear() if self._app: @@ -409,43 +407,26 @@ class TelegramChannel(BaseChannel): str_chat_id = str(chat_id) - # Telegram media groups arrive as multiple messages sharing media_group_id. - # Buffer briefly and forward as one aggregated turn. - media_group_id = getattr(message, "media_group_id", None) - if media_group_id: - group_key = f"{str_chat_id}:{media_group_id}" - buffer = self._media_group_buffers.get(group_key) - if not buffer: - buffer = { - "sender_id": sender_id, - "chat_id": str_chat_id, - "contents": [], - "media": [], + # Telegram media groups: buffer briefly, forward as one aggregated turn. + if media_group_id := getattr(message, "media_group_id", None): + key = f"{str_chat_id}:{media_group_id}" + if key not in self._media_group_buffers: + self._media_group_buffers[key] = { + "sender_id": sender_id, "chat_id": str_chat_id, + "contents": [], "media": [], "metadata": { - "message_id": message.message_id, - "user_id": user.id, - "username": user.username, - "first_name": user.first_name, + "message_id": message.message_id, "user_id": user.id, + "username": user.username, "first_name": user.first_name, "is_group": message.chat.type != "private", - "media_group_id": media_group_id, }, } - self._media_group_buffers[group_key] = buffer self._start_typing(str_chat_id) - + buf = self._media_group_buffers[key] if content and content != "[empty message]": - cast_contents = buffer["contents"] - if isinstance(cast_contents, list): - cast_contents.append(content) - cast_media = buffer["media"] - if isinstance(cast_media, list): - cast_media.extend(media_paths) - - # Start one delayed flush task per media group. - if group_key not in self._media_group_tasks: - self._media_group_tasks[group_key] = asyncio.create_task( - self._flush_media_group(group_key) - ) + buf["contents"].append(content) + buf["media"].extend(media_paths) + if key not in self._media_group_tasks: + self._media_group_tasks[key] = asyncio.create_task(self._flush_media_group(key)) return # Start typing indicator before processing @@ -466,42 +447,20 @@ class TelegramChannel(BaseChannel): } ) - async def _flush_media_group(self, group_key: str, delay_s: float = 0.6) -> None: - """Flush buffered Telegram media-group messages as one aggregated turn.""" + async def _flush_media_group(self, key: str) -> None: + """Wait briefly, then forward buffered media-group as one turn.""" try: - await asyncio.sleep(delay_s) - buffer = self._media_group_buffers.pop(group_key, None) - if not buffer: + await asyncio.sleep(0.6) + if not (buf := self._media_group_buffers.pop(key, None)): return - - sender_id = str(buffer.get("sender_id", "")) - chat_id = str(buffer.get("chat_id", "")) - contents = buffer.get("contents") - media = buffer.get("media") - metadata = buffer.get("metadata") - - content_parts = [c for c in (contents if isinstance(contents, list) else []) if isinstance(c, str) and c] - media_paths = [m for m in (media if isinstance(media, list) else []) if isinstance(m, str) and m] - - # De-duplicate while preserving order - seen = set() - unique_media: list[str] = [] - for m in media_paths: - if m in seen: - continue - seen.add(m) - unique_media.append(m) - - content = "\n".join(content_parts) if content_parts else "[empty message]" + content = "\n".join(buf["contents"]) or "[empty message]" await self._handle_message( - sender_id=sender_id, - chat_id=chat_id, - content=content, - media=unique_media, - metadata=metadata if isinstance(metadata, dict) else {}, + sender_id=buf["sender_id"], chat_id=buf["chat_id"], + content=content, media=list(dict.fromkeys(buf["media"])), + metadata=buf["metadata"], ) finally: - self._media_group_tasks.pop(group_key, None) + self._media_group_tasks.pop(key, None) def _start_typing(self, chat_id: str) -> None: """Start sending 'typing...' indicator for a chat."""