Add media download functionality
This commit is contained in:
@@ -1,5 +1,7 @@
|
||||
import asyncio
|
||||
from pathlib import Path
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import AsyncMock
|
||||
|
||||
import pytest
|
||||
|
||||
@@ -43,6 +45,12 @@ class _FakeBot:
|
||||
async def send_chat_action(self, **kwargs) -> None:
|
||||
pass
|
||||
|
||||
async def get_file(self, file_id: str):
|
||||
"""Return a fake file that 'downloads' to a path (for reply-to-media tests)."""
|
||||
async def _fake_download(path) -> None:
|
||||
pass
|
||||
return SimpleNamespace(download_to_drive=_fake_download)
|
||||
|
||||
|
||||
class _FakeApp:
|
||||
def __init__(self, on_start_polling) -> None:
|
||||
@@ -389,14 +397,14 @@ def test_extract_reply_context_no_text_no_media() -> None:
|
||||
|
||||
|
||||
def test_extract_reply_context_reply_to_photo() -> None:
|
||||
"""When reply has photo but no text/caption, return (image) placeholder."""
|
||||
"""When reply has photo but no text/caption, return (image — not attached) placeholder."""
|
||||
reply = SimpleNamespace(
|
||||
text=None,
|
||||
caption=None,
|
||||
photo=[SimpleNamespace(file_id="x")],
|
||||
)
|
||||
message = SimpleNamespace(reply_to_message=reply)
|
||||
assert TelegramChannel._extract_reply_context(message) == "[Reply to: (image)]"
|
||||
assert TelegramChannel._extract_reply_context(message) == "[Reply to: (image — not attached)]"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@@ -420,3 +428,125 @@ async def test_on_message_includes_reply_context() -> None:
|
||||
assert len(handled) == 1
|
||||
assert handled[0]["content"].startswith("[Reply to: Hello]")
|
||||
assert "translate this" in handled[0]["content"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_download_message_media_returns_path_when_download_succeeds(
|
||||
monkeypatch, tmp_path
|
||||
) -> None:
|
||||
"""_download_message_media returns (paths, content_parts) when bot.get_file and download succeed."""
|
||||
media_dir = tmp_path / "media" / "telegram"
|
||||
media_dir.mkdir(parents=True)
|
||||
monkeypatch.setattr(
|
||||
"nanobot.channels.telegram.get_media_dir",
|
||||
lambda channel=None: media_dir if channel else tmp_path / "media",
|
||||
)
|
||||
|
||||
channel = TelegramChannel(
|
||||
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]),
|
||||
MessageBus(),
|
||||
)
|
||||
channel._app = _FakeApp(lambda: None)
|
||||
channel._app.bot.get_file = AsyncMock(
|
||||
return_value=SimpleNamespace(download_to_drive=AsyncMock(return_value=None))
|
||||
)
|
||||
|
||||
msg = SimpleNamespace(
|
||||
photo=[SimpleNamespace(file_id="fid123", mime_type="image/jpeg")],
|
||||
voice=None,
|
||||
audio=None,
|
||||
document=None,
|
||||
video=None,
|
||||
video_note=None,
|
||||
animation=None,
|
||||
)
|
||||
paths, parts = await channel._download_message_media(msg)
|
||||
assert len(paths) == 1
|
||||
assert len(parts) == 1
|
||||
assert "fid123" in paths[0]
|
||||
assert "[image:" in parts[0]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_on_message_attaches_reply_to_media_when_available(monkeypatch, tmp_path) -> None:
|
||||
"""When user replies to a message with media, that media is downloaded and attached to the turn."""
|
||||
media_dir = tmp_path / "media" / "telegram"
|
||||
media_dir.mkdir(parents=True)
|
||||
monkeypatch.setattr(
|
||||
"nanobot.channels.telegram.get_media_dir",
|
||||
lambda channel=None: media_dir if channel else tmp_path / "media",
|
||||
)
|
||||
|
||||
channel = TelegramChannel(
|
||||
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"], group_policy="open"),
|
||||
MessageBus(),
|
||||
)
|
||||
app = _FakeApp(lambda: None)
|
||||
app.bot.get_file = AsyncMock(
|
||||
return_value=SimpleNamespace(download_to_drive=AsyncMock(return_value=None))
|
||||
)
|
||||
channel._app = app
|
||||
handled = []
|
||||
async def capture_handle(**kwargs) -> None:
|
||||
handled.append(kwargs)
|
||||
channel._handle_message = capture_handle
|
||||
channel._start_typing = lambda _chat_id: None
|
||||
|
||||
reply_with_photo = SimpleNamespace(
|
||||
text=None,
|
||||
caption=None,
|
||||
photo=[SimpleNamespace(file_id="reply_photo_fid", mime_type="image/jpeg")],
|
||||
document=None,
|
||||
voice=None,
|
||||
audio=None,
|
||||
video=None,
|
||||
video_note=None,
|
||||
animation=None,
|
||||
)
|
||||
update = _make_telegram_update(
|
||||
text="what is the image?",
|
||||
reply_to_message=reply_with_photo,
|
||||
)
|
||||
await channel._on_message(update, None)
|
||||
|
||||
assert len(handled) == 1
|
||||
assert handled[0]["content"].startswith("[Reply to: [image:")
|
||||
assert "what is the image?" in handled[0]["content"]
|
||||
assert len(handled[0]["media"]) == 1
|
||||
assert "reply_photo_fid" in handled[0]["media"][0]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_on_message_reply_to_media_fallback_when_download_fails() -> None:
|
||||
"""When reply has media but download fails, keep placeholder and do not attach."""
|
||||
channel = TelegramChannel(
|
||||
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"], group_policy="open"),
|
||||
MessageBus(),
|
||||
)
|
||||
channel._app = _FakeApp(lambda: None)
|
||||
# No get_file on bot -> download will fail
|
||||
channel._app.bot.get_file = None
|
||||
handled = []
|
||||
async def capture_handle(**kwargs) -> None:
|
||||
handled.append(kwargs)
|
||||
channel._handle_message = capture_handle
|
||||
channel._start_typing = lambda _chat_id: None
|
||||
|
||||
reply_with_photo = SimpleNamespace(
|
||||
text=None,
|
||||
caption=None,
|
||||
photo=[SimpleNamespace(file_id="x", mime_type="image/jpeg")],
|
||||
document=None,
|
||||
voice=None,
|
||||
audio=None,
|
||||
video=None,
|
||||
video_note=None,
|
||||
animation=None,
|
||||
)
|
||||
update = _make_telegram_update(text="what is this?", reply_to_message=reply_with_photo)
|
||||
await channel._on_message(update, None)
|
||||
|
||||
assert len(handled) == 1
|
||||
assert "[Reply to: (image — not attached)]" in handled[0]["content"]
|
||||
assert "what is this?" in handled[0]["content"]
|
||||
assert handled[0]["media"] == []
|
||||
|
||||
Reference in New Issue
Block a user