feat(qq): serve public media via gateway

This commit is contained in:
Hua
2026-03-19 16:27:29 +08:00
parent fd52973751
commit 395fdc16f9
9 changed files with 758 additions and 19 deletions

View File

@@ -33,6 +33,7 @@ Do not commit real API keys, tokens, chat logs, or workspace data. Keep local se
- `/skill` currently supports `search`, `install`, `uninstall`, `list`, and `update`. Keep subcommand dispatch in `nanobot/agent/loop.py`.
- `/mcp` supports the default `list` behavior (and explicit `/mcp list`) to show configured MCP servers and registered MCP tools.
- Agent runtime config should be hot-reloaded from the active `config.json` for safe in-process fields such as `tools.mcpServers`, `tools.web.*`, `tools.exec.*`, `tools.restrictToWorkspace`, `agents.defaults.model`, `agents.defaults.maxToolIterations`, `agents.defaults.contextWindowTokens`, `agents.defaults.maxTokens`, `agents.defaults.temperature`, `agents.defaults.reasoningEffort`, `channels.sendProgress`, and `channels.sendToolHints`. Channel connection settings and provider credentials still require a restart.
- QQ outbound media uses QQ's URL-based rich-media API. Remote `http(s)` image URLs can be sent directly. Local files are allowed from two controlled locations only: the configured `mediaPublicDir` inside `workspace/public`, and generated image files under `workspace/out`, which the QQ channel may hard-link into `public/` automatically before sending. Do not auto-publish from any other directory.
- `/skill` shells out to `npx clawhub@latest`; it requires Node.js/`npx` at runtime.
- `/skill uninstall` runs in a non-interactive context, so keep passing `--yes` when shelling out to ClawHub.
- Treat empty `/skill search` output as a user-visible "no results" case rather than a silent success. Surface npm/registry failures directly to the user.

View File

@@ -699,12 +699,22 @@ Uses **botpy SDK** with WebSocket — no public IP required. Currently supports
"enabled": true,
"appId": "YOUR_APP_ID",
"secret": "YOUR_APP_SECRET",
"allowFrom": ["YOUR_OPENID"]
"allowFrom": ["YOUR_OPENID"],
"mediaBaseUrl": "https://bot.example.com/public/qq/",
"mediaPublicDir": "public/qq",
"mediaTtlSeconds": 600
}
}
}
```
`mediaBaseUrl` is optional, but it is required if you want nanobot to send local screenshots or
other local image files through QQ. `mediaPublicDir` is resolved against the active startup
workspace and must stay under `workspace/public`, because the built-in gateway HTTP server only
serves that tree at `/public/`. nanobot accepts local QQ media from two places only: files already
under `mediaPublicDir`, and generated image files under `workspace/out`, which nanobot will
hard-link into `mediaPublicDir` automatically before sending.
Multi-bot example:
```json
@@ -739,6 +749,16 @@ nanobot gateway
Now send a message to the bot from QQ — it should respond!
Outbound QQ media always uses the QQ `url`-based rich-media API. Remote `http(s)` image URLs can be
sent directly. Local image files can also be sent when `mediaBaseUrl` points to a public URL and
`mediaPublicDir` matches a directory under `workspace/public`; nanobot maps that local public path
to a URL and then sends that URL through QQ. The built-in gateway route exposes
`workspace/public` as `/public/`, so a common setup is `mediaBaseUrl = https://your-host/public/qq/`.
If you generate screenshots under `workspace/out`, nanobot will automatically create a hard link in
`workspace/public/qq` first, then send that public URL. Files outside `mediaPublicDir` and
`workspace/out` are rejected. Without that publishing config, local files still fall back to a text
notice.
</details>
<details>

View File

