From 8125d9b6bcf39a2d92f13833aa53be45ed3a9330 Mon Sep 17 00:00:00 2001 From: Re-bin Date: Sat, 21 Feb 2026 06:30:26 +0000 Subject: [PATCH] fix(feishu): fix double recursion, English placeholders, top-level Path import --- nanobot/channels/feishu.py | 130 ++++++++++++------------------------- 1 file changed, 43 insertions(+), 87 deletions(-) diff --git a/nanobot/channels/feishu.py b/nanobot/channels/feishu.py index 7e1d50a..815d853 100644 --- a/nanobot/channels/feishu.py +++ b/nanobot/channels/feishu.py @@ -6,6 +6,7 @@ import os import re import threading from collections import OrderedDict +from pathlib import Path from typing import Any from loguru import logger @@ -47,44 +48,22 @@ MSG_TYPE_MAP = { def _extract_share_card_content(content_json: dict, msg_type: str) -> str: - """Extract content from share cards and interactive messages. - - Handles: - - share_chat: Group share card - - share_user: User share card - - interactive: Interactive card (may contain links from external shares) - - share_calendar_event: Calendar event share - - system: System messages - """ + """Extract text representation from share cards and interactive messages.""" parts = [] - + if msg_type == "share_chat": - # Group share: {"chat_id": "oc_xxx"} - chat_id = content_json.get("chat_id", "") - parts.append(f"[分享群聊: {chat_id}]") - + parts.append(f"[shared chat: {content_json.get('chat_id', '')}]") elif msg_type == "share_user": - # User share: {"user_id": "ou_xxx"} - user_id = content_json.get("user_id", "") - parts.append(f"[分享用户: {user_id}]") - + parts.append(f"[shared user: {content_json.get('user_id', '')}]") elif msg_type == "interactive": - # Interactive card - extract text and links recursively parts.extend(_extract_interactive_content(content_json)) - elif msg_type == "share_calendar_event": - # Calendar event share - event_key = content_json.get("event_key", "") - parts.append(f"[分享日程: {event_key}]") - + parts.append(f"[shared calendar event: {content_json.get('event_key', '')}]") elif msg_type == "system": - # System message - parts.append("[系统消息]") - + parts.append("[system message]") elif msg_type == "merge_forward": - # Merged forward messages - parts.append("[合并转发消息]") - + parts.append("[merged forward messages]") + return "\n".join(parts) if parts else f"[{msg_type}]" @@ -93,44 +72,37 @@ def _extract_interactive_content(content: dict) -> list[str]: parts = [] if isinstance(content, str): - # Try to parse as JSON try: content = json.loads(content) except (json.JSONDecodeError, TypeError): return [content] if content.strip() else [] - + if not isinstance(content, dict): return parts - - # Extract title + if "title" in content: title = content["title"] if isinstance(title, dict): title_content = title.get("content", "") or title.get("text", "") if title_content: - parts.append(f"标题: {title_content}") + parts.append(f"title: {title_content}") elif isinstance(title, str): - parts.append(f"标题: {title}") - - # Extract from elements array - elements = content.get("elements", []) - if isinstance(elements, list): - for element in elements: - parts.extend(_extract_element_content(element)) - - # Extract from card config + parts.append(f"title: {title}") + + for element in content.get("elements", []) if isinstance(content.get("elements"), list) else []: + parts.extend(_extract_element_content(element)) + card = content.get("card", {}) if card: parts.extend(_extract_interactive_content(card)) - - # Extract header + header = content.get("header", {}) if header: header_title = header.get("title", {}) if isinstance(header_title, dict): header_text = header_title.get("content", "") or header_title.get("text", "") if header_text: - parts.append(f"标题: {header_text}") + parts.append(f"title: {header_text}") return parts @@ -144,13 +116,11 @@ def _extract_element_content(element: dict) -> list[str]: tag = element.get("tag", "") - # Markdown element - if tag == "markdown" or tag == "lark_md": + if tag in ("markdown", "lark_md"): content = element.get("content", "") if content: parts.append(content) - - # Div element + elif tag == "div": text = element.get("text", {}) if isinstance(text, dict): @@ -159,64 +129,52 @@ def _extract_element_content(element: dict) -> list[str]: parts.append(text_content) elif isinstance(text, str): parts.append(text) - # Check for extra fields - fields = element.get("fields", []) - for field in fields: + for field in element.get("fields", []): if isinstance(field, dict): field_text = field.get("text", {}) if isinstance(field_text, dict): - parts.append(field_text.get("content", "")) - - # Link/URL element + c = field_text.get("content", "") + if c: + parts.append(c) + elif tag == "a": href = element.get("href", "") text = element.get("text", "") if href: - parts.append(f"链接: {href}") + parts.append(f"link: {href}") if text: parts.append(text) - - # Button element (may contain URL) + elif tag == "button": text = element.get("text", {}) if isinstance(text, dict): - parts.append(text.get("content", "")) + c = text.get("content", "") + if c: + parts.append(c) url = element.get("url", "") or element.get("multi_url", {}).get("url", "") if url: - parts.append(f"链接: {url}") - - # Image element + parts.append(f"link: {url}") + elif tag == "img": alt = element.get("alt", {}) - if isinstance(alt, dict): - parts.append(alt.get("content", "[图片]")) - else: - parts.append("[图片]") - - # Note element + parts.append(alt.get("content", "[image]") if isinstance(alt, dict) else "[image]") + elif tag == "note": - note_elements = element.get("elements", []) - for ne in note_elements: + for ne in element.get("elements", []): parts.extend(_extract_element_content(ne)) - - # Column set + elif tag == "column_set": - columns = element.get("columns", []) - for col in columns: - col_elements = col.get("elements", []) - for ce in col_elements: + for col in element.get("columns", []): + for ce in col.get("elements", []): parts.extend(_extract_element_content(ce)) - - # Plain text + elif tag == "plain_text": content = element.get("content", "") if content: parts.append(content) - - # Recursively check nested elements - nested = element.get("elements", []) - if isinstance(nested, list): - for ne in nested: + + else: + for ne in element.get("elements", []): parts.extend(_extract_element_content(ne)) return parts @@ -571,8 +529,6 @@ class FeishuChannel(BaseChannel): Returns: (file_path, content_text) - file_path is None if download failed """ - from pathlib import Path - loop = asyncio.get_running_loop() media_dir = Path.home() / ".nanobot" / "media" media_dir.mkdir(parents=True, exist_ok=True)