diff --git a/nanobot/channels/feishu.py b/nanobot/channels/feishu.py index a8ca1fa..815d853 100644 --- a/nanobot/channels/feishu.py +++ b/nanobot/channels/feishu.py @@ -6,6 +6,7 @@ import os import re import threading from collections import OrderedDict +from pathlib import Path from typing import Any from loguru import logger @@ -27,6 +28,8 @@ try: CreateMessageReactionRequest, CreateMessageReactionRequestBody, Emoji, + GetFileRequest, + GetMessageResourceRequest, P2ImMessageReceiveV1, ) FEISHU_AVAILABLE = True @@ -44,6 +47,139 @@ MSG_TYPE_MAP = { } +def _extract_share_card_content(content_json: dict, msg_type: str) -> str: + """Extract text representation from share cards and interactive messages.""" + parts = [] + + if msg_type == "share_chat": + parts.append(f"[shared chat: {content_json.get('chat_id', '')}]") + elif msg_type == "share_user": + parts.append(f"[shared user: {content_json.get('user_id', '')}]") + elif msg_type == "interactive": + parts.extend(_extract_interactive_content(content_json)) + elif msg_type == "share_calendar_event": + parts.append(f"[shared calendar event: {content_json.get('event_key', '')}]") + elif msg_type == "system": + parts.append("[system message]") + elif msg_type == "merge_forward": + parts.append("[merged forward messages]") + + return "\n".join(parts) if parts else f"[{msg_type}]" + + +def _extract_interactive_content(content: dict) -> list[str]: + """Recursively extract text and links from interactive card content.""" + parts = [] + + if isinstance(content, str): + try: + content = json.loads(content) + except (json.JSONDecodeError, TypeError): + return [content] if content.strip() else [] + + if not isinstance(content, dict): + return parts + + if "title" in content: + title = content["title"] + if isinstance(title, dict): + title_content = title.get("content", "") or title.get("text", "") + if title_content: + parts.append(f"title: {title_content}") + elif isinstance(title, str): + parts.append(f"title: {title}") + + for element in content.get("elements", []) if isinstance(content.get("elements"), list) else []: + parts.extend(_extract_element_content(element)) + + card = content.get("card", {}) + if card: + parts.extend(_extract_interactive_content(card)) + + header = content.get("header", {}) + if header: + header_title = header.get("title", {}) + if isinstance(header_title, dict): + header_text = header_title.get("content", "") or header_title.get("text", "") + if header_text: + parts.append(f"title: {header_text}") + + return parts + + +def _extract_element_content(element: dict) -> list[str]: + """Extract content from a single card element.""" + parts = [] + + if not isinstance(element, dict): + return parts + + tag = element.get("tag", "") + + if tag in ("markdown", "lark_md"): + content = element.get("content", "") + if content: + parts.append(content) + + elif tag == "div": + text = element.get("text", {}) + if isinstance(text, dict): + text_content = text.get("content", "") or text.get("text", "") + if text_content: + parts.append(text_content) + elif isinstance(text, str): + parts.append(text) + for field in element.get("fields", []): + if isinstance(field, dict): + field_text = field.get("text", {}) + if isinstance(field_text, dict): + c = field_text.get("content", "") + if c: + parts.append(c) + + elif tag == "a": + href = element.get("href", "") + text = element.get("text", "") + if href: + parts.append(f"link: {href}") + if text: + parts.append(text) + + elif tag == "button": + text = element.get("text", {}) + if isinstance(text, dict): + c = text.get("content", "") + if c: + parts.append(c) + url = element.get("url", "") or element.get("multi_url", {}).get("url", "") + if url: + parts.append(f"link: {url}") + + elif tag == "img": + alt = element.get("alt", {}) + parts.append(alt.get("content", "[image]") if isinstance(alt, dict) else "[image]") + + elif tag == "note": + for ne in element.get("elements", []): + parts.extend(_extract_element_content(ne)) + + elif tag == "column_set": + for col in element.get("columns", []): + for ce in col.get("elements", []): + parts.extend(_extract_element_content(ce)) + + elif tag == "plain_text": + content = element.get("content", "") + if content: + parts.append(content) + + else: + for ne in element.get("elements", []): + parts.extend(_extract_element_content(ne)) + + return parts + + def _extract_post_text(content_json: dict) -> str: """Extract plain text from Feishu post (rich text) message content. @@ -345,6 +481,87 @@ class FeishuChannel(BaseChannel): logger.error("Error uploading file {}: {}", file_path, e) return None + def _download_image_sync(self, message_id: str, image_key: str) -> tuple[bytes | None, str | None]: + """Download an image from Feishu message by message_id and image_key.""" + try: + request = GetMessageResourceRequest.builder() \ + .message_id(message_id) \ + .file_key(image_key) \ + .type("image") \ + .build() + response = self._client.im.v1.message_resource.get(request) + if response.success(): + file_data = response.file + # GetMessageResourceRequest returns BytesIO, need to read bytes + if hasattr(file_data, 'read'): + file_data = file_data.read() + return file_data, response.file_name + else: + logger.error("Failed to download image: code={}, msg={}", response.code, response.msg) + return None, None + except Exception as e: + logger.error("Error downloading image {}: {}", image_key, e) + return None, None + + def _download_file_sync(self, file_key: str) -> tuple[bytes | None, str | None]: + """Download a file from Feishu by file_key.""" + try: + request = GetFileRequest.builder().file_key(file_key).build() + response = self._client.im.v1.file.get(request) + if response.success(): + return response.file, response.file_name + else: + logger.error("Failed to download file: code={}, msg={}", response.code, response.msg) + return None, None + except Exception as e: + logger.error("Error downloading file {}: {}", file_key, e) + return None, None + + async def _download_and_save_media( + self, + msg_type: str, + content_json: dict, + message_id: str | None = None + ) -> tuple[str | None, str]: + """ + Download media from Feishu and save to local disk. + + Returns: + (file_path, content_text) - file_path is None if download failed + """ + loop = asyncio.get_running_loop() + media_dir = Path.home() / ".nanobot" / "media" + media_dir.mkdir(parents=True, exist_ok=True) + + data, filename = None, None + + if msg_type == "image": + image_key = content_json.get("image_key") + if image_key and message_id: + data, filename = await loop.run_in_executor( + None, self._download_image_sync, message_id, image_key + ) + if not filename: + filename = f"{image_key[:16]}.jpg" + + elif msg_type in ("audio", "file"): + file_key = content_json.get("file_key") + if file_key: + data, filename = await loop.run_in_executor( + None, self._download_file_sync, file_key + ) + if not filename: + ext = ".opus" if msg_type == "audio" else "" + filename = f"{file_key[:16]}{ext}" + + if data and filename: + file_path = media_dir / filename + file_path.write_bytes(data) + logger.debug("Downloaded {} to {}", msg_type, file_path) + return str(file_path), f"[{msg_type}: {filename}]" + + return None, f"[{msg_type}: download failed]" + def _send_message_sync(self, receive_id_type: str, receive_id: str, msg_type: str, content: str) -> bool: """Send a single message (text/image/file/interactive) synchronously.""" try: @@ -425,60 +642,81 @@ class FeishuChannel(BaseChannel): event = data.event message = event.message sender = event.sender - + # Deduplication check message_id = message.message_id if message_id in self._processed_message_ids: return self._processed_message_ids[message_id] = None - - # Trim cache: keep most recent 500 when exceeds 1000 + + # Trim cache while len(self._processed_message_ids) > 1000: self._processed_message_ids.popitem(last=False) - + # Skip bot messages - sender_type = sender.sender_type - if sender_type == "bot": + if sender.sender_type == "bot": return - + sender_id = sender.sender_id.open_id if sender.sender_id else "unknown" chat_id = message.chat_id - chat_type = message.chat_type # "p2p" or "group" + chat_type = message.chat_type msg_type = message.message_type - - # Add reaction to indicate "seen" + + # Add reaction await self._add_reaction(message_id, "THUMBSUP") - - # Parse message content + + # Parse content + content_parts = [] + media_paths = [] + + try: + content_json = json.loads(message.content) if message.content else {} + except json.JSONDecodeError: + content_json = {} + if msg_type == "text": - try: - content = json.loads(message.content).get("text", "") - except json.JSONDecodeError: - content = message.content or "" + text = content_json.get("text", "") + if text: + content_parts.append(text) + elif msg_type == "post": - try: - content_json = json.loads(message.content) - content = _extract_post_text(content_json) - except (json.JSONDecodeError, TypeError): - content = message.content or "" + text = _extract_post_text(content_json) + if text: + content_parts.append(text) + + elif msg_type in ("image", "audio", "file"): + file_path, content_text = await self._download_and_save_media(msg_type, content_json, message_id) + if file_path: + media_paths.append(file_path) + content_parts.append(content_text) + + elif msg_type in ("share_chat", "share_user", "interactive", "share_calendar_event", "system", "merge_forward"): + # Handle share cards and interactive messages + text = _extract_share_card_content(content_json, msg_type) + if text: + content_parts.append(text) + else: - content = MSG_TYPE_MAP.get(msg_type, f"[{msg_type}]") - - if not content: + content_parts.append(MSG_TYPE_MAP.get(msg_type, f"[{msg_type}]")) + + content = "\n".join(content_parts) if content_parts else "" + + if not content and not media_paths: return - + # Forward to message bus reply_to = chat_id if chat_type == "group" else sender_id await self._handle_message( sender_id=sender_id, chat_id=reply_to, content=content, + media=media_paths, metadata={ "message_id": message_id, "chat_type": chat_type, "msg_type": msg_type, } ) - + except Exception as e: logger.error("Error processing Feishu message: {}", e)