import asyncio from types import SimpleNamespace import pytest from nanobot.bus.events import OutboundMessage from nanobot.bus.queue import MessageBus from nanobot.channels.telegram import TELEGRAM_REPLY_CONTEXT_MAX_LEN, TelegramChannel from nanobot.config.schema import TelegramConfig class _FakeHTTPXRequest: instances: list["_FakeHTTPXRequest"] = [] def __init__(self, **kwargs) -> None: self.kwargs = kwargs self.__class__.instances.append(self) class _FakeUpdater: def __init__(self, on_start_polling) -> None: self._on_start_polling = on_start_polling async def start_polling(self, **kwargs) -> None: self._on_start_polling() class _FakeBot: def __init__(self) -> None: self.sent_messages: list[dict] = [] self.get_me_calls = 0 async def get_me(self): self.get_me_calls += 1 return SimpleNamespace(id=999, username="nanobot_test") async def set_my_commands(self, commands) -> None: self.commands = commands async def send_message(self, **kwargs) -> None: self.sent_messages.append(kwargs) async def send_chat_action(self, **kwargs) -> None: pass class _FakeApp: def __init__(self, on_start_polling) -> None: self.bot = _FakeBot() self.updater = _FakeUpdater(on_start_polling) self.handlers = [] self.error_handlers = [] def add_error_handler(self, handler) -> None: self.error_handlers.append(handler) def add_handler(self, handler) -> None: self.handlers.append(handler) async def initialize(self) -> None: pass async def start(self) -> None: pass class _FakeBuilder: def __init__(self, app: _FakeApp) -> None: self.app = app self.token_value = None self.request_value = None self.get_updates_request_value = None def token(self, token: str): self.token_value = token return self def request(self, request): self.request_value = request return self def get_updates_request(self, request): self.get_updates_request_value = request return self def proxy(self, _proxy): raise AssertionError("builder.proxy should not be called when request is set") def get_updates_proxy(self, _proxy): raise AssertionError("builder.get_updates_proxy should not be called when request is set") def build(self): return self.app def _make_telegram_update( *, chat_type: str = "group", text: str | None = None, caption: str | None = None, entities=None, caption_entities=None, reply_to_message=None, ): user = SimpleNamespace(id=12345, username="alice", first_name="Alice") message = SimpleNamespace( chat=SimpleNamespace(type=chat_type, is_forum=False), chat_id=-100123, text=text, caption=caption, entities=entities or [], caption_entities=caption_entities or [], reply_to_message=reply_to_message, photo=None, voice=None, audio=None, document=None, media_group_id=None, message_thread_id=None, message_id=1, ) return SimpleNamespace(message=message, effective_user=user) @pytest.mark.asyncio async def test_start_uses_request_proxy_without_builder_proxy(monkeypatch) -> None: config = TelegramConfig( enabled=True, token="123:abc", allow_from=["*"], proxy="http://127.0.0.1:7890", ) bus = MessageBus() channel = TelegramChannel(config, bus) app = _FakeApp(lambda: setattr(channel, "_running", False)) builder = _FakeBuilder(app) monkeypatch.setattr("nanobot.channels.telegram.HTTPXRequest", _FakeHTTPXRequest) monkeypatch.setattr( "nanobot.channels.telegram.Application", SimpleNamespace(builder=lambda: builder), ) await channel.start() assert len(_FakeHTTPXRequest.instances) == 1 assert _FakeHTTPXRequest.instances[0].kwargs["proxy"] == config.proxy assert builder.request_value is _FakeHTTPXRequest.instances[0] assert builder.get_updates_request_value is _FakeHTTPXRequest.instances[0] def test_derive_topic_session_key_uses_thread_id() -> None: message = SimpleNamespace( chat=SimpleNamespace(type="supergroup"), chat_id=-100123, message_thread_id=42, ) assert TelegramChannel._derive_topic_session_key(message) == "telegram:-100123:topic:42" def test_get_extension_falls_back_to_original_filename() -> None: channel = TelegramChannel(TelegramConfig(), MessageBus()) assert channel._get_extension("file", None, "report.pdf") == ".pdf" assert channel._get_extension("file", None, "archive.tar.gz") == ".tar.gz" def test_telegram_group_policy_defaults_to_mention() -> None: assert TelegramConfig().group_policy == "mention" def test_is_allowed_accepts_legacy_telegram_id_username_formats() -> None: channel = TelegramChannel(TelegramConfig(allow_from=["12345", "alice", "67890|bob"]), MessageBus()) assert channel.is_allowed("12345|carol") is True assert channel.is_allowed("99999|alice") is True assert channel.is_allowed("67890|bob") is True def test_is_allowed_rejects_invalid_legacy_telegram_sender_shapes() -> None: channel = TelegramChannel(TelegramConfig(allow_from=["alice"]), MessageBus()) assert channel.is_allowed("attacker|alice|extra") is False assert channel.is_allowed("not-a-number|alice") is False @pytest.mark.asyncio async def test_send_progress_keeps_message_in_topic() -> None: config = TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]) channel = TelegramChannel(config, MessageBus()) channel._app = _FakeApp(lambda: None) await channel.send( OutboundMessage( channel="telegram", chat_id="123", content="hello", metadata={"_progress": True, "message_thread_id": 42}, ) ) assert channel._app.bot.sent_messages[0]["message_thread_id"] == 42 @pytest.mark.asyncio async def test_send_reply_infers_topic_from_message_id_cache() -> None: config = TelegramConfig(enabled=True, token="123:abc", allow_from=["*"], reply_to_message=True) channel = TelegramChannel(config, MessageBus()) channel._app = _FakeApp(lambda: None) channel._message_threads[("123", 10)] = 42 await channel.send( OutboundMessage( channel="telegram", chat_id="123", content="hello", metadata={"message_id": 10}, ) ) assert channel._app.bot.sent_messages[0]["message_thread_id"] == 42 assert channel._app.bot.sent_messages[0]["reply_parameters"].message_id == 10 @pytest.mark.asyncio async def test_group_policy_mention_ignores_unmentioned_group_message() -> None: channel = TelegramChannel( TelegramConfig(enabled=True, token="123:abc", allow_from=["*"], group_policy="mention"), MessageBus(), ) channel._app = _FakeApp(lambda: None) handled = [] async def capture_handle(**kwargs) -> None: handled.append(kwargs) channel._handle_message = capture_handle channel._start_typing = lambda _chat_id: None await channel._on_message(_make_telegram_update(text="hello everyone"), None) assert handled == [] assert channel._app.bot.get_me_calls == 1 @pytest.mark.asyncio async def test_group_policy_mention_accepts_text_mention_and_caches_bot_identity() -> None: channel = TelegramChannel( TelegramConfig(enabled=True, token="123:abc", allow_from=["*"], group_policy="mention"), MessageBus(), ) channel._app = _FakeApp(lambda: None) handled = [] async def capture_handle(**kwargs) -> None: handled.append(kwargs) channel._handle_message = capture_handle channel._start_typing = lambda _chat_id: None mention = SimpleNamespace(type="mention", offset=0, length=13) await channel._on_message(_make_telegram_update(text="@nanobot_test hi", entities=[mention]), None) await channel._on_message(_make_telegram_update(text="@nanobot_test again", entities=[mention]), None) assert len(handled) == 2 assert channel._app.bot.get_me_calls == 1 @pytest.mark.asyncio async def test_group_policy_mention_accepts_caption_mention() -> None: channel = TelegramChannel( TelegramConfig(enabled=True, token="123:abc", allow_from=["*"], group_policy="mention"), MessageBus(), ) channel._app = _FakeApp(lambda: None) handled = [] async def capture_handle(**kwargs) -> None: handled.append(kwargs) channel._handle_message = capture_handle channel._start_typing = lambda _chat_id: None mention = SimpleNamespace(type="mention", offset=0, length=13) await channel._on_message( _make_telegram_update(caption="@nanobot_test photo", caption_entities=[mention]), None, ) assert len(handled) == 1 assert handled[0]["content"] == "@nanobot_test photo" @pytest.mark.asyncio async def test_group_policy_mention_accepts_reply_to_bot() -> None: channel = TelegramChannel( TelegramConfig(enabled=True, token="123:abc", allow_from=["*"], group_policy="mention"), MessageBus(), ) channel._app = _FakeApp(lambda: None) handled = [] async def capture_handle(**kwargs) -> None: handled.append(kwargs) channel._handle_message = capture_handle channel._start_typing = lambda _chat_id: None reply = SimpleNamespace(from_user=SimpleNamespace(id=999)) await channel._on_message(_make_telegram_update(text="reply", reply_to_message=reply), None) assert len(handled) == 1 @pytest.mark.asyncio async def test_group_policy_open_accepts_plain_group_message() -> None: channel = TelegramChannel( TelegramConfig(enabled=True, token="123:abc", allow_from=["*"], group_policy="open"), MessageBus(), ) channel._app = _FakeApp(lambda: None) handled = [] async def capture_handle(**kwargs) -> None: handled.append(kwargs) channel._handle_message = capture_handle channel._start_typing = lambda _chat_id: None await channel._on_message(_make_telegram_update(text="hello group"), None) assert len(handled) == 1 assert channel._app.bot.get_me_calls == 0 def test_extract_reply_context_no_reply() -> None: """When there is no reply_to_message, _extract_reply_context returns None.""" message = SimpleNamespace(reply_to_message=None) assert TelegramChannel._extract_reply_context(message) is None def test_extract_reply_context_with_text() -> None: """When reply has text, return prefixed string.""" reply = SimpleNamespace(text="Hello world", caption=None) message = SimpleNamespace(reply_to_message=reply) assert TelegramChannel._extract_reply_context(message) == "[Reply to: Hello world]" def test_extract_reply_context_with_caption_only() -> None: """When reply has only caption (no text), caption is used.""" reply = SimpleNamespace(text=None, caption="Photo caption") message = SimpleNamespace(reply_to_message=reply) assert TelegramChannel._extract_reply_context(message) == "[Reply to: Photo caption]" def test_extract_reply_context_truncation() -> None: """Reply text is truncated at TELEGRAM_REPLY_CONTEXT_MAX_LEN.""" long_text = "x" * (TELEGRAM_REPLY_CONTEXT_MAX_LEN + 100) reply = SimpleNamespace(text=long_text, caption=None) message = SimpleNamespace(reply_to_message=reply) result = TelegramChannel._extract_reply_context(message) assert result is not None assert result.startswith("[Reply to: ") assert result.endswith("...]") assert len(result) == len("[Reply to: ]") + TELEGRAM_REPLY_CONTEXT_MAX_LEN + len("...") def test_extract_reply_context_no_text_no_media() -> None: """When reply has no text/caption and no media, return (no text) placeholder.""" reply = SimpleNamespace( text=None, caption=None, photo=None, document=None, voice=None, video_note=None, video=None, audio=None, animation=None, ) message = SimpleNamespace(reply_to_message=reply) assert TelegramChannel._extract_reply_context(message) == "[Reply to: (no text)]" def test_extract_reply_context_reply_to_photo() -> None: """When reply has photo but no text/caption, return (image) placeholder.""" reply = SimpleNamespace( text=None, caption=None, photo=[SimpleNamespace(file_id="x")], ) message = SimpleNamespace(reply_to_message=reply) assert TelegramChannel._extract_reply_context(message) == "[Reply to: (image)]" @pytest.mark.asyncio async def test_on_message_includes_reply_context() -> None: """When user replies to a message, content passed to bus starts with reply context.""" channel = TelegramChannel( TelegramConfig(enabled=True, token="123:abc", allow_from=["*"], group_policy="open"), MessageBus(), ) channel._app = _FakeApp(lambda: None) handled = [] async def capture_handle(**kwargs) -> None: handled.append(kwargs) channel._handle_message = capture_handle channel._start_typing = lambda _chat_id: None reply = SimpleNamespace(text="Hello", message_id=2, from_user=SimpleNamespace(id=1)) update = _make_telegram_update(text="translate this", reply_to_message=reply) await channel._on_message(update, None) assert len(handled) == 1 assert handled[0]["content"].startswith("[Reply to: Hello]") assert "translate this" in handled[0]["content"]