diff --git a/nanobot/channels/qq.py b/nanobot/channels/qq.py index 5dae01b..7442e10 100644 --- a/nanobot/channels/qq.py +++ b/nanobot/channels/qq.py @@ -74,6 +74,7 @@ _IMAGE_EXTS = { ".tif", ".tiff", ".ico", + ".svg", } # Replace unsafe characters with "_", keep Chinese and common safe punctuation. @@ -367,8 +368,27 @@ class QQChannel(BaseChannel): if not media_ref: return None, None - ok, err = validate_url_target(media_ref) + # Local file: plain path or file:// URI + if not media_ref.startswith("http://") and not media_ref.startswith("https://"): + try: + if media_ref.startswith("file://"): + parsed = urlparse(media_ref) + local_path = Path(unquote(parsed.path)) + else: + local_path = Path(os.path.expanduser(media_ref)) + if not local_path.is_file(): + logger.warning("QQ outbound media file not found: {}", str(local_path)) + return None, None + + data = await asyncio.to_thread(local_path.read_bytes) + return data, local_path.name + except Exception as e: + logger.warning("QQ outbound media read error ref={} err={}", media_ref, e) + return None, None + + # Remote URL + ok, err = validate_url_target(media_ref) if not ok: logger.warning("QQ outbound media URL validation failed url={} err={}", media_ref, err) return None, None @@ -393,24 +413,6 @@ class QQChannel(BaseChannel): logger.warning("QQ outbound media download error url={} err={}", media_ref, e) return None, None - # Local file - try: - if media_ref.startswith("file://"): - parsed = urlparse(media_ref) - local_path = Path(unquote(parsed.path)) - else: - local_path = Path(os.path.expanduser(media_ref)) - - if not local_path.is_file(): - logger.warning("QQ outbound media file not found: {}", str(local_path)) - return None, None - - data = await asyncio.to_thread(local_path.read_bytes) - return data, local_path.name - except Exception as e: - logger.warning("QQ outbound media read error ref={} err={}", media_ref, e) - return None, None - # https://github.com/tencent-connect/botpy/issues/198 # https://bot.q.qq.com/wiki/develop/api-v2/server-inter/message/send-receive/rich-media.html async def _post_base64file( @@ -459,8 +461,7 @@ class QQChannel(BaseChannel): self._chat_type_cache[chat_id] = "group" else: chat_id = str( - getattr(data.author, "id", None) - or getattr(data.author, "user_openid", "unknown") + getattr(data.author, "id", None) or getattr(data.author, "user_openid", "unknown") ) user_id = chat_id self._chat_type_cache[chat_id] = "c2c" @@ -474,15 +475,9 @@ class QQChannel(BaseChannel): # Compose content that always contains actionable saved paths if recv_lines: - tag = ( - "[Image]" - if any(_is_image_name(Path(p).name) for p in media_paths) - else "[File]" - ) + tag = "[Image]" if any(_is_image_name(Path(p).name) for p in media_paths) else "[File]" file_block = "Received files:\n" + "\n".join(recv_lines) - content = ( - f"{content}\n\n{file_block}".strip() if content else f"{tag}\n{file_block}" - ) + content = f"{content}\n\n{file_block}".strip() if content else f"{tag}\n{file_block}" if not content and not media_paths: return diff --git a/tests/test_qq_channel.py b/tests/test_qq_channel.py index ab09ff3..ab9afcb 100644 --- a/tests/test_qq_channel.py +++ b/tests/test_qq_channel.py @@ -1,11 +1,12 @@ +import tempfile +from pathlib import Path from types import SimpleNamespace import pytest from nanobot.bus.events import OutboundMessage from nanobot.bus.queue import MessageBus -from nanobot.channels.qq import QQChannel -from nanobot.channels.qq import QQConfig +from nanobot.channels.qq import QQChannel, QQConfig class _FakeApi: @@ -124,3 +125,38 @@ async def test_send_group_message_uses_markdown_when_configured() -> None: "msg_id": "msg1", "msg_seq": 2, } + + +@pytest.mark.asyncio +async def test_read_media_bytes_local_path() -> None: + channel = QQChannel(QQConfig(app_id="app", secret="secret"), MessageBus()) + + with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as f: + f.write(b"\x89PNG\r\n") + tmp_path = f.name + + data, filename = await channel._read_media_bytes(tmp_path) + assert data == b"\x89PNG\r\n" + assert filename == Path(tmp_path).name + + +@pytest.mark.asyncio +async def test_read_media_bytes_file_uri() -> None: + channel = QQChannel(QQConfig(app_id="app", secret="secret"), MessageBus()) + + with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f: + f.write(b"JFIF") + tmp_path = f.name + + data, filename = await channel._read_media_bytes(f"file://{tmp_path}") + assert data == b"JFIF" + assert filename == Path(tmp_path).name + + +@pytest.mark.asyncio +async def test_read_media_bytes_missing_file() -> None: + channel = QQChannel(QQConfig(app_id="app", secret="secret"), MessageBus()) + + data, filename = await channel._read_media_bytes("/nonexistent/path/image.png") + assert data is None + assert filename is None