798 lines
30 KiB
Python
798 lines
30 KiB
Python
"""Telegram channel implementation using python-telegram-bot."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import re
|
|
import time
|
|
import unicodedata
|
|
from typing import Any, Literal
|
|
|
|
from loguru import logger
|
|
from pydantic import Field
|
|
from telegram import BotCommand, ReplyParameters, Update
|
|
from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters
|
|
from telegram.request import HTTPXRequest
|
|
|
|
from nanobot.bus.events import OutboundMessage
|
|
from nanobot.bus.queue import MessageBus
|
|
from nanobot.channels.base import BaseChannel
|
|
from nanobot.config.paths import get_media_dir
|
|
from nanobot.config.schema import Base
|
|
from nanobot.utils.helpers import split_message
|
|
|
|
TELEGRAM_MAX_MESSAGE_LEN = 4000 # Telegram message character limit
|
|
TELEGRAM_REPLY_CONTEXT_MAX_LEN = TELEGRAM_MAX_MESSAGE_LEN # Max length for reply context in user message
|
|
|
|
|
|
def _strip_md(s: str) -> str:
|
|
"""Strip markdown inline formatting from text."""
|
|
s = re.sub(r'\*\*(.+?)\*\*', r'\1', s)
|
|
s = re.sub(r'__(.+?)__', r'\1', s)
|
|
s = re.sub(r'~~(.+?)~~', r'\1', s)
|
|
s = re.sub(r'`([^`]+)`', r'\1', s)
|
|
return s.strip()
|
|
|
|
|
|
def _render_table_box(table_lines: list[str]) -> str:
|
|
"""Convert markdown pipe-table to compact aligned text for <pre> display."""
|
|
|
|
def dw(s: str) -> int:
|
|
return sum(2 if unicodedata.east_asian_width(c) in ('W', 'F') else 1 for c in s)
|
|
|
|
rows: list[list[str]] = []
|
|
has_sep = False
|
|
for line in table_lines:
|
|
cells = [_strip_md(c) for c in line.strip().strip('|').split('|')]
|
|
if all(re.match(r'^:?-+:?$', c) for c in cells if c):
|
|
has_sep = True
|
|
continue
|
|
rows.append(cells)
|
|
if not rows or not has_sep:
|
|
return '\n'.join(table_lines)
|
|
|
|
ncols = max(len(r) for r in rows)
|
|
for r in rows:
|
|
r.extend([''] * (ncols - len(r)))
|
|
widths = [max(dw(r[c]) for r in rows) for c in range(ncols)]
|
|
|
|
def dr(cells: list[str]) -> str:
|
|
return ' '.join(f'{c}{" " * (w - dw(c))}' for c, w in zip(cells, widths))
|
|
|
|
out = [dr(rows[0])]
|
|
out.append(' '.join('─' * w for w in widths))
|
|
for row in rows[1:]:
|
|
out.append(dr(row))
|
|
return '\n'.join(out)
|
|
|
|
|
|
def _markdown_to_telegram_html(text: str) -> str:
|
|
"""
|
|
Convert markdown to Telegram-safe HTML.
|
|
"""
|
|
if not text:
|
|
return ""
|
|
|
|
# 1. Extract and protect code blocks (preserve content from other processing)
|
|
code_blocks: list[str] = []
|
|
def save_code_block(m: re.Match) -> str:
|
|
code_blocks.append(m.group(1))
|
|
return f"\x00CB{len(code_blocks) - 1}\x00"
|
|
|
|
text = re.sub(r'```[\w]*\n?([\s\S]*?)```', save_code_block, text)
|
|
|
|
# 1.5. Convert markdown tables to box-drawing (reuse code_block placeholders)
|
|
lines = text.split('\n')
|
|
rebuilt: list[str] = []
|
|
li = 0
|
|
while li < len(lines):
|
|
if re.match(r'^\s*\|.+\|', lines[li]):
|
|
tbl: list[str] = []
|
|
while li < len(lines) and re.match(r'^\s*\|.+\|', lines[li]):
|
|
tbl.append(lines[li])
|
|
li += 1
|
|
box = _render_table_box(tbl)
|
|
if box != '\n'.join(tbl):
|
|
code_blocks.append(box)
|
|
rebuilt.append(f"\x00CB{len(code_blocks) - 1}\x00")
|
|
else:
|
|
rebuilt.extend(tbl)
|
|
else:
|
|
rebuilt.append(lines[li])
|
|
li += 1
|
|
text = '\n'.join(rebuilt)
|
|
|
|
# 2. Extract and protect inline code
|
|
inline_codes: list[str] = []
|
|
def save_inline_code(m: re.Match) -> str:
|
|
inline_codes.append(m.group(1))
|
|
return f"\x00IC{len(inline_codes) - 1}\x00"
|
|
|
|
text = re.sub(r'`([^`]+)`', save_inline_code, text)
|
|
|
|
# 3. Headers # Title -> just the title text
|
|
text = re.sub(r'^#{1,6}\s+(.+)$', r'\1', text, flags=re.MULTILINE)
|
|
|
|
# 4. Blockquotes > text -> just the text (before HTML escaping)
|
|
text = re.sub(r'^>\s*(.*)$', r'\1', text, flags=re.MULTILINE)
|
|
|
|
# 5. Escape HTML special characters
|
|
text = text.replace("&", "&").replace("<", "<").replace(">", ">")
|
|
|
|
# 6. Links [text](url) - must be before bold/italic to handle nested cases
|
|
text = re.sub(r'\[([^\]]+)\]\(([^)]+)\)', r'<a href="\2">\1</a>', text)
|
|
|
|
# 7. Bold **text** or __text__
|
|
text = re.sub(r'\*\*(.+?)\*\*', r'<b>\1</b>', text)
|
|
text = re.sub(r'__(.+?)__', r'<b>\1</b>', text)
|
|
|
|
# 8. Italic _text_ (avoid matching inside words like some_var_name)
|
|
text = re.sub(r'(?<![a-zA-Z0-9])_([^_]+)_(?![a-zA-Z0-9])', r'<i>\1</i>', text)
|
|
|
|
# 9. Strikethrough ~~text~~
|
|
text = re.sub(r'~~(.+?)~~', r'<s>\1</s>', text)
|
|
|
|
# 10. Bullet lists - item -> • item
|
|
text = re.sub(r'^[-*]\s+', '• ', text, flags=re.MULTILINE)
|
|
|
|
# 11. Restore inline code with HTML tags
|
|
for i, code in enumerate(inline_codes):
|
|
# Escape HTML in code content
|
|
escaped = code.replace("&", "&").replace("<", "<").replace(">", ">")
|
|
text = text.replace(f"\x00IC{i}\x00", f"<code>{escaped}</code>")
|
|
|
|
# 12. Restore code blocks with HTML tags
|
|
for i, code in enumerate(code_blocks):
|
|
# Escape HTML in code content
|
|
escaped = code.replace("&", "&").replace("<", "<").replace(">", ">")
|
|
text = text.replace(f"\x00CB{i}\x00", f"<pre><code>{escaped}</code></pre>")
|
|
|
|
return text
|
|
|
|
|
|
class TelegramConfig(Base):
|
|
"""Telegram channel configuration."""
|
|
|
|
enabled: bool = False
|
|
token: str = ""
|
|
allow_from: list[str] = Field(default_factory=list)
|
|
proxy: str | None = None
|
|
reply_to_message: bool = False
|
|
group_policy: Literal["open", "mention"] = "mention"
|
|
|
|
|
|
class TelegramChannel(BaseChannel):
|
|
"""
|
|
Telegram channel using long polling.
|
|
|
|
Simple and reliable - no webhook/public IP needed.
|
|
"""
|
|
|
|
name = "telegram"
|
|
display_name = "Telegram"
|
|
|
|
# Commands registered with Telegram's command menu
|
|
BOT_COMMANDS = [
|
|
BotCommand("start", "Start the bot"),
|
|
BotCommand("new", "Start a new conversation"),
|
|
BotCommand("stop", "Stop the current task"),
|
|
BotCommand("help", "Show available commands"),
|
|
BotCommand("restart", "Restart the bot"),
|
|
]
|
|
|
|
@classmethod
|
|
def default_config(cls) -> dict[str, Any]:
|
|
return TelegramConfig().model_dump(by_alias=True)
|
|
|
|
def __init__(self, config: Any, bus: MessageBus):
|
|
if isinstance(config, dict):
|
|
config = TelegramConfig.model_validate(config)
|
|
super().__init__(config, bus)
|
|
self.config: TelegramConfig = config
|
|
self._app: Application | None = None
|
|
self._chat_ids: dict[str, int] = {} # Map sender_id to chat_id for replies
|
|
self._typing_tasks: dict[str, asyncio.Task] = {} # chat_id -> typing loop task
|
|
self._media_group_buffers: dict[str, dict] = {}
|
|
self._media_group_tasks: dict[str, asyncio.Task] = {}
|
|
self._message_threads: dict[tuple[str, int], int] = {}
|
|
self._bot_user_id: int | None = None
|
|
self._bot_username: str | None = None
|
|
|
|
def is_allowed(self, sender_id: str) -> bool:
|
|
"""Preserve Telegram's legacy id|username allowlist matching."""
|
|
if super().is_allowed(sender_id):
|
|
return True
|
|
|
|
allow_list = getattr(self.config, "allow_from", [])
|
|
if not allow_list or "*" in allow_list:
|
|
return False
|
|
|
|
sender_str = str(sender_id)
|
|
if sender_str.count("|") != 1:
|
|
return False
|
|
|
|
sid, username = sender_str.split("|", 1)
|
|
if not sid.isdigit() or not username:
|
|
return False
|
|
|
|
return sid in allow_list or username in allow_list
|
|
|
|
async def start(self) -> None:
|
|
"""Start the Telegram bot with long polling."""
|
|
if not self.config.token:
|
|
logger.error("Telegram bot token not configured")
|
|
return
|
|
|
|
self._running = True
|
|
|
|
# Build the application with larger connection pool to avoid pool-timeout on long runs
|
|
req = HTTPXRequest(
|
|
connection_pool_size=16,
|
|
pool_timeout=5.0,
|
|
connect_timeout=30.0,
|
|
read_timeout=30.0,
|
|
proxy=self.config.proxy if self.config.proxy else None,
|
|
)
|
|
builder = Application.builder().token(self.config.token).request(req).get_updates_request(req)
|
|
self._app = builder.build()
|
|
self._app.add_error_handler(self._on_error)
|
|
|
|
# Add command handlers
|
|
self._app.add_handler(CommandHandler("start", self._on_start))
|
|
self._app.add_handler(CommandHandler("new", self._forward_command))
|
|
self._app.add_handler(CommandHandler("stop", self._forward_command))
|
|
self._app.add_handler(CommandHandler("restart", self._forward_command))
|
|
self._app.add_handler(CommandHandler("help", self._on_help))
|
|
|
|
# Add message handler for text, photos, voice, documents
|
|
self._app.add_handler(
|
|
MessageHandler(
|
|
(filters.TEXT | filters.PHOTO | filters.VOICE | filters.AUDIO | filters.Document.ALL)
|
|
& ~filters.COMMAND,
|
|
self._on_message
|
|
)
|
|
)
|
|
|
|
logger.info("Starting Telegram bot (polling mode)...")
|
|
|
|
# Initialize and start polling
|
|
await self._app.initialize()
|
|
await self._app.start()
|
|
|
|
# Get bot info and register command menu
|
|
bot_info = await self._app.bot.get_me()
|
|
self._bot_user_id = getattr(bot_info, "id", None)
|
|
self._bot_username = getattr(bot_info, "username", None)
|
|
logger.info("Telegram bot @{} connected", bot_info.username)
|
|
|
|
try:
|
|
await self._app.bot.set_my_commands(self.BOT_COMMANDS)
|
|
logger.debug("Telegram bot commands registered")
|
|
except Exception as e:
|
|
logger.warning("Failed to register bot commands: {}", e)
|
|
|
|
# Start polling (this runs until stopped)
|
|
await self._app.updater.start_polling(
|
|
allowed_updates=["message"],
|
|
drop_pending_updates=True # Ignore old messages on startup
|
|
)
|
|
|
|
# Keep running until stopped
|
|
while self._running:
|
|
await asyncio.sleep(1)
|
|
|
|
async def stop(self) -> None:
|
|
"""Stop the Telegram bot."""
|
|
self._running = False
|
|
|
|
# Cancel all typing indicators
|
|
for chat_id in list(self._typing_tasks):
|
|
self._stop_typing(chat_id)
|
|
|
|
for task in self._media_group_tasks.values():
|
|
task.cancel()
|
|
self._media_group_tasks.clear()
|
|
self._media_group_buffers.clear()
|
|
|
|
if self._app:
|
|
logger.info("Stopping Telegram bot...")
|
|
await self._app.updater.stop()
|
|
await self._app.stop()
|
|
await self._app.shutdown()
|
|
self._app = None
|
|
|
|
@staticmethod
|
|
def _get_media_type(path: str) -> str:
|
|
"""Guess media type from file extension."""
|
|
ext = path.rsplit(".", 1)[-1].lower() if "." in path else ""
|
|
if ext in ("jpg", "jpeg", "png", "gif", "webp"):
|
|
return "photo"
|
|
if ext == "ogg":
|
|
return "voice"
|
|
if ext in ("mp3", "m4a", "wav", "aac"):
|
|
return "audio"
|
|
return "document"
|
|
|
|
async def send(self, msg: OutboundMessage) -> None:
|
|
"""Send a message through Telegram."""
|
|
if not self._app:
|
|
logger.warning("Telegram bot not running")
|
|
return
|
|
|
|
# Only stop typing indicator for final responses
|
|
if not msg.metadata.get("_progress", False):
|
|
self._stop_typing(msg.chat_id)
|
|
|
|
try:
|
|
chat_id = int(msg.chat_id)
|
|
except ValueError:
|
|
logger.error("Invalid chat_id: {}", msg.chat_id)
|
|
return
|
|
reply_to_message_id = msg.metadata.get("message_id")
|
|
message_thread_id = msg.metadata.get("message_thread_id")
|
|
if message_thread_id is None and reply_to_message_id is not None:
|
|
message_thread_id = self._message_threads.get((msg.chat_id, reply_to_message_id))
|
|
thread_kwargs = {}
|
|
if message_thread_id is not None:
|
|
thread_kwargs["message_thread_id"] = message_thread_id
|
|
|
|
reply_params = None
|
|
if self.config.reply_to_message:
|
|
if reply_to_message_id:
|
|
reply_params = ReplyParameters(
|
|
message_id=reply_to_message_id,
|
|
allow_sending_without_reply=True
|
|
)
|
|
|
|
# Send media files
|
|
for media_path in (msg.media or []):
|
|
try:
|
|
media_type = self._get_media_type(media_path)
|
|
sender = {
|
|
"photo": self._app.bot.send_photo,
|
|
"voice": self._app.bot.send_voice,
|
|
"audio": self._app.bot.send_audio,
|
|
}.get(media_type, self._app.bot.send_document)
|
|
param = "photo" if media_type == "photo" else media_type if media_type in ("voice", "audio") else "document"
|
|
with open(media_path, 'rb') as f:
|
|
await sender(
|
|
chat_id=chat_id,
|
|
**{param: f},
|
|
reply_parameters=reply_params,
|
|
**thread_kwargs,
|
|
)
|
|
except Exception as e:
|
|
filename = media_path.rsplit("/", 1)[-1]
|
|
logger.error("Failed to send media {}: {}", media_path, e)
|
|
await self._app.bot.send_message(
|
|
chat_id=chat_id,
|
|
text=f"[Failed to send: {filename}]",
|
|
reply_parameters=reply_params,
|
|
**thread_kwargs,
|
|
)
|
|
|
|
# Send text content
|
|
if msg.content and msg.content != "[empty message]":
|
|
is_progress = msg.metadata.get("_progress", False)
|
|
|
|
for chunk in split_message(msg.content, TELEGRAM_MAX_MESSAGE_LEN):
|
|
# Final response: simulate streaming via draft, then persist
|
|
if not is_progress:
|
|
await self._send_with_streaming(chat_id, chunk, reply_params, thread_kwargs)
|
|
else:
|
|
await self._send_text(chat_id, chunk, reply_params, thread_kwargs)
|
|
|
|
async def _send_text(
|
|
self,
|
|
chat_id: int,
|
|
text: str,
|
|
reply_params=None,
|
|
thread_kwargs: dict | None = None,
|
|
) -> None:
|
|
"""Send a plain text message with HTML fallback."""
|
|
try:
|
|
html = _markdown_to_telegram_html(text)
|
|
await self._app.bot.send_message(
|
|
chat_id=chat_id, text=html, parse_mode="HTML",
|
|
reply_parameters=reply_params,
|
|
**(thread_kwargs or {}),
|
|
)
|
|
except Exception as e:
|
|
logger.warning("HTML parse failed, falling back to plain text: {}", e)
|
|
try:
|
|
await self._app.bot.send_message(
|
|
chat_id=chat_id,
|
|
text=text,
|
|
reply_parameters=reply_params,
|
|
**(thread_kwargs or {}),
|
|
)
|
|
except Exception as e2:
|
|
logger.error("Error sending Telegram message: {}", e2)
|
|
|
|
async def _send_with_streaming(
|
|
self,
|
|
chat_id: int,
|
|
text: str,
|
|
reply_params=None,
|
|
thread_kwargs: dict | None = None,
|
|
) -> None:
|
|
"""Simulate streaming via send_message_draft, then persist with send_message."""
|
|
draft_id = int(time.time() * 1000) % (2**31)
|
|
try:
|
|
step = max(len(text) // 8, 40)
|
|
for i in range(step, len(text), step):
|
|
await self._app.bot.send_message_draft(
|
|
chat_id=chat_id, draft_id=draft_id, text=text[:i],
|
|
)
|
|
await asyncio.sleep(0.04)
|
|
await self._app.bot.send_message_draft(
|
|
chat_id=chat_id, draft_id=draft_id, text=text,
|
|
)
|
|
await asyncio.sleep(0.15)
|
|
except Exception:
|
|
pass
|
|
await self._send_text(chat_id, text, reply_params, thread_kwargs)
|
|
|
|
async def _on_start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
"""Handle /start command."""
|
|
if not update.message or not update.effective_user:
|
|
return
|
|
|
|
user = update.effective_user
|
|
await update.message.reply_text(
|
|
f"👋 Hi {user.first_name}! I'm nanobot.\n\n"
|
|
"Send me a message and I'll respond!\n"
|
|
"Type /help to see available commands."
|
|
)
|
|
|
|
async def _on_help(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
"""Handle /help command, bypassing ACL so all users can access it."""
|
|
if not update.message:
|
|
return
|
|
await update.message.reply_text(
|
|
"🐈 nanobot commands:\n"
|
|
"/new — Start a new conversation\n"
|
|
"/stop — Stop the current task\n"
|
|
"/restart — Restart the bot\n"
|
|
"/help — Show available commands"
|
|
)
|
|
|
|
@staticmethod
|
|
def _sender_id(user) -> str:
|
|
"""Build sender_id with username for allowlist matching."""
|
|
sid = str(user.id)
|
|
return f"{sid}|{user.username}" if user.username else sid
|
|
|
|
@staticmethod
|
|
def _derive_topic_session_key(message) -> str | None:
|
|
"""Derive topic-scoped session key for non-private Telegram chats."""
|
|
message_thread_id = getattr(message, "message_thread_id", None)
|
|
if message.chat.type == "private" or message_thread_id is None:
|
|
return None
|
|
return f"telegram:{message.chat_id}:topic:{message_thread_id}"
|
|
|
|
@staticmethod
|
|
def _build_message_metadata(message, user) -> dict:
|
|
"""Build common Telegram inbound metadata payload."""
|
|
reply_to = getattr(message, "reply_to_message", None)
|
|
return {
|
|
"message_id": message.message_id,
|
|
"user_id": user.id,
|
|
"username": user.username,
|
|
"first_name": user.first_name,
|
|
"is_group": message.chat.type != "private",
|
|
"message_thread_id": getattr(message, "message_thread_id", None),
|
|
"is_forum": bool(getattr(message.chat, "is_forum", False)),
|
|
"reply_to_message_id": getattr(reply_to, "message_id", None) if reply_to else None,
|
|
}
|
|
|
|
@staticmethod
|
|
def _extract_reply_context(message) -> str | None:
|
|
"""Extract text from the message being replied to, if any."""
|
|
reply = getattr(message, "reply_to_message", None)
|
|
if not reply:
|
|
return None
|
|
text = getattr(reply, "text", None) or getattr(reply, "caption", None) or ""
|
|
if len(text) > TELEGRAM_REPLY_CONTEXT_MAX_LEN:
|
|
text = text[:TELEGRAM_REPLY_CONTEXT_MAX_LEN] + "..."
|
|
return f"[Reply to: {text}]" if text else None
|
|
|
|
async def _download_message_media(
|
|
self, msg, *, add_failure_content: bool = False
|
|
) -> tuple[list[str], list[str]]:
|
|
"""Download media from a message (current or reply). Returns (media_paths, content_parts)."""
|
|
media_file = None
|
|
media_type = None
|
|
if getattr(msg, "photo", None):
|
|
media_file = msg.photo[-1]
|
|
media_type = "image"
|
|
elif getattr(msg, "voice", None):
|
|
media_file = msg.voice
|
|
media_type = "voice"
|
|
elif getattr(msg, "audio", None):
|
|
media_file = msg.audio
|
|
media_type = "audio"
|
|
elif getattr(msg, "document", None):
|
|
media_file = msg.document
|
|
media_type = "file"
|
|
elif getattr(msg, "video", None):
|
|
media_file = msg.video
|
|
media_type = "video"
|
|
elif getattr(msg, "video_note", None):
|
|
media_file = msg.video_note
|
|
media_type = "video"
|
|
elif getattr(msg, "animation", None):
|
|
media_file = msg.animation
|
|
media_type = "animation"
|
|
if not media_file or not self._app:
|
|
return [], []
|
|
try:
|
|
file = await self._app.bot.get_file(media_file.file_id)
|
|
ext = self._get_extension(
|
|
media_type,
|
|
getattr(media_file, "mime_type", None),
|
|
getattr(media_file, "file_name", None),
|
|
)
|
|
media_dir = get_media_dir("telegram")
|
|
unique_id = getattr(media_file, "file_unique_id", media_file.file_id)
|
|
file_path = media_dir / f"{unique_id}{ext}"
|
|
await file.download_to_drive(str(file_path))
|
|
path_str = str(file_path)
|
|
if media_type in ("voice", "audio"):
|
|
transcription = await self.transcribe_audio(file_path)
|
|
if transcription:
|
|
logger.info("Transcribed {}: {}...", media_type, transcription[:50])
|
|
return [path_str], [f"[transcription: {transcription}]"]
|
|
return [path_str], [f"[{media_type}: {path_str}]"]
|
|
return [path_str], [f"[{media_type}: {path_str}]"]
|
|
except Exception as e:
|
|
logger.warning("Failed to download message media: {}", e)
|
|
if add_failure_content:
|
|
return [], [f"[{media_type}: download failed]"]
|
|
return [], []
|
|
|
|
async def _ensure_bot_identity(self) -> tuple[int | None, str | None]:
|
|
"""Load bot identity once and reuse it for mention/reply checks."""
|
|
if self._bot_user_id is not None or self._bot_username is not None:
|
|
return self._bot_user_id, self._bot_username
|
|
if not self._app:
|
|
return None, None
|
|
bot_info = await self._app.bot.get_me()
|
|
self._bot_user_id = getattr(bot_info, "id", None)
|
|
self._bot_username = getattr(bot_info, "username", None)
|
|
return self._bot_user_id, self._bot_username
|
|
|
|
@staticmethod
|
|
def _has_mention_entity(
|
|
text: str,
|
|
entities,
|
|
bot_username: str,
|
|
bot_id: int | None,
|
|
) -> bool:
|
|
"""Check Telegram mention entities against the bot username."""
|
|
handle = f"@{bot_username}".lower()
|
|
for entity in entities or []:
|
|
entity_type = getattr(entity, "type", None)
|
|
if entity_type == "text_mention":
|
|
user = getattr(entity, "user", None)
|
|
if user is not None and bot_id is not None and getattr(user, "id", None) == bot_id:
|
|
return True
|
|
continue
|
|
if entity_type != "mention":
|
|
continue
|
|
offset = getattr(entity, "offset", None)
|
|
length = getattr(entity, "length", None)
|
|
if offset is None or length is None:
|
|
continue
|
|
if text[offset : offset + length].lower() == handle:
|
|
return True
|
|
return handle in text.lower()
|
|
|
|
async def _is_group_message_for_bot(self, message) -> bool:
|
|
"""Allow group messages when policy is open, @mentioned, or replying to the bot."""
|
|
if message.chat.type == "private" or self.config.group_policy == "open":
|
|
return True
|
|
|
|
bot_id, bot_username = await self._ensure_bot_identity()
|
|
if bot_username:
|
|
text = message.text or ""
|
|
caption = message.caption or ""
|
|
if self._has_mention_entity(
|
|
text,
|
|
getattr(message, "entities", None),
|
|
bot_username,
|
|
bot_id,
|
|
):
|
|
return True
|
|
if self._has_mention_entity(
|
|
caption,
|
|
getattr(message, "caption_entities", None),
|
|
bot_username,
|
|
bot_id,
|
|
):
|
|
return True
|
|
|
|
reply_user = getattr(getattr(message, "reply_to_message", None), "from_user", None)
|
|
return bool(bot_id and reply_user and reply_user.id == bot_id)
|
|
|
|
def _remember_thread_context(self, message) -> None:
|
|
"""Cache topic thread id by chat/message id for follow-up replies."""
|
|
message_thread_id = getattr(message, "message_thread_id", None)
|
|
if message_thread_id is None:
|
|
return
|
|
key = (str(message.chat_id), message.message_id)
|
|
self._message_threads[key] = message_thread_id
|
|
if len(self._message_threads) > 1000:
|
|
self._message_threads.pop(next(iter(self._message_threads)))
|
|
|
|
async def _forward_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
"""Forward slash commands to the bus for unified handling in AgentLoop."""
|
|
if not update.message or not update.effective_user:
|
|
return
|
|
message = update.message
|
|
user = update.effective_user
|
|
self._remember_thread_context(message)
|
|
await self._handle_message(
|
|
sender_id=self._sender_id(user),
|
|
chat_id=str(message.chat_id),
|
|
content=message.text or "",
|
|
metadata=self._build_message_metadata(message, user),
|
|
session_key=self._derive_topic_session_key(message),
|
|
)
|
|
|
|
async def _on_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
"""Handle incoming messages (text, photos, voice, documents)."""
|
|
if not update.message or not update.effective_user:
|
|
return
|
|
|
|
message = update.message
|
|
user = update.effective_user
|
|
chat_id = message.chat_id
|
|
sender_id = self._sender_id(user)
|
|
self._remember_thread_context(message)
|
|
|
|
# Store chat_id for replies
|
|
self._chat_ids[sender_id] = chat_id
|
|
|
|
if not await self._is_group_message_for_bot(message):
|
|
return
|
|
|
|
# Build content from text and/or media
|
|
content_parts = []
|
|
media_paths = []
|
|
|
|
# Text content
|
|
if message.text:
|
|
content_parts.append(message.text)
|
|
if message.caption:
|
|
content_parts.append(message.caption)
|
|
|
|
# Download current message media
|
|
current_media_paths, current_media_parts = await self._download_message_media(
|
|
message, add_failure_content=True
|
|
)
|
|
media_paths.extend(current_media_paths)
|
|
content_parts.extend(current_media_parts)
|
|
if current_media_paths:
|
|
logger.debug("Downloaded message media to {}", current_media_paths[0])
|
|
|
|
# Reply context: text and/or media from the replied-to message
|
|
reply = getattr(message, "reply_to_message", None)
|
|
if reply is not None:
|
|
reply_ctx = self._extract_reply_context(message)
|
|
reply_media, reply_media_parts = await self._download_message_media(reply)
|
|
if reply_media:
|
|
media_paths = reply_media + media_paths
|
|
logger.debug("Attached replied-to media: {}", reply_media[0])
|
|
tag = reply_ctx or (f"[Reply to: {reply_media_parts[0]}]" if reply_media_parts else None)
|
|
if tag:
|
|
content_parts.insert(0, tag)
|
|
content = "\n".join(content_parts) if content_parts else "[empty message]"
|
|
|
|
logger.debug("Telegram message from {}: {}...", sender_id, content[:50])
|
|
|
|
str_chat_id = str(chat_id)
|
|
metadata = self._build_message_metadata(message, user)
|
|
session_key = self._derive_topic_session_key(message)
|
|
|
|
# Telegram media groups: buffer briefly, forward as one aggregated turn.
|
|
if media_group_id := getattr(message, "media_group_id", None):
|
|
key = f"{str_chat_id}:{media_group_id}"
|
|
if key not in self._media_group_buffers:
|
|
self._media_group_buffers[key] = {
|
|
"sender_id": sender_id, "chat_id": str_chat_id,
|
|
"contents": [], "media": [],
|
|
"metadata": metadata,
|
|
"session_key": session_key,
|
|
}
|
|
self._start_typing(str_chat_id)
|
|
buf = self._media_group_buffers[key]
|
|
if content and content != "[empty message]":
|
|
buf["contents"].append(content)
|
|
buf["media"].extend(media_paths)
|
|
if key not in self._media_group_tasks:
|
|
self._media_group_tasks[key] = asyncio.create_task(self._flush_media_group(key))
|
|
return
|
|
|
|
# Start typing indicator before processing
|
|
self._start_typing(str_chat_id)
|
|
|
|
# Forward to the message bus
|
|
await self._handle_message(
|
|
sender_id=sender_id,
|
|
chat_id=str_chat_id,
|
|
content=content,
|
|
media=media_paths,
|
|
metadata=metadata,
|
|
session_key=session_key,
|
|
)
|
|
|
|
async def _flush_media_group(self, key: str) -> None:
|
|
"""Wait briefly, then forward buffered media-group as one turn."""
|
|
try:
|
|
await asyncio.sleep(0.6)
|
|
if not (buf := self._media_group_buffers.pop(key, None)):
|
|
return
|
|
content = "\n".join(buf["contents"]) or "[empty message]"
|
|
await self._handle_message(
|
|
sender_id=buf["sender_id"], chat_id=buf["chat_id"],
|
|
content=content, media=list(dict.fromkeys(buf["media"])),
|
|
metadata=buf["metadata"],
|
|
session_key=buf.get("session_key"),
|
|
)
|
|
finally:
|
|
self._media_group_tasks.pop(key, None)
|
|
|
|
def _start_typing(self, chat_id: str) -> None:
|
|
"""Start sending 'typing...' indicator for a chat."""
|
|
# Cancel any existing typing task for this chat
|
|
self._stop_typing(chat_id)
|
|
self._typing_tasks[chat_id] = asyncio.create_task(self._typing_loop(chat_id))
|
|
|
|
def _stop_typing(self, chat_id: str) -> None:
|
|
"""Stop the typing indicator for a chat."""
|
|
task = self._typing_tasks.pop(chat_id, None)
|
|
if task and not task.done():
|
|
task.cancel()
|
|
|
|
async def _typing_loop(self, chat_id: str) -> None:
|
|
"""Repeatedly send 'typing' action until cancelled."""
|
|
try:
|
|
while self._app:
|
|
await self._app.bot.send_chat_action(chat_id=int(chat_id), action="typing")
|
|
await asyncio.sleep(4)
|
|
except asyncio.CancelledError:
|
|
pass
|
|
except Exception as e:
|
|
logger.debug("Typing indicator stopped for {}: {}", chat_id, e)
|
|
|
|
async def _on_error(self, update: object, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
"""Log polling / handler errors instead of silently swallowing them."""
|
|
logger.error("Telegram error: {}", context.error)
|
|
|
|
def _get_extension(
|
|
self,
|
|
media_type: str,
|
|
mime_type: str | None,
|
|
filename: str | None = None,
|
|
) -> str:
|
|
"""Get file extension based on media type or original filename."""
|
|
if mime_type:
|
|
ext_map = {
|
|
"image/jpeg": ".jpg", "image/png": ".png", "image/gif": ".gif",
|
|
"audio/ogg": ".ogg", "audio/mpeg": ".mp3", "audio/mp4": ".m4a",
|
|
}
|
|
if mime_type in ext_map:
|
|
return ext_map[mime_type]
|
|
|
|
type_map = {"image": ".jpg", "voice": ".ogg", "audio": ".mp3", "file": ""}
|
|
if ext := type_map.get(media_type, ""):
|
|
return ext
|
|
|
|
if filename:
|
|
from pathlib import Path
|
|
|
|
return "".join(Path(filename).suffixes)
|
|
|
|
return ""
|