feat(telegram): add Telegram group topic support
This commit is contained in:
@@ -232,6 +232,10 @@ class TelegramChannel(BaseChannel):
|
|||||||
except ValueError:
|
except ValueError:
|
||||||
logger.error("Invalid chat_id: {}", msg.chat_id)
|
logger.error("Invalid chat_id: {}", msg.chat_id)
|
||||||
return
|
return
|
||||||
|
message_thread_id = msg.metadata.get("message_thread_id")
|
||||||
|
thread_kwargs = {}
|
||||||
|
if message_thread_id is not None:
|
||||||
|
thread_kwargs["message_thread_id"] = message_thread_id
|
||||||
|
|
||||||
reply_params = None
|
reply_params = None
|
||||||
if self.config.reply_to_message:
|
if self.config.reply_to_message:
|
||||||
@@ -256,7 +260,8 @@ class TelegramChannel(BaseChannel):
|
|||||||
await sender(
|
await sender(
|
||||||
chat_id=chat_id,
|
chat_id=chat_id,
|
||||||
**{param: f},
|
**{param: f},
|
||||||
reply_parameters=reply_params
|
reply_parameters=reply_params,
|
||||||
|
**thread_kwargs,
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
filename = media_path.rsplit("/", 1)[-1]
|
filename = media_path.rsplit("/", 1)[-1]
|
||||||
@@ -264,7 +269,8 @@ class TelegramChannel(BaseChannel):
|
|||||||
await self._app.bot.send_message(
|
await self._app.bot.send_message(
|
||||||
chat_id=chat_id,
|
chat_id=chat_id,
|
||||||
text=f"[Failed to send: {filename}]",
|
text=f"[Failed to send: {filename}]",
|
||||||
reply_parameters=reply_params
|
reply_parameters=reply_params,
|
||||||
|
**thread_kwargs,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Send text content
|
# Send text content
|
||||||
@@ -276,7 +282,8 @@ class TelegramChannel(BaseChannel):
|
|||||||
chat_id=chat_id,
|
chat_id=chat_id,
|
||||||
text=html,
|
text=html,
|
||||||
parse_mode="HTML",
|
parse_mode="HTML",
|
||||||
reply_parameters=reply_params
|
reply_parameters=reply_params,
|
||||||
|
**thread_kwargs,
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning("HTML parse failed, falling back to plain text: {}", e)
|
logger.warning("HTML parse failed, falling back to plain text: {}", e)
|
||||||
@@ -284,7 +291,8 @@ class TelegramChannel(BaseChannel):
|
|||||||
await self._app.bot.send_message(
|
await self._app.bot.send_message(
|
||||||
chat_id=chat_id,
|
chat_id=chat_id,
|
||||||
text=chunk,
|
text=chunk,
|
||||||
reply_parameters=reply_params
|
reply_parameters=reply_params,
|
||||||
|
**thread_kwargs,
|
||||||
)
|
)
|
||||||
except Exception as e2:
|
except Exception as e2:
|
||||||
logger.error("Error sending Telegram message: {}", e2)
|
logger.error("Error sending Telegram message: {}", e2)
|
||||||
@@ -318,14 +326,39 @@ class TelegramChannel(BaseChannel):
|
|||||||
sid = str(user.id)
|
sid = str(user.id)
|
||||||
return f"{sid}|{user.username}" if user.username else sid
|
return f"{sid}|{user.username}" if user.username else sid
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _derive_topic_session_key(message) -> str | None:
|
||||||
|
"""Derive topic-scoped session key for non-private Telegram chats."""
|
||||||
|
message_thread_id = getattr(message, "message_thread_id", None)
|
||||||
|
if message.chat.type == "private" or message_thread_id is None:
|
||||||
|
return None
|
||||||
|
return f"telegram:{message.chat_id}:topic:{message_thread_id}"
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _build_message_metadata(message, user) -> dict:
|
||||||
|
"""Build common Telegram inbound metadata payload."""
|
||||||
|
return {
|
||||||
|
"message_id": message.message_id,
|
||||||
|
"user_id": user.id,
|
||||||
|
"username": user.username,
|
||||||
|
"first_name": user.first_name,
|
||||||
|
"is_group": message.chat.type != "private",
|
||||||
|
"message_thread_id": getattr(message, "message_thread_id", None),
|
||||||
|
"is_forum": bool(getattr(message.chat, "is_forum", False)),
|
||||||
|
}
|
||||||
|
|
||||||
async def _forward_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
async def _forward_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||||
"""Forward slash commands to the bus for unified handling in AgentLoop."""
|
"""Forward slash commands to the bus for unified handling in AgentLoop."""
|
||||||
if not update.message or not update.effective_user:
|
if not update.message or not update.effective_user:
|
||||||
return
|
return
|
||||||
|
message = update.message
|
||||||
|
user = update.effective_user
|
||||||
await self._handle_message(
|
await self._handle_message(
|
||||||
sender_id=self._sender_id(update.effective_user),
|
sender_id=self._sender_id(user),
|
||||||
chat_id=str(update.message.chat_id),
|
chat_id=str(message.chat_id),
|
||||||
content=update.message.text,
|
content=message.text,
|
||||||
|
metadata=self._build_message_metadata(message, user),
|
||||||
|
session_key=self._derive_topic_session_key(message),
|
||||||
)
|
)
|
||||||
|
|
||||||
async def _on_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
async def _on_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||||
@@ -407,6 +440,8 @@ class TelegramChannel(BaseChannel):
|
|||||||
logger.debug("Telegram message from {}: {}...", sender_id, content[:50])
|
logger.debug("Telegram message from {}: {}...", sender_id, content[:50])
|
||||||
|
|
||||||
str_chat_id = str(chat_id)
|
str_chat_id = str(chat_id)
|
||||||
|
metadata = self._build_message_metadata(message, user)
|
||||||
|
session_key = self._derive_topic_session_key(message)
|
||||||
|
|
||||||
# Telegram media groups: buffer briefly, forward as one aggregated turn.
|
# Telegram media groups: buffer briefly, forward as one aggregated turn.
|
||||||
if media_group_id := getattr(message, "media_group_id", None):
|
if media_group_id := getattr(message, "media_group_id", None):
|
||||||
@@ -415,11 +450,8 @@ class TelegramChannel(BaseChannel):
|
|||||||
self._media_group_buffers[key] = {
|
self._media_group_buffers[key] = {
|
||||||
"sender_id": sender_id, "chat_id": str_chat_id,
|
"sender_id": sender_id, "chat_id": str_chat_id,
|
||||||
"contents": [], "media": [],
|
"contents": [], "media": [],
|
||||||
"metadata": {
|
"metadata": metadata,
|
||||||
"message_id": message.message_id, "user_id": user.id,
|
"session_key": session_key,
|
||||||
"username": user.username, "first_name": user.first_name,
|
|
||||||
"is_group": message.chat.type != "private",
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
self._start_typing(str_chat_id)
|
self._start_typing(str_chat_id)
|
||||||
buf = self._media_group_buffers[key]
|
buf = self._media_group_buffers[key]
|
||||||
@@ -439,13 +471,8 @@ class TelegramChannel(BaseChannel):
|
|||||||
chat_id=str_chat_id,
|
chat_id=str_chat_id,
|
||||||
content=content,
|
content=content,
|
||||||
media=media_paths,
|
media=media_paths,
|
||||||
metadata={
|
metadata=metadata,
|
||||||
"message_id": message.message_id,
|
session_key=session_key,
|
||||||
"user_id": user.id,
|
|
||||||
"username": user.username,
|
|
||||||
"first_name": user.first_name,
|
|
||||||
"is_group": message.chat.type != "private"
|
|
||||||
}
|
|
||||||
)
|
)
|
||||||
|
|
||||||
async def _flush_media_group(self, key: str) -> None:
|
async def _flush_media_group(self, key: str) -> None:
|
||||||
@@ -459,6 +486,7 @@ class TelegramChannel(BaseChannel):
|
|||||||
sender_id=buf["sender_id"], chat_id=buf["chat_id"],
|
sender_id=buf["sender_id"], chat_id=buf["chat_id"],
|
||||||
content=content, media=list(dict.fromkeys(buf["media"])),
|
content=content, media=list(dict.fromkeys(buf["media"])),
|
||||||
metadata=buf["metadata"],
|
metadata=buf["metadata"],
|
||||||
|
session_key=buf.get("session_key"),
|
||||||
)
|
)
|
||||||
finally:
|
finally:
|
||||||
self._media_group_tasks.pop(key, None)
|
self._media_group_tasks.pop(key, None)
|
||||||
|
|||||||
Reference in New Issue
Block a user