@@ -1,8 +1,12 @@
"""QQ channel implementation using botpy SDK."""
import asyncio
import os
import secrets
from collections import deque
from pathlib import Path
from typing import TYPE_CHECKING
from urllib.parse import quote, urljoin
from loguru import logger
@@ -10,6 +14,8 @@ from nanobot.bus.events import OutboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.channels.base import BaseChannel
from nanobot.config.schema import QQConfig, QQInstanceConfig
from nanobot.security.network import validate_url_target
from nanobot.utils.helpers import detect_image_mime, ensure_dir
try:
import botpy
@@ -60,13 +66,225 @@ class QQChannel(BaseChannel):
def default_config(cls) -> dict[str, object]:
return QQConfig().model_dump(by_alias=True)
def __init__(self, config: QQConfig | QQInstanceConfig, bus: MessageBus):
def __init__(
self,
config: QQConfig | QQInstanceConfig,
bus: MessageBus,
workspace: str | Path | None = None,
):
super().__init__(config, bus)
self.config: QQConfig | QQInstanceConfig = config
self._client: "botpy.Client | None" = None
self._processed_ids: deque = deque(maxlen=1000)
self._msg_seq: int = 1 # 消息序列号,避免被 QQ API 去重
self._chat_type_cache: dict[str, str] = {}
self._workspace = Path(workspace).expanduser() if workspace is not None else None
self._cleanup_tasks: set[asyncio.Task[None]] = set()
@staticmethod
def _is_remote_media(path: str) -> bool:
"""Return True when the outbound media reference is a remote URL."""
return path.startswith(("http://", "https://"))
@staticmethod
def _failed_media_notice(path: str, reason: str | None = None) -> str:
"""Render a user-visible fallback notice for unsent QQ media."""
name = Path(path).name or path
return f"[Failed to send: {name}{f' - {reason}' if reason else ''}]"
def _workspace_root(self) -> Path:
"""Return the active workspace root used by QQ publishing."""
return (self._workspace or Path.cwd()).resolve(strict=False)
def _public_root(self) -> Path:
"""Return the fixed public tree served by the gateway HTTP route."""
return ensure_dir(self._workspace_root() / "public")
def _out_root(self) -> Path:
"""Return the default workspace out directory used for generated artifacts."""
return self._workspace_root() / "out"
def _resolve_media_public_dir(self) -> tuple[Path | None, str | None]:
"""Resolve the local publish directory for QQ media under workspace/public."""
configured = Path(self.config.media_public_dir).expanduser()
if configured.is_absolute():
resolved = configured.resolve(strict=False)
else:
resolved = (self._workspace_root() / configured).resolve(strict=False)
public_root = self._public_root()
try:
resolved.relative_to(public_root)
except ValueError:
return None, f"QQ mediaPublicDir must stay under {public_root}"
return ensure_dir(resolved), None
@staticmethod
def _guess_image_suffix(path: Path, mime_type: str | None) -> str:
"""Pick a reasonable output suffix for published QQ images."""
if path.suffix:
return path.suffix.lower()
return {
"image/png": ".png",
"image/jpeg": ".jpg",
"image/gif": ".gif",
"image/webp": ".webp",
}.get(mime_type or "", ".bin")
@staticmethod
def _is_image_file(path: Path) -> bool:
"""Validate that a local file looks like an image supported by QQ rich media."""
try:
with path.open("rb") as f:
header = f.read(16)
except OSError:
return False
return detect_image_mime(header) is not None
@staticmethod
def _detect_image_mime(path: Path) -> str | None:
"""Detect image mime type from the leading bytes of a file."""
try:
with path.open("rb") as f:
return detect_image_mime(f.read(16))
except OSError:
return None
async def _delete_published_media_later(self, path: Path, delay_seconds: int) -> None:
"""Delete an auto-published QQ media file after a grace period."""
try:
await asyncio.sleep(delay_seconds)
path.unlink(missing_ok=True)
except Exception as e:
logger.debug("Failed to delete published QQ media {}: {}", path, e)
def _schedule_media_cleanup(self, path: Path) -> None:
"""Best-effort cleanup for auto-published local QQ media."""
if self.config.media_ttl_seconds <= 0:
return
task = asyncio.create_task(
self._delete_published_media_later(path, self.config.media_ttl_seconds)
)
self._cleanup_tasks.add(task)
task.add_done_callback(self._cleanup_tasks.discard)
def _try_link_out_media_into_public(
self,
source: Path,
public_dir: Path,
) -> tuple[Path | None, str | None]:
"""Hard-link a generated workspace/out media file into public/qq."""
out_root = self._out_root().resolve(strict=False)
try:
source.relative_to(out_root)
except ValueError:
return None, f"QQ local media must stay under {public_dir} or {out_root}"
if not self._is_image_file(source):
return None, "QQ local media must be an image"
mime_type = self._detect_image_mime(source)
suffix = self._guess_image_suffix(source, mime_type)
published = public_dir / f"{source.stem}-{secrets.token_urlsafe(6)}{suffix}"
try:
os.link(source, published)
except OSError as e:
logger.warning("Failed to hard-link QQ media {} -> {}: {}", source, published, e)
return None, "failed to publish local file"
self._schedule_media_cleanup(published)
return published, None
async def _publish_local_media(self, media_path: str) -> tuple[str | None, str | None]:
"""Map a local public QQ media file, or a generated out file, to its served URL."""
if not self.config.media_base_url:
return None, "QQ local media publishing is not configured"
source = Path(media_path).expanduser()
try:
resolved = source.resolve(strict=True)
except FileNotFoundError:
return None, "local file not found"
except OSError as e:
logger.warning("Failed to resolve QQ media path {}: {}", media_path, e)
return None, "local file unavailable"
if not resolved.is_file():
return None, "local file not found"
public_dir, dir_error = self._resolve_media_public_dir()
if public_dir is None:
return None, dir_error
try:
relative_path = resolved.relative_to(public_dir)
except ValueError:
published, publish_error = self._try_link_out_media_into_public(resolved, public_dir)
if published is None:
return None, publish_error
relative_path = published.relative_to(public_dir)
media_url = urljoin(
f"{self.config.media_base_url.rstrip('/')}/",
quote(relative_path.as_posix(), safe="/"),
)
return media_url, None
def _next_msg_seq(self) -> int:
"""Return the next QQ message sequence number."""
self._msg_seq += 1
return self._msg_seq
async def _post_text_message(self, chat_id: str, msg_type: str, content: str, msg_id: str | None) -> None:
"""Send a plain-text QQ message."""
payload = {
"msg_type": 0,
"content": content,
"msg_id": msg_id,
"msg_seq": self._next_msg_seq(),
}
if msg_type == "group":
await self._client.api.post_group_message(group_openid=chat_id, **payload)
else:
await self._client.api.post_c2c_message(openid=chat_id, **payload)
async def _post_remote_media_message(
self,
chat_id: str,
msg_type: str,
media_url: str,
content: str | None,
msg_id: str | None,
) -> None:
"""Send one QQ remote image URL as a rich-media message."""
if msg_type == "group":
media = await self._client.api.post_group_file(
group_openid=chat_id,
file_type=1,
url=media_url,
srv_send_msg=False,
)
await self._client.api.post_group_message(
group_openid=chat_id,
msg_type=7,
content=content,
media=media,
msg_id=msg_id,
msg_seq=self._next_msg_seq(),
)
else:
media = await self._client.api.post_c2c_file(
openid=chat_id,
file_type=1,
url=media_url,
srv_send_msg=False,
)
await self._client.api.post_c2c_message(
openid=chat_id,
msg_type=7,
content=content,
media=media,
msg_id=msg_id,
msg_seq=self._next_msg_seq(),
)
async def start(self) -> None:
"""Start the QQ bot."""
@@ -98,6 +316,9 @@ class QQChannel(BaseChannel):
async def stop(self) -> None:
"""Stop the QQ bot."""
self._running = False
for task in list(self._cleanup_tasks):
task.cancel()
self._cleanup_tasks.clear()
if self._client:
try:
await self._client.close()
@@ -113,24 +334,53 @@ class QQChannel(BaseChannel):
try:
msg_id = msg.metadata.get("message_id")
self._msg_seq += 1
msg_type = self._chat_type_cache.get(msg.chat_id, "c2c")
if msg_type == "group":
await self._client.api.post_group_message(
group_openid=msg.chat_id,
msg_type=0,
content=msg.content,
msg_id=msg_id,
msg_seq=self._msg_seq,
)
else:
await self._client.api.post_c2c_message(
openid=msg.chat_id,
msg_type=0,
content=msg.content,
msg_id=msg_id,
msg_seq=self._msg_seq,
)
content_sent = False
fallback_lines: list[str] = []
for media_path in msg.media:
resolved_media = media_path
if not self._is_remote_media(media_path):
resolved_media, publish_error = await self._publish_local_media(media_path)
if not resolved_media:
logger.warning(
"QQ outbound local media could not be published: {} ({})",
media_path,
publish_error,
)
fallback_lines.append(
self._failed_media_notice(media_path, publish_error)
)
continue
ok, error = validate_url_target(resolved_media)
if not ok:
logger.warning("QQ outbound media blocked by URL validation: {}", error)
fallback_lines.append(self._failed_media_notice(media_path, error))
continue
try:
await self._post_remote_media_message(
msg.chat_id,
msg_type,
resolved_media,
msg.content if msg.content and not content_sent else None,
msg_id,
)
if msg.content and not content_sent:
content_sent = True
except Exception as media_error:
logger.error("Error sending QQ media {}: {}", resolved_media, media_error)
fallback_lines.append(self._failed_media_notice(media_path))
text_parts: list[str] = []
if msg.content and not content_sent:
text_parts.append(msg.content)
if fallback_lines:
text_parts.extend(fallback_lines)
if text_parts:
await self._post_text_message(msg.chat_id, msg_type, "\n".join(text_parts), msg_id)
except Exception as e:
logger.error("Error sending QQ message: {}", e)

