From 057927cd24871c73ecd46e47291ec0aa4ac3a2ce Mon Sep 17 00:00:00 2001 From: Re-bin Date: Sat, 7 Mar 2026 16:36:12 +0000 Subject: [PATCH] fix(auth): prevent allowlist bypass via sender_id token splitting --- nanobot/channels/base.py | 5 +---- nanobot/channels/telegram.py | 19 +++++++++++++++++++ tests/test_base_channel.py | 25 +++++++++++++++++++++++++ tests/test_telegram_channel.py | 15 +++++++++++++++ 4 files changed, 60 insertions(+), 4 deletions(-) create mode 100644 tests/test_base_channel.py diff --git a/nanobot/channels/base.py b/nanobot/channels/base.py index b38fcaf..dc53ba4 100644 --- a/nanobot/channels/base.py +++ b/nanobot/channels/base.py @@ -66,10 +66,7 @@ class BaseChannel(ABC): return False if "*" in allow_list: return True - sender_str = str(sender_id) - return sender_str in allow_list or any( - p in allow_list for p in sender_str.split("|") if p - ) + return str(sender_id) in allow_list async def _handle_message( self, diff --git a/nanobot/channels/telegram.py b/nanobot/channels/telegram.py index 81cf0ca..501a3c1 100644 --- a/nanobot/channels/telegram.py +++ b/nanobot/channels/telegram.py @@ -179,6 +179,25 @@ class TelegramChannel(BaseChannel): self._media_group_tasks: dict[str, asyncio.Task] = {} self._message_threads: dict[tuple[str, int], int] = {} + def is_allowed(self, sender_id: str) -> bool: + """Preserve Telegram's legacy id|username allowlist matching.""" + if super().is_allowed(sender_id): + return True + + allow_list = getattr(self.config, "allow_from", []) + if not allow_list or "*" in allow_list: + return False + + sender_str = str(sender_id) + if sender_str.count("|") != 1: + return False + + sid, username = sender_str.split("|", 1) + if not sid.isdigit() or not username: + return False + + return sid in allow_list or username in allow_list + async def start(self) -> None: """Start the Telegram bot with long polling.""" if not self.config.token: diff --git a/tests/test_base_channel.py b/tests/test_base_channel.py new file mode 100644 index 0000000..5d10d4e --- /dev/null +++ b/tests/test_base_channel.py @@ -0,0 +1,25 @@ +from types import SimpleNamespace + +from nanobot.bus.events import OutboundMessage +from nanobot.bus.queue import MessageBus +from nanobot.channels.base import BaseChannel + + +class _DummyChannel(BaseChannel): + name = "dummy" + + async def start(self) -> None: + return None + + async def stop(self) -> None: + return None + + async def send(self, msg: OutboundMessage) -> None: + return None + + +def test_is_allowed_requires_exact_match() -> None: + channel = _DummyChannel(SimpleNamespace(allow_from=["allow@email.com"]), MessageBus()) + + assert channel.is_allowed("allow@email.com") is True + assert channel.is_allowed("attacker|allow@email.com") is False diff --git a/tests/test_telegram_channel.py b/tests/test_telegram_channel.py index acd2a96..88c3f54 100644 --- a/tests/test_telegram_channel.py +++ b/tests/test_telegram_channel.py @@ -131,6 +131,21 @@ def test_get_extension_falls_back_to_original_filename() -> None: assert channel._get_extension("file", None, "archive.tar.gz") == ".tar.gz" +def test_is_allowed_accepts_legacy_telegram_id_username_formats() -> None: + channel = TelegramChannel(TelegramConfig(allow_from=["12345", "alice", "67890|bob"]), MessageBus()) + + assert channel.is_allowed("12345|carol") is True + assert channel.is_allowed("99999|alice") is True + assert channel.is_allowed("67890|bob") is True + + +def test_is_allowed_rejects_invalid_legacy_telegram_sender_shapes() -> None: + channel = TelegramChannel(TelegramConfig(allow_from=["alice"]), MessageBus()) + + assert channel.is_allowed("attacker|alice|extra") is False + assert channel.is_allowed("not-a-number|alice") is False + + @pytest.mark.asyncio async def test_send_progress_keeps_message_in_topic() -> None: config = TelegramConfig(enabled=True, token="123:abc", allow_from=["*"])