fix(feishu): fix double recursion, English placeholders, top-level Path import

This commit is contained in:
Re-bin
2026-02-21 06:30:26 +00:00
parent b9c3f8a5a3
commit 8125d9b6bc

View File

@@ -6,6 +6,7 @@ import os
import re import re
import threading import threading
from collections import OrderedDict from collections import OrderedDict
from pathlib import Path
from typing import Any from typing import Any
from loguru import logger from loguru import logger
@@ -47,44 +48,22 @@ MSG_TYPE_MAP = {
def _extract_share_card_content(content_json: dict, msg_type: str) -> str: def _extract_share_card_content(content_json: dict, msg_type: str) -> str:
"""Extract content from share cards and interactive messages. """Extract text representation from share cards and interactive messages."""
Handles:
- share_chat: Group share card
- share_user: User share card
- interactive: Interactive card (may contain links from external shares)
- share_calendar_event: Calendar event share
- system: System messages
"""
parts = [] parts = []
if msg_type == "share_chat": if msg_type == "share_chat":
# Group share: {"chat_id": "oc_xxx"} parts.append(f"[shared chat: {content_json.get('chat_id', '')}]")
chat_id = content_json.get("chat_id", "")
parts.append(f"[分享群聊: {chat_id}]")
elif msg_type == "share_user": elif msg_type == "share_user":
# User share: {"user_id": "ou_xxx"} parts.append(f"[shared user: {content_json.get('user_id', '')}]")
user_id = content_json.get("user_id", "")
parts.append(f"[分享用户: {user_id}]")
elif msg_type == "interactive": elif msg_type == "interactive":
# Interactive card - extract text and links recursively
parts.extend(_extract_interactive_content(content_json)) parts.extend(_extract_interactive_content(content_json))
elif msg_type == "share_calendar_event": elif msg_type == "share_calendar_event":
# Calendar event share parts.append(f"[shared calendar event: {content_json.get('event_key', '')}]")
event_key = content_json.get("event_key", "")
parts.append(f"[分享日程: {event_key}]")
elif msg_type == "system": elif msg_type == "system":
# System message parts.append("[system message]")
parts.append("[系统消息]")
elif msg_type == "merge_forward": elif msg_type == "merge_forward":
# Merged forward messages parts.append("[merged forward messages]")
parts.append("[合并转发消息]")
return "\n".join(parts) if parts else f"[{msg_type}]" return "\n".join(parts) if parts else f"[{msg_type}]"
@@ -93,44 +72,37 @@ def _extract_interactive_content(content: dict) -> list[str]:
parts = [] parts = []
if isinstance(content, str): if isinstance(content, str):
# Try to parse as JSON
try: try:
content = json.loads(content) content = json.loads(content)
except (json.JSONDecodeError, TypeError): except (json.JSONDecodeError, TypeError):
return [content] if content.strip() else [] return [content] if content.strip() else []
if not isinstance(content, dict): if not isinstance(content, dict):
return parts return parts
# Extract title
if "title" in content: if "title" in content:
title = content["title"] title = content["title"]
if isinstance(title, dict): if isinstance(title, dict):
title_content = title.get("content", "") or title.get("text", "") title_content = title.get("content", "") or title.get("text", "")
if title_content: if title_content:
parts.append(f"标题: {title_content}") parts.append(f"title: {title_content}")
elif isinstance(title, str): elif isinstance(title, str):
parts.append(f"标题: {title}") parts.append(f"title: {title}")
# Extract from elements array for element in content.get("elements", []) if isinstance(content.get("elements"), list) else []:
elements = content.get("elements", []) parts.extend(_extract_element_content(element))
if isinstance(elements, list):
for element in elements:
parts.extend(_extract_element_content(element))
# Extract from card config
card = content.get("card", {}) card = content.get("card", {})
if card: if card:
parts.extend(_extract_interactive_content(card)) parts.extend(_extract_interactive_content(card))
# Extract header
header = content.get("header", {}) header = content.get("header", {})
if header: if header:
header_title = header.get("title", {}) header_title = header.get("title", {})
if isinstance(header_title, dict): if isinstance(header_title, dict):
header_text = header_title.get("content", "") or header_title.get("text", "") header_text = header_title.get("content", "") or header_title.get("text", "")
if header_text: if header_text:
parts.append(f"标题: {header_text}") parts.append(f"title: {header_text}")
return parts return parts
@@ -144,13 +116,11 @@ def _extract_element_content(element: dict) -> list[str]:
tag = element.get("tag", "") tag = element.get("tag", "")
# Markdown element if tag in ("markdown", "lark_md"):
if tag == "markdown" or tag == "lark_md":
content = element.get("content", "") content = element.get("content", "")
if content: if content:
parts.append(content) parts.append(content)
# Div element
elif tag == "div": elif tag == "div":
text = element.get("text", {}) text = element.get("text", {})
if isinstance(text, dict): if isinstance(text, dict):
@@ -159,64 +129,52 @@ def _extract_element_content(element: dict) -> list[str]:
parts.append(text_content) parts.append(text_content)
elif isinstance(text, str): elif isinstance(text, str):
parts.append(text) parts.append(text)
# Check for extra fields for field in element.get("fields", []):
fields = element.get("fields", [])
for field in fields:
if isinstance(field, dict): if isinstance(field, dict):
field_text = field.get("text", {}) field_text = field.get("text", {})
if isinstance(field_text, dict): if isinstance(field_text, dict):
parts.append(field_text.get("content", "")) c = field_text.get("content", "")
if c:
# Link/URL element parts.append(c)
elif tag == "a": elif tag == "a":
href = element.get("href", "") href = element.get("href", "")
text = element.get("text", "") text = element.get("text", "")
if href: if href:
parts.append(f"链接: {href}") parts.append(f"link: {href}")
if text: if text:
parts.append(text) parts.append(text)
# Button element (may contain URL)
elif tag == "button": elif tag == "button":
text = element.get("text", {}) text = element.get("text", {})
if isinstance(text, dict): if isinstance(text, dict):
parts.append(text.get("content", "")) c = text.get("content", "")
if c:
parts.append(c)
url = element.get("url", "") or element.get("multi_url", {}).get("url", "") url = element.get("url", "") or element.get("multi_url", {}).get("url", "")
if url: if url:
parts.append(f"链接: {url}") parts.append(f"link: {url}")
# Image element
elif tag == "img": elif tag == "img":
alt = element.get("alt", {}) alt = element.get("alt", {})
if isinstance(alt, dict): parts.append(alt.get("content", "[image]") if isinstance(alt, dict) else "[image]")
parts.append(alt.get("content", "[图片]"))
else:
parts.append("[图片]")
# Note element
elif tag == "note": elif tag == "note":
note_elements = element.get("elements", []) for ne in element.get("elements", []):
for ne in note_elements:
parts.extend(_extract_element_content(ne)) parts.extend(_extract_element_content(ne))
# Column set
elif tag == "column_set": elif tag == "column_set":
columns = element.get("columns", []) for col in element.get("columns", []):
for col in columns: for ce in col.get("elements", []):
col_elements = col.get("elements", [])
for ce in col_elements:
parts.extend(_extract_element_content(ce)) parts.extend(_extract_element_content(ce))
# Plain text
elif tag == "plain_text": elif tag == "plain_text":
content = element.get("content", "") content = element.get("content", "")
if content: if content:
parts.append(content) parts.append(content)
# Recursively check nested elements else:
nested = element.get("elements", []) for ne in element.get("elements", []):
if isinstance(nested, list):
for ne in nested:
parts.extend(_extract_element_content(ne)) parts.extend(_extract_element_content(ne))
return parts return parts
@@ -571,8 +529,6 @@ class FeishuChannel(BaseChannel):
Returns: Returns:
(file_path, content_text) - file_path is None if download failed (file_path, content_text) - file_path is None if download failed
""" """
from pathlib import Path
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
media_dir = Path.home() / ".nanobot" / "media" media_dir = Path.home() / ".nanobot" / "media"
media_dir.mkdir(parents=True, exist_ok=True) media_dir.mkdir(parents=True, exist_ok=True)