View File

@@ -495,6 +495,7 @@ def gateway(
from nanobot.config.paths import get_cron_dir
from nanobot.cron.service import CronService
from nanobot.cron.types import CronJob
from nanobot.gateway.http import GatewayHttpServer
from nanobot.heartbeat.service import HeartbeatService
from nanobot.session.manager import SessionManager
@@ -581,6 +582,7 @@ def gateway(
# Create channel manager
channels = ChannelManager(config, bus)
http_server = GatewayHttpServer(config.workspace_path, config.gateway.host, port)
def _pick_heartbeat_target() -> tuple[str, str]:
"""Pick a routable channel/chat target for heartbeat-triggered messages."""
@@ -638,6 +640,10 @@ def gateway(
else:
console.print("[yellow]Warning: No channels enabled[/yellow]")
console.print(
f"[green]✓[/green] Public files: {http_server.public_dir} -> /public/"
)
cron_status = cron.status()
if cron_status["jobs"] > 0:
console.print(f"[green]✓[/green] Cron: {cron_status['jobs']} scheduled jobs")
@@ -648,6 +654,7 @@ def gateway(
try:
await cron.start()
await heartbeat.start()
await http_server.start()
await asyncio.gather(
agent.run(),
channels.start_all(),
@@ -659,6 +666,7 @@ def gateway(
heartbeat.stop()
cron.stop()
agent.stop()
await http_server.stop()
await channels.stop_all()
asyncio.run(run())

View File

@@ -315,6 +315,9 @@ class QQConfig(Base):
app_id: str = "" # 机器人 ID (AppID) from q.qq.com
secret: str = "" # 机器人密钥 (AppSecret) from q.qq.com
allow_from: list[str] = Field(default_factory=list) # Allowed user openids
media_base_url: str = "" # Public base URL used to expose local QQ media files
media_public_dir: str = "public/qq" # Must stay under the active workspace/public tree
media_ttl_seconds: int = 600 # Delete published local QQ media after N seconds; <=0 keeps files
class QQInstanceConfig(QQConfig):

View File

@@ -0,0 +1 @@
"""Gateway HTTP helpers."""

65
nanobot/gateway/http.py Normal file
View File

@@ -0,0 +1,65 @@
"""Minimal HTTP server for workspace-scoped public files."""
from __future__ import annotations
from pathlib import Path
from aiohttp import web
from loguru import logger
from nanobot.utils.helpers import ensure_dir
def get_public_dir(workspace: Path) -> Path:
"""Return the fixed public directory served by the gateway."""
return ensure_dir(workspace / "public")
def create_http_app(workspace: Path) -> web.Application:
"""Create the gateway HTTP app serving workspace/public."""
public_dir = get_public_dir(workspace)
app = web.Application()
async def health(_request: web.Request) -> web.Response:
return web.json_response({"ok": True})
app.router.add_get("/healthz", health)
app.router.add_static("/public/", path=str(public_dir), follow_symlinks=False, show_index=False)
return app
class GatewayHttpServer:
"""Small aiohttp server exposing only workspace/public."""
def __init__(self, workspace: Path, host: str, port: int):
self.workspace = workspace
self.host = host
self.port = port
self._app = create_http_app(workspace)
self._runner: web.AppRunner | None = None
self._site: web.TCPSite | None = None
@property
def public_dir(self) -> Path:
"""Return the served public directory."""
return get_public_dir(self.workspace)
async def start(self) -> None:
"""Start serving the HTTP routes."""
self._runner = web.AppRunner(self._app, access_log=None)
await self._runner.setup()
self._site = web.TCPSite(self._runner, host=self.host, port=self.port)
await self._site.start()
logger.info(
"Gateway HTTP server listening on {}:{} (public dir: {})",
self.host,
self.port,
self.public_dir,
)
async def stop(self) -> None:
"""Stop the HTTP server."""
if self._runner:
await self._runner.cleanup()
self._runner = None
self._site = None

View File

@@ -0,0 +1,44 @@
import os
from pathlib import Path
import pytest
from aiohttp.test_utils import make_mocked_request
from nanobot.gateway.http import create_http_app, get_public_dir
@pytest.mark.asyncio
async def test_gateway_public_route_maps_requests_into_workspace_public(tmp_path) -> None:
public_dir = get_public_dir(tmp_path)
file_path = public_dir / "hello.txt"
file_path.write_text("hello", encoding="utf-8")
app = create_http_app(tmp_path)
request = make_mocked_request("GET", "/public/hello.txt", app=app)
match = await app.router.resolve(request)
assert match.route.resource.canonical == "/public"
assert match["filename"] == "hello.txt"
assert Path(getattr(match.route.resource, "_directory")) == public_dir
@pytest.mark.asyncio
async def test_gateway_public_route_disables_symlink_following_and_allows_hard_links(tmp_path) -> None:
out_dir = tmp_path / "out"
out_dir.mkdir()
source = out_dir / "shot.png"
source.write_bytes(b"png")
public_dir = get_public_dir(tmp_path) / "qq"
public_dir.mkdir()
published = public_dir / "shot.png"
os.link(source, published)
app = create_http_app(tmp_path)
request = make_mocked_request("GET", "/public/qq/shot.png", app=app)
match = await app.router.resolve(request)
assert os.stat(source).st_ino == os.stat(published).st_ino
assert match.route.resource.canonical == "/public"
assert match["filename"] == "qq/shot.png"
assert getattr(match.route.resource, "_follow_symlinks") is False

View File

@@ -1,3 +1,4 @@
import os
from types import SimpleNamespace
import pytest
@@ -12,6 +13,8 @@ class _FakeApi:
def __init__(self) -> None:
self.c2c_calls: list[dict] = []
self.group_calls: list[dict] = []
self.c2c_file_calls: list[dict] = []
self.group_file_calls: list[dict] = []
async def post_c2c_message(self, **kwargs) -> None:
self.c2c_calls.append(kwargs)
@@ -19,6 +22,14 @@ class _FakeApi:
async def post_group_message(self, **kwargs) -> None:
self.group_calls.append(kwargs)
async def post_c2c_file(self, **kwargs) -> dict:
self.c2c_file_calls.append(kwargs)
return {"file_info": "c2c-file-info", "file_uuid": "c2c-file", "ttl": 60}
async def post_group_file(self, **kwargs) -> dict:
self.group_file_calls.append(kwargs)
return {"file_info": "group-file-info", "file_uuid": "group-file", "ttl": 60}
class _FakeClient:
def __init__(self) -> None:
@@ -94,3 +105,339 @@ async def test_send_c2c_message_uses_plain_text_c2c_api_with_msg_seq() -> None:
"msg_seq": 2,
}
assert not channel._client.api.group_calls
@pytest.mark.asyncio
async def test_send_group_remote_media_url_uses_file_api_then_media_message(monkeypatch) -> None:
channel = QQChannel(QQConfig(app_id="app", secret="secret", allow_from=["*"]), MessageBus())
channel._client = _FakeClient()
channel._chat_type_cache["group123"] = "group"
monkeypatch.setattr("nanobot.channels.qq.validate_url_target", lambda url: (True, ""))
await channel.send(
OutboundMessage(
channel="qq",
chat_id="group123",
content="look",
media=["https://example.com/cat.jpg"],
metadata={"message_id": "msg1"},
)
)
assert channel._client.api.group_file_calls == [
{
"group_openid": "group123",
"file_type": 1,
"url": "https://example.com/cat.jpg",
"srv_send_msg": False,
}
]
assert channel._client.api.group_calls == [
{
"group_openid": "group123",
"msg_type": 7,
"content": "look",
"media": {"file_info": "group-file-info", "file_uuid": "group-file", "ttl": 60},
"msg_id": "msg1",
"msg_seq": 2,
}
]
assert channel._client.api.c2c_calls == []
@pytest.mark.asyncio
async def test_send_local_media_falls_back_to_text_notice_when_publishing_not_configured() -> None:
channel = QQChannel(QQConfig(app_id="app", secret="secret", allow_from=["*"]), MessageBus())
channel._client = _FakeClient()
await channel.send(
OutboundMessage(
channel="qq",
chat_id="user123",
content="hello",
media=["/tmp/demo.png"],
metadata={"message_id": "msg1"},
)
)
assert channel._client.api.c2c_file_calls == []
assert channel._client.api.group_file_calls == []
assert channel._client.api.c2c_calls == [
{
"openid": "user123",
"msg_type": 0,
"content": "hello\n[Failed to send: demo.png - QQ local media publishing is not configured]",
"msg_id": "msg1",
"msg_seq": 2,
}
]
@pytest.mark.asyncio
async def test_send_local_media_under_public_dir_uses_c2c_file_api(
monkeypatch,
tmp_path,
) -> None:
workspace = tmp_path / "workspace"
workspace.mkdir()
public_dir = workspace / "public" / "qq"
public_dir.mkdir(parents=True)
source = public_dir / "demo.png"
source.write_bytes(b"fake-png")
channel = QQChannel(
QQConfig(
app_id="app",
secret="secret",
allow_from=["*"],
media_base_url="https://files.example.com/public/qq",
media_public_dir="public/qq",
media_ttl_seconds=0,
),
MessageBus(),
workspace=workspace,
)
channel._client = _FakeClient()
monkeypatch.setattr("nanobot.channels.qq.validate_url_target", lambda url: (True, ""))
await channel.send(
OutboundMessage(
channel="qq",
chat_id="user123",
content="hello",
media=[str(source)],
metadata={"message_id": "msg1"},
)
)
assert channel._client.api.c2c_file_calls == [
{
"openid": "user123",
"file_type": 1,
"url": "https://files.example.com/public/qq/demo.png",
"srv_send_msg": False,
}
]
assert channel._client.api.c2c_calls == [
{
"openid": "user123",
"msg_type": 7,
"content": "hello",
"media": {"file_info": "c2c-file-info", "file_uuid": "c2c-file", "ttl": 60},
"msg_id": "msg1",
"msg_seq": 2,
}
]
@pytest.mark.asyncio
async def test_send_local_media_from_out_auto_links_into_public_then_uses_c2c_file_api(
monkeypatch,
tmp_path,
) -> None:
workspace = tmp_path / "workspace"
workspace.mkdir()
out_dir = workspace / "out"
out_dir.mkdir()
source = out_dir / "outside.png"
source.write_bytes(b"\x89PNG\r\n\x1a\nfake-png")
channel = QQChannel(
QQConfig(
app_id="app",
secret="secret",
allow_from=["*"],
media_base_url="https://files.example.com/public/qq",
media_public_dir="public/qq",
media_ttl_seconds=0,
),
MessageBus(),
workspace=workspace,
)
channel._client = _FakeClient()
monkeypatch.setattr("nanobot.channels.qq.validate_url_target", lambda url: (True, ""))
await channel.send(
OutboundMessage(
channel="qq",
chat_id="user123",
content="hello",
media=[str(source)],
metadata={"message_id": "msg1"},
)
)
published = list((workspace / "public" / "qq").iterdir())
assert len(published) == 1
assert published[0].name.startswith("outside-")
assert published[0].suffix == ".png"
assert os.stat(source).st_ino == os.stat(published[0]).st_ino
assert channel._client.api.c2c_file_calls == [
{
"openid": "user123",
"file_type": 1,
"url": f"https://files.example.com/public/qq/{published[0].name}",
"srv_send_msg": False,
}
]
assert channel._client.api.c2c_calls == [
{
"openid": "user123",
"msg_type": 7,
"content": "hello",
"media": {"file_info": "c2c-file-info", "file_uuid": "c2c-file", "ttl": 60},
"msg_id": "msg1",
"msg_seq": 2,
}
]
@pytest.mark.asyncio
async def test_send_local_media_outside_public_and_out_falls_back_to_text_notice(
monkeypatch,
tmp_path,
) -> None:
workspace = tmp_path / "workspace"
workspace.mkdir()
docs_dir = workspace / "docs"
docs_dir.mkdir()
source = docs_dir / "outside.png"
source.write_bytes(b"fake-png")
channel = QQChannel(
QQConfig(
app_id="app",
secret="secret",
allow_from=["*"],
media_base_url="https://files.example.com/public/qq",
media_public_dir="public/qq",
media_ttl_seconds=0,
),
MessageBus(),
workspace=workspace,
)
channel._client = _FakeClient()
monkeypatch.setattr("nanobot.channels.qq.validate_url_target", lambda url: (True, ""))
await channel.send(
OutboundMessage(
channel="qq",
chat_id="user123",
content="hello",
media=[str(source)],
metadata={"message_id": "msg1"},
)
)
assert channel._client.api.c2c_file_calls == []
assert channel._client.api.c2c_calls == [
{
"openid": "user123",
"msg_type": 0,
"content": "hello\n[Failed to send: outside.png - QQ local media must stay under "
f"{workspace / 'public' / 'qq'} or {workspace / 'out'}]",
"msg_id": "msg1",
"msg_seq": 2,
}
]
@pytest.mark.asyncio
async def test_send_local_media_symlink_to_outside_public_dir_is_rejected(
monkeypatch,
tmp_path,
) -> None:
workspace = tmp_path / "workspace"
workspace.mkdir()
public_dir = workspace / "public" / "qq"
public_dir.mkdir(parents=True)
outside = tmp_path / "secret.png"
outside.write_bytes(b"secret")
source = public_dir / "linked.png"
source.symlink_to(outside)
channel = QQChannel(
QQConfig(
app_id="app",
secret="secret",
allow_from=["*"],
media_base_url="https://files.example.com/public/qq",
media_public_dir="public/qq",
media_ttl_seconds=0,
),
MessageBus(),
workspace=workspace,
)
channel._client = _FakeClient()
monkeypatch.setattr("nanobot.channels.qq.validate_url_target", lambda url: (True, ""))
await channel.send(
OutboundMessage(
channel="qq",
chat_id="user123",
content="hello",
media=[str(source)],
metadata={"message_id": "msg1"},
)
)
assert channel._client.api.c2c_file_calls == []
assert channel._client.api.c2c_calls == [
{
"openid": "user123",
"msg_type": 0,
"content": "hello\n[Failed to send: linked.png - QQ local media must stay under "
f"{workspace / 'public' / 'qq'} or {workspace / 'out'}]",
"msg_id": "msg1",
"msg_seq": 2,
}
]
@pytest.mark.asyncio
async def test_send_non_image_media_from_out_falls_back_to_text_notice(
monkeypatch,
tmp_path,
) -> None:
workspace = tmp_path / "workspace"
workspace.mkdir()
out_dir = workspace / "out"
out_dir.mkdir()
source = out_dir / "note.txt"
source.write_text("not an image", encoding="utf-8")
channel = QQChannel(
QQConfig(
app_id="app",
secret="secret",
allow_from=["*"],
media_base_url="https://files.example.com/public/qq",
media_public_dir="public/qq",
media_ttl_seconds=0,
),
MessageBus(),
workspace=workspace,
)
channel._client = _FakeClient()
monkeypatch.setattr("nanobot.channels.qq.validate_url_target", lambda url: (True, ""))
await channel.send(
OutboundMessage(
channel="qq",
chat_id="user123",
content="hello",
media=[str(source)],
metadata={"message_id": "msg1"},
)
)
assert channel._client.api.c2c_file_calls == []
assert channel._client.api.c2c_calls == [
{
"openid": "user123",
"msg_type": 0,
"content": "hello\n[Failed to send: note.txt - QQ local media must be an image]",
"msg_id": "msg1",
"msg_seq": 2,
}
]