feat(matrix): support inbound media attachments

This commit is contained in:
Alexander Minges
2026-02-10 17:09:06 +01:00
parent 7b2adf9d9d
commit a482a89df6
3 changed files with 556 additions and 26 deletions

View File

@@ -1,5 +1,7 @@
import asyncio
import logging
import mimetypes
from pathlib import Path
from typing import Any
import nh3
@@ -8,52 +10,69 @@ from mistune import create_markdown
from nio import (
AsyncClient,
AsyncClientConfig,
DownloadError,
InviteEvent,
JoinError,
MatrixRoom,
MemoryDownloadResponse,
RoomEncryptedAudio,
RoomEncryptedFile,
RoomEncryptedImage,
RoomEncryptedVideo,
RoomMessageAudio,
RoomMessageFile,
RoomMessageImage,
RoomMessageText,
RoomMessageVideo,
RoomSendError,
RoomTypingError,
SyncError,
)
from nio.crypto.attachments import decrypt_attachment
from nio.exceptions import EncryptionError
from nanobot.bus.events import OutboundMessage
from nanobot.channels.base import BaseChannel
from nanobot.config.loader import get_data_dir
from nanobot.utils.helpers import safe_filename
LOGGING_STACK_BASE_DEPTH = 2
TYPING_NOTICE_TIMEOUT_MS = 30_000
MATRIX_HTML_FORMAT = "org.matrix.custom.html"
MATRIX_ATTACHMENT_MARKER_TEMPLATE = "[attachment: {}]"
MATRIX_ATTACHMENT_TOO_LARGE_TEMPLATE = "[attachment: {} - too large]"
MATRIX_ATTACHMENT_FAILED_TEMPLATE = "[attachment: {} - download failed]"
MATRIX_DEFAULT_ATTACHMENT_NAME = "attachment"
# Keep plugin output aligned with Matrix recommended HTML tags:
# https://spec.matrix.org/latest/client-server-api/#mroommessage-msgtypes
# - table/strikethrough are already used in replies.
# - url, superscript, and subscript map to common tags (<a>, <sup>, <sub>)
# that Matrix clients (e.g. Element/FluffyChat) can render consistently.
# We intentionally avoid plugins that emit less-portable tags to keep output
# predictable across clients.
# escape=True is intentional: raw HTML from model output is rendered as text,
# not as live HTML. This includes Matrix-specific raw snippets such as
# <span data-mx-...> and <div data-mx-maths>, unless we later add explicit
# structured support for those features.
MATRIX_MEDIA_EVENT_TYPES = (
RoomMessageImage,
RoomMessageFile,
RoomMessageAudio,
RoomMessageVideo,
RoomEncryptedImage,
RoomEncryptedFile,
RoomEncryptedAudio,
RoomEncryptedVideo,
)
# Markdown renderer policy:
# https://spec.matrix.org/v1.17/client-server-api/#mroommessage-msgtypes
# - Only enable portable features that map cleanly to Matrix-compatible HTML.
# - escape=True ensures raw model HTML is treated as text unless we explicitly
# add structured support for Matrix-specific HTML features later.
MATRIX_MARKDOWN = create_markdown(
escape=True,
plugins=["table", "strikethrough", "url", "superscript", "subscript"],
)
# Sanitizer policy rationale:
# - Baseline follows Matrix formatted message guidance:
# https://spec.matrix.org/latest/client-server-api/#mroommessage-msgtypes
# - We intentionally use a tighter subset than the full spec to keep behavior
# predictable across clients and reduce risk from LLM-generated content.
# - URLs are restricted to common safe schemes for links, and image sources are
# additionally constrained to mxc:// for Matrix-native media handling.
# - Spec items intentionally NOT enabled yet:
# - href schemes ftp/magnet (we keep link schemes smaller for now).
# - a[target] (clients already control link-opening behavior).
# - span[data-mx-bg-color|data-mx-color|data-mx-spoiler|data-mx-maths]
# - div[data-mx-maths]
# These can be added later when we explicitly support those Matrix features.
# Sanitizer policy:
# https://spec.matrix.org/v1.17/client-server-api/#mroommessage-msgtypes
# - Start from Matrix formatted-message guidance, but keep a smaller allowlist
# to reduce risk and keep client behavior predictable for LLM output.
# - Enforce mxc:// for img src to align media rendering with Matrix content
# repository semantics.
# - Unused spec-permitted features (e.g. some href schemes and data-mx-* attrs)
# are intentionally deferred until explicitly needed.
MATRIX_ALLOWED_HTML_TAGS = {
"p",
"a",
@@ -292,6 +311,7 @@ class MatrixChannel(BaseChannel):
def _register_event_callbacks(self) -> None:
"""Register Matrix event callbacks used by this channel."""
self.client.add_event_callback(self._on_message, RoomMessageText)
self.client.add_event_callback(self._on_media_message, MATRIX_MEDIA_EVENT_TYPES)
self.client.add_event_callback(self._on_room_invite, InviteEvent)
def _register_response_callbacks(self) -> None:
@@ -371,7 +391,7 @@ class MatrixChannel(BaseChannel):
member_count = getattr(room, "member_count", None)
return isinstance(member_count, int) and member_count <= 2
def _is_bot_mentioned_from_mx_mentions(self, event: RoomMessageText) -> bool:
def _is_bot_mentioned_from_mx_mentions(self, event: Any) -> bool:
"""Resolve mentions strictly from Matrix-native m.mentions payload."""
source = getattr(event, "source", None)
if not isinstance(source, dict):
@@ -391,7 +411,7 @@ class MatrixChannel(BaseChannel):
return bool(self.config.allow_room_mentions and mentions.get("room") is True)
def _should_process_message(self, room: MatrixRoom, event: RoomMessageText) -> bool:
def _should_process_message(self, room: MatrixRoom, event: Any) -> bool:
"""Apply sender and room policy checks before processing Matrix messages."""
if not self.is_allowed(event.sender):
return False
@@ -409,6 +429,253 @@ class MatrixChannel(BaseChannel):
return False
def _media_dir(self) -> Path:
"""Return directory used to persist downloaded Matrix attachments."""
media_dir = get_data_dir() / "media" / "matrix"
media_dir.mkdir(parents=True, exist_ok=True)
return media_dir
@staticmethod
def _event_source_content(event: Any) -> dict[str, Any]:
"""Extract Matrix event content payload when available."""
source = getattr(event, "source", None)
if not isinstance(source, dict):
return {}
content = source.get("content")
return content if isinstance(content, dict) else {}
def _event_attachment_type(self, event: Any) -> str:
"""Map Matrix event payload/type to a stable attachment kind."""
msgtype = self._event_source_content(event).get("msgtype")
if msgtype == "m.image":
return "image"
if msgtype == "m.audio":
return "audio"
if msgtype == "m.video":
return "video"
if msgtype == "m.file":
return "file"
class_name = type(event).__name__.lower()
if "image" in class_name:
return "image"
if "audio" in class_name:
return "audio"
if "video" in class_name:
return "video"
return "file"
@staticmethod
def _is_encrypted_media_event(event: Any) -> bool:
"""Return True for encrypted Matrix media events."""
return (
isinstance(getattr(event, "key", None), dict)
and isinstance(getattr(event, "hashes", None), dict)
and isinstance(getattr(event, "iv", None), str)
)
def _event_declared_size_bytes(self, event: Any) -> int | None:
"""Return declared media size from Matrix event info, if present."""
info = self._event_source_content(event).get("info")
if not isinstance(info, dict):
return None
size = info.get("size")
if isinstance(size, int) and size >= 0:
return size
return None
def _event_mime(self, event: Any) -> str | None:
"""Best-effort MIME extraction from Matrix media event."""
info = self._event_source_content(event).get("info")
if isinstance(info, dict):
mime = info.get("mimetype")
if isinstance(mime, str) and mime:
return mime
mime = getattr(event, "mimetype", None)
if isinstance(mime, str) and mime:
return mime
return None
def _event_filename(self, event: Any, attachment_type: str) -> str:
"""Build a safe filename for a Matrix attachment."""
body = getattr(event, "body", None)
if isinstance(body, str) and body.strip():
candidate = safe_filename(Path(body).name)
if candidate:
return candidate
return MATRIX_DEFAULT_ATTACHMENT_NAME if attachment_type == "file" else attachment_type
def _build_attachment_path(
self,
event: Any,
attachment_type: str,
filename: str,
mime: str | None,
) -> Path:
"""Compute a deterministic local file path for a downloaded attachment."""
safe_name = safe_filename(Path(filename).name) or MATRIX_DEFAULT_ATTACHMENT_NAME
suffix = Path(safe_name).suffix
if not suffix and mime:
guessed = mimetypes.guess_extension(mime, strict=False) or ""
if guessed:
safe_name = f"{safe_name}{guessed}"
suffix = guessed
stem = Path(safe_name).stem or attachment_type
stem = stem[:72]
suffix = suffix[:16]
event_id = safe_filename(str(getattr(event, "event_id", "") or "evt").lstrip("$"))
event_prefix = (event_id[:24] or "evt").strip("_")
return self._media_dir() / f"{event_prefix}_{stem}{suffix}"
async def _download_media_bytes(self, mxc_url: str) -> bytes | None:
"""Download media bytes from Matrix content repository."""
if not self.client:
return None
response = await self.client.download(mxc=mxc_url)
if isinstance(response, DownloadError):
logger.warning("Matrix attachment download failed for {}: {}", mxc_url, response)
return None
body = getattr(response, "body", None)
if isinstance(body, (bytes, bytearray)):
return bytes(body)
if isinstance(response, MemoryDownloadResponse):
return bytes(response.body)
if isinstance(body, (str, Path)):
path = Path(body)
if path.is_file():
try:
return path.read_bytes()
except OSError as e:
logger.warning(
"Matrix attachment read failed for {} ({}): {}",
mxc_url,
type(e).__name__,
str(e),
)
return None
logger.warning(
"Matrix attachment download failed for {}: unexpected response type {}",
mxc_url,
type(response).__name__,
)
return None
def _decrypt_media_bytes(self, event: Any, ciphertext: bytes) -> bytes | None:
"""Decrypt encrypted Matrix attachment bytes."""
key_obj = getattr(event, "key", None)
hashes = getattr(event, "hashes", None)
iv = getattr(event, "iv", None)
key = key_obj.get("k") if isinstance(key_obj, dict) else None
sha256 = hashes.get("sha256") if isinstance(hashes, dict) else None
if not isinstance(key, str) or not isinstance(sha256, str) or not isinstance(iv, str):
logger.warning(
"Matrix encrypted attachment missing key material for event {}",
getattr(event, "event_id", ""),
)
return None
try:
return decrypt_attachment(ciphertext, key, sha256, iv)
except (EncryptionError, ValueError, TypeError) as e:
logger.warning(
"Matrix encrypted attachment decryption failed for event {} ({}): {}",
getattr(event, "event_id", ""),
type(e).__name__,
str(e),
)
return None
async def _fetch_media_attachment(
self,
room: MatrixRoom,
event: Any,
) -> tuple[dict[str, Any] | None, str]:
"""Download and prepare a Matrix attachment for inbound processing."""
attachment_type = self._event_attachment_type(event)
mime = self._event_mime(event)
filename = self._event_filename(event, attachment_type)
mxc_url = getattr(event, "url", None)
if not isinstance(mxc_url, str) or not mxc_url.startswith("mxc://"):
logger.warning(
"Matrix attachment skipped in room {}: invalid mxc URL {}",
room.room_id,
mxc_url,
)
return None, MATRIX_ATTACHMENT_FAILED_TEMPLATE.format(filename)
declared_size = self._event_declared_size_bytes(event)
if (
declared_size is not None
and declared_size > self.config.max_inbound_media_bytes
):
logger.warning(
"Matrix attachment skipped in room {}: declared size {} exceeds limit {}",
room.room_id,
declared_size,
self.config.max_inbound_media_bytes,
)
return None, MATRIX_ATTACHMENT_TOO_LARGE_TEMPLATE.format(filename)
downloaded = await self._download_media_bytes(mxc_url)
if downloaded is None:
return None, MATRIX_ATTACHMENT_FAILED_TEMPLATE.format(filename)
encrypted = self._is_encrypted_media_event(event)
data = downloaded
if encrypted:
decrypted = self._decrypt_media_bytes(event, downloaded)
if decrypted is None:
return None, MATRIX_ATTACHMENT_FAILED_TEMPLATE.format(filename)
data = decrypted
if len(data) > self.config.max_inbound_media_bytes:
logger.warning(
"Matrix attachment skipped in room {}: downloaded size {} exceeds limit {}",
room.room_id,
len(data),
self.config.max_inbound_media_bytes,
)
return None, MATRIX_ATTACHMENT_TOO_LARGE_TEMPLATE.format(filename)
path = self._build_attachment_path(
event,
attachment_type,
filename,
mime,
)
try:
path.write_bytes(data)
except OSError as e:
logger.warning(
"Matrix attachment persist failed for room {} ({}): {}",
room.room_id,
type(e).__name__,
str(e),
)
return None, MATRIX_ATTACHMENT_FAILED_TEMPLATE.format(filename)
attachment = {
"type": attachment_type,
"mime": mime,
"filename": filename,
"event_id": str(getattr(event, "event_id", "") or ""),
"encrypted": encrypted,
"size_bytes": len(data),
"path": str(path),
"mxc_url": mxc_url,
}
return attachment, MATRIX_ATTACHMENT_MARKER_TEMPLATE.format(path)
async def _on_message(self, room: MatrixRoom, event: RoomMessageText) -> None:
# Ignore self messages
if event.sender == self.config.user_id:
@@ -428,3 +695,41 @@ class MatrixChannel(BaseChannel):
except Exception:
await self._set_typing(room.room_id, False)
raise
async def _on_media_message(self, room: MatrixRoom, event: Any) -> None:
"""Handle inbound Matrix media events and forward local attachment paths."""
if event.sender == self.config.user_id:
return
if not self._should_process_message(room, event):
return
attachment, marker = await self._fetch_media_attachment(room, event)
attachments = [attachment] if attachment else []
markers = [marker]
media_paths = [a["path"] for a in attachments]
body = getattr(event, "body", None)
content_parts: list[str] = []
if isinstance(body, str) and body.strip():
content_parts.append(body.strip())
content_parts.extend(markers)
# TODO: Optionally add audio transcription support for Matrix attachments,
# similar to Telegram's voice/audio flow, behind explicit config.
await self._set_typing(room.room_id, True)
try:
await self._handle_message(
sender_id=event.sender,
chat_id=room.room_id,
content="\n".join(content_parts),
media=media_paths,
metadata={
"room": getattr(room, "display_name", room.room_id),
"attachments": attachments,
},
)
except Exception:
await self._set_typing(room.room_id, False)
raise

View File

@@ -74,6 +74,8 @@ class MatrixConfig(Base):
device_id: str = ""
# Max seconds to wait for sync_forever to stop gracefully before cancellation fallback.
sync_stop_grace_seconds: int = 2
# Max attachment size accepted from inbound Matrix media events.
max_inbound_media_bytes: int = 20 * 1024 * 1024
allow_from: list[str] = Field(default_factory=list)
group_policy: Literal["open", "mention", "allowlist"] = "open"
group_allow_from: list[str] = Field(default_factory=list)

View File

@@ -1,3 +1,4 @@
from pathlib import Path
from types import SimpleNamespace
import pytest
@@ -43,6 +44,11 @@ class _FakeAsyncClient:
self.response_callbacks: list[tuple[object, object]] = []
self.room_send_calls: list[dict[str, object]] = []
self.typing_calls: list[tuple[str, bool, int]] = []
self.download_calls: list[dict[str, object]] = []
self.download_response: object | None = None
self.download_bytes: bytes = b"media"
self.download_content_type: str = "application/octet-stream"
self.download_filename: str | None = None
self.raise_on_send = False
self.raise_on_typing = False
@@ -89,6 +95,16 @@ class _FakeAsyncClient:
if self.raise_on_typing:
raise RuntimeError("typing failed")
async def download(self, **kwargs):
self.download_calls.append(kwargs)
if self.download_response is not None:
return self.download_response
return matrix_module.MemoryDownloadResponse(
body=self.download_bytes,
content_type=self.download_content_type,
filename=self.download_filename,
)
async def close(self) -> None:
return None
@@ -133,6 +149,7 @@ async def test_start_skips_load_store_when_device_id_missing(
assert len(clients) == 1
assert clients[0].load_store_called is False
assert len(clients[0].callbacks) == 3
assert len(clients[0].response_callbacks) == 3
await channel.stop()
@@ -374,6 +391,212 @@ async def test_on_message_room_mention_requires_opt_in() -> None:
assert client.typing_calls == [("!room:matrix.org", True, TYPING_NOTICE_TIMEOUT_MS)]
@pytest.mark.asyncio
async def test_on_media_message_downloads_attachment_and_sets_metadata(
monkeypatch, tmp_path
) -> None:
monkeypatch.setattr("nanobot.channels.matrix.get_data_dir", lambda: tmp_path)
channel = MatrixChannel(_make_config(), MessageBus())
client = _FakeAsyncClient("", "", "", None)
client.download_bytes = b"image"
channel.client = client
handled: list[dict[str, object]] = []
async def _fake_handle_message(**kwargs) -> None:
handled.append(kwargs)
channel._handle_message = _fake_handle_message # type: ignore[method-assign]
room = SimpleNamespace(room_id="!room:matrix.org", display_name="Test room", member_count=2)
event = SimpleNamespace(
sender="@alice:matrix.org",
body="photo.png",
url="mxc://example.org/mediaid",
event_id="$event1",
source={
"content": {
"msgtype": "m.image",
"info": {"mimetype": "image/png", "size": 5},
}
},
)
await channel._on_media_message(room, event)
assert len(client.download_calls) == 1
assert len(handled) == 1
assert client.typing_calls == [("!room:matrix.org", True, TYPING_NOTICE_TIMEOUT_MS)]
media_paths = handled[0]["media"]
assert isinstance(media_paths, list) and len(media_paths) == 1
media_path = Path(media_paths[0])
assert media_path.is_file()
assert media_path.read_bytes() == b"image"
metadata = handled[0]["metadata"]
attachments = metadata["attachments"]
assert isinstance(attachments, list) and len(attachments) == 1
assert attachments[0]["type"] == "image"
assert attachments[0]["mxc_url"] == "mxc://example.org/mediaid"
assert attachments[0]["path"] == str(media_path)
assert "[attachment: " in handled[0]["content"]
@pytest.mark.asyncio
async def test_on_media_message_respects_declared_size_limit(
monkeypatch, tmp_path
) -> None:
monkeypatch.setattr("nanobot.channels.matrix.get_data_dir", lambda: tmp_path)
channel = MatrixChannel(_make_config(max_inbound_media_bytes=3), MessageBus())
client = _FakeAsyncClient("", "", "", None)
channel.client = client
handled: list[dict[str, object]] = []
async def _fake_handle_message(**kwargs) -> None:
handled.append(kwargs)
channel._handle_message = _fake_handle_message # type: ignore[method-assign]
room = SimpleNamespace(room_id="!room:matrix.org", display_name="Test room", member_count=2)
event = SimpleNamespace(
sender="@alice:matrix.org",
body="large.bin",
url="mxc://example.org/large",
event_id="$event2",
source={"content": {"msgtype": "m.file", "info": {"size": 10}}},
)
await channel._on_media_message(room, event)
assert client.download_calls == []
assert len(handled) == 1
assert handled[0]["media"] == []
assert handled[0]["metadata"]["attachments"] == []
assert "[attachment: large.bin - too large]" in handled[0]["content"]
@pytest.mark.asyncio
async def test_on_media_message_handles_download_error(monkeypatch, tmp_path) -> None:
monkeypatch.setattr("nanobot.channels.matrix.get_data_dir", lambda: tmp_path)
channel = MatrixChannel(_make_config(), MessageBus())
client = _FakeAsyncClient("", "", "", None)
client.download_response = matrix_module.DownloadError("download failed")
channel.client = client
handled: list[dict[str, object]] = []
async def _fake_handle_message(**kwargs) -> None:
handled.append(kwargs)
channel._handle_message = _fake_handle_message # type: ignore[method-assign]
room = SimpleNamespace(room_id="!room:matrix.org", display_name="Test room", member_count=2)
event = SimpleNamespace(
sender="@alice:matrix.org",
body="photo.png",
url="mxc://example.org/mediaid",
event_id="$event3",
source={"content": {"msgtype": "m.image"}},
)
await channel._on_media_message(room, event)
assert len(client.download_calls) == 1
assert len(handled) == 1
assert handled[0]["media"] == []
assert handled[0]["metadata"]["attachments"] == []
assert "[attachment: photo.png - download failed]" in handled[0]["content"]
@pytest.mark.asyncio
async def test_on_media_message_decrypts_encrypted_media(monkeypatch, tmp_path) -> None:
monkeypatch.setattr("nanobot.channels.matrix.get_data_dir", lambda: tmp_path)
monkeypatch.setattr(
matrix_module,
"decrypt_attachment",
lambda ciphertext, key, sha256, iv: b"plain",
)
channel = MatrixChannel(_make_config(), MessageBus())
client = _FakeAsyncClient("", "", "", None)
client.download_bytes = b"cipher"
channel.client = client
handled: list[dict[str, object]] = []
async def _fake_handle_message(**kwargs) -> None:
handled.append(kwargs)
channel._handle_message = _fake_handle_message # type: ignore[method-assign]
room = SimpleNamespace(room_id="!room:matrix.org", display_name="Test room", member_count=2)
event = SimpleNamespace(
sender="@alice:matrix.org",
body="secret.txt",
url="mxc://example.org/encrypted",
event_id="$event4",
key={"k": "key"},
hashes={"sha256": "hash"},
iv="iv",
source={"content": {"msgtype": "m.file", "info": {"size": 6}}},
)
await channel._on_media_message(room, event)
assert len(handled) == 1
media_path = Path(handled[0]["media"][0])
assert media_path.read_bytes() == b"plain"
attachment = handled[0]["metadata"]["attachments"][0]
assert attachment["encrypted"] is True
assert attachment["size_bytes"] == 5
@pytest.mark.asyncio
async def test_on_media_message_handles_decrypt_error(monkeypatch, tmp_path) -> None:
monkeypatch.setattr("nanobot.channels.matrix.get_data_dir", lambda: tmp_path)
def _raise(*args, **kwargs):
raise matrix_module.EncryptionError("boom")
monkeypatch.setattr(matrix_module, "decrypt_attachment", _raise)
channel = MatrixChannel(_make_config(), MessageBus())
client = _FakeAsyncClient("", "", "", None)
client.download_bytes = b"cipher"
channel.client = client
handled: list[dict[str, object]] = []
async def _fake_handle_message(**kwargs) -> None:
handled.append(kwargs)
channel._handle_message = _fake_handle_message # type: ignore[method-assign]
room = SimpleNamespace(room_id="!room:matrix.org", display_name="Test room", member_count=2)
event = SimpleNamespace(
sender="@alice:matrix.org",
body="secret.txt",
url="mxc://example.org/encrypted",
event_id="$event5",
key={"k": "key"},
hashes={"sha256": "hash"},
iv="iv",
source={"content": {"msgtype": "m.file"}},
)
await channel._on_media_message(room, event)
assert len(handled) == 1
assert handled[0]["media"] == []
assert handled[0]["metadata"]["attachments"] == []
assert "[attachment: secret.txt - download failed]" in handled[0]["content"]
@pytest.mark.asyncio
async def test_send_clears_typing_after_send() -> None:
channel = MatrixChannel(_make_config(), MessageBus())