"""Telegram channel implementation using python-telegram-bot.""" from __future__ import annotations import asyncio import re from loguru import logger from telegram import BotCommand, Update from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes from telegram.request import HTTPXRequest from nanobot.bus.events import OutboundMessage from nanobot.bus.queue import MessageBus from nanobot.channels.base import BaseChannel from nanobot.config.schema import TelegramConfig def _markdown_to_telegram_html(text: str) -> str: """ Convert markdown to Telegram-safe HTML. """ if not text: return "" # 1. Extract and protect code blocks (preserve content from other processing) code_blocks: list[str] = [] def save_code_block(m: re.Match) -> str: code_blocks.append(m.group(1)) return f"\x00CB{len(code_blocks) - 1}\x00" text = re.sub(r'```[\w]*\n?([\s\S]*?)```', save_code_block, text) # 2. Extract and protect inline code inline_codes: list[str] = [] def save_inline_code(m: re.Match) -> str: inline_codes.append(m.group(1)) return f"\x00IC{len(inline_codes) - 1}\x00" text = re.sub(r'`([^`]+)`', save_inline_code, text) # 3. Headers # Title -> just the title text text = re.sub(r'^#{1,6}\s+(.+)$', r'\1', text, flags=re.MULTILINE) # 4. Blockquotes > text -> just the text (before HTML escaping) text = re.sub(r'^>\s*(.*)$', r'\1', text, flags=re.MULTILINE) # 5. Escape HTML special characters text = text.replace("&", "&").replace("<", "<").replace(">", ">") # 6. Links [text](url) - must be before bold/italic to handle nested cases text = re.sub(r'\[([^\]]+)\]\(([^)]+)\)', r'\1', text) # 7. Bold **text** or __text__ text = re.sub(r'\*\*(.+?)\*\*', r'\1', text) text = re.sub(r'__(.+?)__', r'\1', text) # 8. Italic _text_ (avoid matching inside words like some_var_name) text = re.sub(r'(?\1', text) # 9. Strikethrough ~~text~~ text = re.sub(r'~~(.+?)~~', r'\1', text) # 10. Bullet lists - item -> • item text = re.sub(r'^[-*]\s+', '• ', text, flags=re.MULTILINE) # 11. Restore inline code with HTML tags for i, code in enumerate(inline_codes): # Escape HTML in code content escaped = code.replace("&", "&").replace("<", "<").replace(">", ">") text = text.replace(f"\x00IC{i}\x00", f"{escaped}") # 12. Restore code blocks with HTML tags for i, code in enumerate(code_blocks): # Escape HTML in code content escaped = code.replace("&", "&").replace("<", "<").replace(">", ">") text = text.replace(f"\x00CB{i}\x00", f"
{escaped}
") return text def _split_message(content: str, max_len: int = 4000) -> list[str]: """Split content into chunks within max_len, preferring line breaks.""" if len(content) <= max_len: return [content] chunks: list[str] = [] while content: if len(content) <= max_len: chunks.append(content) break cut = content[:max_len] pos = cut.rfind('\n') if pos == -1: pos = cut.rfind(' ') if pos == -1: pos = max_len chunks.append(content[:pos]) content = content[pos:].lstrip() return chunks class TelegramChannel(BaseChannel): """ Telegram channel using long polling. Simple and reliable - no webhook/public IP needed. """ name = "telegram" # Commands registered with Telegram's command menu BOT_COMMANDS = [ BotCommand("start", "Start the bot"), BotCommand("new", "Start a new conversation"), BotCommand("help", "Show available commands"), ] def __init__( self, config: TelegramConfig, bus: MessageBus, groq_api_key: str = "", ): super().__init__(config, bus) self.config: TelegramConfig = config self.groq_api_key = groq_api_key self._app: Application | None = None self._chat_ids: dict[str, int] = {} # Map sender_id to chat_id for replies self._typing_tasks: dict[str, asyncio.Task] = {} # chat_id -> typing loop task async def start(self) -> None: """Start the Telegram bot with long polling.""" if not self.config.token: logger.error("Telegram bot token not configured") return self._running = True # Build the application with larger connection pool to avoid pool-timeout on long runs req = HTTPXRequest(connection_pool_size=16, pool_timeout=5.0, connect_timeout=30.0, read_timeout=30.0) builder = Application.builder().token(self.config.token).request(req).get_updates_request(req) if self.config.proxy: builder = builder.proxy(self.config.proxy).get_updates_proxy(self.config.proxy) self._app = builder.build() self._app.add_error_handler(self._on_error) # Add command handlers self._app.add_handler(CommandHandler("start", self._on_start)) self._app.add_handler(CommandHandler("new", self._forward_command)) self._app.add_handler(CommandHandler("help", self._forward_command)) # Add message handler for text, photos, voice, documents self._app.add_handler( MessageHandler( (filters.TEXT | filters.PHOTO | filters.VOICE | filters.AUDIO | filters.Document.ALL) & ~filters.COMMAND, self._on_message ) ) logger.info("Starting Telegram bot (polling mode)...") # Initialize and start polling await self._app.initialize() await self._app.start() # Get bot info and register command menu bot_info = await self._app.bot.get_me() logger.info("Telegram bot @{} connected", bot_info.username) try: await self._app.bot.set_my_commands(self.BOT_COMMANDS) logger.debug("Telegram bot commands registered") except Exception as e: logger.warning("Failed to register bot commands: {}", e) # Start polling (this runs until stopped) await self._app.updater.start_polling( allowed_updates=["message"], drop_pending_updates=True # Ignore old messages on startup ) # Keep running until stopped while self._running: await asyncio.sleep(1) async def stop(self) -> None: """Stop the Telegram bot.""" self._running = False # Cancel all typing indicators for chat_id in list(self._typing_tasks): self._stop_typing(chat_id) if self._app: logger.info("Stopping Telegram bot...") await self._app.updater.stop() await self._app.stop() await self._app.shutdown() self._app = None @staticmethod def _get_media_type(path: str) -> str: """Guess media type from file extension.""" ext = path.rsplit(".", 1)[-1].lower() if "." in path else "" if ext in ("jpg", "jpeg", "png", "gif", "webp"): return "photo" if ext == "ogg": return "voice" if ext in ("mp3", "m4a", "wav", "aac"): return "audio" return "document" async def send(self, msg: OutboundMessage) -> None: """Send a message through Telegram.""" if not self._app: logger.warning("Telegram bot not running") return self._stop_typing(msg.chat_id) try: chat_id = int(msg.chat_id) except ValueError: logger.error("Invalid chat_id: {}", msg.chat_id) return # Send media files for media_path in (msg.media or []): try: media_type = self._get_media_type(media_path) sender = { "photo": self._app.bot.send_photo, "voice": self._app.bot.send_voice, "audio": self._app.bot.send_audio, }.get(media_type, self._app.bot.send_document) param = "photo" if media_type == "photo" else media_type if media_type in ("voice", "audio") else "document" with open(media_path, 'rb') as f: await sender(chat_id=chat_id, **{param: f}) except Exception as e: filename = media_path.rsplit("/", 1)[-1] logger.error("Failed to send media {}: {}", media_path, e) await self._app.bot.send_message(chat_id=chat_id, text=f"[Failed to send: {filename}]") # Send text content if msg.content and msg.content != "[empty message]": for chunk in _split_message(msg.content): try: html = _markdown_to_telegram_html(chunk) await self._app.bot.send_message(chat_id=chat_id, text=html, parse_mode="HTML") except Exception as e: logger.warning("HTML parse failed, falling back to plain text: {}", e) try: await self._app.bot.send_message(chat_id=chat_id, text=chunk) except Exception as e2: logger.error("Error sending Telegram message: {}", e2) async def _on_start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Handle /start command.""" if not update.message or not update.effective_user: return user = update.effective_user await update.message.reply_text( f"👋 Hi {user.first_name}! I'm nanobot.\n\n" "Send me a message and I'll respond!\n" "Type /help to see available commands." ) @staticmethod def _sender_id(user) -> str: """Build sender_id with username for allowlist matching.""" sid = str(user.id) return f"{sid}|{user.username}" if user.username else sid async def _forward_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Forward slash commands to the bus for unified handling in AgentLoop.""" if not update.message or not update.effective_user: return await self._handle_message( sender_id=self._sender_id(update.effective_user), chat_id=str(update.message.chat_id), content=update.message.text, ) async def _on_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Handle incoming messages (text, photos, voice, documents).""" if not update.message or not update.effective_user: return message = update.message user = update.effective_user chat_id = message.chat_id sender_id = self._sender_id(user) # Store chat_id for replies self._chat_ids[sender_id] = chat_id # Build content from text and/or media content_parts = [] media_paths = [] # Text content if message.text: content_parts.append(message.text) if message.caption: content_parts.append(message.caption) # Handle media files media_file = None media_type = None if message.photo: media_file = message.photo[-1] # Largest photo media_type = "image" elif message.voice: media_file = message.voice media_type = "voice" elif message.audio: media_file = message.audio media_type = "audio" elif message.document: media_file = message.document media_type = "file" # Download media if present if media_file and self._app: try: file = await self._app.bot.get_file(media_file.file_id) ext = self._get_extension(media_type, getattr(media_file, 'mime_type', None)) # Save to workspace/media/ from pathlib import Path media_dir = Path.home() / ".nanobot" / "media" media_dir.mkdir(parents=True, exist_ok=True) file_path = media_dir / f"{media_file.file_id[:16]}{ext}" await file.download_to_drive(str(file_path)) media_paths.append(str(file_path)) # Handle voice transcription if media_type == "voice" or media_type == "audio": from nanobot.providers.transcription import GroqTranscriptionProvider transcriber = GroqTranscriptionProvider(api_key=self.groq_api_key) transcription = await transcriber.transcribe(file_path) if transcription: logger.info("Transcribed {}: {}...", media_type, transcription[:50]) content_parts.append(f"[transcription: {transcription}]") else: content_parts.append(f"[{media_type}: {file_path}]") else: content_parts.append(f"[{media_type}: {file_path}]") logger.debug("Downloaded {} to {}", media_type, file_path) except Exception as e: logger.error("Failed to download media: {}", e) content_parts.append(f"[{media_type}: download failed]") content = "\n".join(content_parts) if content_parts else "[empty message]" logger.debug("Telegram message from {}: {}...", sender_id, content[:50]) str_chat_id = str(chat_id) # Start typing indicator before processing self._start_typing(str_chat_id) # Forward to the message bus await self._handle_message( sender_id=sender_id, chat_id=str_chat_id, content=content, media=media_paths, metadata={ "message_id": message.message_id, "user_id": user.id, "username": user.username, "first_name": user.first_name, "is_group": message.chat.type != "private" } ) def _start_typing(self, chat_id: str) -> None: """Start sending 'typing...' indicator for a chat.""" # Cancel any existing typing task for this chat self._stop_typing(chat_id) self._typing_tasks[chat_id] = asyncio.create_task(self._typing_loop(chat_id)) def _stop_typing(self, chat_id: str) -> None: """Stop the typing indicator for a chat.""" task = self._typing_tasks.pop(chat_id, None) if task and not task.done(): task.cancel() async def _typing_loop(self, chat_id: str) -> None: """Repeatedly send 'typing' action until cancelled.""" try: while self._app: await self._app.bot.send_chat_action(chat_id=int(chat_id), action="typing") await asyncio.sleep(4) except asyncio.CancelledError: pass except Exception as e: logger.debug("Typing indicator stopped for {}: {}", chat_id, e) async def _on_error(self, update: object, context: ContextTypes.DEFAULT_TYPE) -> None: """Log polling / handler errors instead of silently swallowing them.""" logger.error("Telegram error: {}", context.error) def _get_extension(self, media_type: str, mime_type: str | None) -> str: """Get file extension based on media type.""" if mime_type: ext_map = { "image/jpeg": ".jpg", "image/png": ".png", "image/gif": ".gif", "audio/ogg": ".ogg", "audio/mpeg": ".mp3", "audio/mp4": ".m4a", } if mime_type in ext_map: return ext_map[mime_type] type_map = {"image": ".jpg", "voice": ".ogg", "audio": ".mp3", "file": ""} return type_map.get(media_type, "")