diff --git a/bridge/src/whatsapp.ts b/bridge/src/whatsapp.ts index 04eba0f..a98f3a8 100644 --- a/bridge/src/whatsapp.ts +++ b/bridge/src/whatsapp.ts @@ -29,6 +29,7 @@ export interface InboundMessage { content: string; timestamp: number; isGroup: boolean; + wasMentioned?: boolean; media?: string[]; } @@ -48,6 +49,31 @@ export class WhatsAppClient { this.options = options; } + private normalizeJid(jid: string | undefined | null): string { + return (jid || '').split(':')[0]; + } + + private wasMentioned(msg: any): boolean { + if (!msg?.key?.remoteJid?.endsWith('@g.us')) return false; + + const candidates = [ + msg?.message?.extendedTextMessage?.contextInfo?.mentionedJid, + msg?.message?.imageMessage?.contextInfo?.mentionedJid, + msg?.message?.videoMessage?.contextInfo?.mentionedJid, + msg?.message?.documentMessage?.contextInfo?.mentionedJid, + msg?.message?.audioMessage?.contextInfo?.mentionedJid, + ]; + const mentioned = candidates.flatMap((items) => (Array.isArray(items) ? items : [])); + if (mentioned.length === 0) return false; + + const selfIds = new Set( + [this.sock?.user?.id, this.sock?.user?.lid, this.sock?.user?.jid] + .map((jid) => this.normalizeJid(jid)) + .filter(Boolean), + ); + return mentioned.some((jid: string) => selfIds.has(this.normalizeJid(jid))); + } + async connect(): Promise { const logger = pino({ level: 'silent' }); const { state, saveCreds } = await useMultiFileAuthState(this.options.authDir); @@ -145,6 +171,7 @@ export class WhatsAppClient { if (!finalContent && mediaPaths.length === 0) continue; const isGroup = msg.key.remoteJid?.endsWith('@g.us') || false; + const wasMentioned = this.wasMentioned(msg); this.options.onMessage({ id: msg.key.id || '', @@ -153,6 +180,7 @@ export class WhatsAppClient { content: finalContent, timestamp: msg.messageTimestamp as number, isGroup, + ...(isGroup ? { wasMentioned } : {}), ...(mediaPaths.length > 0 ? { media: mediaPaths } : {}), }); } diff --git a/nanobot/channels/feishu.py b/nanobot/channels/feishu.py index 64d92e7..75bd474 100644 --- a/nanobot/channels/feishu.py +++ b/nanobot/channels/feishu.py @@ -938,6 +938,9 @@ class FeishuChannel(BaseChannel): reply_message_id: str | None = None if self.config.reply_to_message and not msg.metadata.get("_progress", False): reply_message_id = msg.metadata.get("message_id") or None + # For topic group messages, always reply to keep context in thread + elif msg.metadata.get("thread_id"): + reply_message_id = msg.metadata.get("root_id") or msg.metadata.get("message_id") or None first_send = True @@ -1095,6 +1098,7 @@ class FeishuChannel(BaseChannel): parent_id = getattr(message, "parent_id", None) or None root_id = getattr(message, "root_id", None) or None + thread_id = getattr(message, "thread_id", None) or None if parent_id and self._client: loop = asyncio.get_running_loop() @@ -1120,6 +1124,7 @@ class FeishuChannel(BaseChannel): "msg_type": msg_type, "parent_id": parent_id, "root_id": root_id, + "thread_id": thread_id, } ) diff --git a/nanobot/channels/qq.py b/nanobot/channels/qq.py index 7f0742c..ea01ad3 100644 --- a/nanobot/channels/qq.py +++ b/nanobot/channels/qq.py @@ -2,11 +2,15 @@ import asyncio import base64 +import os +import re +import time from collections import deque from pathlib import Path -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any from urllib.parse import urlparse +import aiohttp from loguru import logger from nanobot.bus.events import OutboundMessage @@ -16,6 +20,11 @@ from nanobot.config.schema import QQConfig, QQInstanceConfig from nanobot.security.network import validate_url_target from nanobot.utils.delivery import delivery_artifacts_root, is_image_file +try: + from nanobot.config.paths import get_media_dir +except Exception: # pragma: no cover + get_media_dir = None # type: ignore + try: import botpy from botpy.http import Route @@ -34,6 +43,33 @@ if TYPE_CHECKING: from botpy.message import C2CMessage, GroupMessage +_IMAGE_EXTS = { + ".png", + ".jpg", + ".jpeg", + ".gif", + ".bmp", + ".webp", + ".tif", + ".tiff", + ".ico", + ".svg", +} +_SAFE_NAME_RE = re.compile(r"[^\w.\-()\[\]()【】\u4e00-\u9fff]+", re.UNICODE) + + +def _sanitize_filename(name: str) -> str: + """Sanitize filename to avoid traversal and problematic characters.""" + name = Path(name or "").name.strip() + name = _SAFE_NAME_RE.sub("_", name).strip("._ ") + return name + + +def _is_image_name(name: str) -> bool: + """Return whether the file name looks like an image.""" + return Path(name).suffix.lower() in _IMAGE_EXTS + + def _make_bot_class(channel: "QQChannel") -> "type[botpy.Client]": """Create a botpy Client subclass bound to the given channel.""" intents = botpy.Intents(public_messages=True, direct_message=True) @@ -71,17 +107,21 @@ class QQChannel(BaseChannel): def __init__( self, - config: QQConfig | QQInstanceConfig, + config: QQConfig | QQInstanceConfig | dict, bus: MessageBus, workspace: str | Path | None = None, ): + if isinstance(config, dict): + config = QQConfig.model_validate(config) super().__init__(config, bus) self.config: QQConfig | QQInstanceConfig = config self._client: "botpy.Client | None" = None - self._processed_ids: deque = deque(maxlen=1000) + self._http: aiohttp.ClientSession | None = None + self._processed_ids: deque[str] = deque(maxlen=1000) self._msg_seq: int = 1 # 消息序列号,避免被 QQ API 去重 self._chat_type_cache: dict[str, str] = {} self._workspace = Path(workspace).expanduser() if workspace is not None else None + self._media_root = self._init_media_root() @staticmethod def _is_remote_media(path: str) -> bool: @@ -153,14 +193,24 @@ class QQChannel(BaseChannel): """Encode a local media file as base64 for QQ rich-media upload.""" return base64.b64encode(path.read_bytes()).decode("ascii") - async def _post_text_message(self, chat_id: str, msg_type: str, content: str, msg_id: str | None) -> None: - """Send a plain-text QQ message.""" - payload = { - "msg_type": 0, - "content": content, + async def _post_text_message( + self, + chat_id: str, + msg_type: str, + content: str, + msg_id: str | None, + ) -> None: + """Send a plain-text or markdown QQ message.""" + use_markdown = self.config.msg_format == "markdown" + payload: dict[str, Any] = { + "msg_type": 2 if use_markdown else 0, "msg_id": msg_id, "msg_seq": self._next_msg_seq(), } + if use_markdown: + payload["markdown"] = {"content": content} + else: + payload["content"] = content if msg_type == "group": await self._client.api.post_group_message(group_openid=chat_id, **payload) else: @@ -248,8 +298,24 @@ class QQChannel(BaseChannel): msg_seq=self._next_msg_seq(), ) + def _init_media_root(self) -> Path: + """Choose a directory for saving inbound attachments.""" + if self.config.media_dir: + root = Path(self.config.media_dir).expanduser() + elif get_media_dir: + try: + root = Path(get_media_dir("qq")) + except Exception: + root = Path.home() / ".nanobot" / "media" / "qq" + else: + root = Path.home() / ".nanobot" / "media" / "qq" + + root.mkdir(parents=True, exist_ok=True) + logger.info("QQ media directory: {}", str(root)) + return root + async def start(self) -> None: - """Start the QQ bot.""" + """Start the QQ bot with auto-reconnect.""" if not QQ_AVAILABLE: logger.error("QQ SDK not installed. Run: pip install qq-botpy") return @@ -259,8 +325,8 @@ class QQChannel(BaseChannel): return self._running = True - bot_class = _make_bot_class(self) - self._client = bot_class() + self._http = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=120)) + self._client = _make_bot_class(self)() logger.info("QQ bot started (C2C & Group supported)") await self._run_bot() @@ -276,13 +342,20 @@ class QQChannel(BaseChannel): await asyncio.sleep(5) async def stop(self) -> None: - """Stop the QQ bot.""" + """Stop bot and cleanup resources.""" self._running = False if self._client: try: await self._client.close() except Exception: pass + self._client = None + if self._http: + try: + await self._http.close() + except Exception: + pass + self._http = None logger.info("QQ bot stopped") async def send(self, msg: OutboundMessage) -> None: @@ -297,11 +370,13 @@ class QQChannel(BaseChannel): content_sent = False fallback_lines: list[str] = [] - for media_path in msg.media: + for media_path in msg.media or []: local_media_path: Path | None = None local_file_type: int | None = None if not self._is_remote_media(media_path): - local_media_path, local_file_type, publish_error = self._resolve_local_media(media_path) + local_media_path, local_file_type, publish_error = self._resolve_local_media( + media_path + ) if local_media_path is None: logger.warning( "QQ outbound local media could not be uploaded directly: {} ({})", @@ -353,7 +428,9 @@ class QQChannel(BaseChannel): logger.error("Error sending QQ media {}: {}", media_path, media_error) if local_media_path is not None: fallback_lines.append( - self._failed_media_notice(media_path, "QQ local file_data upload failed") + self._failed_media_notice( + media_path, "QQ local file_data upload failed" + ) ) else: fallback_lines.append(self._failed_media_notice(media_path)) @@ -372,29 +449,161 @@ class QQChannel(BaseChannel): async def _on_message(self, data: "C2CMessage | GroupMessage", is_group: bool = False) -> None: """Handle incoming message from QQ.""" try: - # Dedup by message ID if data.id in self._processed_ids: return self._processed_ids.append(data.id) - content = (data.content or "").strip() - if not content: - return - if is_group: chat_id = data.group_openid user_id = data.author.member_openid self._chat_type_cache[chat_id] = "group" else: - chat_id = str(getattr(data.author, 'id', None) or getattr(data.author, 'user_openid', 'unknown')) + chat_id = str( + getattr(data.author, "id", None) + or getattr(data.author, "user_openid", "unknown") + ) user_id = chat_id self._chat_type_cache[chat_id] = "c2c" + content = (data.content or "").strip() + attachments = getattr(data, "attachments", None) or [] + media_paths, recv_lines, att_meta = await self._handle_attachments(attachments) + if recv_lines: + tag = "[Image]" if any(_is_image_name(Path(p).name) for p in media_paths) else "[File]" + file_block = "Received files:\n" + "\n".join(recv_lines) + content = f"{content}\n\n{file_block}".strip() if content else f"{tag}\n{file_block}" + + if not content and not media_paths: + return + await self._handle_message( sender_id=user_id, chat_id=chat_id, content=content, - metadata={"message_id": data.id}, + media=media_paths or None, + metadata={ + "message_id": data.id, + "attachments": att_meta, + }, ) except Exception: logger.exception("Error handling QQ message") + + async def _handle_attachments(self, attachments: list[Any]) -> tuple[list[str], list[str], list[dict[str, Any]]]: + """Extract, download, and format QQ attachments for downstream tools.""" + media_paths: list[str] = [] + recv_lines: list[str] = [] + att_meta: list[dict[str, Any]] = [] + if not attachments: + return media_paths, recv_lines, att_meta + + for att in attachments: + url = getattr(att, "url", None) + filename = getattr(att, "filename", None) + content_type = getattr(att, "content_type", None) + local_path = ( + await self._download_to_media_dir_chunked(url, filename_hint=filename or "") + if url + else None + ) + att_meta.append( + { + "url": url, + "filename": filename, + "content_type": content_type, + "saved_path": local_path, + } + ) + shown_name = filename or url or "file" + if local_path: + media_paths.append(local_path) + recv_lines.append(f"- {shown_name}\n saved: {local_path}") + else: + recv_lines.append(f"- {shown_name}\n saved: [download failed]") + + return media_paths, recv_lines, att_meta + + async def _download_to_media_dir_chunked(self, url: str, filename_hint: str = "") -> str | None: + """Download an inbound attachment using chunked streaming writes.""" + if not self._http: + self._http = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=120)) + + safe = _sanitize_filename(filename_hint) + timestamp_ms = int(time.time() * 1000) + tmp_path: Path | None = None + + try: + async with self._http.get( + url, + timeout=aiohttp.ClientTimeout(total=120), + allow_redirects=True, + ) as resp: + if resp.status != 200: + logger.warning("QQ download failed: status={} url={}", resp.status, url) + return None + + content_type = (resp.headers.get("Content-Type") or "").lower() + ext = Path(urlparse(url).path).suffix or Path(filename_hint).suffix + if not ext: + if "png" in content_type: + ext = ".png" + elif "jpeg" in content_type or "jpg" in content_type: + ext = ".jpg" + elif "gif" in content_type: + ext = ".gif" + elif "webp" in content_type: + ext = ".webp" + elif "pdf" in content_type: + ext = ".pdf" + else: + ext = ".bin" + + if safe and not Path(safe).suffix: + safe = safe + ext + filename = safe or f"qq_file_{timestamp_ms}{ext}" + target = self._media_root / filename + if target.exists(): + target = self._media_root / f"{target.stem}_{timestamp_ms}{target.suffix}" + tmp_path = target.with_suffix(target.suffix + ".part") + + chunk_size = max(1024, int(self.config.download_chunk_size or 262144)) + max_bytes = max( + 1024 * 1024, + int(self.config.download_max_bytes or (200 * 1024 * 1024)), + ) + downloaded = 0 + + def _open_tmp() -> Any: + tmp_path.parent.mkdir(parents=True, exist_ok=True) + return open(tmp_path, "wb") # noqa: SIM115 + + f = await asyncio.to_thread(_open_tmp) + try: + async for chunk in resp.content.iter_chunked(chunk_size): + if not chunk: + continue + downloaded += len(chunk) + if downloaded > max_bytes: + logger.warning( + "QQ download exceeded max_bytes={} url={} -> abort", + max_bytes, + url, + ) + return None + await asyncio.to_thread(f.write, chunk) + finally: + await asyncio.to_thread(f.close) + + await asyncio.to_thread(os.replace, tmp_path, target) + tmp_path = None + logger.info("QQ file saved: {}", str(target)) + return str(target) + except Exception as e: + logger.error("QQ download error: {}", e) + return None + finally: + if tmp_path is not None: + try: + tmp_path.unlink(missing_ok=True) + except Exception: + pass diff --git a/nanobot/channels/telegram.py b/nanobot/channels/telegram.py index 1fb64e7..2081fc2 100644 --- a/nanobot/channels/telegram.py +++ b/nanobot/channels/telegram.py @@ -10,7 +10,7 @@ from dataclasses import dataclass from typing import Any from loguru import logger -from telegram import BotCommand, ReplyParameters, Update +from telegram import BotCommand, ReactionTypeEmoji, ReplyParameters, Update from telegram.error import TimedOut from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters from telegram.request import HTTPXRequest @@ -800,6 +800,7 @@ class TelegramChannel(BaseChannel): "session_key": session_key, } self._start_typing(str_chat_id) + await self._add_reaction(str_chat_id, message.message_id, self.config.react_emoji) buf = self._media_group_buffers[key] if content and content != "[empty message]": buf["contents"].append(content) @@ -810,6 +811,7 @@ class TelegramChannel(BaseChannel): # Start typing indicator before processing self._start_typing(str_chat_id) + await self._add_reaction(str_chat_id, message.message_id, self.config.react_emoji) # Forward to the message bus await self._handle_message( @@ -849,6 +851,19 @@ class TelegramChannel(BaseChannel): if task and not task.done(): task.cancel() + async def _add_reaction(self, chat_id: str, message_id: int, emoji: str) -> None: + """Add an emoji reaction best-effort without interrupting message handling.""" + if not self._app or not emoji: + return + try: + await self._app.bot.set_message_reaction( + chat_id=int(chat_id), + message_id=message_id, + reaction=[ReactionTypeEmoji(emoji=emoji)], + ) + except Exception as e: + logger.debug("Telegram reaction failed: {}", e) + async def _typing_loop(self, chat_id: str) -> None: """Repeatedly send 'typing' action until cancelled.""" try: diff --git a/nanobot/channels/whatsapp.py b/nanobot/channels/whatsapp.py index 76a98e9..9246e2c 100644 --- a/nanobot/channels/whatsapp.py +++ b/nanobot/channels/whatsapp.py @@ -32,7 +32,9 @@ class WhatsAppChannel(BaseChannel): def default_config(cls) -> dict[str, object]: return WhatsAppConfig().model_dump(by_alias=True) - def __init__(self, config: WhatsAppConfig | WhatsAppInstanceConfig, bus: MessageBus): + def __init__(self, config: WhatsAppConfig | WhatsAppInstanceConfig | dict, bus: MessageBus): + if isinstance(config, dict): + config = WhatsAppConfig.model_validate(config) super().__init__(config, bus) self.config: WhatsAppConfig | WhatsAppInstanceConfig = config self._ws = None @@ -175,6 +177,12 @@ class WhatsAppChannel(BaseChannel): self._processed_message_ids.popitem(last=False) # Extract just the phone number or lid as chat_id + is_group = data.get("isGroup", False) + was_mentioned = data.get("wasMentioned", False) + + if is_group and self.config.group_policy == "mention" and not was_mentioned: + return + user_id = pn if pn else sender sender_id = user_id.split("@")[0] if "@" in user_id else user_id logger.info("Sender {}", sender) diff --git a/nanobot/config/schema.py b/nanobot/config/schema.py index 1d8d5d1..a33f5a0 100644 --- a/nanobot/config/schema.py +++ b/nanobot/config/schema.py @@ -21,6 +21,7 @@ class WhatsAppConfig(Base): bridge_url: str = "ws://localhost:3001" bridge_token: str = "" # Shared token for bridge auth (optional, recommended) allow_from: list[str] = Field(default_factory=list) # Allowed phone numbers + group_policy: Literal["open", "mention"] = "open" class WhatsAppInstanceConfig(WhatsAppConfig): @@ -46,6 +47,7 @@ class TelegramConfig(Base): None # HTTP/SOCKS5 proxy URL, e.g. "http://127.0.0.1:7890" or "socks5://127.0.0.1:1080" ) reply_to_message: bool = False # If true, bot replies quote the original message + react_emoji: str = "👀" group_policy: Literal["open", "mention"] = "mention" # "mention" responds when @mentioned or replied to, "open" responds to all connection_pool_size: int = 32 # Outbound Telegram API HTTP pool size pool_timeout: float = 5.0 # Shared HTTP pool timeout for bot sends and getUpdates @@ -319,6 +321,10 @@ class QQConfig(Base): app_id: str = "" # 机器人 ID (AppID) from q.qq.com secret: str = "" # 机器人密钥 (AppSecret) from q.qq.com allow_from: list[str] = Field(default_factory=list) # Allowed user openids + msg_format: Literal["plain", "markdown"] = "plain" + media_dir: str = "" + download_chunk_size: int = 1024 * 256 + download_max_bytes: int = 1024 * 1024 * 200 media_base_url: str = "" # Public base URL used to expose workspace/out QQ media files diff --git a/tests/test_qq_channel.py b/tests/test_qq_channel.py index 8eefdd8..fe8287a 100644 --- a/tests/test_qq_channel.py +++ b/tests/test_qq_channel.py @@ -80,6 +80,7 @@ async def test_on_group_message_routes_to_group_chat_id() -> None: content="hello", group_openid="group123", author=SimpleNamespace(member_openid="user1"), + attachments=[], ) await channel._on_message(data, is_group=True) @@ -142,6 +143,35 @@ async def test_send_c2c_message_uses_plain_text_c2c_api_with_msg_seq() -> None: assert not channel._client.api.group_calls +@pytest.mark.asyncio +async def test_send_group_message_uses_markdown_when_configured() -> None: + channel = QQChannel( + QQConfig(app_id="app", secret="secret", allow_from=["*"], msg_format="markdown"), + MessageBus(), + ) + channel._client = _FakeClient() + channel._chat_type_cache["group123"] = "group" + + await channel.send( + OutboundMessage( + channel="qq", + chat_id="group123", + content="**hello**", + metadata={"message_id": "msg1"}, + ) + ) + + assert len(channel._client.api.group_calls) == 1 + call = channel._client.api.group_calls[0] + assert call == { + "group_openid": "group123", + "msg_type": 2, + "markdown": {"content": "**hello**"}, + "msg_id": "msg1", + "msg_seq": 2, + } + + @pytest.mark.asyncio async def test_send_group_remote_media_url_uses_file_api_then_media_message(monkeypatch) -> None: channel = QQChannel(QQConfig(app_id="app", secret="secret", allow_from=["*"]), MessageBus()) diff --git a/tests/test_whatsapp_channel.py b/tests/test_whatsapp_channel.py index 1413429..dea15d7 100644 --- a/tests/test_whatsapp_channel.py +++ b/tests/test_whatsapp_channel.py @@ -106,3 +106,52 @@ async def test_send_when_disconnected_is_noop(): await ch.send(msg) ch._ws.send.assert_not_called() + + +@pytest.mark.asyncio +async def test_group_policy_mention_skips_unmentioned_group_message(): + ch = WhatsAppChannel({"enabled": True, "groupPolicy": "mention"}, MagicMock()) + ch._handle_message = AsyncMock() + + await ch._handle_bridge_message( + json.dumps( + { + "type": "message", + "id": "m1", + "sender": "12345@g.us", + "pn": "user@s.whatsapp.net", + "content": "hello group", + "timestamp": 1, + "isGroup": True, + "wasMentioned": False, + } + ) + ) + + ch._handle_message.assert_not_called() + + +@pytest.mark.asyncio +async def test_group_policy_mention_accepts_mentioned_group_message(): + ch = WhatsAppChannel({"enabled": True, "groupPolicy": "mention"}, MagicMock()) + ch._handle_message = AsyncMock() + + await ch._handle_bridge_message( + json.dumps( + { + "type": "message", + "id": "m1", + "sender": "12345@g.us", + "pn": "user@s.whatsapp.net", + "content": "hello @bot", + "timestamp": 1, + "isGroup": True, + "wasMentioned": True, + } + ) + ) + + ch._handle_message.assert_awaited_once() + kwargs = ch._handle_message.await_args.kwargs + assert kwargs["chat_id"] == "12345@g.us" + assert kwargs["sender_id"] == "user"