feat(qq): bot can send and receive images and files (#1667)

Implement file upload and sending for QQ C2C messages

Reference: https://github.com/tencent-connect/botpy/blob/master/examples/demo_c2c_reply_file.py

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-authored-by: chengyongru <chengyongru.ai@gmail.com>
This commit is contained in:
Chen Junda
2026-03-20 11:27:40 +08:00
committed by chengyongru
parent 80ee2729ac
commit d7373db419
2 changed files with 522 additions and 62 deletions

View File

@@ -1,33 +1,107 @@
"""QQ channel implementation using botpy SDK.""" """QQ channel implementation using botpy SDK.
Inbound:
- Parse QQ botpy messages (C2C / Group)
- Download attachments to media dir using chunked streaming write (memory-safe)
- Publish to Nanobot bus via BaseChannel._handle_message()
- Content includes a clear, actionable "Received files:" list with local paths
Outbound:
- Send attachments (msg.media) first via QQ rich media API (base64 upload + msg_type=7)
- Then send text (plain or markdown)
- msg.media supports local paths, file:// paths, and http(s) URLs
Notes:
- QQ restricts many audio/video formats. We conservatively classify as image vs file.
- Attachment structures differ across botpy versions; we try multiple field candidates.
"""
from __future__ import annotations
import asyncio import asyncio
import base64
import mimetypes
import os
import re
import time
from collections import deque from collections import deque
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal from typing import TYPE_CHECKING, Any, Literal
from urllib.parse import unquote, urlparse
import aiohttp
from loguru import logger from loguru import logger
from pydantic import Field
from nanobot.bus.events import OutboundMessage from nanobot.bus.events import OutboundMessage
from nanobot.bus.queue import MessageBus from nanobot.bus.queue import MessageBus
from nanobot.channels.base import BaseChannel from nanobot.channels.base import BaseChannel
from nanobot.config.schema import Base from nanobot.config.schema import Base
from pydantic import Field from nanobot.security.network import validate_url_target
try:
from nanobot.config.paths import get_media_dir
except Exception: # pragma: no cover
get_media_dir = None # type: ignore
try: try:
import botpy import botpy
from botpy.message import C2CMessage, GroupMessage from botpy.http import Route
QQ_AVAILABLE = True QQ_AVAILABLE = True
except ImportError: except ImportError: # pragma: no cover
QQ_AVAILABLE = False QQ_AVAILABLE = False
botpy = None botpy = None
C2CMessage = None Route = None
GroupMessage = None
if TYPE_CHECKING: if TYPE_CHECKING:
from botpy.message import C2CMessage, GroupMessage from botpy.message import BaseMessage, C2CMessage, GroupMessage
from botpy.types.message import Media
def _make_bot_class(channel: "QQChannel") -> "type[botpy.Client]": # QQ rich media file_type: 1=image, 4=file
# (2=voice, 3=video are restricted; we only use image vs file)
QQ_FILE_TYPE_IMAGE = 1
QQ_FILE_TYPE_FILE = 4
_IMAGE_EXTS = {
".png",
".jpg",
".jpeg",
".gif",
".bmp",
".webp",
".tif",
".tiff",
".ico",
}
# Replace unsafe characters with "_", keep Chinese and common safe punctuation.
_SAFE_NAME_RE = re.compile(r"[^\w.\-()\[\]()【】\u4e00-\u9fff]+", re.UNICODE)
def _sanitize_filename(name: str) -> str:
"""Sanitize filename to avoid traversal and problematic chars."""
name = (name or "").strip()
name = Path(name).name
name = _SAFE_NAME_RE.sub("_", name).strip("._ ")
return name
def _is_image_name(name: str) -> bool:
return Path(name).suffix.lower() in _IMAGE_EXTS
def _guess_send_file_type(filename: str) -> int:
"""Conservative send type: images -> 1, else -> 4."""
ext = Path(filename).suffix.lower()
mime, _ = mimetypes.guess_type(filename)
if ext in _IMAGE_EXTS or (mime and mime.startswith("image/")):
return QQ_FILE_TYPE_IMAGE
return QQ_FILE_TYPE_FILE
def _make_bot_class(channel: QQChannel) -> type[botpy.Client]:
"""Create a botpy Client subclass bound to the given channel.""" """Create a botpy Client subclass bound to the given channel."""
intents = botpy.Intents(public_messages=True, direct_message=True) intents = botpy.Intents(public_messages=True, direct_message=True)
@@ -39,10 +113,10 @@ def _make_bot_class(channel: "QQChannel") -> "type[botpy.Client]":
async def on_ready(self): async def on_ready(self):
logger.info("QQ bot ready: {}", self.robot.name) logger.info("QQ bot ready: {}", self.robot.name)
async def on_c2c_message_create(self, message: "C2CMessage"): async def on_c2c_message_create(self, message: C2CMessage):
await channel._on_message(message, is_group=False) await channel._on_message(message, is_group=False)
async def on_group_at_message_create(self, message: "GroupMessage"): async def on_group_at_message_create(self, message: GroupMessage):
await channel._on_message(message, is_group=True) await channel._on_message(message, is_group=True)
async def on_direct_message_create(self, message): async def on_direct_message_create(self, message):
@@ -60,6 +134,13 @@ class QQConfig(Base):
allow_from: list[str] = Field(default_factory=list) allow_from: list[str] = Field(default_factory=list)
msg_format: Literal["plain", "markdown"] = "plain" msg_format: Literal["plain", "markdown"] = "plain"
# Optional: directory to save inbound attachments. If empty, use nanobot get_media_dir("qq").
media_dir: str = ""
# Download tuning
download_chunk_size: int = 1024 * 256 # 256KB
download_max_bytes: int = 1024 * 1024 * 200 # 200MB safety limit
class QQChannel(BaseChannel): class QQChannel(BaseChannel):
"""QQ channel using botpy SDK with WebSocket connection.""" """QQ channel using botpy SDK with WebSocket connection."""
@@ -76,13 +157,38 @@ class QQChannel(BaseChannel):
config = QQConfig.model_validate(config) config = QQConfig.model_validate(config)
super().__init__(config, bus) super().__init__(config, bus)
self.config: QQConfig = config self.config: QQConfig = config
self._client: "botpy.Client | None" = None
self._processed_ids: deque = deque(maxlen=1000) self._client: botpy.Client | None = None
self._msg_seq: int = 1 # 消息序列号,避免被 QQ API 去重 self._http: aiohttp.ClientSession | None = None
self._processed_ids: deque[str] = deque(maxlen=1000)
self._msg_seq: int = 1 # used to avoid QQ API dedup
self._chat_type_cache: dict[str, str] = {} self._chat_type_cache: dict[str, str] = {}
self._media_root: Path = self._init_media_root()
# ---------------------------
# Lifecycle
# ---------------------------
def _init_media_root(self) -> Path:
"""Choose a directory for saving inbound attachments."""
if self.config.media_dir:
root = Path(self.config.media_dir).expanduser()
elif get_media_dir:
try:
root = Path(get_media_dir("qq"))
except Exception:
root = Path.home() / ".nanobot" / "media" / "qq"
else:
root = Path.home() / ".nanobot" / "media" / "qq"
root.mkdir(parents=True, exist_ok=True)
logger.info("QQ media directory: {}", str(root))
return root
async def start(self) -> None: async def start(self) -> None:
"""Start the QQ bot.""" """Start the QQ bot with auto-reconnect loop."""
if not QQ_AVAILABLE: if not QQ_AVAILABLE:
logger.error("QQ SDK not installed. Run: pip install qq-botpy") logger.error("QQ SDK not installed. Run: pip install qq-botpy")
return return
@@ -92,8 +198,9 @@ class QQChannel(BaseChannel):
return return
self._running = True self._running = True
BotClass = _make_bot_class(self) self._http = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=120))
self._client = BotClass()
self._client = _make_bot_class(self)()
logger.info("QQ bot started (C2C & Group supported)") logger.info("QQ bot started (C2C & Group supported)")
await self._run_bot() await self._run_bot()
@@ -109,23 +216,79 @@ class QQChannel(BaseChannel):
await asyncio.sleep(5) await asyncio.sleep(5)
async def stop(self) -> None: async def stop(self) -> None:
"""Stop the QQ bot.""" """Stop bot and cleanup resources."""
self._running = False self._running = False
if self._client: if self._client:
try: try:
await self._client.close() await self._client.close()
except Exception: except Exception:
pass pass
self._client = None
if self._http:
try:
await self._http.close()
except Exception:
pass
self._http = None
logger.info("QQ bot stopped") logger.info("QQ bot stopped")
# ---------------------------
# Outbound (send)
# ---------------------------
async def send(self, msg: OutboundMessage) -> None: async def send(self, msg: OutboundMessage) -> None:
"""Send a message through QQ.""" """Send attachments first, then text."""
if not self._client: if not self._client:
logger.warning("QQ client not initialized") logger.warning("QQ client not initialized")
return return
try:
msg_id = msg.metadata.get("message_id") msg_id = msg.metadata.get("message_id")
chat_type = self._chat_type_cache.get(msg.chat_id, "c2c")
is_group = chat_type == "group"
# 1) Send media
for media_ref in msg.media or []:
ok = await self._send_media(
chat_id=msg.chat_id,
media_ref=media_ref,
msg_id=msg_id,
is_group=is_group,
)
if not ok:
filename = (
os.path.basename(urlparse(media_ref).path)
or os.path.basename(media_ref)
or "file"
)
await self._send_text_only(
chat_id=msg.chat_id,
is_group=is_group,
msg_id=msg_id,
content=f"[Attachment send failed: {filename}]",
)
# 2) Send text
if msg.content and msg.content.strip():
await self._send_text_only(
chat_id=msg.chat_id,
is_group=is_group,
msg_id=msg_id,
content=msg.content.strip(),
)
async def _send_text_only(
self,
chat_id: str,
is_group: bool,
msg_id: str | None,
content: str,
) -> None:
"""Send a plain/markdown text message."""
if not self._client:
return
self._msg_seq += 1 self._msg_seq += 1
use_markdown = self.config.msg_format == "markdown" use_markdown = self.config.msg_format == "markdown"
payload: dict[str, Any] = { payload: dict[str, Any] = {
@@ -134,50 +297,346 @@ class QQChannel(BaseChannel):
"msg_seq": self._msg_seq, "msg_seq": self._msg_seq,
} }
if use_markdown: if use_markdown:
payload["markdown"] = {"content": msg.content} payload["markdown"] = {"content": content}
else: else:
payload["content"] = msg.content payload["content"] = content
chat_type = self._chat_type_cache.get(msg.chat_id, "c2c") if is_group:
if chat_type == "group": await self._client.api.post_group_message(group_openid=chat_id, **payload)
else:
await self._client.api.post_c2c_message(openid=chat_id, **payload)
async def _send_media(
self,
chat_id: str,
media_ref: str,
msg_id: str | None,
is_group: bool,
) -> bool:
"""Read bytes -> base64 upload -> msg_type=7 send."""
if not self._client:
return False
data, filename = await self._read_media_bytes(media_ref)
if not data or not filename:
return False
try:
file_type = _guess_send_file_type(filename)
file_data_b64 = base64.b64encode(data).decode()
media_obj = await self._post_base64file(
chat_id=chat_id,
is_group=is_group,
file_type=file_type,
file_data=file_data_b64,
file_name=filename,
srv_send_msg=False,
)
if not media_obj:
logger.error("QQ media upload failed: empty response")
return False
self._msg_seq += 1
if is_group:
await self._client.api.post_group_message( await self._client.api.post_group_message(
group_openid=msg.chat_id, group_openid=chat_id,
**payload, msg_type=7,
msg_id=msg_id,
msg_seq=self._msg_seq,
media=media_obj,
) )
else: else:
await self._client.api.post_c2c_message( await self._client.api.post_c2c_message(
openid=msg.chat_id, openid=chat_id,
**payload, msg_type=7,
msg_id=msg_id,
msg_seq=self._msg_seq,
media=media_obj,
) )
except Exception as e:
logger.error("Error sending QQ message: {}", e)
async def _on_message(self, data: "C2CMessage | GroupMessage", is_group: bool = False) -> None: logger.info("QQ media sent: {}", filename)
"""Handle incoming message from QQ.""" return True
except Exception as e:
logger.error("QQ send media failed filename={} err={}", filename, e)
return False
async def _read_media_bytes(self, media_ref: str) -> tuple[bytes | None, str | None]:
"""Read bytes from http(s) or local file path; return (data, filename)."""
media_ref = (media_ref or "").strip()
if not media_ref:
return None, None
ok, err = validate_url_target(media_ref)
if not ok:
logger.warning("QQ outbound media URL validation failed url={} err={}", media_ref, err)
return None, None
if not self._http:
self._http = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=120))
try: try:
# Dedup by message ID async with self._http.get(media_ref, allow_redirects=True) as resp:
if resp.status >= 400:
logger.warning(
"QQ outbound media download failed status={} url={}",
resp.status,
media_ref,
)
return None, None
data = await resp.read()
if not data:
return None, None
filename = os.path.basename(urlparse(media_ref).path) or "file.bin"
return data, filename
except Exception as e:
logger.warning("QQ outbound media download error url={} err={}", media_ref, e)
return None, None
# Local file
try:
if media_ref.startswith("file://"):
parsed = urlparse(media_ref)
local_path = Path(unquote(parsed.path))
else:
local_path = Path(os.path.expanduser(media_ref))
if not local_path.is_file():
logger.warning("QQ outbound media file not found: {}", str(local_path))
return None, None
data = await asyncio.to_thread(local_path.read_bytes)
return data, local_path.name
except Exception as e:
logger.warning("QQ outbound media read error ref={} err={}", media_ref, e)
return None, None
# https://github.com/tencent-connect/botpy/issues/198
# https://bot.q.qq.com/wiki/develop/api-v2/server-inter/message/send-receive/rich-media.html
async def _post_base64file(
self,
chat_id: str,
is_group: bool,
file_type: int,
file_data: str,
file_name: str | None = None,
srv_send_msg: bool = False,
) -> Media:
"""Upload base64-encoded file and return Media object."""
if not self._client:
raise RuntimeError("QQ client not initialized")
if is_group:
endpoint = "/v2/groups/{group_openid}/files"
id_key = "group_openid"
else:
endpoint = "/v2/users/{openid}/files"
id_key = "openid"
payload = {
id_key: chat_id,
"file_type": file_type,
"file_data": file_data,
"file_name": file_name,
"srv_send_msg": srv_send_msg,
}
route = Route("POST", endpoint, **{id_key: chat_id})
return await self._client.api._http.request(route, json=payload)
# ---------------------------
# Inbound (receive)
# ---------------------------
async def _on_message(self, data: C2CMessage | GroupMessage, is_group: bool = False) -> None:
"""Parse inbound message, download attachments, and publish to the bus."""
if data.id in self._processed_ids: if data.id in self._processed_ids:
return return
self._processed_ids.append(data.id) self._processed_ids.append(data.id)
content = (data.content or "").strip()
if not content:
return
if is_group: if is_group:
chat_id = data.group_openid chat_id = data.group_openid
user_id = data.author.member_openid user_id = data.author.member_openid
self._chat_type_cache[chat_id] = "group" self._chat_type_cache[chat_id] = "group"
else: else:
chat_id = str(getattr(data.author, 'id', None) or getattr(data.author, 'user_openid', 'unknown')) chat_id = str(
getattr(data.author, "id", None)
or getattr(data.author, "user_openid", "unknown")
)
user_id = chat_id user_id = chat_id
self._chat_type_cache[chat_id] = "c2c" self._chat_type_cache[chat_id] = "c2c"
content = (data.content or "").strip()
# the data used by tests don't contain attachments property
# so we use getattr with a default of [] to avoid AttributeError in tests
attachments = getattr(data, "attachments", None) or []
media_paths, recv_lines, att_meta = await self._handle_attachments(attachments)
# Compose content that always contains actionable saved paths
if recv_lines:
tag = (
"[Image]"
if any(_is_image_name(Path(p).name) for p in media_paths)
else "[File]"
)
file_block = "Received files:\n" + "\n".join(recv_lines)
content = (
f"{content}\n\n{file_block}".strip() if content else f"{tag}\n{file_block}"
)
if not content and not media_paths:
return
await self._handle_message( await self._handle_message(
sender_id=user_id, sender_id=user_id,
chat_id=chat_id, chat_id=chat_id,
content=content, content=content,
metadata={"message_id": data.id}, media=media_paths if media_paths else None,
metadata={
"message_id": data.id,
"attachments": att_meta,
},
) )
async def _handle_attachments(
self,
attachments: list[BaseMessage._Attachments],
) -> tuple[list[str], list[str], list[dict[str, Any]]]:
"""Extract, download (chunked), and format attachments for agent consumption."""
media_paths: list[str] = []
recv_lines: list[str] = []
att_meta: list[dict[str, Any]] = []
if not attachments:
return media_paths, recv_lines, att_meta
for att in attachments:
url, filename, ctype = att.url, att.filename, att.content_type
logger.info("Downloading file from QQ: {}", filename or url)
local_path = await self._download_to_media_dir_chunked(url, filename_hint=filename)
att_meta.append(
{
"url": url,
"filename": filename,
"content_type": ctype,
"saved_path": local_path,
}
)
if local_path:
media_paths.append(local_path)
shown_name = filename or os.path.basename(local_path)
recv_lines.append(f"- {shown_name}\n saved: {local_path}")
else:
shown_name = filename or url
recv_lines.append(f"- {shown_name}\n saved: [download failed]")
return media_paths, recv_lines, att_meta
async def _download_to_media_dir_chunked(
self,
url: str,
filename_hint: str = "",
) -> str | None:
"""Download an inbound attachment using streaming chunk write.
Uses chunked streaming to avoid loading large files into memory.
Enforces a max download size and writes to a .part temp file
that is atomically renamed on success.
"""
if not self._http:
self._http = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=120))
safe = _sanitize_filename(filename_hint)
ts = int(time.time() * 1000)
tmp_path: Path | None = None
try:
async with self._http.get(
url,
timeout=aiohttp.ClientTimeout(total=120),
allow_redirects=True,
) as resp:
if resp.status != 200:
logger.warning("QQ download failed: status={} url={}", resp.status, url)
return None
ctype = (resp.headers.get("Content-Type") or "").lower()
# Infer extension: url -> filename_hint -> content-type -> fallback
ext = Path(urlparse(url).path).suffix
if not ext:
ext = Path(filename_hint).suffix
if not ext:
if "png" in ctype:
ext = ".png"
elif "jpeg" in ctype or "jpg" in ctype:
ext = ".jpg"
elif "gif" in ctype:
ext = ".gif"
elif "webp" in ctype:
ext = ".webp"
elif "pdf" in ctype:
ext = ".pdf"
else:
ext = ".bin"
if safe:
if not Path(safe).suffix:
safe = safe + ext
filename = safe
else:
filename = f"qq_file_{ts}{ext}"
target = self._media_root / filename
if target.exists():
target = self._media_root / f"{target.stem}_{ts}{target.suffix}"
tmp_path = target.with_suffix(target.suffix + ".part")
# Stream write
downloaded = 0
chunk_size = max(1024, int(self.config.download_chunk_size or 262144))
max_bytes = max(
1024 * 1024, int(self.config.download_max_bytes or (200 * 1024 * 1024))
)
def _open_tmp():
tmp_path.parent.mkdir(parents=True, exist_ok=True)
return open(tmp_path, "wb") # noqa: SIM115
f = await asyncio.to_thread(_open_tmp)
try:
async for chunk in resp.content.iter_chunked(chunk_size):
if not chunk:
continue
downloaded += len(chunk)
if downloaded > max_bytes:
logger.warning(
"QQ download exceeded max_bytes={} url={} -> abort",
max_bytes,
url,
)
return None
await asyncio.to_thread(f.write, chunk)
finally:
await asyncio.to_thread(f.close)
# Atomic rename
await asyncio.to_thread(os.replace, tmp_path, target)
tmp_path = None # mark as moved
logger.info("QQ file saved: {}", str(target))
return str(target)
except Exception as e:
logger.error("QQ download error: {}", e)
return None
finally:
# Cleanup partial file
if tmp_path is not None:
try:
tmp_path.unlink(missing_ok=True)
except Exception: except Exception:
logger.exception("Error handling QQ message") pass

View File

@@ -34,6 +34,7 @@ async def test_on_group_message_routes_to_group_chat_id() -> None:
content="hello", content="hello",
group_openid="group123", group_openid="group123",
author=SimpleNamespace(member_openid="user1"), author=SimpleNamespace(member_openid="user1"),
attachments=[],
) )
await channel._on_message(data, is_group=True) await channel._on_message(data, is_group=True)