Merge PR #1258: fix Telegram media-group aggregation
This commit is contained in:
@@ -127,6 +127,8 @@ class TelegramChannel(BaseChannel):
|
||||
self._app: Application | None = None
|
||||
self._chat_ids: dict[str, int] = {} # Map sender_id to chat_id for replies
|
||||
self._typing_tasks: dict[str, asyncio.Task] = {} # chat_id -> typing loop task
|
||||
self._media_group_buffers: dict[str, dict] = {}
|
||||
self._media_group_tasks: dict[str, asyncio.Task] = {}
|
||||
|
||||
async def start(self) -> None:
|
||||
"""Start the Telegram bot with long polling."""
|
||||
@@ -191,6 +193,11 @@ class TelegramChannel(BaseChannel):
|
||||
# Cancel all typing indicators
|
||||
for chat_id in list(self._typing_tasks):
|
||||
self._stop_typing(chat_id)
|
||||
|
||||
for task in self._media_group_tasks.values():
|
||||
task.cancel()
|
||||
self._media_group_tasks.clear()
|
||||
self._media_group_buffers.clear()
|
||||
|
||||
if self._app:
|
||||
logger.info("Stopping Telegram bot...")
|
||||
@@ -399,6 +406,28 @@ class TelegramChannel(BaseChannel):
|
||||
logger.debug("Telegram message from {}: {}...", sender_id, content[:50])
|
||||
|
||||
str_chat_id = str(chat_id)
|
||||
|
||||
# Telegram media groups: buffer briefly, forward as one aggregated turn.
|
||||
if media_group_id := getattr(message, "media_group_id", None):
|
||||
key = f"{str_chat_id}:{media_group_id}"
|
||||
if key not in self._media_group_buffers:
|
||||
self._media_group_buffers[key] = {
|
||||
"sender_id": sender_id, "chat_id": str_chat_id,
|
||||
"contents": [], "media": [],
|
||||
"metadata": {
|
||||
"message_id": message.message_id, "user_id": user.id,
|
||||
"username": user.username, "first_name": user.first_name,
|
||||
"is_group": message.chat.type != "private",
|
||||
},
|
||||
}
|
||||
self._start_typing(str_chat_id)
|
||||
buf = self._media_group_buffers[key]
|
||||
if content and content != "[empty message]":
|
||||
buf["contents"].append(content)
|
||||
buf["media"].extend(media_paths)
|
||||
if key not in self._media_group_tasks:
|
||||
self._media_group_tasks[key] = asyncio.create_task(self._flush_media_group(key))
|
||||
return
|
||||
|
||||
# Start typing indicator before processing
|
||||
self._start_typing(str_chat_id)
|
||||
@@ -418,6 +447,21 @@ class TelegramChannel(BaseChannel):
|
||||
}
|
||||
)
|
||||
|
||||
async def _flush_media_group(self, key: str) -> None:
|
||||
"""Wait briefly, then forward buffered media-group as one turn."""
|
||||
try:
|
||||
await asyncio.sleep(0.6)
|
||||
if not (buf := self._media_group_buffers.pop(key, None)):
|
||||
return
|
||||
content = "\n".join(buf["contents"]) or "[empty message]"
|
||||
await self._handle_message(
|
||||
sender_id=buf["sender_id"], chat_id=buf["chat_id"],
|
||||
content=content, media=list(dict.fromkeys(buf["media"])),
|
||||
metadata=buf["metadata"],
|
||||
)
|
||||
finally:
|
||||
self._media_group_tasks.pop(key, None)
|
||||
|
||||
def _start_typing(self, chat_id: str) -> None:
|
||||
"""Start sending 'typing...' indicator for a chat."""
|
||||
# Cancel any existing typing task for this chat
|
||||
|
||||
Reference in New Issue
Block a user