Merge pull request #922: feat(feishu): multimedia download and share card parsing

This commit is contained in:
Re-bin
2026-02-21 06:30:31 +00:00

View File

@@ -6,6 +6,7 @@ import os
import re import re
import threading import threading
from collections import OrderedDict from collections import OrderedDict
from pathlib import Path
from typing import Any from typing import Any
from loguru import logger from loguru import logger
@@ -27,6 +28,8 @@ try:
CreateMessageReactionRequest, CreateMessageReactionRequest,
CreateMessageReactionRequestBody, CreateMessageReactionRequestBody,
Emoji, Emoji,
GetFileRequest,
GetMessageResourceRequest,
P2ImMessageReceiveV1, P2ImMessageReceiveV1,
) )
FEISHU_AVAILABLE = True FEISHU_AVAILABLE = True
@@ -44,6 +47,139 @@ MSG_TYPE_MAP = {
} }
def _extract_share_card_content(content_json: dict, msg_type: str) -> str:
"""Extract text representation from share cards and interactive messages."""
parts = []
if msg_type == "share_chat":
parts.append(f"[shared chat: {content_json.get('chat_id', '')}]")
elif msg_type == "share_user":
parts.append(f"[shared user: {content_json.get('user_id', '')}]")
elif msg_type == "interactive":
parts.extend(_extract_interactive_content(content_json))
elif msg_type == "share_calendar_event":
parts.append(f"[shared calendar event: {content_json.get('event_key', '')}]")
elif msg_type == "system":
parts.append("[system message]")
elif msg_type == "merge_forward":
parts.append("[merged forward messages]")
return "\n".join(parts) if parts else f"[{msg_type}]"
def _extract_interactive_content(content: dict) -> list[str]:
"""Recursively extract text and links from interactive card content."""
parts = []
if isinstance(content, str):
try:
content = json.loads(content)
except (json.JSONDecodeError, TypeError):
return [content] if content.strip() else []
if not isinstance(content, dict):
return parts
if "title" in content:
title = content["title"]
if isinstance(title, dict):
title_content = title.get("content", "") or title.get("text", "")
if title_content:
parts.append(f"title: {title_content}")
elif isinstance(title, str):
parts.append(f"title: {title}")
for element in content.get("elements", []) if isinstance(content.get("elements"), list) else []:
parts.extend(_extract_element_content(element))
card = content.get("card", {})
if card:
parts.extend(_extract_interactive_content(card))
header = content.get("header", {})
if header:
header_title = header.get("title", {})
if isinstance(header_title, dict):
header_text = header_title.get("content", "") or header_title.get("text", "")
if header_text:
parts.append(f"title: {header_text}")
return parts
def _extract_element_content(element: dict) -> list[str]:
"""Extract content from a single card element."""
parts = []
if not isinstance(element, dict):
return parts
tag = element.get("tag", "")
if tag in ("markdown", "lark_md"):
content = element.get("content", "")
if content:
parts.append(content)
elif tag == "div":
text = element.get("text", {})
if isinstance(text, dict):
text_content = text.get("content", "") or text.get("text", "")
if text_content:
parts.append(text_content)
elif isinstance(text, str):
parts.append(text)
for field in element.get("fields", []):
if isinstance(field, dict):
field_text = field.get("text", {})
if isinstance(field_text, dict):
c = field_text.get("content", "")
if c:
parts.append(c)
elif tag == "a":
href = element.get("href", "")
text = element.get("text", "")
if href:
parts.append(f"link: {href}")
if text:
parts.append(text)
elif tag == "button":
text = element.get("text", {})
if isinstance(text, dict):
c = text.get("content", "")
if c:
parts.append(c)
url = element.get("url", "") or element.get("multi_url", {}).get("url", "")
if url:
parts.append(f"link: {url}")
elif tag == "img":
alt = element.get("alt", {})
parts.append(alt.get("content", "[image]") if isinstance(alt, dict) else "[image]")
elif tag == "note":
for ne in element.get("elements", []):
parts.extend(_extract_element_content(ne))
elif tag == "column_set":
for col in element.get("columns", []):
for ce in col.get("elements", []):
parts.extend(_extract_element_content(ce))
elif tag == "plain_text":
content = element.get("content", "")
if content:
parts.append(content)
else:
for ne in element.get("elements", []):
parts.extend(_extract_element_content(ne))
return parts
def _extract_post_text(content_json: dict) -> str: def _extract_post_text(content_json: dict) -> str:
"""Extract plain text from Feishu post (rich text) message content. """Extract plain text from Feishu post (rich text) message content.
@@ -345,6 +481,87 @@ class FeishuChannel(BaseChannel):
logger.error("Error uploading file {}: {}", file_path, e) logger.error("Error uploading file {}: {}", file_path, e)
return None return None
def _download_image_sync(self, message_id: str, image_key: str) -> tuple[bytes | None, str | None]:
"""Download an image from Feishu message by message_id and image_key."""
try:
request = GetMessageResourceRequest.builder() \
.message_id(message_id) \
.file_key(image_key) \
.type("image") \
.build()
response = self._client.im.v1.message_resource.get(request)
if response.success():
file_data = response.file
# GetMessageResourceRequest returns BytesIO, need to read bytes
if hasattr(file_data, 'read'):
file_data = file_data.read()
return file_data, response.file_name
else:
logger.error("Failed to download image: code={}, msg={}", response.code, response.msg)
return None, None
except Exception as e:
logger.error("Error downloading image {}: {}", image_key, e)
return None, None
def _download_file_sync(self, file_key: str) -> tuple[bytes | None, str | None]:
"""Download a file from Feishu by file_key."""
try:
request = GetFileRequest.builder().file_key(file_key).build()
response = self._client.im.v1.file.get(request)
if response.success():
return response.file, response.file_name
else:
logger.error("Failed to download file: code={}, msg={}", response.code, response.msg)
return None, None
except Exception as e:
logger.error("Error downloading file {}: {}", file_key, e)
return None, None
async def _download_and_save_media(
self,
msg_type: str,
content_json: dict,
message_id: str | None = None
) -> tuple[str | None, str]:
"""
Download media from Feishu and save to local disk.
Returns:
(file_path, content_text) - file_path is None if download failed
"""
loop = asyncio.get_running_loop()
media_dir = Path.home() / ".nanobot" / "media"
media_dir.mkdir(parents=True, exist_ok=True)
data, filename = None, None
if msg_type == "image":
image_key = content_json.get("image_key")
if image_key and message_id:
data, filename = await loop.run_in_executor(
None, self._download_image_sync, message_id, image_key
)
if not filename:
filename = f"{image_key[:16]}.jpg"
elif msg_type in ("audio", "file"):
file_key = content_json.get("file_key")
if file_key:
data, filename = await loop.run_in_executor(
None, self._download_file_sync, file_key
)
if not filename:
ext = ".opus" if msg_type == "audio" else ""
filename = f"{file_key[:16]}{ext}"
if data and filename:
file_path = media_dir / filename
file_path.write_bytes(data)
logger.debug("Downloaded {} to {}", msg_type, file_path)
return str(file_path), f"[{msg_type}: {filename}]"
return None, f"[{msg_type}: download failed]"
def _send_message_sync(self, receive_id_type: str, receive_id: str, msg_type: str, content: str) -> bool: def _send_message_sync(self, receive_id_type: str, receive_id: str, msg_type: str, content: str) -> bool:
"""Send a single message (text/image/file/interactive) synchronously.""" """Send a single message (text/image/file/interactive) synchronously."""
try: try:
@@ -425,60 +642,81 @@ class FeishuChannel(BaseChannel):
event = data.event event = data.event
message = event.message message = event.message
sender = event.sender sender = event.sender
# Deduplication check # Deduplication check
message_id = message.message_id message_id = message.message_id
if message_id in self._processed_message_ids: if message_id in self._processed_message_ids:
return return
self._processed_message_ids[message_id] = None self._processed_message_ids[message_id] = None
# Trim cache: keep most recent 500 when exceeds 1000 # Trim cache
while len(self._processed_message_ids) > 1000: while len(self._processed_message_ids) > 1000:
self._processed_message_ids.popitem(last=False) self._processed_message_ids.popitem(last=False)
# Skip bot messages # Skip bot messages
sender_type = sender.sender_type if sender.sender_type == "bot":
if sender_type == "bot":
return return
sender_id = sender.sender_id.open_id if sender.sender_id else "unknown" sender_id = sender.sender_id.open_id if sender.sender_id else "unknown"
chat_id = message.chat_id chat_id = message.chat_id
chat_type = message.chat_type # "p2p" or "group" chat_type = message.chat_type
msg_type = message.message_type msg_type = message.message_type
# Add reaction to indicate "seen" # Add reaction
await self._add_reaction(message_id, "THUMBSUP") await self._add_reaction(message_id, "THUMBSUP")
# Parse message content # Parse content
content_parts = []
media_paths = []
try:
content_json = json.loads(message.content) if message.content else {}
except json.JSONDecodeError:
content_json = {}
if msg_type == "text": if msg_type == "text":
try: text = content_json.get("text", "")
content = json.loads(message.content).get("text", "") if text:
except json.JSONDecodeError: content_parts.append(text)
content = message.content or ""
elif msg_type == "post": elif msg_type == "post":
try: text = _extract_post_text(content_json)
content_json = json.loads(message.content) if text:
content = _extract_post_text(content_json) content_parts.append(text)
except (json.JSONDecodeError, TypeError):
content = message.content or "" elif msg_type in ("image", "audio", "file"):
file_path, content_text = await self._download_and_save_media(msg_type, content_json, message_id)
if file_path:
media_paths.append(file_path)
content_parts.append(content_text)
elif msg_type in ("share_chat", "share_user", "interactive", "share_calendar_event", "system", "merge_forward"):
# Handle share cards and interactive messages
text = _extract_share_card_content(content_json, msg_type)
if text:
content_parts.append(text)
else: else:
content = MSG_TYPE_MAP.get(msg_type, f"[{msg_type}]") content_parts.append(MSG_TYPE_MAP.get(msg_type, f"[{msg_type}]"))
if not content: content = "\n".join(content_parts) if content_parts else ""
if not content and not media_paths:
return return
# Forward to message bus # Forward to message bus
reply_to = chat_id if chat_type == "group" else sender_id reply_to = chat_id if chat_type == "group" else sender_id
await self._handle_message( await self._handle_message(
sender_id=sender_id, sender_id=sender_id,
chat_id=reply_to, chat_id=reply_to,
content=content, content=content,
media=media_paths,
metadata={ metadata={
"message_id": message_id, "message_id": message_id,
"chat_type": chat_type, "chat_type": chat_type,
"msg_type": msg_type, "msg_type": msg_type,
} }
) )
except Exception as e: except Exception as e:
logger.error("Error processing Feishu message: {}", e) logger.error("Error processing Feishu message: {}", e)