security: deny-by-default allowFrom with wildcard support and startup validation

This commit is contained in:
Re-bin
2026-03-02 06:13:37 +00:00
parent d447be5ca2
commit bbfc1b40c1
6 changed files with 31 additions and 39 deletions

View File

@@ -59,36 +59,17 @@ class BaseChannel(ABC):
pass
def is_allowed(self, sender_id: str) -> bool:
"""
Check if a sender is allowed to use this bot.
Args:
sender_id: The sender's identifier.
Returns:
True if allowed, False otherwise.
"""
"""Check if *sender_id* is permitted. Empty list → deny all; ``"*"`` → allow all."""
allow_list = getattr(self.config, "allow_from", [])
# Security fix: If no allow list, deny everyone by default
# This prevents unauthorized access when allow_from is not configured
if not allow_list:
logger.warning(
"Channel {} has no allow_from configured - "
"blocking all access by default for security. "
"Add allowed senders to config to enable access.",
self.name,
)
logger.warning("{}: allow_from is empty — all access denied", self.name)
return False
sender_str = str(sender_id)
if sender_str in allow_list:
if "*" in allow_list:
return True
if "|" in sender_str:
for part in sender_str.split("|"):
if part and part in allow_list:
return True
return False
sender_str = str(sender_id)
return sender_str in allow_list or any(
p in allow_list for p in sender_str.split("|") if p
)
async def _handle_message(
self,

View File

@@ -149,6 +149,16 @@ class ChannelManager:
except ImportError as e:
logger.warning("Matrix channel not available: {}", e)
self._validate_allow_from()
def _validate_allow_from(self) -> None:
for name, ch in self.channels.items():
if getattr(ch.config, "allow_from", None) == []:
raise SystemExit(
f'Error: "{name}" has empty allowFrom (denies all). '
f'Set ["*"] to allow everyone, or add specific user IDs.'
)
async def _start_channel(self, name: str, channel: BaseChannel) -> None:
"""Start a channel and log any exceptions."""
try:

View File

@@ -450,8 +450,7 @@ class MatrixChannel(BaseChannel):
await asyncio.sleep(2)
async def _on_room_invite(self, room: MatrixRoom, event: InviteEvent) -> None:
allow_from = self.config.allow_from or []
if not allow_from or event.sender in allow_from:
if self.is_allowed(event.sender):
await self.client.join(room.room_id)
def _is_direct_room(self, room: MatrixRoom) -> bool: