diff --git a/nanobot/channels/qq.py b/nanobot/channels/qq.py index e556c98..5dae01b 100644 --- a/nanobot/channels/qq.py +++ b/nanobot/channels/qq.py @@ -1,33 +1,107 @@ -"""QQ channel implementation using botpy SDK.""" +"""QQ channel implementation using botpy SDK. + +Inbound: +- Parse QQ botpy messages (C2C / Group) +- Download attachments to media dir using chunked streaming write (memory-safe) +- Publish to Nanobot bus via BaseChannel._handle_message() +- Content includes a clear, actionable "Received files:" list with local paths + +Outbound: +- Send attachments (msg.media) first via QQ rich media API (base64 upload + msg_type=7) +- Then send text (plain or markdown) +- msg.media supports local paths, file:// paths, and http(s) URLs + +Notes: +- QQ restricts many audio/video formats. We conservatively classify as image vs file. +- Attachment structures differ across botpy versions; we try multiple field candidates. +""" + +from __future__ import annotations import asyncio +import base64 +import mimetypes +import os +import re +import time from collections import deque +from pathlib import Path from typing import TYPE_CHECKING, Any, Literal +from urllib.parse import unquote, urlparse +import aiohttp from loguru import logger +from pydantic import Field from nanobot.bus.events import OutboundMessage from nanobot.bus.queue import MessageBus from nanobot.channels.base import BaseChannel from nanobot.config.schema import Base -from pydantic import Field +from nanobot.security.network import validate_url_target + +try: + from nanobot.config.paths import get_media_dir +except Exception: # pragma: no cover + get_media_dir = None # type: ignore try: import botpy - from botpy.message import C2CMessage, GroupMessage + from botpy.http import Route QQ_AVAILABLE = True -except ImportError: +except ImportError: # pragma: no cover QQ_AVAILABLE = False botpy = None - C2CMessage = None - GroupMessage = None + Route = None if TYPE_CHECKING: - from botpy.message import C2CMessage, GroupMessage + from botpy.message import BaseMessage, C2CMessage, GroupMessage + from botpy.types.message import Media -def _make_bot_class(channel: "QQChannel") -> "type[botpy.Client]": +# QQ rich media file_type: 1=image, 4=file +# (2=voice, 3=video are restricted; we only use image vs file) +QQ_FILE_TYPE_IMAGE = 1 +QQ_FILE_TYPE_FILE = 4 + +_IMAGE_EXTS = { + ".png", + ".jpg", + ".jpeg", + ".gif", + ".bmp", + ".webp", + ".tif", + ".tiff", + ".ico", +} + +# Replace unsafe characters with "_", keep Chinese and common safe punctuation. +_SAFE_NAME_RE = re.compile(r"[^\w.\-()\[\]()【】\u4e00-\u9fff]+", re.UNICODE) + + +def _sanitize_filename(name: str) -> str: + """Sanitize filename to avoid traversal and problematic chars.""" + name = (name or "").strip() + name = Path(name).name + name = _SAFE_NAME_RE.sub("_", name).strip("._ ") + return name + + +def _is_image_name(name: str) -> bool: + return Path(name).suffix.lower() in _IMAGE_EXTS + + +def _guess_send_file_type(filename: str) -> int: + """Conservative send type: images -> 1, else -> 4.""" + ext = Path(filename).suffix.lower() + mime, _ = mimetypes.guess_type(filename) + if ext in _IMAGE_EXTS or (mime and mime.startswith("image/")): + return QQ_FILE_TYPE_IMAGE + return QQ_FILE_TYPE_FILE + + +def _make_bot_class(channel: QQChannel) -> type[botpy.Client]: """Create a botpy Client subclass bound to the given channel.""" intents = botpy.Intents(public_messages=True, direct_message=True) @@ -39,10 +113,10 @@ def _make_bot_class(channel: "QQChannel") -> "type[botpy.Client]": async def on_ready(self): logger.info("QQ bot ready: {}", self.robot.name) - async def on_c2c_message_create(self, message: "C2CMessage"): + async def on_c2c_message_create(self, message: C2CMessage): await channel._on_message(message, is_group=False) - async def on_group_at_message_create(self, message: "GroupMessage"): + async def on_group_at_message_create(self, message: GroupMessage): await channel._on_message(message, is_group=True) async def on_direct_message_create(self, message): @@ -60,6 +134,13 @@ class QQConfig(Base): allow_from: list[str] = Field(default_factory=list) msg_format: Literal["plain", "markdown"] = "plain" + # Optional: directory to save inbound attachments. If empty, use nanobot get_media_dir("qq"). + media_dir: str = "" + + # Download tuning + download_chunk_size: int = 1024 * 256 # 256KB + download_max_bytes: int = 1024 * 1024 * 200 # 200MB safety limit + class QQChannel(BaseChannel): """QQ channel using botpy SDK with WebSocket connection.""" @@ -76,13 +157,38 @@ class QQChannel(BaseChannel): config = QQConfig.model_validate(config) super().__init__(config, bus) self.config: QQConfig = config - self._client: "botpy.Client | None" = None - self._processed_ids: deque = deque(maxlen=1000) - self._msg_seq: int = 1 # 消息序列号,避免被 QQ API 去重 + + self._client: botpy.Client | None = None + self._http: aiohttp.ClientSession | None = None + + self._processed_ids: deque[str] = deque(maxlen=1000) + self._msg_seq: int = 1 # used to avoid QQ API dedup self._chat_type_cache: dict[str, str] = {} + self._media_root: Path = self._init_media_root() + + # --------------------------- + # Lifecycle + # --------------------------- + + def _init_media_root(self) -> Path: + """Choose a directory for saving inbound attachments.""" + if self.config.media_dir: + root = Path(self.config.media_dir).expanduser() + elif get_media_dir: + try: + root = Path(get_media_dir("qq")) + except Exception: + root = Path.home() / ".nanobot" / "media" / "qq" + else: + root = Path.home() / ".nanobot" / "media" / "qq" + + root.mkdir(parents=True, exist_ok=True) + logger.info("QQ media directory: {}", str(root)) + return root + async def start(self) -> None: - """Start the QQ bot.""" + """Start the QQ bot with auto-reconnect loop.""" if not QQ_AVAILABLE: logger.error("QQ SDK not installed. Run: pip install qq-botpy") return @@ -92,8 +198,9 @@ class QQChannel(BaseChannel): return self._running = True - BotClass = _make_bot_class(self) - self._client = BotClass() + self._http = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=120)) + + self._client = _make_bot_class(self)() logger.info("QQ bot started (C2C & Group supported)") await self._run_bot() @@ -109,75 +216,427 @@ class QQChannel(BaseChannel): await asyncio.sleep(5) async def stop(self) -> None: - """Stop the QQ bot.""" + """Stop bot and cleanup resources.""" self._running = False if self._client: try: await self._client.close() except Exception: pass + self._client = None + + if self._http: + try: + await self._http.close() + except Exception: + pass + self._http = None + logger.info("QQ bot stopped") + # --------------------------- + # Outbound (send) + # --------------------------- + async def send(self, msg: OutboundMessage) -> None: - """Send a message through QQ.""" + """Send attachments first, then text.""" if not self._client: logger.warning("QQ client not initialized") return - try: - msg_id = msg.metadata.get("message_id") - self._msg_seq += 1 - use_markdown = self.config.msg_format == "markdown" - payload: dict[str, Any] = { - "msg_type": 2 if use_markdown else 0, - "msg_id": msg_id, - "msg_seq": self._msg_seq, - } - if use_markdown: - payload["markdown"] = {"content": msg.content} - else: - payload["content"] = msg.content + msg_id = msg.metadata.get("message_id") + chat_type = self._chat_type_cache.get(msg.chat_id, "c2c") + is_group = chat_type == "group" - chat_type = self._chat_type_cache.get(msg.chat_id, "c2c") - if chat_type == "group": + # 1) Send media + for media_ref in msg.media or []: + ok = await self._send_media( + chat_id=msg.chat_id, + media_ref=media_ref, + msg_id=msg_id, + is_group=is_group, + ) + if not ok: + filename = ( + os.path.basename(urlparse(media_ref).path) + or os.path.basename(media_ref) + or "file" + ) + await self._send_text_only( + chat_id=msg.chat_id, + is_group=is_group, + msg_id=msg_id, + content=f"[Attachment send failed: {filename}]", + ) + + # 2) Send text + if msg.content and msg.content.strip(): + await self._send_text_only( + chat_id=msg.chat_id, + is_group=is_group, + msg_id=msg_id, + content=msg.content.strip(), + ) + + async def _send_text_only( + self, + chat_id: str, + is_group: bool, + msg_id: str | None, + content: str, + ) -> None: + """Send a plain/markdown text message.""" + if not self._client: + return + + self._msg_seq += 1 + use_markdown = self.config.msg_format == "markdown" + payload: dict[str, Any] = { + "msg_type": 2 if use_markdown else 0, + "msg_id": msg_id, + "msg_seq": self._msg_seq, + } + if use_markdown: + payload["markdown"] = {"content": content} + else: + payload["content"] = content + + if is_group: + await self._client.api.post_group_message(group_openid=chat_id, **payload) + else: + await self._client.api.post_c2c_message(openid=chat_id, **payload) + + async def _send_media( + self, + chat_id: str, + media_ref: str, + msg_id: str | None, + is_group: bool, + ) -> bool: + """Read bytes -> base64 upload -> msg_type=7 send.""" + if not self._client: + return False + + data, filename = await self._read_media_bytes(media_ref) + if not data or not filename: + return False + + try: + file_type = _guess_send_file_type(filename) + file_data_b64 = base64.b64encode(data).decode() + + media_obj = await self._post_base64file( + chat_id=chat_id, + is_group=is_group, + file_type=file_type, + file_data=file_data_b64, + file_name=filename, + srv_send_msg=False, + ) + if not media_obj: + logger.error("QQ media upload failed: empty response") + return False + + self._msg_seq += 1 + if is_group: await self._client.api.post_group_message( - group_openid=msg.chat_id, - **payload, + group_openid=chat_id, + msg_type=7, + msg_id=msg_id, + msg_seq=self._msg_seq, + media=media_obj, ) else: await self._client.api.post_c2c_message( - openid=msg.chat_id, - **payload, + openid=chat_id, + msg_type=7, + msg_id=msg_id, + msg_seq=self._msg_seq, + media=media_obj, ) + + logger.info("QQ media sent: {}", filename) + return True except Exception as e: - logger.error("Error sending QQ message: {}", e) + logger.error("QQ send media failed filename={} err={}", filename, e) + return False - async def _on_message(self, data: "C2CMessage | GroupMessage", is_group: bool = False) -> None: - """Handle incoming message from QQ.""" + async def _read_media_bytes(self, media_ref: str) -> tuple[bytes | None, str | None]: + """Read bytes from http(s) or local file path; return (data, filename).""" + media_ref = (media_ref or "").strip() + if not media_ref: + return None, None + + ok, err = validate_url_target(media_ref) + + if not ok: + logger.warning("QQ outbound media URL validation failed url={} err={}", media_ref, err) + return None, None + + if not self._http: + self._http = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=120)) try: - # Dedup by message ID - if data.id in self._processed_ids: - return - self._processed_ids.append(data.id) + async with self._http.get(media_ref, allow_redirects=True) as resp: + if resp.status >= 400: + logger.warning( + "QQ outbound media download failed status={} url={}", + resp.status, + media_ref, + ) + return None, None + data = await resp.read() + if not data: + return None, None + filename = os.path.basename(urlparse(media_ref).path) or "file.bin" + return data, filename + except Exception as e: + logger.warning("QQ outbound media download error url={} err={}", media_ref, e) + return None, None - content = (data.content or "").strip() - if not content: - return - - if is_group: - chat_id = data.group_openid - user_id = data.author.member_openid - self._chat_type_cache[chat_id] = "group" + # Local file + try: + if media_ref.startswith("file://"): + parsed = urlparse(media_ref) + local_path = Path(unquote(parsed.path)) else: - chat_id = str(getattr(data.author, 'id', None) or getattr(data.author, 'user_openid', 'unknown')) - user_id = chat_id - self._chat_type_cache[chat_id] = "c2c" + local_path = Path(os.path.expanduser(media_ref)) - await self._handle_message( - sender_id=user_id, - chat_id=chat_id, - content=content, - metadata={"message_id": data.id}, + if not local_path.is_file(): + logger.warning("QQ outbound media file not found: {}", str(local_path)) + return None, None + + data = await asyncio.to_thread(local_path.read_bytes) + return data, local_path.name + except Exception as e: + logger.warning("QQ outbound media read error ref={} err={}", media_ref, e) + return None, None + + # https://github.com/tencent-connect/botpy/issues/198 + # https://bot.q.qq.com/wiki/develop/api-v2/server-inter/message/send-receive/rich-media.html + async def _post_base64file( + self, + chat_id: str, + is_group: bool, + file_type: int, + file_data: str, + file_name: str | None = None, + srv_send_msg: bool = False, + ) -> Media: + """Upload base64-encoded file and return Media object.""" + if not self._client: + raise RuntimeError("QQ client not initialized") + + if is_group: + endpoint = "/v2/groups/{group_openid}/files" + id_key = "group_openid" + else: + endpoint = "/v2/users/{openid}/files" + id_key = "openid" + + payload = { + id_key: chat_id, + "file_type": file_type, + "file_data": file_data, + "file_name": file_name, + "srv_send_msg": srv_send_msg, + } + route = Route("POST", endpoint, **{id_key: chat_id}) + return await self._client.api._http.request(route, json=payload) + + # --------------------------- + # Inbound (receive) + # --------------------------- + + async def _on_message(self, data: C2CMessage | GroupMessage, is_group: bool = False) -> None: + """Parse inbound message, download attachments, and publish to the bus.""" + if data.id in self._processed_ids: + return + self._processed_ids.append(data.id) + + if is_group: + chat_id = data.group_openid + user_id = data.author.member_openid + self._chat_type_cache[chat_id] = "group" + else: + chat_id = str( + getattr(data.author, "id", None) + or getattr(data.author, "user_openid", "unknown") ) - except Exception: - logger.exception("Error handling QQ message") + user_id = chat_id + self._chat_type_cache[chat_id] = "c2c" + + content = (data.content or "").strip() + + # the data used by tests don't contain attachments property + # so we use getattr with a default of [] to avoid AttributeError in tests + attachments = getattr(data, "attachments", None) or [] + media_paths, recv_lines, att_meta = await self._handle_attachments(attachments) + + # Compose content that always contains actionable saved paths + if recv_lines: + tag = ( + "[Image]" + if any(_is_image_name(Path(p).name) for p in media_paths) + else "[File]" + ) + file_block = "Received files:\n" + "\n".join(recv_lines) + content = ( + f"{content}\n\n{file_block}".strip() if content else f"{tag}\n{file_block}" + ) + + if not content and not media_paths: + return + + await self._handle_message( + sender_id=user_id, + chat_id=chat_id, + content=content, + media=media_paths if media_paths else None, + metadata={ + "message_id": data.id, + "attachments": att_meta, + }, + ) + + async def _handle_attachments( + self, + attachments: list[BaseMessage._Attachments], + ) -> tuple[list[str], list[str], list[dict[str, Any]]]: + """Extract, download (chunked), and format attachments for agent consumption.""" + media_paths: list[str] = [] + recv_lines: list[str] = [] + att_meta: list[dict[str, Any]] = [] + + if not attachments: + return media_paths, recv_lines, att_meta + + for att in attachments: + url, filename, ctype = att.url, att.filename, att.content_type + + logger.info("Downloading file from QQ: {}", filename or url) + local_path = await self._download_to_media_dir_chunked(url, filename_hint=filename) + + att_meta.append( + { + "url": url, + "filename": filename, + "content_type": ctype, + "saved_path": local_path, + } + ) + + if local_path: + media_paths.append(local_path) + shown_name = filename or os.path.basename(local_path) + recv_lines.append(f"- {shown_name}\n saved: {local_path}") + else: + shown_name = filename or url + recv_lines.append(f"- {shown_name}\n saved: [download failed]") + + return media_paths, recv_lines, att_meta + + async def _download_to_media_dir_chunked( + self, + url: str, + filename_hint: str = "", + ) -> str | None: + """Download an inbound attachment using streaming chunk write. + + Uses chunked streaming to avoid loading large files into memory. + Enforces a max download size and writes to a .part temp file + that is atomically renamed on success. + """ + if not self._http: + self._http = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=120)) + + safe = _sanitize_filename(filename_hint) + ts = int(time.time() * 1000) + tmp_path: Path | None = None + + try: + async with self._http.get( + url, + timeout=aiohttp.ClientTimeout(total=120), + allow_redirects=True, + ) as resp: + if resp.status != 200: + logger.warning("QQ download failed: status={} url={}", resp.status, url) + return None + + ctype = (resp.headers.get("Content-Type") or "").lower() + + # Infer extension: url -> filename_hint -> content-type -> fallback + ext = Path(urlparse(url).path).suffix + if not ext: + ext = Path(filename_hint).suffix + if not ext: + if "png" in ctype: + ext = ".png" + elif "jpeg" in ctype or "jpg" in ctype: + ext = ".jpg" + elif "gif" in ctype: + ext = ".gif" + elif "webp" in ctype: + ext = ".webp" + elif "pdf" in ctype: + ext = ".pdf" + else: + ext = ".bin" + + if safe: + if not Path(safe).suffix: + safe = safe + ext + filename = safe + else: + filename = f"qq_file_{ts}{ext}" + + target = self._media_root / filename + if target.exists(): + target = self._media_root / f"{target.stem}_{ts}{target.suffix}" + + tmp_path = target.with_suffix(target.suffix + ".part") + + # Stream write + downloaded = 0 + chunk_size = max(1024, int(self.config.download_chunk_size or 262144)) + max_bytes = max( + 1024 * 1024, int(self.config.download_max_bytes or (200 * 1024 * 1024)) + ) + + def _open_tmp(): + tmp_path.parent.mkdir(parents=True, exist_ok=True) + return open(tmp_path, "wb") # noqa: SIM115 + + f = await asyncio.to_thread(_open_tmp) + try: + async for chunk in resp.content.iter_chunked(chunk_size): + if not chunk: + continue + downloaded += len(chunk) + if downloaded > max_bytes: + logger.warning( + "QQ download exceeded max_bytes={} url={} -> abort", + max_bytes, + url, + ) + return None + await asyncio.to_thread(f.write, chunk) + finally: + await asyncio.to_thread(f.close) + + # Atomic rename + await asyncio.to_thread(os.replace, tmp_path, target) + tmp_path = None # mark as moved + logger.info("QQ file saved: {}", str(target)) + return str(target) + + except Exception as e: + logger.error("QQ download error: {}", e) + return None + finally: + # Cleanup partial file + if tmp_path is not None: + try: + tmp_path.unlink(missing_ok=True) + except Exception: + pass diff --git a/tests/test_qq_channel.py b/tests/test_qq_channel.py index bd5e891..ab09ff3 100644 --- a/tests/test_qq_channel.py +++ b/tests/test_qq_channel.py @@ -34,6 +34,7 @@ async def test_on_group_message_routes_to_group_chat_id() -> None: content="hello", group_openid="group123", author=SimpleNamespace(member_openid="user1"), + attachments=[], ) await channel._on_message(data, is_group=True)