Merge PR #1677: fix(auth): prevent allowlist bypass via sender_id token splitting
fix(auth): prevent allowlist bypass via sender_id token splitting
This commit is contained in:
@@ -66,10 +66,7 @@ class BaseChannel(ABC):
|
|||||||
return False
|
return False
|
||||||
if "*" in allow_list:
|
if "*" in allow_list:
|
||||||
return True
|
return True
|
||||||
sender_str = str(sender_id)
|
return str(sender_id) in allow_list
|
||||||
return sender_str in allow_list or any(
|
|
||||||
p in allow_list for p in sender_str.split("|") if p
|
|
||||||
)
|
|
||||||
|
|
||||||
async def _handle_message(
|
async def _handle_message(
|
||||||
self,
|
self,
|
||||||
|
|||||||
@@ -179,6 +179,25 @@ class TelegramChannel(BaseChannel):
|
|||||||
self._media_group_tasks: dict[str, asyncio.Task] = {}
|
self._media_group_tasks: dict[str, asyncio.Task] = {}
|
||||||
self._message_threads: dict[tuple[str, int], int] = {}
|
self._message_threads: dict[tuple[str, int], int] = {}
|
||||||
|
|
||||||
|
def is_allowed(self, sender_id: str) -> bool:
|
||||||
|
"""Preserve Telegram's legacy id|username allowlist matching."""
|
||||||
|
if super().is_allowed(sender_id):
|
||||||
|
return True
|
||||||
|
|
||||||
|
allow_list = getattr(self.config, "allow_from", [])
|
||||||
|
if not allow_list or "*" in allow_list:
|
||||||
|
return False
|
||||||
|
|
||||||
|
sender_str = str(sender_id)
|
||||||
|
if sender_str.count("|") != 1:
|
||||||
|
return False
|
||||||
|
|
||||||
|
sid, username = sender_str.split("|", 1)
|
||||||
|
if not sid.isdigit() or not username:
|
||||||
|
return False
|
||||||
|
|
||||||
|
return sid in allow_list or username in allow_list
|
||||||
|
|
||||||
async def start(self) -> None:
|
async def start(self) -> None:
|
||||||
"""Start the Telegram bot with long polling."""
|
"""Start the Telegram bot with long polling."""
|
||||||
if not self.config.token:
|
if not self.config.token:
|
||||||
|
|||||||
25
tests/test_base_channel.py
Normal file
25
tests/test_base_channel.py
Normal file
@@ -0,0 +1,25 @@
|
|||||||
|
from types import SimpleNamespace
|
||||||
|
|
||||||
|
from nanobot.bus.events import OutboundMessage
|
||||||
|
from nanobot.bus.queue import MessageBus
|
||||||
|
from nanobot.channels.base import BaseChannel
|
||||||
|
|
||||||
|
|
||||||
|
class _DummyChannel(BaseChannel):
|
||||||
|
name = "dummy"
|
||||||
|
|
||||||
|
async def start(self) -> None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def stop(self) -> None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def send(self, msg: OutboundMessage) -> None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def test_is_allowed_requires_exact_match() -> None:
|
||||||
|
channel = _DummyChannel(SimpleNamespace(allow_from=["allow@email.com"]), MessageBus())
|
||||||
|
|
||||||
|
assert channel.is_allowed("allow@email.com") is True
|
||||||
|
assert channel.is_allowed("attacker|allow@email.com") is False
|
||||||
@@ -131,6 +131,21 @@ def test_get_extension_falls_back_to_original_filename() -> None:
|
|||||||
assert channel._get_extension("file", None, "archive.tar.gz") == ".tar.gz"
|
assert channel._get_extension("file", None, "archive.tar.gz") == ".tar.gz"
|
||||||
|
|
||||||
|
|
||||||
|
def test_is_allowed_accepts_legacy_telegram_id_username_formats() -> None:
|
||||||
|
channel = TelegramChannel(TelegramConfig(allow_from=["12345", "alice", "67890|bob"]), MessageBus())
|
||||||
|
|
||||||
|
assert channel.is_allowed("12345|carol") is True
|
||||||
|
assert channel.is_allowed("99999|alice") is True
|
||||||
|
assert channel.is_allowed("67890|bob") is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_is_allowed_rejects_invalid_legacy_telegram_sender_shapes() -> None:
|
||||||
|
channel = TelegramChannel(TelegramConfig(allow_from=["alice"]), MessageBus())
|
||||||
|
|
||||||
|
assert channel.is_allowed("attacker|alice|extra") is False
|
||||||
|
assert channel.is_allowed("not-a-number|alice") is False
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_send_progress_keeps_message_in_topic() -> None:
|
async def test_send_progress_keeps_message_in_topic() -> None:
|
||||||
config = TelegramConfig(enabled=True, token="123:abc", allow_from=["*"])
|
config = TelegramConfig(enabled=True, token="123:abc", allow_from=["*"])
|
||||||
|
|||||||
Reference in New Issue
Block a user