- Fix _read_media_bytes treating local paths as URLs: local file handling code was dead code placed after an early return inside the HTTP try/except block. Restructure to check for local paths (plain path or file:// URI) before URL validation, so files like /home/.../.nanobot/workspace/generated_image.svg can be read and sent correctly. - Add .svg to _IMAGE_EXTS so SVG files are uploaded as file_type=1 (image) instead of file_type=4 (file). - Add tests for local path, file:// URI, and missing file cases. Fixes: https://github.com/HKUDS/nanobot/pull/1667#issuecomment-4096400955 Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
163 lines
4.6 KiB
Python
163 lines
4.6 KiB
Python
import tempfile
|
|
from pathlib import Path
|
|
from types import SimpleNamespace
|
|
|
|
import pytest
|
|
|
|
from nanobot.bus.events import OutboundMessage
|
|
from nanobot.bus.queue import MessageBus
|
|
from nanobot.channels.qq import QQChannel, QQConfig
|
|
|
|
|
|
class _FakeApi:
|
|
def __init__(self) -> None:
|
|
self.c2c_calls: list[dict] = []
|
|
self.group_calls: list[dict] = []
|
|
|
|
async def post_c2c_message(self, **kwargs) -> None:
|
|
self.c2c_calls.append(kwargs)
|
|
|
|
async def post_group_message(self, **kwargs) -> None:
|
|
self.group_calls.append(kwargs)
|
|
|
|
|
|
class _FakeClient:
|
|
def __init__(self) -> None:
|
|
self.api = _FakeApi()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_on_group_message_routes_to_group_chat_id() -> None:
|
|
channel = QQChannel(QQConfig(app_id="app", secret="secret", allow_from=["user1"]), MessageBus())
|
|
|
|
data = SimpleNamespace(
|
|
id="msg1",
|
|
content="hello",
|
|
group_openid="group123",
|
|
author=SimpleNamespace(member_openid="user1"),
|
|
attachments=[],
|
|
)
|
|
|
|
await channel._on_message(data, is_group=True)
|
|
|
|
msg = await channel.bus.consume_inbound()
|
|
assert msg.sender_id == "user1"
|
|
assert msg.chat_id == "group123"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_group_message_uses_plain_text_group_api_with_msg_seq() -> None:
|
|
channel = QQChannel(QQConfig(app_id="app", secret="secret", allow_from=["*"]), MessageBus())
|
|
channel._client = _FakeClient()
|
|
channel._chat_type_cache["group123"] = "group"
|
|
|
|
await channel.send(
|
|
OutboundMessage(
|
|
channel="qq",
|
|
chat_id="group123",
|
|
content="hello",
|
|
metadata={"message_id": "msg1"},
|
|
)
|
|
)
|
|
|
|
assert len(channel._client.api.group_calls) == 1
|
|
call = channel._client.api.group_calls[0]
|
|
assert call == {
|
|
"group_openid": "group123",
|
|
"msg_type": 0,
|
|
"content": "hello",
|
|
"msg_id": "msg1",
|
|
"msg_seq": 2,
|
|
}
|
|
assert not channel._client.api.c2c_calls
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_c2c_message_uses_plain_text_c2c_api_with_msg_seq() -> None:
|
|
channel = QQChannel(QQConfig(app_id="app", secret="secret", allow_from=["*"]), MessageBus())
|
|
channel._client = _FakeClient()
|
|
|
|
await channel.send(
|
|
OutboundMessage(
|
|
channel="qq",
|
|
chat_id="user123",
|
|
content="hello",
|
|
metadata={"message_id": "msg1"},
|
|
)
|
|
)
|
|
|
|
assert len(channel._client.api.c2c_calls) == 1
|
|
call = channel._client.api.c2c_calls[0]
|
|
assert call == {
|
|
"openid": "user123",
|
|
"msg_type": 0,
|
|
"content": "hello",
|
|
"msg_id": "msg1",
|
|
"msg_seq": 2,
|
|
}
|
|
assert not channel._client.api.group_calls
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_group_message_uses_markdown_when_configured() -> None:
|
|
channel = QQChannel(
|
|
QQConfig(app_id="app", secret="secret", allow_from=["*"], msg_format="markdown"),
|
|
MessageBus(),
|
|
)
|
|
channel._client = _FakeClient()
|
|
channel._chat_type_cache["group123"] = "group"
|
|
|
|
await channel.send(
|
|
OutboundMessage(
|
|
channel="qq",
|
|
chat_id="group123",
|
|
content="**hello**",
|
|
metadata={"message_id": "msg1"},
|
|
)
|
|
)
|
|
|
|
assert len(channel._client.api.group_calls) == 1
|
|
call = channel._client.api.group_calls[0]
|
|
assert call == {
|
|
"group_openid": "group123",
|
|
"msg_type": 2,
|
|
"markdown": {"content": "**hello**"},
|
|
"msg_id": "msg1",
|
|
"msg_seq": 2,
|
|
}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_read_media_bytes_local_path() -> None:
|
|
channel = QQChannel(QQConfig(app_id="app", secret="secret"), MessageBus())
|
|
|
|
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as f:
|
|
f.write(b"\x89PNG\r\n")
|
|
tmp_path = f.name
|
|
|
|
data, filename = await channel._read_media_bytes(tmp_path)
|
|
assert data == b"\x89PNG\r\n"
|
|
assert filename == Path(tmp_path).name
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_read_media_bytes_file_uri() -> None:
|
|
channel = QQChannel(QQConfig(app_id="app", secret="secret"), MessageBus())
|
|
|
|
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f:
|
|
f.write(b"JFIF")
|
|
tmp_path = f.name
|
|
|
|
data, filename = await channel._read_media_bytes(f"file://{tmp_path}")
|
|
assert data == b"JFIF"
|
|
assert filename == Path(tmp_path).name
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_read_media_bytes_missing_file() -> None:
|
|
channel = QQChannel(QQConfig(app_id="app", secret="secret"), MessageBus())
|
|
|
|
data, filename = await channel._read_media_bytes("/nonexistent/path/image.png")
|
|
assert data is None
|
|
assert filename is None
|