refactor: streamline Telegram media-group buffering
This commit is contained in:
@@ -127,7 +127,7 @@ class TelegramChannel(BaseChannel):
|
||||
self._app: Application | None = None
|
||||
self._chat_ids: dict[str, int] = {} # Map sender_id to chat_id for replies
|
||||
self._typing_tasks: dict[str, asyncio.Task] = {} # chat_id -> typing loop task
|
||||
self._media_group_buffers: dict[str, dict[str, object]] = {}
|
||||
self._media_group_buffers: dict[str, dict] = {}
|
||||
self._media_group_tasks: dict[str, asyncio.Task] = {}
|
||||
|
||||
async def start(self) -> None:
|
||||
@@ -194,11 +194,9 @@ class TelegramChannel(BaseChannel):
|
||||
for chat_id in list(self._typing_tasks):
|
||||
self._stop_typing(chat_id)
|
||||
|
||||
# Cancel buffered media-group flush tasks
|
||||
for key, task in list(self._media_group_tasks.items()):
|
||||
if task and not task.done():
|
||||
task.cancel()
|
||||
self._media_group_tasks.pop(key, None)
|
||||
for task in self._media_group_tasks.values():
|
||||
task.cancel()
|
||||
self._media_group_tasks.clear()
|
||||
self._media_group_buffers.clear()
|
||||
|
||||
if self._app:
|
||||
@@ -409,43 +407,26 @@ class TelegramChannel(BaseChannel):
|
||||
|
||||
str_chat_id = str(chat_id)
|
||||
|
||||
# Telegram media groups arrive as multiple messages sharing media_group_id.
|
||||
# Buffer briefly and forward as one aggregated turn.
|
||||
media_group_id = getattr(message, "media_group_id", None)
|
||||
if media_group_id:
|
||||
group_key = f"{str_chat_id}:{media_group_id}"
|
||||
buffer = self._media_group_buffers.get(group_key)
|
||||
if not buffer:
|
||||
buffer = {
|
||||
"sender_id": sender_id,
|
||||
"chat_id": str_chat_id,
|
||||
"contents": [],
|
||||
"media": [],
|
||||
# Telegram media groups: buffer briefly, forward as one aggregated turn.
|
||||
if media_group_id := getattr(message, "media_group_id", None):
|
||||
key = f"{str_chat_id}:{media_group_id}"
|
||||
if key not in self._media_group_buffers:
|
||||
self._media_group_buffers[key] = {
|
||||
"sender_id": sender_id, "chat_id": str_chat_id,
|
||||
"contents": [], "media": [],
|
||||
"metadata": {
|
||||
"message_id": message.message_id,
|
||||
"user_id": user.id,
|
||||
"username": user.username,
|
||||
"first_name": user.first_name,
|
||||
"message_id": message.message_id, "user_id": user.id,
|
||||
"username": user.username, "first_name": user.first_name,
|
||||
"is_group": message.chat.type != "private",
|
||||
"media_group_id": media_group_id,
|
||||
},
|
||||
}
|
||||
self._media_group_buffers[group_key] = buffer
|
||||
self._start_typing(str_chat_id)
|
||||
|
||||
buf = self._media_group_buffers[key]
|
||||
if content and content != "[empty message]":
|
||||
cast_contents = buffer["contents"]
|
||||
if isinstance(cast_contents, list):
|
||||
cast_contents.append(content)
|
||||
cast_media = buffer["media"]
|
||||
if isinstance(cast_media, list):
|
||||
cast_media.extend(media_paths)
|
||||
|
||||
# Start one delayed flush task per media group.
|
||||
if group_key not in self._media_group_tasks:
|
||||
self._media_group_tasks[group_key] = asyncio.create_task(
|
||||
self._flush_media_group(group_key)
|
||||
)
|
||||
buf["contents"].append(content)
|
||||
buf["media"].extend(media_paths)
|
||||
if key not in self._media_group_tasks:
|
||||
self._media_group_tasks[key] = asyncio.create_task(self._flush_media_group(key))
|
||||
return
|
||||
|
||||
# Start typing indicator before processing
|
||||
@@ -466,42 +447,20 @@ class TelegramChannel(BaseChannel):
|
||||
}
|
||||
)
|
||||
|
||||
async def _flush_media_group(self, group_key: str, delay_s: float = 0.6) -> None:
|
||||
"""Flush buffered Telegram media-group messages as one aggregated turn."""
|
||||
async def _flush_media_group(self, key: str) -> None:
|
||||
"""Wait briefly, then forward buffered media-group as one turn."""
|
||||
try:
|
||||
await asyncio.sleep(delay_s)
|
||||
buffer = self._media_group_buffers.pop(group_key, None)
|
||||
if not buffer:
|
||||
await asyncio.sleep(0.6)
|
||||
if not (buf := self._media_group_buffers.pop(key, None)):
|
||||
return
|
||||
|
||||
sender_id = str(buffer.get("sender_id", ""))
|
||||
chat_id = str(buffer.get("chat_id", ""))
|
||||
contents = buffer.get("contents")
|
||||
media = buffer.get("media")
|
||||
metadata = buffer.get("metadata")
|
||||
|
||||
content_parts = [c for c in (contents if isinstance(contents, list) else []) if isinstance(c, str) and c]
|
||||
media_paths = [m for m in (media if isinstance(media, list) else []) if isinstance(m, str) and m]
|
||||
|
||||
# De-duplicate while preserving order
|
||||
seen = set()
|
||||
unique_media: list[str] = []
|
||||
for m in media_paths:
|
||||
if m in seen:
|
||||
continue
|
||||
seen.add(m)
|
||||
unique_media.append(m)
|
||||
|
||||
content = "\n".join(content_parts) if content_parts else "[empty message]"
|
||||
content = "\n".join(buf["contents"]) or "[empty message]"
|
||||
await self._handle_message(
|
||||
sender_id=sender_id,
|
||||
chat_id=chat_id,
|
||||
content=content,
|
||||
media=unique_media,
|
||||
metadata=metadata if isinstance(metadata, dict) else {},
|
||||
sender_id=buf["sender_id"], chat_id=buf["chat_id"],
|
||||
content=content, media=list(dict.fromkeys(buf["media"])),
|
||||
metadata=buf["metadata"],
|
||||
)
|
||||
finally:
|
||||
self._media_group_tasks.pop(group_key, None)
|
||||
self._media_group_tasks.pop(key, None)
|
||||
|
||||
def _start_typing(self, chat_id: str) -> None:
|
||||
"""Start sending 'typing...' indicator for a chat."""
|
||||
|
||||
Reference in New Issue
Block a user