From aa774733ea2a78798ed582c7bc1f72bb59af5487 Mon Sep 17 00:00:00 2001 From: Kim <150593189+KimGLee@users.noreply.github.com> Date: Fri, 27 Feb 2026 12:08:48 +0800 Subject: [PATCH 1/3] fix(telegram): aggregate media-group images into a single inbound turn --- nanobot/channels/telegram.py | 218 ++++++++++++++++++++++++----------- 1 file changed, 152 insertions(+), 66 deletions(-) diff --git a/nanobot/channels/telegram.py b/nanobot/channels/telegram.py index 808f50c..bf2da73 100644 --- a/nanobot/channels/telegram.py +++ b/nanobot/channels/telegram.py @@ -4,9 +4,10 @@ from __future__ import annotations import asyncio import re + from loguru import logger -from telegram import BotCommand, Update, ReplyParameters -from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes +from telegram import BotCommand, ReplyParameters, Update +from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters from telegram.request import HTTPXRequest from nanobot.bus.events import OutboundMessage @@ -21,60 +22,60 @@ def _markdown_to_telegram_html(text: str) -> str: """ if not text: return "" - + # 1. Extract and protect code blocks (preserve content from other processing) code_blocks: list[str] = [] def save_code_block(m: re.Match) -> str: code_blocks.append(m.group(1)) return f"\x00CB{len(code_blocks) - 1}\x00" - + text = re.sub(r'```[\w]*\n?([\s\S]*?)```', save_code_block, text) - + # 2. Extract and protect inline code inline_codes: list[str] = [] def save_inline_code(m: re.Match) -> str: inline_codes.append(m.group(1)) return f"\x00IC{len(inline_codes) - 1}\x00" - + text = re.sub(r'`([^`]+)`', save_inline_code, text) - + # 3. Headers # Title -> just the title text text = re.sub(r'^#{1,6}\s+(.+)$', r'\1', text, flags=re.MULTILINE) - + # 4. Blockquotes > text -> just the text (before HTML escaping) text = re.sub(r'^>\s*(.*)$', r'\1', text, flags=re.MULTILINE) - + # 5. Escape HTML special characters text = text.replace("&", "&").replace("<", "<").replace(">", ">") - + # 6. Links [text](url) - must be before bold/italic to handle nested cases text = re.sub(r'\[([^\]]+)\]\(([^)]+)\)', r'\1', text) - + # 7. Bold **text** or __text__ text = re.sub(r'\*\*(.+?)\*\*', r'\1', text) text = re.sub(r'__(.+?)__', r'\1', text) - + # 8. Italic _text_ (avoid matching inside words like some_var_name) text = re.sub(r'(?\1', text) - + # 9. Strikethrough ~~text~~ text = re.sub(r'~~(.+?)~~', r'\1', text) - + # 10. Bullet lists - item -> • item text = re.sub(r'^[-*]\s+', '• ', text, flags=re.MULTILINE) - + # 11. Restore inline code with HTML tags for i, code in enumerate(inline_codes): # Escape HTML in code content escaped = code.replace("&", "&").replace("<", "<").replace(">", ">") text = text.replace(f"\x00IC{i}\x00", f"{escaped}") - + # 12. Restore code blocks with HTML tags for i, code in enumerate(code_blocks): # Escape HTML in code content escaped = code.replace("&", "&").replace("<", "<").replace(">", ">") text = text.replace(f"\x00CB{i}\x00", f"
{escaped}
") - + return text @@ -101,12 +102,12 @@ def _split_message(content: str, max_len: int = 4000) -> list[str]: class TelegramChannel(BaseChannel): """ Telegram channel using long polling. - + Simple and reliable - no webhook/public IP needed. """ - + name = "telegram" - + # Commands registered with Telegram's command menu BOT_COMMANDS = [ BotCommand("start", "Start the bot"), @@ -114,7 +115,7 @@ class TelegramChannel(BaseChannel): BotCommand("stop", "Stop the current task"), BotCommand("help", "Show available commands"), ] - + def __init__( self, config: TelegramConfig, @@ -127,15 +128,17 @@ class TelegramChannel(BaseChannel): self._app: Application | None = None self._chat_ids: dict[str, int] = {} # Map sender_id to chat_id for replies self._typing_tasks: dict[str, asyncio.Task] = {} # chat_id -> typing loop task - + self._media_group_buffers: dict[str, dict[str, object]] = {} + self._media_group_tasks: dict[str, asyncio.Task] = {} + async def start(self) -> None: """Start the Telegram bot with long polling.""" if not self.config.token: logger.error("Telegram bot token not configured") return - + self._running = True - + # Build the application with larger connection pool to avoid pool-timeout on long runs req = HTTPXRequest(connection_pool_size=16, pool_timeout=5.0, connect_timeout=30.0, read_timeout=30.0) builder = Application.builder().token(self.config.token).request(req).get_updates_request(req) @@ -143,62 +146,69 @@ class TelegramChannel(BaseChannel): builder = builder.proxy(self.config.proxy).get_updates_proxy(self.config.proxy) self._app = builder.build() self._app.add_error_handler(self._on_error) - + # Add command handlers self._app.add_handler(CommandHandler("start", self._on_start)) self._app.add_handler(CommandHandler("new", self._forward_command)) self._app.add_handler(CommandHandler("help", self._on_help)) - + # Add message handler for text, photos, voice, documents self._app.add_handler( MessageHandler( - (filters.TEXT | filters.PHOTO | filters.VOICE | filters.AUDIO | filters.Document.ALL) - & ~filters.COMMAND, + (filters.TEXT | filters.PHOTO | filters.VOICE | filters.AUDIO | filters.Document.ALL) + & ~filters.COMMAND, self._on_message ) ) - + logger.info("Starting Telegram bot (polling mode)...") - + # Initialize and start polling await self._app.initialize() await self._app.start() - + # Get bot info and register command menu bot_info = await self._app.bot.get_me() logger.info("Telegram bot @{} connected", bot_info.username) - + try: await self._app.bot.set_my_commands(self.BOT_COMMANDS) logger.debug("Telegram bot commands registered") except Exception as e: logger.warning("Failed to register bot commands: {}", e) - + # Start polling (this runs until stopped) await self._app.updater.start_polling( allowed_updates=["message"], drop_pending_updates=True # Ignore old messages on startup ) - + # Keep running until stopped while self._running: await asyncio.sleep(1) - + async def stop(self) -> None: """Stop the Telegram bot.""" self._running = False - + # Cancel all typing indicators for chat_id in list(self._typing_tasks): self._stop_typing(chat_id) - + + # Cancel buffered media-group flush tasks + for key, task in list(self._media_group_tasks.items()): + if task and not task.done(): + task.cancel() + self._media_group_tasks.pop(key, None) + self._media_group_buffers.clear() + if self._app: logger.info("Stopping Telegram bot...") await self._app.updater.stop() await self._app.stop() await self._app.shutdown() self._app = None - + @staticmethod def _get_media_type(path: str) -> str: """Guess media type from file extension.""" @@ -246,7 +256,7 @@ class TelegramChannel(BaseChannel): param = "photo" if media_type == "photo" else media_type if media_type in ("voice", "audio") else "document" with open(media_path, 'rb') as f: await sender( - chat_id=chat_id, + chat_id=chat_id, **{param: f}, reply_parameters=reply_params ) @@ -265,8 +275,8 @@ class TelegramChannel(BaseChannel): try: html = _markdown_to_telegram_html(chunk) await self._app.bot.send_message( - chat_id=chat_id, - text=html, + chat_id=chat_id, + text=html, parse_mode="HTML", reply_parameters=reply_params ) @@ -274,13 +284,13 @@ class TelegramChannel(BaseChannel): logger.warning("HTML parse failed, falling back to plain text: {}", e) try: await self._app.bot.send_message( - chat_id=chat_id, + chat_id=chat_id, text=chunk, reply_parameters=reply_params ) except Exception as e2: logger.error("Error sending Telegram message: {}", e2) - + async def _on_start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Handle /start command.""" if not update.message or not update.effective_user: @@ -319,34 +329,34 @@ class TelegramChannel(BaseChannel): chat_id=str(update.message.chat_id), content=update.message.text, ) - + async def _on_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Handle incoming messages (text, photos, voice, documents).""" if not update.message or not update.effective_user: return - + message = update.message user = update.effective_user chat_id = message.chat_id sender_id = self._sender_id(user) - + # Store chat_id for replies self._chat_ids[sender_id] = chat_id - + # Build content from text and/or media content_parts = [] media_paths = [] - + # Text content if message.text: content_parts.append(message.text) if message.caption: content_parts.append(message.caption) - + # Handle media files media_file = None media_type = None - + if message.photo: media_file = message.photo[-1] # Largest photo media_type = "image" @@ -359,23 +369,23 @@ class TelegramChannel(BaseChannel): elif message.document: media_file = message.document media_type = "file" - + # Download media if present if media_file and self._app: try: file = await self._app.bot.get_file(media_file.file_id) ext = self._get_extension(media_type, getattr(media_file, 'mime_type', None)) - + # Save to workspace/media/ from pathlib import Path media_dir = Path.home() / ".nanobot" / "media" media_dir.mkdir(parents=True, exist_ok=True) - + file_path = media_dir / f"{media_file.file_id[:16]}{ext}" await file.download_to_drive(str(file_path)) - + media_paths.append(str(file_path)) - + # Handle voice transcription if media_type == "voice" or media_type == "audio": from nanobot.providers.transcription import GroqTranscriptionProvider @@ -388,21 +398,60 @@ class TelegramChannel(BaseChannel): content_parts.append(f"[{media_type}: {file_path}]") else: content_parts.append(f"[{media_type}: {file_path}]") - + logger.debug("Downloaded {} to {}", media_type, file_path) except Exception as e: logger.error("Failed to download media: {}", e) content_parts.append(f"[{media_type}: download failed]") - + content = "\n".join(content_parts) if content_parts else "[empty message]" - + logger.debug("Telegram message from {}: {}...", sender_id, content[:50]) - + str_chat_id = str(chat_id) - + + # Telegram media groups arrive as multiple messages sharing media_group_id. + # Buffer briefly and forward as one aggregated turn. + media_group_id = getattr(message, "media_group_id", None) + if media_group_id: + group_key = f"{str_chat_id}:{media_group_id}" + buffer = self._media_group_buffers.get(group_key) + if not buffer: + buffer = { + "sender_id": sender_id, + "chat_id": str_chat_id, + "contents": [], + "media": [], + "metadata": { + "message_id": message.message_id, + "user_id": user.id, + "username": user.username, + "first_name": user.first_name, + "is_group": message.chat.type != "private", + "media_group_id": media_group_id, + }, + } + self._media_group_buffers[group_key] = buffer + self._start_typing(str_chat_id) + + if content and content != "[empty message]": + cast_contents = buffer["contents"] + if isinstance(cast_contents, list): + cast_contents.append(content) + cast_media = buffer["media"] + if isinstance(cast_media, list): + cast_media.extend(media_paths) + + # Start one delayed flush task per media group. + if group_key not in self._media_group_tasks: + self._media_group_tasks[group_key] = asyncio.create_task( + self._flush_media_group(group_key) + ) + return + # Start typing indicator before processing self._start_typing(str_chat_id) - + # Forward to the message bus await self._handle_message( sender_id=sender_id, @@ -417,19 +466,56 @@ class TelegramChannel(BaseChannel): "is_group": message.chat.type != "private" } ) - + + async def _flush_media_group(self, group_key: str, delay_s: float = 0.6) -> None: + """Flush buffered Telegram media-group messages as one aggregated turn.""" + try: + await asyncio.sleep(delay_s) + buffer = self._media_group_buffers.pop(group_key, None) + if not buffer: + return + + sender_id = str(buffer.get("sender_id", "")) + chat_id = str(buffer.get("chat_id", "")) + contents = buffer.get("contents") + media = buffer.get("media") + metadata = buffer.get("metadata") + + content_parts = [c for c in (contents if isinstance(contents, list) else []) if isinstance(c, str) and c] + media_paths = [m for m in (media if isinstance(media, list) else []) if isinstance(m, str) and m] + + # De-duplicate while preserving order + seen = set() + unique_media: list[str] = [] + for m in media_paths: + if m in seen: + continue + seen.add(m) + unique_media.append(m) + + content = "\n".join(content_parts) if content_parts else "[empty message]" + await self._handle_message( + sender_id=sender_id, + chat_id=chat_id, + content=content, + media=unique_media, + metadata=metadata if isinstance(metadata, dict) else {}, + ) + finally: + self._media_group_tasks.pop(group_key, None) + def _start_typing(self, chat_id: str) -> None: """Start sending 'typing...' indicator for a chat.""" # Cancel any existing typing task for this chat self._stop_typing(chat_id) self._typing_tasks[chat_id] = asyncio.create_task(self._typing_loop(chat_id)) - + def _stop_typing(self, chat_id: str) -> None: """Stop the typing indicator for a chat.""" task = self._typing_tasks.pop(chat_id, None) if task and not task.done(): task.cancel() - + async def _typing_loop(self, chat_id: str) -> None: """Repeatedly send 'typing' action until cancelled.""" try: @@ -440,7 +526,7 @@ class TelegramChannel(BaseChannel): pass except Exception as e: logger.debug("Typing indicator stopped for {}: {}", chat_id, e) - + async def _on_error(self, update: object, context: ContextTypes.DEFAULT_TYPE) -> None: """Log polling / handler errors instead of silently swallowing them.""" logger.error("Telegram error: {}", context.error) @@ -454,6 +540,6 @@ class TelegramChannel(BaseChannel): } if mime_type in ext_map: return ext_map[mime_type] - + type_map = {"image": ".jpg", "voice": ".ogg", "audio": ".mp3", "file": ""} return type_map.get(media_type, "") From a3e0543eae66e566b9d5cb1c0e398bfc33b6e7d9 Mon Sep 17 00:00:00 2001 From: Kim <150593189+KimGLee@users.noreply.github.com> Date: Fri, 27 Feb 2026 12:16:51 +0800 Subject: [PATCH 2/3] chore(telegram): keep media-group fix without unrelated formatting changes --- nanobot/channels/telegram.py | 133 +++++++++++++++++------------------ 1 file changed, 66 insertions(+), 67 deletions(-) diff --git a/nanobot/channels/telegram.py b/nanobot/channels/telegram.py index bf2da73..ed77963 100644 --- a/nanobot/channels/telegram.py +++ b/nanobot/channels/telegram.py @@ -4,10 +4,9 @@ from __future__ import annotations import asyncio import re - from loguru import logger -from telegram import BotCommand, ReplyParameters, Update -from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters +from telegram import BotCommand, Update, ReplyParameters +from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes from telegram.request import HTTPXRequest from nanobot.bus.events import OutboundMessage @@ -22,60 +21,60 @@ def _markdown_to_telegram_html(text: str) -> str: """ if not text: return "" - + # 1. Extract and protect code blocks (preserve content from other processing) code_blocks: list[str] = [] def save_code_block(m: re.Match) -> str: code_blocks.append(m.group(1)) return f"\x00CB{len(code_blocks) - 1}\x00" - + text = re.sub(r'```[\w]*\n?([\s\S]*?)```', save_code_block, text) - + # 2. Extract and protect inline code inline_codes: list[str] = [] def save_inline_code(m: re.Match) -> str: inline_codes.append(m.group(1)) return f"\x00IC{len(inline_codes) - 1}\x00" - + text = re.sub(r'`([^`]+)`', save_inline_code, text) - + # 3. Headers # Title -> just the title text text = re.sub(r'^#{1,6}\s+(.+)$', r'\1', text, flags=re.MULTILINE) - + # 4. Blockquotes > text -> just the text (before HTML escaping) text = re.sub(r'^>\s*(.*)$', r'\1', text, flags=re.MULTILINE) - + # 5. Escape HTML special characters text = text.replace("&", "&").replace("<", "<").replace(">", ">") - + # 6. Links [text](url) - must be before bold/italic to handle nested cases text = re.sub(r'\[([^\]]+)\]\(([^)]+)\)', r'\1', text) - + # 7. Bold **text** or __text__ text = re.sub(r'\*\*(.+?)\*\*', r'\1', text) text = re.sub(r'__(.+?)__', r'\1', text) - + # 8. Italic _text_ (avoid matching inside words like some_var_name) text = re.sub(r'(?\1', text) - + # 9. Strikethrough ~~text~~ text = re.sub(r'~~(.+?)~~', r'\1', text) - + # 10. Bullet lists - item -> • item text = re.sub(r'^[-*]\s+', '• ', text, flags=re.MULTILINE) - + # 11. Restore inline code with HTML tags for i, code in enumerate(inline_codes): # Escape HTML in code content escaped = code.replace("&", "&").replace("<", "<").replace(">", ">") text = text.replace(f"\x00IC{i}\x00", f"{escaped}") - + # 12. Restore code blocks with HTML tags for i, code in enumerate(code_blocks): # Escape HTML in code content escaped = code.replace("&", "&").replace("<", "<").replace(">", ">") text = text.replace(f"\x00CB{i}\x00", f"
{escaped}
") - + return text @@ -102,12 +101,12 @@ def _split_message(content: str, max_len: int = 4000) -> list[str]: class TelegramChannel(BaseChannel): """ Telegram channel using long polling. - + Simple and reliable - no webhook/public IP needed. """ - + name = "telegram" - + # Commands registered with Telegram's command menu BOT_COMMANDS = [ BotCommand("start", "Start the bot"), @@ -115,7 +114,7 @@ class TelegramChannel(BaseChannel): BotCommand("stop", "Stop the current task"), BotCommand("help", "Show available commands"), ] - + def __init__( self, config: TelegramConfig, @@ -130,15 +129,15 @@ class TelegramChannel(BaseChannel): self._typing_tasks: dict[str, asyncio.Task] = {} # chat_id -> typing loop task self._media_group_buffers: dict[str, dict[str, object]] = {} self._media_group_tasks: dict[str, asyncio.Task] = {} - + async def start(self) -> None: """Start the Telegram bot with long polling.""" if not self.config.token: logger.error("Telegram bot token not configured") return - + self._running = True - + # Build the application with larger connection pool to avoid pool-timeout on long runs req = HTTPXRequest(connection_pool_size=16, pool_timeout=5.0, connect_timeout=30.0, read_timeout=30.0) builder = Application.builder().token(self.config.token).request(req).get_updates_request(req) @@ -146,51 +145,51 @@ class TelegramChannel(BaseChannel): builder = builder.proxy(self.config.proxy).get_updates_proxy(self.config.proxy) self._app = builder.build() self._app.add_error_handler(self._on_error) - + # Add command handlers self._app.add_handler(CommandHandler("start", self._on_start)) self._app.add_handler(CommandHandler("new", self._forward_command)) self._app.add_handler(CommandHandler("help", self._on_help)) - + # Add message handler for text, photos, voice, documents self._app.add_handler( MessageHandler( - (filters.TEXT | filters.PHOTO | filters.VOICE | filters.AUDIO | filters.Document.ALL) - & ~filters.COMMAND, + (filters.TEXT | filters.PHOTO | filters.VOICE | filters.AUDIO | filters.Document.ALL) + & ~filters.COMMAND, self._on_message ) ) - + logger.info("Starting Telegram bot (polling mode)...") - + # Initialize and start polling await self._app.initialize() await self._app.start() - + # Get bot info and register command menu bot_info = await self._app.bot.get_me() logger.info("Telegram bot @{} connected", bot_info.username) - + try: await self._app.bot.set_my_commands(self.BOT_COMMANDS) logger.debug("Telegram bot commands registered") except Exception as e: logger.warning("Failed to register bot commands: {}", e) - + # Start polling (this runs until stopped) await self._app.updater.start_polling( allowed_updates=["message"], drop_pending_updates=True # Ignore old messages on startup ) - + # Keep running until stopped while self._running: await asyncio.sleep(1) - + async def stop(self) -> None: """Stop the Telegram bot.""" self._running = False - + # Cancel all typing indicators for chat_id in list(self._typing_tasks): self._stop_typing(chat_id) @@ -201,14 +200,14 @@ class TelegramChannel(BaseChannel): task.cancel() self._media_group_tasks.pop(key, None) self._media_group_buffers.clear() - + if self._app: logger.info("Stopping Telegram bot...") await self._app.updater.stop() await self._app.stop() await self._app.shutdown() self._app = None - + @staticmethod def _get_media_type(path: str) -> str: """Guess media type from file extension.""" @@ -256,7 +255,7 @@ class TelegramChannel(BaseChannel): param = "photo" if media_type == "photo" else media_type if media_type in ("voice", "audio") else "document" with open(media_path, 'rb') as f: await sender( - chat_id=chat_id, + chat_id=chat_id, **{param: f}, reply_parameters=reply_params ) @@ -275,8 +274,8 @@ class TelegramChannel(BaseChannel): try: html = _markdown_to_telegram_html(chunk) await self._app.bot.send_message( - chat_id=chat_id, - text=html, + chat_id=chat_id, + text=html, parse_mode="HTML", reply_parameters=reply_params ) @@ -284,13 +283,13 @@ class TelegramChannel(BaseChannel): logger.warning("HTML parse failed, falling back to plain text: {}", e) try: await self._app.bot.send_message( - chat_id=chat_id, + chat_id=chat_id, text=chunk, reply_parameters=reply_params ) except Exception as e2: logger.error("Error sending Telegram message: {}", e2) - + async def _on_start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Handle /start command.""" if not update.message or not update.effective_user: @@ -329,34 +328,34 @@ class TelegramChannel(BaseChannel): chat_id=str(update.message.chat_id), content=update.message.text, ) - + async def _on_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Handle incoming messages (text, photos, voice, documents).""" if not update.message or not update.effective_user: return - + message = update.message user = update.effective_user chat_id = message.chat_id sender_id = self._sender_id(user) - + # Store chat_id for replies self._chat_ids[sender_id] = chat_id - + # Build content from text and/or media content_parts = [] media_paths = [] - + # Text content if message.text: content_parts.append(message.text) if message.caption: content_parts.append(message.caption) - + # Handle media files media_file = None media_type = None - + if message.photo: media_file = message.photo[-1] # Largest photo media_type = "image" @@ -369,23 +368,23 @@ class TelegramChannel(BaseChannel): elif message.document: media_file = message.document media_type = "file" - + # Download media if present if media_file and self._app: try: file = await self._app.bot.get_file(media_file.file_id) ext = self._get_extension(media_type, getattr(media_file, 'mime_type', None)) - + # Save to workspace/media/ from pathlib import Path media_dir = Path.home() / ".nanobot" / "media" media_dir.mkdir(parents=True, exist_ok=True) - + file_path = media_dir / f"{media_file.file_id[:16]}{ext}" await file.download_to_drive(str(file_path)) - + media_paths.append(str(file_path)) - + # Handle voice transcription if media_type == "voice" or media_type == "audio": from nanobot.providers.transcription import GroqTranscriptionProvider @@ -398,16 +397,16 @@ class TelegramChannel(BaseChannel): content_parts.append(f"[{media_type}: {file_path}]") else: content_parts.append(f"[{media_type}: {file_path}]") - + logger.debug("Downloaded {} to {}", media_type, file_path) except Exception as e: logger.error("Failed to download media: {}", e) content_parts.append(f"[{media_type}: download failed]") - + content = "\n".join(content_parts) if content_parts else "[empty message]" - + logger.debug("Telegram message from {}: {}...", sender_id, content[:50]) - + str_chat_id = str(chat_id) # Telegram media groups arrive as multiple messages sharing media_group_id. @@ -448,10 +447,10 @@ class TelegramChannel(BaseChannel): self._flush_media_group(group_key) ) return - + # Start typing indicator before processing self._start_typing(str_chat_id) - + # Forward to the message bus await self._handle_message( sender_id=sender_id, @@ -466,7 +465,7 @@ class TelegramChannel(BaseChannel): "is_group": message.chat.type != "private" } ) - + async def _flush_media_group(self, group_key: str, delay_s: float = 0.6) -> None: """Flush buffered Telegram media-group messages as one aggregated turn.""" try: @@ -509,13 +508,13 @@ class TelegramChannel(BaseChannel): # Cancel any existing typing task for this chat self._stop_typing(chat_id) self._typing_tasks[chat_id] = asyncio.create_task(self._typing_loop(chat_id)) - + def _stop_typing(self, chat_id: str) -> None: """Stop the typing indicator for a chat.""" task = self._typing_tasks.pop(chat_id, None) if task and not task.done(): task.cancel() - + async def _typing_loop(self, chat_id: str) -> None: """Repeatedly send 'typing' action until cancelled.""" try: @@ -526,7 +525,7 @@ class TelegramChannel(BaseChannel): pass except Exception as e: logger.debug("Typing indicator stopped for {}: {}", chat_id, e) - + async def _on_error(self, update: object, context: ContextTypes.DEFAULT_TYPE) -> None: """Log polling / handler errors instead of silently swallowing them.""" logger.error("Telegram error: {}", context.error) @@ -540,6 +539,6 @@ class TelegramChannel(BaseChannel): } if mime_type in ext_map: return ext_map[mime_type] - + type_map = {"image": ".jpg", "voice": ".ogg", "audio": ".mp3", "file": ""} return type_map.get(media_type, "") From aa2987be3eed79aa31cb4de9e49d7a751262d440 Mon Sep 17 00:00:00 2001 From: Re-bin Date: Fri, 27 Feb 2026 09:30:01 +0000 Subject: [PATCH 3/3] refactor: streamline Telegram media-group buffering --- nanobot/channels/telegram.py | 95 ++++++++++-------------------------- 1 file changed, 27 insertions(+), 68 deletions(-) diff --git a/nanobot/channels/telegram.py b/nanobot/channels/telegram.py index ed77963..969d853 100644 --- a/nanobot/channels/telegram.py +++ b/nanobot/channels/telegram.py @@ -127,7 +127,7 @@ class TelegramChannel(BaseChannel): self._app: Application | None = None self._chat_ids: dict[str, int] = {} # Map sender_id to chat_id for replies self._typing_tasks: dict[str, asyncio.Task] = {} # chat_id -> typing loop task - self._media_group_buffers: dict[str, dict[str, object]] = {} + self._media_group_buffers: dict[str, dict] = {} self._media_group_tasks: dict[str, asyncio.Task] = {} async def start(self) -> None: @@ -194,11 +194,9 @@ class TelegramChannel(BaseChannel): for chat_id in list(self._typing_tasks): self._stop_typing(chat_id) - # Cancel buffered media-group flush tasks - for key, task in list(self._media_group_tasks.items()): - if task and not task.done(): - task.cancel() - self._media_group_tasks.pop(key, None) + for task in self._media_group_tasks.values(): + task.cancel() + self._media_group_tasks.clear() self._media_group_buffers.clear() if self._app: @@ -409,43 +407,26 @@ class TelegramChannel(BaseChannel): str_chat_id = str(chat_id) - # Telegram media groups arrive as multiple messages sharing media_group_id. - # Buffer briefly and forward as one aggregated turn. - media_group_id = getattr(message, "media_group_id", None) - if media_group_id: - group_key = f"{str_chat_id}:{media_group_id}" - buffer = self._media_group_buffers.get(group_key) - if not buffer: - buffer = { - "sender_id": sender_id, - "chat_id": str_chat_id, - "contents": [], - "media": [], + # Telegram media groups: buffer briefly, forward as one aggregated turn. + if media_group_id := getattr(message, "media_group_id", None): + key = f"{str_chat_id}:{media_group_id}" + if key not in self._media_group_buffers: + self._media_group_buffers[key] = { + "sender_id": sender_id, "chat_id": str_chat_id, + "contents": [], "media": [], "metadata": { - "message_id": message.message_id, - "user_id": user.id, - "username": user.username, - "first_name": user.first_name, + "message_id": message.message_id, "user_id": user.id, + "username": user.username, "first_name": user.first_name, "is_group": message.chat.type != "private", - "media_group_id": media_group_id, }, } - self._media_group_buffers[group_key] = buffer self._start_typing(str_chat_id) - + buf = self._media_group_buffers[key] if content and content != "[empty message]": - cast_contents = buffer["contents"] - if isinstance(cast_contents, list): - cast_contents.append(content) - cast_media = buffer["media"] - if isinstance(cast_media, list): - cast_media.extend(media_paths) - - # Start one delayed flush task per media group. - if group_key not in self._media_group_tasks: - self._media_group_tasks[group_key] = asyncio.create_task( - self._flush_media_group(group_key) - ) + buf["contents"].append(content) + buf["media"].extend(media_paths) + if key not in self._media_group_tasks: + self._media_group_tasks[key] = asyncio.create_task(self._flush_media_group(key)) return # Start typing indicator before processing @@ -466,42 +447,20 @@ class TelegramChannel(BaseChannel): } ) - async def _flush_media_group(self, group_key: str, delay_s: float = 0.6) -> None: - """Flush buffered Telegram media-group messages as one aggregated turn.""" + async def _flush_media_group(self, key: str) -> None: + """Wait briefly, then forward buffered media-group as one turn.""" try: - await asyncio.sleep(delay_s) - buffer = self._media_group_buffers.pop(group_key, None) - if not buffer: + await asyncio.sleep(0.6) + if not (buf := self._media_group_buffers.pop(key, None)): return - - sender_id = str(buffer.get("sender_id", "")) - chat_id = str(buffer.get("chat_id", "")) - contents = buffer.get("contents") - media = buffer.get("media") - metadata = buffer.get("metadata") - - content_parts = [c for c in (contents if isinstance(contents, list) else []) if isinstance(c, str) and c] - media_paths = [m for m in (media if isinstance(media, list) else []) if isinstance(m, str) and m] - - # De-duplicate while preserving order - seen = set() - unique_media: list[str] = [] - for m in media_paths: - if m in seen: - continue - seen.add(m) - unique_media.append(m) - - content = "\n".join(content_parts) if content_parts else "[empty message]" + content = "\n".join(buf["contents"]) or "[empty message]" await self._handle_message( - sender_id=sender_id, - chat_id=chat_id, - content=content, - media=unique_media, - metadata=metadata if isinstance(metadata, dict) else {}, + sender_id=buf["sender_id"], chat_id=buf["chat_id"], + content=content, media=list(dict.fromkeys(buf["media"])), + metadata=buf["metadata"], ) finally: - self._media_group_tasks.pop(group_key, None) + self._media_group_tasks.pop(key, None) def _start_typing(self, chat_id: str) -> None: """Start sending 'typing...' indicator for a chat."""