fix(auth): prevent allowlist bypass via sender_id token splitting

This commit is contained in:
Re-bin
2026-03-07 16:36:12 +00:00
parent 74066e2823
commit 057927cd24
4 changed files with 60 additions and 4 deletions

View File

@@ -66,10 +66,7 @@ class BaseChannel(ABC):
return False return False
if "*" in allow_list: if "*" in allow_list:
return True return True
sender_str = str(sender_id) return str(sender_id) in allow_list
return sender_str in allow_list or any(
p in allow_list for p in sender_str.split("|") if p
)
async def _handle_message( async def _handle_message(
self, self,

View File

@@ -179,6 +179,25 @@ class TelegramChannel(BaseChannel):
self._media_group_tasks: dict[str, asyncio.Task] = {} self._media_group_tasks: dict[str, asyncio.Task] = {}
self._message_threads: dict[tuple[str, int], int] = {} self._message_threads: dict[tuple[str, int], int] = {}
def is_allowed(self, sender_id: str) -> bool:
"""Preserve Telegram's legacy id|username allowlist matching."""
if super().is_allowed(sender_id):
return True
allow_list = getattr(self.config, "allow_from", [])
if not allow_list or "*" in allow_list:
return False
sender_str = str(sender_id)
if sender_str.count("|") != 1:
return False
sid, username = sender_str.split("|", 1)
if not sid.isdigit() or not username:
return False
return sid in allow_list or username in allow_list
async def start(self) -> None: async def start(self) -> None:
"""Start the Telegram bot with long polling.""" """Start the Telegram bot with long polling."""
if not self.config.token: if not self.config.token:

View File

@@ -0,0 +1,25 @@
from types import SimpleNamespace
from nanobot.bus.events import OutboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.channels.base import BaseChannel
class _DummyChannel(BaseChannel):
name = "dummy"
async def start(self) -> None:
return None
async def stop(self) -> None:
return None
async def send(self, msg: OutboundMessage) -> None:
return None
def test_is_allowed_requires_exact_match() -> None:
channel = _DummyChannel(SimpleNamespace(allow_from=["allow@email.com"]), MessageBus())
assert channel.is_allowed("allow@email.com") is True
assert channel.is_allowed("attacker|allow@email.com") is False

View File

@@ -131,6 +131,21 @@ def test_get_extension_falls_back_to_original_filename() -> None:
assert channel._get_extension("file", None, "archive.tar.gz") == ".tar.gz" assert channel._get_extension("file", None, "archive.tar.gz") == ".tar.gz"
def test_is_allowed_accepts_legacy_telegram_id_username_formats() -> None:
channel = TelegramChannel(TelegramConfig(allow_from=["12345", "alice", "67890|bob"]), MessageBus())
assert channel.is_allowed("12345|carol") is True
assert channel.is_allowed("99999|alice") is True
assert channel.is_allowed("67890|bob") is True
def test_is_allowed_rejects_invalid_legacy_telegram_sender_shapes() -> None:
channel = TelegramChannel(TelegramConfig(allow_from=["alice"]), MessageBus())
assert channel.is_allowed("attacker|alice|extra") is False
assert channel.is_allowed("not-a-number|alice") is False
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_send_progress_keeps_message_in_topic() -> None: async def test_send_progress_keeps_message_in_topic() -> None:
config = TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]) config = TelegramConfig(enabled=True, token="123:abc", allow_from=["*"])