diff --git a/nanobot/channels/telegram.py b/nanobot/channels/telegram.py index bf2da73..ed77963 100644 --- a/nanobot/channels/telegram.py +++ b/nanobot/channels/telegram.py @@ -4,10 +4,9 @@ from __future__ import annotations import asyncio import re - from loguru import logger -from telegram import BotCommand, ReplyParameters, Update -from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters +from telegram import BotCommand, Update, ReplyParameters +from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes from telegram.request import HTTPXRequest from nanobot.bus.events import OutboundMessage @@ -22,60 +21,60 @@ def _markdown_to_telegram_html(text: str) -> str: """ if not text: return "" - + # 1. Extract and protect code blocks (preserve content from other processing) code_blocks: list[str] = [] def save_code_block(m: re.Match) -> str: code_blocks.append(m.group(1)) return f"\x00CB{len(code_blocks) - 1}\x00" - + text = re.sub(r'```[\w]*\n?([\s\S]*?)```', save_code_block, text) - + # 2. Extract and protect inline code inline_codes: list[str] = [] def save_inline_code(m: re.Match) -> str: inline_codes.append(m.group(1)) return f"\x00IC{len(inline_codes) - 1}\x00" - + text = re.sub(r'`([^`]+)`', save_inline_code, text) - + # 3. Headers # Title -> just the title text text = re.sub(r'^#{1,6}\s+(.+)$', r'\1', text, flags=re.MULTILINE) - + # 4. Blockquotes > text -> just the text (before HTML escaping) text = re.sub(r'^>\s*(.*)$', r'\1', text, flags=re.MULTILINE) - + # 5. Escape HTML special characters text = text.replace("&", "&").replace("<", "<").replace(">", ">") - + # 6. Links [text](url) - must be before bold/italic to handle nested cases text = re.sub(r'\[([^\]]+)\]\(([^)]+)\)', r'\1', text) - + # 7. Bold **text** or __text__ text = re.sub(r'\*\*(.+?)\*\*', r'\1', text) text = re.sub(r'__(.+?)__', r'\1', text) - + # 8. Italic _text_ (avoid matching inside words like some_var_name) text = re.sub(r'(?\1', text) - + # 9. Strikethrough ~~text~~ text = re.sub(r'~~(.+?)~~', r'\1', text) - + # 10. Bullet lists - item -> • item text = re.sub(r'^[-*]\s+', '• ', text, flags=re.MULTILINE) - + # 11. Restore inline code with HTML tags for i, code in enumerate(inline_codes): # Escape HTML in code content escaped = code.replace("&", "&").replace("<", "<").replace(">", ">") text = text.replace(f"\x00IC{i}\x00", f"{escaped}") - + # 12. Restore code blocks with HTML tags for i, code in enumerate(code_blocks): # Escape HTML in code content escaped = code.replace("&", "&").replace("<", "<").replace(">", ">") text = text.replace(f"\x00CB{i}\x00", f"
{escaped}
") - + return text @@ -102,12 +101,12 @@ def _split_message(content: str, max_len: int = 4000) -> list[str]: class TelegramChannel(BaseChannel): """ Telegram channel using long polling. - + Simple and reliable - no webhook/public IP needed. """ - + name = "telegram" - + # Commands registered with Telegram's command menu BOT_COMMANDS = [ BotCommand("start", "Start the bot"), @@ -115,7 +114,7 @@ class TelegramChannel(BaseChannel): BotCommand("stop", "Stop the current task"), BotCommand("help", "Show available commands"), ] - + def __init__( self, config: TelegramConfig, @@ -130,15 +129,15 @@ class TelegramChannel(BaseChannel): self._typing_tasks: dict[str, asyncio.Task] = {} # chat_id -> typing loop task self._media_group_buffers: dict[str, dict[str, object]] = {} self._media_group_tasks: dict[str, asyncio.Task] = {} - + async def start(self) -> None: """Start the Telegram bot with long polling.""" if not self.config.token: logger.error("Telegram bot token not configured") return - + self._running = True - + # Build the application with larger connection pool to avoid pool-timeout on long runs req = HTTPXRequest(connection_pool_size=16, pool_timeout=5.0, connect_timeout=30.0, read_timeout=30.0) builder = Application.builder().token(self.config.token).request(req).get_updates_request(req) @@ -146,51 +145,51 @@ class TelegramChannel(BaseChannel): builder = builder.proxy(self.config.proxy).get_updates_proxy(self.config.proxy) self._app = builder.build() self._app.add_error_handler(self._on_error) - + # Add command handlers self._app.add_handler(CommandHandler("start", self._on_start)) self._app.add_handler(CommandHandler("new", self._forward_command)) self._app.add_handler(CommandHandler("help", self._on_help)) - + # Add message handler for text, photos, voice, documents self._app.add_handler( MessageHandler( - (filters.TEXT | filters.PHOTO | filters.VOICE | filters.AUDIO | filters.Document.ALL) - & ~filters.COMMAND, + (filters.TEXT | filters.PHOTO | filters.VOICE | filters.AUDIO | filters.Document.ALL) + & ~filters.COMMAND, self._on_message ) ) - + logger.info("Starting Telegram bot (polling mode)...") - + # Initialize and start polling await self._app.initialize() await self._app.start() - + # Get bot info and register command menu bot_info = await self._app.bot.get_me() logger.info("Telegram bot @{} connected", bot_info.username) - + try: await self._app.bot.set_my_commands(self.BOT_COMMANDS) logger.debug("Telegram bot commands registered") except Exception as e: logger.warning("Failed to register bot commands: {}", e) - + # Start polling (this runs until stopped) await self._app.updater.start_polling( allowed_updates=["message"], drop_pending_updates=True # Ignore old messages on startup ) - + # Keep running until stopped while self._running: await asyncio.sleep(1) - + async def stop(self) -> None: """Stop the Telegram bot.""" self._running = False - + # Cancel all typing indicators for chat_id in list(self._typing_tasks): self._stop_typing(chat_id) @@ -201,14 +200,14 @@ class TelegramChannel(BaseChannel): task.cancel() self._media_group_tasks.pop(key, None) self._media_group_buffers.clear() - + if self._app: logger.info("Stopping Telegram bot...") await self._app.updater.stop() await self._app.stop() await self._app.shutdown() self._app = None - + @staticmethod def _get_media_type(path: str) -> str: """Guess media type from file extension.""" @@ -256,7 +255,7 @@ class TelegramChannel(BaseChannel): param = "photo" if media_type == "photo" else media_type if media_type in ("voice", "audio") else "document" with open(media_path, 'rb') as f: await sender( - chat_id=chat_id, + chat_id=chat_id, **{param: f}, reply_parameters=reply_params ) @@ -275,8 +274,8 @@ class TelegramChannel(BaseChannel): try: html = _markdown_to_telegram_html(chunk) await self._app.bot.send_message( - chat_id=chat_id, - text=html, + chat_id=chat_id, + text=html, parse_mode="HTML", reply_parameters=reply_params ) @@ -284,13 +283,13 @@ class TelegramChannel(BaseChannel): logger.warning("HTML parse failed, falling back to plain text: {}", e) try: await self._app.bot.send_message( - chat_id=chat_id, + chat_id=chat_id, text=chunk, reply_parameters=reply_params ) except Exception as e2: logger.error("Error sending Telegram message: {}", e2) - + async def _on_start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Handle /start command.""" if not update.message or not update.effective_user: @@ -329,34 +328,34 @@ class TelegramChannel(BaseChannel): chat_id=str(update.message.chat_id), content=update.message.text, ) - + async def _on_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Handle incoming messages (text, photos, voice, documents).""" if not update.message or not update.effective_user: return - + message = update.message user = update.effective_user chat_id = message.chat_id sender_id = self._sender_id(user) - + # Store chat_id for replies self._chat_ids[sender_id] = chat_id - + # Build content from text and/or media content_parts = [] media_paths = [] - + # Text content if message.text: content_parts.append(message.text) if message.caption: content_parts.append(message.caption) - + # Handle media files media_file = None media_type = None - + if message.photo: media_file = message.photo[-1] # Largest photo media_type = "image" @@ -369,23 +368,23 @@ class TelegramChannel(BaseChannel): elif message.document: media_file = message.document media_type = "file" - + # Download media if present if media_file and self._app: try: file = await self._app.bot.get_file(media_file.file_id) ext = self._get_extension(media_type, getattr(media_file, 'mime_type', None)) - + # Save to workspace/media/ from pathlib import Path media_dir = Path.home() / ".nanobot" / "media" media_dir.mkdir(parents=True, exist_ok=True) - + file_path = media_dir / f"{media_file.file_id[:16]}{ext}" await file.download_to_drive(str(file_path)) - + media_paths.append(str(file_path)) - + # Handle voice transcription if media_type == "voice" or media_type == "audio": from nanobot.providers.transcription import GroqTranscriptionProvider @@ -398,16 +397,16 @@ class TelegramChannel(BaseChannel): content_parts.append(f"[{media_type}: {file_path}]") else: content_parts.append(f"[{media_type}: {file_path}]") - + logger.debug("Downloaded {} to {}", media_type, file_path) except Exception as e: logger.error("Failed to download media: {}", e) content_parts.append(f"[{media_type}: download failed]") - + content = "\n".join(content_parts) if content_parts else "[empty message]" - + logger.debug("Telegram message from {}: {}...", sender_id, content[:50]) - + str_chat_id = str(chat_id) # Telegram media groups arrive as multiple messages sharing media_group_id. @@ -448,10 +447,10 @@ class TelegramChannel(BaseChannel): self._flush_media_group(group_key) ) return - + # Start typing indicator before processing self._start_typing(str_chat_id) - + # Forward to the message bus await self._handle_message( sender_id=sender_id, @@ -466,7 +465,7 @@ class TelegramChannel(BaseChannel): "is_group": message.chat.type != "private" } ) - + async def _flush_media_group(self, group_key: str, delay_s: float = 0.6) -> None: """Flush buffered Telegram media-group messages as one aggregated turn.""" try: @@ -509,13 +508,13 @@ class TelegramChannel(BaseChannel): # Cancel any existing typing task for this chat self._stop_typing(chat_id) self._typing_tasks[chat_id] = asyncio.create_task(self._typing_loop(chat_id)) - + def _stop_typing(self, chat_id: str) -> None: """Stop the typing indicator for a chat.""" task = self._typing_tasks.pop(chat_id, None) if task and not task.done(): task.cancel() - + async def _typing_loop(self, chat_id: str) -> None: """Repeatedly send 'typing' action until cancelled.""" try: @@ -526,7 +525,7 @@ class TelegramChannel(BaseChannel): pass except Exception as e: logger.debug("Typing indicator stopped for {}: {}", chat_id, e) - + async def _on_error(self, update: object, context: ContextTypes.DEFAULT_TYPE) -> None: """Log polling / handler errors instead of silently swallowing them.""" logger.error("Telegram error: {}", context.error) @@ -540,6 +539,6 @@ class TelegramChannel(BaseChannel): } if mime_type in ext_map: return ext_map[mime_type] - + type_map = {"image": ".jpg", "voice": ".ogg", "audio": ".mp3", "file": ""} return type_map.get(media_type, "")