feat(matrix): add outbound media uploads and unify media limits with maxMediaBytes

- Use OutboundMessage.media for Matrix file/image/audio/video sends
- Apply effective media limit as min(m.upload.size, maxMediaBytes)
- Rename matrix config key maxInboundMediaBytes -> maxMediaBytes (no legacy fallback)
This commit is contained in:
Alexander Minges
2026-02-11 10:45:28 +01:00
parent bfd2018095
commit 97cb85ee0b
5 changed files with 482 additions and 64 deletions

View File

@@ -10,6 +10,7 @@ from mistune import create_markdown
from nio import (
AsyncClient,
AsyncClientConfig,
ContentRepositoryConfigError,
DownloadError,
InviteEvent,
JoinError,
@@ -22,6 +23,7 @@ from nio import (
RoomSendError,
RoomTypingError,
SyncError,
UploadError,
)
from nio.crypto.attachments import decrypt_attachment
from nio.exceptions import EncryptionError
@@ -44,6 +46,7 @@ MATRIX_HTML_FORMAT = "org.matrix.custom.html"
MATRIX_ATTACHMENT_MARKER_TEMPLATE = "[attachment: {}]"
MATRIX_ATTACHMENT_TOO_LARGE_TEMPLATE = "[attachment: {} - too large]"
MATRIX_ATTACHMENT_FAILED_TEMPLATE = "[attachment: {} - download failed]"
MATRIX_ATTACHMENT_UPLOAD_FAILED_TEMPLATE = "[attachment: {} - upload failed]"
MATRIX_DEFAULT_ATTACHMENT_NAME = "attachment"
# Runtime callback filter for nio event dispatch (checked via isinstance).
@@ -227,11 +230,22 @@ class MatrixChannel(BaseChannel):
name = "matrix"
def __init__(self, config: Any, bus):
def __init__(
self,
config: Any,
bus,
*,
restrict_to_workspace: bool = False,
workspace: Path | None = None,
):
super().__init__(config, bus)
self.client: AsyncClient | None = None
self._sync_task: asyncio.Task | None = None
self._typing_tasks: dict[str, asyncio.Task] = {}
self._restrict_to_workspace = restrict_to_workspace
self._workspace = workspace.expanduser().resolve() if workspace else None
self._server_upload_limit_bytes: int | None = None
self._server_upload_limit_checked = False
async def start(self) -> None:
"""Start Matrix client and begin sync loop."""
@@ -313,21 +327,266 @@ class MatrixChannel(BaseChannel):
if self.client:
await self.client.close()
async def send(self, msg: OutboundMessage) -> None:
@staticmethod
def _path_dedupe_key(path: Path) -> str:
"""Return a stable deduplication key for attachment paths."""
expanded = path.expanduser()
try:
return str(expanded.resolve(strict=False))
except OSError:
return str(expanded)
def _is_workspace_path_allowed(self, path: Path) -> bool:
"""Enforce optional workspace-only outbound attachment policy."""
if not self._restrict_to_workspace:
return True
if self._workspace is None:
return False
try:
path.resolve(strict=False).relative_to(self._workspace)
return True
except ValueError:
return False
def _collect_outbound_media_candidates(self, media: list[str]) -> list[Path]:
"""Collect unique outbound attachment paths from OutboundMessage.media."""
candidates: list[Path] = []
seen: set[str] = set()
for raw in media:
if not isinstance(raw, str) or not raw.strip():
continue
path = Path(raw.strip()).expanduser()
key = self._path_dedupe_key(path)
if key in seen:
continue
seen.add(key)
candidates.append(path)
return candidates
@staticmethod
def _build_outbound_attachment_content(
*,
filename: str,
mime: str,
size_bytes: int,
mxc_url: str,
) -> dict[str, Any]:
"""Build Matrix content payload for an uploaded file/image/audio/video."""
msgtype = "m.file"
if mime.startswith("image/"):
msgtype = "m.image"
elif mime.startswith("audio/"):
msgtype = "m.audio"
elif mime.startswith("video/"):
msgtype = "m.video"
return {
"msgtype": msgtype,
"body": filename,
"filename": filename,
"url": mxc_url,
"info": {
"mimetype": mime,
"size": size_bytes,
},
"m.mentions": {},
}
async def _send_room_content(self, room_id: str, content: dict[str, Any]) -> None:
"""Send Matrix m.room.message content with configured E2EE send options."""
if not self.client:
return
room_send_kwargs: dict[str, Any] = {
"room_id": msg.chat_id,
"room_id": room_id,
"message_type": "m.room.message",
"content": _build_matrix_text_content(msg.content),
"content": content,
}
if self.config.e2ee_enabled:
# TODO(matrix): Add explicit config for strict verified-device sending mode.
room_send_kwargs["ignore_unverified_devices"] = True
await self.client.room_send(**room_send_kwargs)
async def _resolve_server_upload_limit_bytes(self) -> int | None:
"""Resolve homeserver-advertised upload limit once per channel lifecycle."""
if self._server_upload_limit_checked:
return self._server_upload_limit_bytes
self._server_upload_limit_checked = True
if not self.client:
return None
try:
await self.client.room_send(**room_send_kwargs)
response = await self.client.content_repository_config()
except Exception as e:
logger.debug(
"Matrix media config lookup failed ({}): {}",
type(e).__name__,
str(e),
)
return None
upload_size = getattr(response, "upload_size", None)
if isinstance(upload_size, int) and upload_size > 0:
self._server_upload_limit_bytes = upload_size
return self._server_upload_limit_bytes
if isinstance(response, ContentRepositoryConfigError):
logger.debug("Matrix media config lookup failed: {}", response)
return None
logger.debug(
"Matrix media config lookup returned unexpected response {}",
type(response).__name__,
)
return None
async def _effective_media_limit_bytes(self) -> int:
"""
Compute effective Matrix media size cap.
`m.upload.size` (if advertised) is treated as the homeserver-side cap.
`maxMediaBytes` is a local hard limit/fallback. Using the stricter value
keeps resource usage predictable while honoring server constraints.
"""
local_limit = max(int(self.config.max_media_bytes), 0)
server_limit = await self._resolve_server_upload_limit_bytes()
if server_limit is None:
return local_limit
if local_limit == 0:
return 0
return min(local_limit, server_limit)
async def _upload_and_send_attachment(
self, room_id: str, path: Path, limit_bytes: int
) -> str | None:
"""Upload one local file to Matrix and send it as a media message."""
if not self.client:
return MATRIX_ATTACHMENT_UPLOAD_FAILED_TEMPLATE.format(
path.name or MATRIX_DEFAULT_ATTACHMENT_NAME
)
resolved = path.expanduser().resolve(strict=False)
filename = safe_filename(resolved.name) or MATRIX_DEFAULT_ATTACHMENT_NAME
if not resolved.is_file():
logger.warning("Matrix outbound attachment missing file: {}", resolved)
return MATRIX_ATTACHMENT_UPLOAD_FAILED_TEMPLATE.format(filename)
if not self._is_workspace_path_allowed(resolved):
logger.warning(
"Matrix outbound attachment denied by workspace restriction: {}",
resolved,
)
return MATRIX_ATTACHMENT_UPLOAD_FAILED_TEMPLATE.format(filename)
try:
size_bytes = resolved.stat().st_size
except OSError as e:
logger.warning(
"Matrix outbound attachment stat failed for {} ({}): {}",
resolved,
type(e).__name__,
str(e),
)
return MATRIX_ATTACHMENT_UPLOAD_FAILED_TEMPLATE.format(filename)
if limit_bytes and size_bytes > limit_bytes:
logger.warning(
"Matrix outbound attachment skipped: {} bytes exceeds limit {} for {}",
size_bytes,
limit_bytes,
resolved,
)
return MATRIX_ATTACHMENT_TOO_LARGE_TEMPLATE.format(filename)
try:
data = resolved.read_bytes()
except OSError as e:
logger.warning(
"Matrix outbound attachment read failed for {} ({}): {}",
resolved,
type(e).__name__,
str(e),
)
return MATRIX_ATTACHMENT_UPLOAD_FAILED_TEMPLATE.format(filename)
mime = mimetypes.guess_type(filename, strict=False)[0] or "application/octet-stream"
upload_response = await self.client.upload(
data,
content_type=mime,
filename=filename,
filesize=len(data),
)
if isinstance(upload_response, UploadError):
logger.warning(
"Matrix outbound attachment upload failed for {}: {}",
resolved,
upload_response,
)
return MATRIX_ATTACHMENT_UPLOAD_FAILED_TEMPLATE.format(filename)
mxc_url = getattr(upload_response, "content_uri", None)
if not isinstance(mxc_url, str) or not mxc_url.startswith("mxc://"):
logger.warning(
"Matrix outbound attachment upload returned unexpected response {} for {}",
type(upload_response).__name__,
resolved,
)
return MATRIX_ATTACHMENT_UPLOAD_FAILED_TEMPLATE.format(filename)
content = self._build_outbound_attachment_content(
filename=filename,
mime=mime,
size_bytes=len(data),
mxc_url=mxc_url,
)
try:
await self._send_room_content(room_id, content)
except Exception as e:
logger.warning(
"Matrix outbound attachment send failed for {} ({}): {}",
resolved,
type(e).__name__,
str(e),
)
return MATRIX_ATTACHMENT_UPLOAD_FAILED_TEMPLATE.format(filename)
return None
async def send(self, msg: OutboundMessage) -> None:
if not self.client:
return
text = msg.content or ""
candidates = self._collect_outbound_media_candidates(msg.media)
try:
failures: list[str] = []
if candidates:
limit_bytes = await self._effective_media_limit_bytes()
for path in candidates:
failure_marker = await self._upload_and_send_attachment(
room_id=msg.chat_id,
path=path,
limit_bytes=limit_bytes,
)
if failure_marker:
failures.append(failure_marker)
if failures:
if text.strip():
text = f"{text.rstrip()}\n" + "\n".join(failures)
else:
text = "\n".join(failures)
if text or not candidates:
await self._send_room_content(msg.chat_id, _build_matrix_text_content(text))
finally:
await self._stop_typing_keepalive(msg.chat_id, clear_typing=True)
@@ -711,13 +970,14 @@ class MatrixChannel(BaseChannel):
)
return None, MATRIX_ATTACHMENT_FAILED_TEMPLATE.format(filename)
limit_bytes = await self._effective_media_limit_bytes()
declared_size = self._event_declared_size_bytes(event)
if declared_size is not None and declared_size > self.config.max_inbound_media_bytes:
if declared_size is not None and declared_size > limit_bytes:
logger.warning(
"Matrix attachment skipped in room {}: declared size {} exceeds limit {}",
room.room_id,
declared_size,
self.config.max_inbound_media_bytes,
limit_bytes,
)
return None, MATRIX_ATTACHMENT_TOO_LARGE_TEMPLATE.format(filename)
@@ -733,12 +993,12 @@ class MatrixChannel(BaseChannel):
return None, MATRIX_ATTACHMENT_FAILED_TEMPLATE.format(filename)
data = decrypted
if len(data) > self.config.max_inbound_media_bytes:
if len(data) > limit_bytes:
logger.warning(
"Matrix attachment skipped in room {}: downloaded size {} exceeds limit {}",
room.room_id,
len(data),
self.config.max_inbound_media_bytes,
limit_bytes,
)
return None, MATRIX_ATTACHMENT_TOO_LARGE_TEMPLATE.format(filename)