From 395fdc16f97d98f268c2126ae2d542a841646b9e Mon Sep 17 00:00:00 2001 From: Hua Date: Thu, 19 Mar 2026 16:27:29 +0800 Subject: [PATCH] feat(qq): serve public media via gateway --- AGENTS.md | 1 + README.md | 22 ++- nanobot/channels/qq.py | 286 +++++++++++++++++++++++++++-- nanobot/cli/commands.py | 8 + nanobot/config/schema.py | 3 + nanobot/gateway/__init__.py | 1 + nanobot/gateway/http.py | 65 +++++++ tests/test_gateway_http.py | 44 +++++ tests/test_qq_channel.py | 347 ++++++++++++++++++++++++++++++++++++ 9 files changed, 758 insertions(+), 19 deletions(-) create mode 100644 nanobot/gateway/__init__.py create mode 100644 nanobot/gateway/http.py create mode 100644 tests/test_gateway_http.py diff --git a/AGENTS.md b/AGENTS.md index 1216ca3..5f41c88 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -33,6 +33,7 @@ Do not commit real API keys, tokens, chat logs, or workspace data. Keep local se - `/skill` currently supports `search`, `install`, `uninstall`, `list`, and `update`. Keep subcommand dispatch in `nanobot/agent/loop.py`. - `/mcp` supports the default `list` behavior (and explicit `/mcp list`) to show configured MCP servers and registered MCP tools. - Agent runtime config should be hot-reloaded from the active `config.json` for safe in-process fields such as `tools.mcpServers`, `tools.web.*`, `tools.exec.*`, `tools.restrictToWorkspace`, `agents.defaults.model`, `agents.defaults.maxToolIterations`, `agents.defaults.contextWindowTokens`, `agents.defaults.maxTokens`, `agents.defaults.temperature`, `agents.defaults.reasoningEffort`, `channels.sendProgress`, and `channels.sendToolHints`. Channel connection settings and provider credentials still require a restart. +- QQ outbound media uses QQ's URL-based rich-media API. Remote `http(s)` image URLs can be sent directly. Local files are allowed from two controlled locations only: the configured `mediaPublicDir` inside `workspace/public`, and generated image files under `workspace/out`, which the QQ channel may hard-link into `public/` automatically before sending. Do not auto-publish from any other directory. - `/skill` shells out to `npx clawhub@latest`; it requires Node.js/`npx` at runtime. - `/skill uninstall` runs in a non-interactive context, so keep passing `--yes` when shelling out to ClawHub. - Treat empty `/skill search` output as a user-visible "no results" case rather than a silent success. Surface npm/registry failures directly to the user. diff --git a/README.md b/README.md index efb4190..73dd0ee 100644 --- a/README.md +++ b/README.md @@ -699,12 +699,22 @@ Uses **botpy SDK** with WebSocket — no public IP required. Currently supports "enabled": true, "appId": "YOUR_APP_ID", "secret": "YOUR_APP_SECRET", - "allowFrom": ["YOUR_OPENID"] + "allowFrom": ["YOUR_OPENID"], + "mediaBaseUrl": "https://bot.example.com/public/qq/", + "mediaPublicDir": "public/qq", + "mediaTtlSeconds": 600 } } } ``` +`mediaBaseUrl` is optional, but it is required if you want nanobot to send local screenshots or +other local image files through QQ. `mediaPublicDir` is resolved against the active startup +workspace and must stay under `workspace/public`, because the built-in gateway HTTP server only +serves that tree at `/public/`. nanobot accepts local QQ media from two places only: files already +under `mediaPublicDir`, and generated image files under `workspace/out`, which nanobot will +hard-link into `mediaPublicDir` automatically before sending. + Multi-bot example: ```json @@ -739,6 +749,16 @@ nanobot gateway Now send a message to the bot from QQ — it should respond! +Outbound QQ media always uses the QQ `url`-based rich-media API. Remote `http(s)` image URLs can be +sent directly. Local image files can also be sent when `mediaBaseUrl` points to a public URL and +`mediaPublicDir` matches a directory under `workspace/public`; nanobot maps that local public path +to a URL and then sends that URL through QQ. The built-in gateway route exposes +`workspace/public` as `/public/`, so a common setup is `mediaBaseUrl = https://your-host/public/qq/`. +If you generate screenshots under `workspace/out`, nanobot will automatically create a hard link in +`workspace/public/qq` first, then send that public URL. Files outside `mediaPublicDir` and +`workspace/out` are rejected. Without that publishing config, local files still fall back to a text +notice. +
diff --git a/nanobot/channels/qq.py b/nanobot/channels/qq.py index 3d7c8e4..8f44dc8 100644 --- a/nanobot/channels/qq.py +++ b/nanobot/channels/qq.py @@ -1,8 +1,12 @@ """QQ channel implementation using botpy SDK.""" import asyncio +import os +import secrets from collections import deque +from pathlib import Path from typing import TYPE_CHECKING +from urllib.parse import quote, urljoin from loguru import logger @@ -10,6 +14,8 @@ from nanobot.bus.events import OutboundMessage from nanobot.bus.queue import MessageBus from nanobot.channels.base import BaseChannel from nanobot.config.schema import QQConfig, QQInstanceConfig +from nanobot.security.network import validate_url_target +from nanobot.utils.helpers import detect_image_mime, ensure_dir try: import botpy @@ -60,13 +66,225 @@ class QQChannel(BaseChannel): def default_config(cls) -> dict[str, object]: return QQConfig().model_dump(by_alias=True) - def __init__(self, config: QQConfig | QQInstanceConfig, bus: MessageBus): + def __init__( + self, + config: QQConfig | QQInstanceConfig, + bus: MessageBus, + workspace: str | Path | None = None, + ): super().__init__(config, bus) self.config: QQConfig | QQInstanceConfig = config self._client: "botpy.Client | None" = None self._processed_ids: deque = deque(maxlen=1000) self._msg_seq: int = 1 # 消息序列号,避免被 QQ API 去重 self._chat_type_cache: dict[str, str] = {} + self._workspace = Path(workspace).expanduser() if workspace is not None else None + self._cleanup_tasks: set[asyncio.Task[None]] = set() + + @staticmethod + def _is_remote_media(path: str) -> bool: + """Return True when the outbound media reference is a remote URL.""" + return path.startswith(("http://", "https://")) + + @staticmethod + def _failed_media_notice(path: str, reason: str | None = None) -> str: + """Render a user-visible fallback notice for unsent QQ media.""" + name = Path(path).name or path + return f"[Failed to send: {name}{f' - {reason}' if reason else ''}]" + + def _workspace_root(self) -> Path: + """Return the active workspace root used by QQ publishing.""" + return (self._workspace or Path.cwd()).resolve(strict=False) + + def _public_root(self) -> Path: + """Return the fixed public tree served by the gateway HTTP route.""" + return ensure_dir(self._workspace_root() / "public") + + def _out_root(self) -> Path: + """Return the default workspace out directory used for generated artifacts.""" + return self._workspace_root() / "out" + + def _resolve_media_public_dir(self) -> tuple[Path | None, str | None]: + """Resolve the local publish directory for QQ media under workspace/public.""" + configured = Path(self.config.media_public_dir).expanduser() + if configured.is_absolute(): + resolved = configured.resolve(strict=False) + else: + resolved = (self._workspace_root() / configured).resolve(strict=False) + public_root = self._public_root() + try: + resolved.relative_to(public_root) + except ValueError: + return None, f"QQ mediaPublicDir must stay under {public_root}" + return ensure_dir(resolved), None + + @staticmethod + def _guess_image_suffix(path: Path, mime_type: str | None) -> str: + """Pick a reasonable output suffix for published QQ images.""" + if path.suffix: + return path.suffix.lower() + return { + "image/png": ".png", + "image/jpeg": ".jpg", + "image/gif": ".gif", + "image/webp": ".webp", + }.get(mime_type or "", ".bin") + + @staticmethod + def _is_image_file(path: Path) -> bool: + """Validate that a local file looks like an image supported by QQ rich media.""" + try: + with path.open("rb") as f: + header = f.read(16) + except OSError: + return False + return detect_image_mime(header) is not None + + @staticmethod + def _detect_image_mime(path: Path) -> str | None: + """Detect image mime type from the leading bytes of a file.""" + try: + with path.open("rb") as f: + return detect_image_mime(f.read(16)) + except OSError: + return None + + async def _delete_published_media_later(self, path: Path, delay_seconds: int) -> None: + """Delete an auto-published QQ media file after a grace period.""" + try: + await asyncio.sleep(delay_seconds) + path.unlink(missing_ok=True) + except Exception as e: + logger.debug("Failed to delete published QQ media {}: {}", path, e) + + def _schedule_media_cleanup(self, path: Path) -> None: + """Best-effort cleanup for auto-published local QQ media.""" + if self.config.media_ttl_seconds <= 0: + return + task = asyncio.create_task( + self._delete_published_media_later(path, self.config.media_ttl_seconds) + ) + self._cleanup_tasks.add(task) + task.add_done_callback(self._cleanup_tasks.discard) + + def _try_link_out_media_into_public( + self, + source: Path, + public_dir: Path, + ) -> tuple[Path | None, str | None]: + """Hard-link a generated workspace/out media file into public/qq.""" + out_root = self._out_root().resolve(strict=False) + try: + source.relative_to(out_root) + except ValueError: + return None, f"QQ local media must stay under {public_dir} or {out_root}" + + if not self._is_image_file(source): + return None, "QQ local media must be an image" + + mime_type = self._detect_image_mime(source) + suffix = self._guess_image_suffix(source, mime_type) + published = public_dir / f"{source.stem}-{secrets.token_urlsafe(6)}{suffix}" + try: + os.link(source, published) + except OSError as e: + logger.warning("Failed to hard-link QQ media {} -> {}: {}", source, published, e) + return None, "failed to publish local file" + self._schedule_media_cleanup(published) + return published, None + + async def _publish_local_media(self, media_path: str) -> tuple[str | None, str | None]: + """Map a local public QQ media file, or a generated out file, to its served URL.""" + if not self.config.media_base_url: + return None, "QQ local media publishing is not configured" + + source = Path(media_path).expanduser() + try: + resolved = source.resolve(strict=True) + except FileNotFoundError: + return None, "local file not found" + except OSError as e: + logger.warning("Failed to resolve QQ media path {}: {}", media_path, e) + return None, "local file unavailable" + + if not resolved.is_file(): + return None, "local file not found" + + public_dir, dir_error = self._resolve_media_public_dir() + if public_dir is None: + return None, dir_error + + try: + relative_path = resolved.relative_to(public_dir) + except ValueError: + published, publish_error = self._try_link_out_media_into_public(resolved, public_dir) + if published is None: + return None, publish_error + relative_path = published.relative_to(public_dir) + + media_url = urljoin( + f"{self.config.media_base_url.rstrip('/')}/", + quote(relative_path.as_posix(), safe="/"), + ) + return media_url, None + + def _next_msg_seq(self) -> int: + """Return the next QQ message sequence number.""" + self._msg_seq += 1 + return self._msg_seq + + async def _post_text_message(self, chat_id: str, msg_type: str, content: str, msg_id: str | None) -> None: + """Send a plain-text QQ message.""" + payload = { + "msg_type": 0, + "content": content, + "msg_id": msg_id, + "msg_seq": self._next_msg_seq(), + } + if msg_type == "group": + await self._client.api.post_group_message(group_openid=chat_id, **payload) + else: + await self._client.api.post_c2c_message(openid=chat_id, **payload) + + async def _post_remote_media_message( + self, + chat_id: str, + msg_type: str, + media_url: str, + content: str | None, + msg_id: str | None, + ) -> None: + """Send one QQ remote image URL as a rich-media message.""" + if msg_type == "group": + media = await self._client.api.post_group_file( + group_openid=chat_id, + file_type=1, + url=media_url, + srv_send_msg=False, + ) + await self._client.api.post_group_message( + group_openid=chat_id, + msg_type=7, + content=content, + media=media, + msg_id=msg_id, + msg_seq=self._next_msg_seq(), + ) + else: + media = await self._client.api.post_c2c_file( + openid=chat_id, + file_type=1, + url=media_url, + srv_send_msg=False, + ) + await self._client.api.post_c2c_message( + openid=chat_id, + msg_type=7, + content=content, + media=media, + msg_id=msg_id, + msg_seq=self._next_msg_seq(), + ) async def start(self) -> None: """Start the QQ bot.""" @@ -98,6 +316,9 @@ class QQChannel(BaseChannel): async def stop(self) -> None: """Stop the QQ bot.""" self._running = False + for task in list(self._cleanup_tasks): + task.cancel() + self._cleanup_tasks.clear() if self._client: try: await self._client.close() @@ -113,24 +334,53 @@ class QQChannel(BaseChannel): try: msg_id = msg.metadata.get("message_id") - self._msg_seq += 1 msg_type = self._chat_type_cache.get(msg.chat_id, "c2c") - if msg_type == "group": - await self._client.api.post_group_message( - group_openid=msg.chat_id, - msg_type=0, - content=msg.content, - msg_id=msg_id, - msg_seq=self._msg_seq, - ) - else: - await self._client.api.post_c2c_message( - openid=msg.chat_id, - msg_type=0, - content=msg.content, - msg_id=msg_id, - msg_seq=self._msg_seq, - ) + content_sent = False + fallback_lines: list[str] = [] + + for media_path in msg.media: + resolved_media = media_path + if not self._is_remote_media(media_path): + resolved_media, publish_error = await self._publish_local_media(media_path) + if not resolved_media: + logger.warning( + "QQ outbound local media could not be published: {} ({})", + media_path, + publish_error, + ) + fallback_lines.append( + self._failed_media_notice(media_path, publish_error) + ) + continue + + ok, error = validate_url_target(resolved_media) + if not ok: + logger.warning("QQ outbound media blocked by URL validation: {}", error) + fallback_lines.append(self._failed_media_notice(media_path, error)) + continue + + try: + await self._post_remote_media_message( + msg.chat_id, + msg_type, + resolved_media, + msg.content if msg.content and not content_sent else None, + msg_id, + ) + if msg.content and not content_sent: + content_sent = True + except Exception as media_error: + logger.error("Error sending QQ media {}: {}", resolved_media, media_error) + fallback_lines.append(self._failed_media_notice(media_path)) + + text_parts: list[str] = [] + if msg.content and not content_sent: + text_parts.append(msg.content) + if fallback_lines: + text_parts.extend(fallback_lines) + + if text_parts: + await self._post_text_message(msg.chat_id, msg_type, "\n".join(text_parts), msg_id) except Exception as e: logger.error("Error sending QQ message: {}", e) diff --git a/nanobot/cli/commands.py b/nanobot/cli/commands.py index 725cc30..78e6033 100644 --- a/nanobot/cli/commands.py +++ b/nanobot/cli/commands.py @@ -495,6 +495,7 @@ def gateway( from nanobot.config.paths import get_cron_dir from nanobot.cron.service import CronService from nanobot.cron.types import CronJob + from nanobot.gateway.http import GatewayHttpServer from nanobot.heartbeat.service import HeartbeatService from nanobot.session.manager import SessionManager @@ -581,6 +582,7 @@ def gateway( # Create channel manager channels = ChannelManager(config, bus) + http_server = GatewayHttpServer(config.workspace_path, config.gateway.host, port) def _pick_heartbeat_target() -> tuple[str, str]: """Pick a routable channel/chat target for heartbeat-triggered messages.""" @@ -638,6 +640,10 @@ def gateway( else: console.print("[yellow]Warning: No channels enabled[/yellow]") + console.print( + f"[green]✓[/green] Public files: {http_server.public_dir} -> /public/" + ) + cron_status = cron.status() if cron_status["jobs"] > 0: console.print(f"[green]✓[/green] Cron: {cron_status['jobs']} scheduled jobs") @@ -648,6 +654,7 @@ def gateway( try: await cron.start() await heartbeat.start() + await http_server.start() await asyncio.gather( agent.run(), channels.start_all(), @@ -659,6 +666,7 @@ def gateway( heartbeat.stop() cron.stop() agent.stop() + await http_server.stop() await channels.stop_all() asyncio.run(run()) diff --git a/nanobot/config/schema.py b/nanobot/config/schema.py index 116066c..c1f5175 100644 --- a/nanobot/config/schema.py +++ b/nanobot/config/schema.py @@ -315,6 +315,9 @@ class QQConfig(Base): app_id: str = "" # 机器人 ID (AppID) from q.qq.com secret: str = "" # 机器人密钥 (AppSecret) from q.qq.com allow_from: list[str] = Field(default_factory=list) # Allowed user openids + media_base_url: str = "" # Public base URL used to expose local QQ media files + media_public_dir: str = "public/qq" # Must stay under the active workspace/public tree + media_ttl_seconds: int = 600 # Delete published local QQ media after N seconds; <=0 keeps files class QQInstanceConfig(QQConfig): diff --git a/nanobot/gateway/__init__.py b/nanobot/gateway/__init__.py new file mode 100644 index 0000000..74a2aa9 --- /dev/null +++ b/nanobot/gateway/__init__.py @@ -0,0 +1 @@ +"""Gateway HTTP helpers.""" diff --git a/nanobot/gateway/http.py b/nanobot/gateway/http.py new file mode 100644 index 0000000..6991423 --- /dev/null +++ b/nanobot/gateway/http.py @@ -0,0 +1,65 @@ +"""Minimal HTTP server for workspace-scoped public files.""" + +from __future__ import annotations + +from pathlib import Path + +from aiohttp import web +from loguru import logger + +from nanobot.utils.helpers import ensure_dir + + +def get_public_dir(workspace: Path) -> Path: + """Return the fixed public directory served by the gateway.""" + return ensure_dir(workspace / "public") + + +def create_http_app(workspace: Path) -> web.Application: + """Create the gateway HTTP app serving workspace/public.""" + public_dir = get_public_dir(workspace) + app = web.Application() + + async def health(_request: web.Request) -> web.Response: + return web.json_response({"ok": True}) + + app.router.add_get("/healthz", health) + app.router.add_static("/public/", path=str(public_dir), follow_symlinks=False, show_index=False) + return app + + +class GatewayHttpServer: + """Small aiohttp server exposing only workspace/public.""" + + def __init__(self, workspace: Path, host: str, port: int): + self.workspace = workspace + self.host = host + self.port = port + self._app = create_http_app(workspace) + self._runner: web.AppRunner | None = None + self._site: web.TCPSite | None = None + + @property + def public_dir(self) -> Path: + """Return the served public directory.""" + return get_public_dir(self.workspace) + + async def start(self) -> None: + """Start serving the HTTP routes.""" + self._runner = web.AppRunner(self._app, access_log=None) + await self._runner.setup() + self._site = web.TCPSite(self._runner, host=self.host, port=self.port) + await self._site.start() + logger.info( + "Gateway HTTP server listening on {}:{} (public dir: {})", + self.host, + self.port, + self.public_dir, + ) + + async def stop(self) -> None: + """Stop the HTTP server.""" + if self._runner: + await self._runner.cleanup() + self._runner = None + self._site = None diff --git a/tests/test_gateway_http.py b/tests/test_gateway_http.py new file mode 100644 index 0000000..2248c21 --- /dev/null +++ b/tests/test_gateway_http.py @@ -0,0 +1,44 @@ +import os +from pathlib import Path + +import pytest +from aiohttp.test_utils import make_mocked_request + +from nanobot.gateway.http import create_http_app, get_public_dir + + +@pytest.mark.asyncio +async def test_gateway_public_route_maps_requests_into_workspace_public(tmp_path) -> None: + public_dir = get_public_dir(tmp_path) + file_path = public_dir / "hello.txt" + file_path.write_text("hello", encoding="utf-8") + + app = create_http_app(tmp_path) + request = make_mocked_request("GET", "/public/hello.txt", app=app) + match = await app.router.resolve(request) + + assert match.route.resource.canonical == "/public" + assert match["filename"] == "hello.txt" + assert Path(getattr(match.route.resource, "_directory")) == public_dir + + +@pytest.mark.asyncio +async def test_gateway_public_route_disables_symlink_following_and_allows_hard_links(tmp_path) -> None: + out_dir = tmp_path / "out" + out_dir.mkdir() + source = out_dir / "shot.png" + source.write_bytes(b"png") + + public_dir = get_public_dir(tmp_path) / "qq" + public_dir.mkdir() + published = public_dir / "shot.png" + os.link(source, published) + + app = create_http_app(tmp_path) + request = make_mocked_request("GET", "/public/qq/shot.png", app=app) + match = await app.router.resolve(request) + + assert os.stat(source).st_ino == os.stat(published).st_ino + assert match.route.resource.canonical == "/public" + assert match["filename"] == "qq/shot.png" + assert getattr(match.route.resource, "_follow_symlinks") is False diff --git a/tests/test_qq_channel.py b/tests/test_qq_channel.py index db21468..acf635c 100644 --- a/tests/test_qq_channel.py +++ b/tests/test_qq_channel.py @@ -1,3 +1,4 @@ +import os from types import SimpleNamespace import pytest @@ -12,6 +13,8 @@ class _FakeApi: def __init__(self) -> None: self.c2c_calls: list[dict] = [] self.group_calls: list[dict] = [] + self.c2c_file_calls: list[dict] = [] + self.group_file_calls: list[dict] = [] async def post_c2c_message(self, **kwargs) -> None: self.c2c_calls.append(kwargs) @@ -19,6 +22,14 @@ class _FakeApi: async def post_group_message(self, **kwargs) -> None: self.group_calls.append(kwargs) + async def post_c2c_file(self, **kwargs) -> dict: + self.c2c_file_calls.append(kwargs) + return {"file_info": "c2c-file-info", "file_uuid": "c2c-file", "ttl": 60} + + async def post_group_file(self, **kwargs) -> dict: + self.group_file_calls.append(kwargs) + return {"file_info": "group-file-info", "file_uuid": "group-file", "ttl": 60} + class _FakeClient: def __init__(self) -> None: @@ -94,3 +105,339 @@ async def test_send_c2c_message_uses_plain_text_c2c_api_with_msg_seq() -> None: "msg_seq": 2, } assert not channel._client.api.group_calls + + +@pytest.mark.asyncio +async def test_send_group_remote_media_url_uses_file_api_then_media_message(monkeypatch) -> None: + channel = QQChannel(QQConfig(app_id="app", secret="secret", allow_from=["*"]), MessageBus()) + channel._client = _FakeClient() + channel._chat_type_cache["group123"] = "group" + monkeypatch.setattr("nanobot.channels.qq.validate_url_target", lambda url: (True, "")) + + await channel.send( + OutboundMessage( + channel="qq", + chat_id="group123", + content="look", + media=["https://example.com/cat.jpg"], + metadata={"message_id": "msg1"}, + ) + ) + + assert channel._client.api.group_file_calls == [ + { + "group_openid": "group123", + "file_type": 1, + "url": "https://example.com/cat.jpg", + "srv_send_msg": False, + } + ] + assert channel._client.api.group_calls == [ + { + "group_openid": "group123", + "msg_type": 7, + "content": "look", + "media": {"file_info": "group-file-info", "file_uuid": "group-file", "ttl": 60}, + "msg_id": "msg1", + "msg_seq": 2, + } + ] + assert channel._client.api.c2c_calls == [] + + +@pytest.mark.asyncio +async def test_send_local_media_falls_back_to_text_notice_when_publishing_not_configured() -> None: + channel = QQChannel(QQConfig(app_id="app", secret="secret", allow_from=["*"]), MessageBus()) + channel._client = _FakeClient() + + await channel.send( + OutboundMessage( + channel="qq", + chat_id="user123", + content="hello", + media=["/tmp/demo.png"], + metadata={"message_id": "msg1"}, + ) + ) + + assert channel._client.api.c2c_file_calls == [] + assert channel._client.api.group_file_calls == [] + assert channel._client.api.c2c_calls == [ + { + "openid": "user123", + "msg_type": 0, + "content": "hello\n[Failed to send: demo.png - QQ local media publishing is not configured]", + "msg_id": "msg1", + "msg_seq": 2, + } + ] + + +@pytest.mark.asyncio +async def test_send_local_media_under_public_dir_uses_c2c_file_api( + monkeypatch, + tmp_path, +) -> None: + workspace = tmp_path / "workspace" + workspace.mkdir() + public_dir = workspace / "public" / "qq" + public_dir.mkdir(parents=True) + source = public_dir / "demo.png" + source.write_bytes(b"fake-png") + + channel = QQChannel( + QQConfig( + app_id="app", + secret="secret", + allow_from=["*"], + media_base_url="https://files.example.com/public/qq", + media_public_dir="public/qq", + media_ttl_seconds=0, + ), + MessageBus(), + workspace=workspace, + ) + channel._client = _FakeClient() + monkeypatch.setattr("nanobot.channels.qq.validate_url_target", lambda url: (True, "")) + + await channel.send( + OutboundMessage( + channel="qq", + chat_id="user123", + content="hello", + media=[str(source)], + metadata={"message_id": "msg1"}, + ) + ) + + assert channel._client.api.c2c_file_calls == [ + { + "openid": "user123", + "file_type": 1, + "url": "https://files.example.com/public/qq/demo.png", + "srv_send_msg": False, + } + ] + assert channel._client.api.c2c_calls == [ + { + "openid": "user123", + "msg_type": 7, + "content": "hello", + "media": {"file_info": "c2c-file-info", "file_uuid": "c2c-file", "ttl": 60}, + "msg_id": "msg1", + "msg_seq": 2, + } + ] + + +@pytest.mark.asyncio +async def test_send_local_media_from_out_auto_links_into_public_then_uses_c2c_file_api( + monkeypatch, + tmp_path, +) -> None: + workspace = tmp_path / "workspace" + workspace.mkdir() + out_dir = workspace / "out" + out_dir.mkdir() + source = out_dir / "outside.png" + source.write_bytes(b"\x89PNG\r\n\x1a\nfake-png") + + channel = QQChannel( + QQConfig( + app_id="app", + secret="secret", + allow_from=["*"], + media_base_url="https://files.example.com/public/qq", + media_public_dir="public/qq", + media_ttl_seconds=0, + ), + MessageBus(), + workspace=workspace, + ) + channel._client = _FakeClient() + monkeypatch.setattr("nanobot.channels.qq.validate_url_target", lambda url: (True, "")) + + await channel.send( + OutboundMessage( + channel="qq", + chat_id="user123", + content="hello", + media=[str(source)], + metadata={"message_id": "msg1"}, + ) + ) + + published = list((workspace / "public" / "qq").iterdir()) + assert len(published) == 1 + assert published[0].name.startswith("outside-") + assert published[0].suffix == ".png" + assert os.stat(source).st_ino == os.stat(published[0]).st_ino + assert channel._client.api.c2c_file_calls == [ + { + "openid": "user123", + "file_type": 1, + "url": f"https://files.example.com/public/qq/{published[0].name}", + "srv_send_msg": False, + } + ] + assert channel._client.api.c2c_calls == [ + { + "openid": "user123", + "msg_type": 7, + "content": "hello", + "media": {"file_info": "c2c-file-info", "file_uuid": "c2c-file", "ttl": 60}, + "msg_id": "msg1", + "msg_seq": 2, + } + ] + + +@pytest.mark.asyncio +async def test_send_local_media_outside_public_and_out_falls_back_to_text_notice( + monkeypatch, + tmp_path, +) -> None: + workspace = tmp_path / "workspace" + workspace.mkdir() + docs_dir = workspace / "docs" + docs_dir.mkdir() + source = docs_dir / "outside.png" + source.write_bytes(b"fake-png") + + channel = QQChannel( + QQConfig( + app_id="app", + secret="secret", + allow_from=["*"], + media_base_url="https://files.example.com/public/qq", + media_public_dir="public/qq", + media_ttl_seconds=0, + ), + MessageBus(), + workspace=workspace, + ) + channel._client = _FakeClient() + monkeypatch.setattr("nanobot.channels.qq.validate_url_target", lambda url: (True, "")) + + await channel.send( + OutboundMessage( + channel="qq", + chat_id="user123", + content="hello", + media=[str(source)], + metadata={"message_id": "msg1"}, + ) + ) + + assert channel._client.api.c2c_file_calls == [] + assert channel._client.api.c2c_calls == [ + { + "openid": "user123", + "msg_type": 0, + "content": "hello\n[Failed to send: outside.png - QQ local media must stay under " + f"{workspace / 'public' / 'qq'} or {workspace / 'out'}]", + "msg_id": "msg1", + "msg_seq": 2, + } + ] + + +@pytest.mark.asyncio +async def test_send_local_media_symlink_to_outside_public_dir_is_rejected( + monkeypatch, + tmp_path, +) -> None: + workspace = tmp_path / "workspace" + workspace.mkdir() + public_dir = workspace / "public" / "qq" + public_dir.mkdir(parents=True) + outside = tmp_path / "secret.png" + outside.write_bytes(b"secret") + source = public_dir / "linked.png" + source.symlink_to(outside) + + channel = QQChannel( + QQConfig( + app_id="app", + secret="secret", + allow_from=["*"], + media_base_url="https://files.example.com/public/qq", + media_public_dir="public/qq", + media_ttl_seconds=0, + ), + MessageBus(), + workspace=workspace, + ) + channel._client = _FakeClient() + monkeypatch.setattr("nanobot.channels.qq.validate_url_target", lambda url: (True, "")) + + await channel.send( + OutboundMessage( + channel="qq", + chat_id="user123", + content="hello", + media=[str(source)], + metadata={"message_id": "msg1"}, + ) + ) + + assert channel._client.api.c2c_file_calls == [] + assert channel._client.api.c2c_calls == [ + { + "openid": "user123", + "msg_type": 0, + "content": "hello\n[Failed to send: linked.png - QQ local media must stay under " + f"{workspace / 'public' / 'qq'} or {workspace / 'out'}]", + "msg_id": "msg1", + "msg_seq": 2, + } + ] + + +@pytest.mark.asyncio +async def test_send_non_image_media_from_out_falls_back_to_text_notice( + monkeypatch, + tmp_path, +) -> None: + workspace = tmp_path / "workspace" + workspace.mkdir() + out_dir = workspace / "out" + out_dir.mkdir() + source = out_dir / "note.txt" + source.write_text("not an image", encoding="utf-8") + + channel = QQChannel( + QQConfig( + app_id="app", + secret="secret", + allow_from=["*"], + media_base_url="https://files.example.com/public/qq", + media_public_dir="public/qq", + media_ttl_seconds=0, + ), + MessageBus(), + workspace=workspace, + ) + channel._client = _FakeClient() + monkeypatch.setattr("nanobot.channels.qq.validate_url_target", lambda url: (True, "")) + + await channel.send( + OutboundMessage( + channel="qq", + chat_id="user123", + content="hello", + media=[str(source)], + metadata={"message_id": "msg1"}, + ) + ) + + assert channel._client.api.c2c_file_calls == [] + assert channel._client.api.c2c_calls == [ + { + "openid": "user123", + "msg_type": 0, + "content": "hello\n[Failed to send: note.txt - QQ local media must be an image]", + "msg_id": "msg1", + "msg_seq": 2, + } + ]