From 0f1cc40b22997fd712c719a6dbc3a970491360e2 Mon Sep 17 00:00:00 2001 From: WufeiHalf Date: Tue, 3 Mar 2026 22:08:01 +0800 Subject: [PATCH] feat(telegram): add Telegram group topic support --- nanobot/channels/telegram.py | 66 +++++++++++++++++++++++++----------- 1 file changed, 47 insertions(+), 19 deletions(-) diff --git a/nanobot/channels/telegram.py b/nanobot/channels/telegram.py index c290535..344ed0c 100644 --- a/nanobot/channels/telegram.py +++ b/nanobot/channels/telegram.py @@ -232,6 +232,10 @@ class TelegramChannel(BaseChannel): except ValueError: logger.error("Invalid chat_id: {}", msg.chat_id) return + message_thread_id = msg.metadata.get("message_thread_id") + thread_kwargs = {} + if message_thread_id is not None: + thread_kwargs["message_thread_id"] = message_thread_id reply_params = None if self.config.reply_to_message: @@ -256,7 +260,8 @@ class TelegramChannel(BaseChannel): await sender( chat_id=chat_id, **{param: f}, - reply_parameters=reply_params + reply_parameters=reply_params, + **thread_kwargs, ) except Exception as e: filename = media_path.rsplit("/", 1)[-1] @@ -264,7 +269,8 @@ class TelegramChannel(BaseChannel): await self._app.bot.send_message( chat_id=chat_id, text=f"[Failed to send: {filename}]", - reply_parameters=reply_params + reply_parameters=reply_params, + **thread_kwargs, ) # Send text content @@ -276,7 +282,8 @@ class TelegramChannel(BaseChannel): chat_id=chat_id, text=html, parse_mode="HTML", - reply_parameters=reply_params + reply_parameters=reply_params, + **thread_kwargs, ) except Exception as e: logger.warning("HTML parse failed, falling back to plain text: {}", e) @@ -284,7 +291,8 @@ class TelegramChannel(BaseChannel): await self._app.bot.send_message( chat_id=chat_id, text=chunk, - reply_parameters=reply_params + reply_parameters=reply_params, + **thread_kwargs, ) except Exception as e2: logger.error("Error sending Telegram message: {}", e2) @@ -318,14 +326,39 @@ class TelegramChannel(BaseChannel): sid = str(user.id) return f"{sid}|{user.username}" if user.username else sid + @staticmethod + def _derive_topic_session_key(message) -> str | None: + """Derive topic-scoped session key for non-private Telegram chats.""" + message_thread_id = getattr(message, "message_thread_id", None) + if message.chat.type == "private" or message_thread_id is None: + return None + return f"telegram:{message.chat_id}:topic:{message_thread_id}" + + @staticmethod + def _build_message_metadata(message, user) -> dict: + """Build common Telegram inbound metadata payload.""" + return { + "message_id": message.message_id, + "user_id": user.id, + "username": user.username, + "first_name": user.first_name, + "is_group": message.chat.type != "private", + "message_thread_id": getattr(message, "message_thread_id", None), + "is_forum": bool(getattr(message.chat, "is_forum", False)), + } + async def _forward_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Forward slash commands to the bus for unified handling in AgentLoop.""" if not update.message or not update.effective_user: return + message = update.message + user = update.effective_user await self._handle_message( - sender_id=self._sender_id(update.effective_user), - chat_id=str(update.message.chat_id), - content=update.message.text, + sender_id=self._sender_id(user), + chat_id=str(message.chat_id), + content=message.text, + metadata=self._build_message_metadata(message, user), + session_key=self._derive_topic_session_key(message), ) async def _on_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: @@ -407,6 +440,8 @@ class TelegramChannel(BaseChannel): logger.debug("Telegram message from {}: {}...", sender_id, content[:50]) str_chat_id = str(chat_id) + metadata = self._build_message_metadata(message, user) + session_key = self._derive_topic_session_key(message) # Telegram media groups: buffer briefly, forward as one aggregated turn. if media_group_id := getattr(message, "media_group_id", None): @@ -415,11 +450,8 @@ class TelegramChannel(BaseChannel): self._media_group_buffers[key] = { "sender_id": sender_id, "chat_id": str_chat_id, "contents": [], "media": [], - "metadata": { - "message_id": message.message_id, "user_id": user.id, - "username": user.username, "first_name": user.first_name, - "is_group": message.chat.type != "private", - }, + "metadata": metadata, + "session_key": session_key, } self._start_typing(str_chat_id) buf = self._media_group_buffers[key] @@ -439,13 +471,8 @@ class TelegramChannel(BaseChannel): chat_id=str_chat_id, content=content, media=media_paths, - metadata={ - "message_id": message.message_id, - "user_id": user.id, - "username": user.username, - "first_name": user.first_name, - "is_group": message.chat.type != "private" - } + metadata=metadata, + session_key=session_key, ) async def _flush_media_group(self, key: str) -> None: @@ -459,6 +486,7 @@ class TelegramChannel(BaseChannel): sender_id=buf["sender_id"], chat_id=buf["chat_id"], content=content, media=list(dict.fromkeys(buf["media"])), metadata=buf["metadata"], + session_key=buf.get("session_key"), ) finally: self._media_group_tasks.pop(key, None)