diff --git a/nanobot/channels/matrix.py b/nanobot/channels/matrix.py index f00f321..3edcf63 100644 --- a/nanobot/channels/matrix.py +++ b/nanobot/channels/matrix.py @@ -1,5 +1,7 @@ import asyncio import logging +import mimetypes +from pathlib import Path from typing import Any import nh3 @@ -8,52 +10,69 @@ from mistune import create_markdown from nio import ( AsyncClient, AsyncClientConfig, + DownloadError, InviteEvent, JoinError, MatrixRoom, + MemoryDownloadResponse, + RoomEncryptedAudio, + RoomEncryptedFile, + RoomEncryptedImage, + RoomEncryptedVideo, + RoomMessageAudio, + RoomMessageFile, + RoomMessageImage, RoomMessageText, + RoomMessageVideo, RoomSendError, RoomTypingError, SyncError, ) +from nio.crypto.attachments import decrypt_attachment +from nio.exceptions import EncryptionError from nanobot.bus.events import OutboundMessage from nanobot.channels.base import BaseChannel from nanobot.config.loader import get_data_dir +from nanobot.utils.helpers import safe_filename LOGGING_STACK_BASE_DEPTH = 2 TYPING_NOTICE_TIMEOUT_MS = 30_000 MATRIX_HTML_FORMAT = "org.matrix.custom.html" +MATRIX_ATTACHMENT_MARKER_TEMPLATE = "[attachment: {}]" +MATRIX_ATTACHMENT_TOO_LARGE_TEMPLATE = "[attachment: {} - too large]" +MATRIX_ATTACHMENT_FAILED_TEMPLATE = "[attachment: {} - download failed]" +MATRIX_DEFAULT_ATTACHMENT_NAME = "attachment" -# Keep plugin output aligned with Matrix recommended HTML tags: -# https://spec.matrix.org/latest/client-server-api/#mroommessage-msgtypes -# - table/strikethrough are already used in replies. -# - url, superscript, and subscript map to common tags (, , ) -# that Matrix clients (e.g. Element/FluffyChat) can render consistently. -# We intentionally avoid plugins that emit less-portable tags to keep output -# predictable across clients. -# escape=True is intentional: raw HTML from model output is rendered as text, -# not as live HTML. This includes Matrix-specific raw snippets such as -# and
, unless we later add explicit -# structured support for those features. +MATRIX_MEDIA_EVENT_TYPES = ( + RoomMessageImage, + RoomMessageFile, + RoomMessageAudio, + RoomMessageVideo, + RoomEncryptedImage, + RoomEncryptedFile, + RoomEncryptedAudio, + RoomEncryptedVideo, +) + +# Markdown renderer policy: +# https://spec.matrix.org/v1.17/client-server-api/#mroommessage-msgtypes +# - Only enable portable features that map cleanly to Matrix-compatible HTML. +# - escape=True ensures raw model HTML is treated as text unless we explicitly +# add structured support for Matrix-specific HTML features later. MATRIX_MARKDOWN = create_markdown( escape=True, plugins=["table", "strikethrough", "url", "superscript", "subscript"], ) -# Sanitizer policy rationale: -# - Baseline follows Matrix formatted message guidance: -# https://spec.matrix.org/latest/client-server-api/#mroommessage-msgtypes -# - We intentionally use a tighter subset than the full spec to keep behavior -# predictable across clients and reduce risk from LLM-generated content. -# - URLs are restricted to common safe schemes for links, and image sources are -# additionally constrained to mxc:// for Matrix-native media handling. -# - Spec items intentionally NOT enabled yet: -# - href schemes ftp/magnet (we keep link schemes smaller for now). -# - a[target] (clients already control link-opening behavior). -# - span[data-mx-bg-color|data-mx-color|data-mx-spoiler|data-mx-maths] -# - div[data-mx-maths] -# These can be added later when we explicitly support those Matrix features. +# Sanitizer policy: +# https://spec.matrix.org/v1.17/client-server-api/#mroommessage-msgtypes +# - Start from Matrix formatted-message guidance, but keep a smaller allowlist +# to reduce risk and keep client behavior predictable for LLM output. +# - Enforce mxc:// for img src to align media rendering with Matrix content +# repository semantics. +# - Unused spec-permitted features (e.g. some href schemes and data-mx-* attrs) +# are intentionally deferred until explicitly needed. MATRIX_ALLOWED_HTML_TAGS = { "p", "a", @@ -292,6 +311,7 @@ class MatrixChannel(BaseChannel): def _register_event_callbacks(self) -> None: """Register Matrix event callbacks used by this channel.""" self.client.add_event_callback(self._on_message, RoomMessageText) + self.client.add_event_callback(self._on_media_message, MATRIX_MEDIA_EVENT_TYPES) self.client.add_event_callback(self._on_room_invite, InviteEvent) def _register_response_callbacks(self) -> None: @@ -371,7 +391,7 @@ class MatrixChannel(BaseChannel): member_count = getattr(room, "member_count", None) return isinstance(member_count, int) and member_count <= 2 - def _is_bot_mentioned_from_mx_mentions(self, event: RoomMessageText) -> bool: + def _is_bot_mentioned_from_mx_mentions(self, event: Any) -> bool: """Resolve mentions strictly from Matrix-native m.mentions payload.""" source = getattr(event, "source", None) if not isinstance(source, dict): @@ -391,7 +411,7 @@ class MatrixChannel(BaseChannel): return bool(self.config.allow_room_mentions and mentions.get("room") is True) - def _should_process_message(self, room: MatrixRoom, event: RoomMessageText) -> bool: + def _should_process_message(self, room: MatrixRoom, event: Any) -> bool: """Apply sender and room policy checks before processing Matrix messages.""" if not self.is_allowed(event.sender): return False @@ -409,6 +429,253 @@ class MatrixChannel(BaseChannel): return False + def _media_dir(self) -> Path: + """Return directory used to persist downloaded Matrix attachments.""" + media_dir = get_data_dir() / "media" / "matrix" + media_dir.mkdir(parents=True, exist_ok=True) + return media_dir + + @staticmethod + def _event_source_content(event: Any) -> dict[str, Any]: + """Extract Matrix event content payload when available.""" + source = getattr(event, "source", None) + if not isinstance(source, dict): + return {} + content = source.get("content") + return content if isinstance(content, dict) else {} + + def _event_attachment_type(self, event: Any) -> str: + """Map Matrix event payload/type to a stable attachment kind.""" + msgtype = self._event_source_content(event).get("msgtype") + if msgtype == "m.image": + return "image" + if msgtype == "m.audio": + return "audio" + if msgtype == "m.video": + return "video" + if msgtype == "m.file": + return "file" + + class_name = type(event).__name__.lower() + if "image" in class_name: + return "image" + if "audio" in class_name: + return "audio" + if "video" in class_name: + return "video" + return "file" + + @staticmethod + def _is_encrypted_media_event(event: Any) -> bool: + """Return True for encrypted Matrix media events.""" + return ( + isinstance(getattr(event, "key", None), dict) + and isinstance(getattr(event, "hashes", None), dict) + and isinstance(getattr(event, "iv", None), str) + ) + + def _event_declared_size_bytes(self, event: Any) -> int | None: + """Return declared media size from Matrix event info, if present.""" + info = self._event_source_content(event).get("info") + if not isinstance(info, dict): + return None + size = info.get("size") + if isinstance(size, int) and size >= 0: + return size + return None + + def _event_mime(self, event: Any) -> str | None: + """Best-effort MIME extraction from Matrix media event.""" + info = self._event_source_content(event).get("info") + if isinstance(info, dict): + mime = info.get("mimetype") + if isinstance(mime, str) and mime: + return mime + + mime = getattr(event, "mimetype", None) + if isinstance(mime, str) and mime: + return mime + return None + + def _event_filename(self, event: Any, attachment_type: str) -> str: + """Build a safe filename for a Matrix attachment.""" + body = getattr(event, "body", None) + if isinstance(body, str) and body.strip(): + candidate = safe_filename(Path(body).name) + if candidate: + return candidate + return MATRIX_DEFAULT_ATTACHMENT_NAME if attachment_type == "file" else attachment_type + + def _build_attachment_path( + self, + event: Any, + attachment_type: str, + filename: str, + mime: str | None, + ) -> Path: + """Compute a deterministic local file path for a downloaded attachment.""" + safe_name = safe_filename(Path(filename).name) or MATRIX_DEFAULT_ATTACHMENT_NAME + suffix = Path(safe_name).suffix + if not suffix and mime: + guessed = mimetypes.guess_extension(mime, strict=False) or "" + if guessed: + safe_name = f"{safe_name}{guessed}" + suffix = guessed + + stem = Path(safe_name).stem or attachment_type + stem = stem[:72] + suffix = suffix[:16] + + event_id = safe_filename(str(getattr(event, "event_id", "") or "evt").lstrip("$")) + event_prefix = (event_id[:24] or "evt").strip("_") + return self._media_dir() / f"{event_prefix}_{stem}{suffix}" + + async def _download_media_bytes(self, mxc_url: str) -> bytes | None: + """Download media bytes from Matrix content repository.""" + if not self.client: + return None + + response = await self.client.download(mxc=mxc_url) + if isinstance(response, DownloadError): + logger.warning("Matrix attachment download failed for {}: {}", mxc_url, response) + return None + + body = getattr(response, "body", None) + if isinstance(body, (bytes, bytearray)): + return bytes(body) + + if isinstance(response, MemoryDownloadResponse): + return bytes(response.body) + + if isinstance(body, (str, Path)): + path = Path(body) + if path.is_file(): + try: + return path.read_bytes() + except OSError as e: + logger.warning( + "Matrix attachment read failed for {} ({}): {}", + mxc_url, + type(e).__name__, + str(e), + ) + return None + + logger.warning( + "Matrix attachment download failed for {}: unexpected response type {}", + mxc_url, + type(response).__name__, + ) + return None + + def _decrypt_media_bytes(self, event: Any, ciphertext: bytes) -> bytes | None: + """Decrypt encrypted Matrix attachment bytes.""" + key_obj = getattr(event, "key", None) + hashes = getattr(event, "hashes", None) + iv = getattr(event, "iv", None) + + key = key_obj.get("k") if isinstance(key_obj, dict) else None + sha256 = hashes.get("sha256") if isinstance(hashes, dict) else None + if not isinstance(key, str) or not isinstance(sha256, str) or not isinstance(iv, str): + logger.warning( + "Matrix encrypted attachment missing key material for event {}", + getattr(event, "event_id", ""), + ) + return None + + try: + return decrypt_attachment(ciphertext, key, sha256, iv) + except (EncryptionError, ValueError, TypeError) as e: + logger.warning( + "Matrix encrypted attachment decryption failed for event {} ({}): {}", + getattr(event, "event_id", ""), + type(e).__name__, + str(e), + ) + return None + + async def _fetch_media_attachment( + self, + room: MatrixRoom, + event: Any, + ) -> tuple[dict[str, Any] | None, str]: + """Download and prepare a Matrix attachment for inbound processing.""" + attachment_type = self._event_attachment_type(event) + mime = self._event_mime(event) + filename = self._event_filename(event, attachment_type) + mxc_url = getattr(event, "url", None) + + if not isinstance(mxc_url, str) or not mxc_url.startswith("mxc://"): + logger.warning( + "Matrix attachment skipped in room {}: invalid mxc URL {}", + room.room_id, + mxc_url, + ) + return None, MATRIX_ATTACHMENT_FAILED_TEMPLATE.format(filename) + + declared_size = self._event_declared_size_bytes(event) + if ( + declared_size is not None + and declared_size > self.config.max_inbound_media_bytes + ): + logger.warning( + "Matrix attachment skipped in room {}: declared size {} exceeds limit {}", + room.room_id, + declared_size, + self.config.max_inbound_media_bytes, + ) + return None, MATRIX_ATTACHMENT_TOO_LARGE_TEMPLATE.format(filename) + + downloaded = await self._download_media_bytes(mxc_url) + if downloaded is None: + return None, MATRIX_ATTACHMENT_FAILED_TEMPLATE.format(filename) + + encrypted = self._is_encrypted_media_event(event) + data = downloaded + if encrypted: + decrypted = self._decrypt_media_bytes(event, downloaded) + if decrypted is None: + return None, MATRIX_ATTACHMENT_FAILED_TEMPLATE.format(filename) + data = decrypted + + if len(data) > self.config.max_inbound_media_bytes: + logger.warning( + "Matrix attachment skipped in room {}: downloaded size {} exceeds limit {}", + room.room_id, + len(data), + self.config.max_inbound_media_bytes, + ) + return None, MATRIX_ATTACHMENT_TOO_LARGE_TEMPLATE.format(filename) + + path = self._build_attachment_path( + event, + attachment_type, + filename, + mime, + ) + try: + path.write_bytes(data) + except OSError as e: + logger.warning( + "Matrix attachment persist failed for room {} ({}): {}", + room.room_id, + type(e).__name__, + str(e), + ) + return None, MATRIX_ATTACHMENT_FAILED_TEMPLATE.format(filename) + + attachment = { + "type": attachment_type, + "mime": mime, + "filename": filename, + "event_id": str(getattr(event, "event_id", "") or ""), + "encrypted": encrypted, + "size_bytes": len(data), + "path": str(path), + "mxc_url": mxc_url, + } + return attachment, MATRIX_ATTACHMENT_MARKER_TEMPLATE.format(path) + async def _on_message(self, room: MatrixRoom, event: RoomMessageText) -> None: # Ignore self messages if event.sender == self.config.user_id: @@ -428,3 +695,41 @@ class MatrixChannel(BaseChannel): except Exception: await self._set_typing(room.room_id, False) raise + + async def _on_media_message(self, room: MatrixRoom, event: Any) -> None: + """Handle inbound Matrix media events and forward local attachment paths.""" + if event.sender == self.config.user_id: + return + + if not self._should_process_message(room, event): + return + + attachment, marker = await self._fetch_media_attachment(room, event) + attachments = [attachment] if attachment else [] + markers = [marker] + media_paths = [a["path"] for a in attachments] + + body = getattr(event, "body", None) + content_parts: list[str] = [] + if isinstance(body, str) and body.strip(): + content_parts.append(body.strip()) + content_parts.extend(markers) + + # TODO: Optionally add audio transcription support for Matrix attachments, + # similar to Telegram's voice/audio flow, behind explicit config. + + await self._set_typing(room.room_id, True) + try: + await self._handle_message( + sender_id=event.sender, + chat_id=room.room_id, + content="\n".join(content_parts), + media=media_paths, + metadata={ + "room": getattr(room, "display_name", room.room_id), + "attachments": attachments, + }, + ) + except Exception: + await self._set_typing(room.room_id, False) + raise diff --git a/nanobot/config/schema.py b/nanobot/config/schema.py index d442104..f0ee410 100644 --- a/nanobot/config/schema.py +++ b/nanobot/config/schema.py @@ -74,6 +74,8 @@ class MatrixConfig(Base): device_id: str = "" # Max seconds to wait for sync_forever to stop gracefully before cancellation fallback. sync_stop_grace_seconds: int = 2 + # Max attachment size accepted from inbound Matrix media events. + max_inbound_media_bytes: int = 20 * 1024 * 1024 allow_from: list[str] = Field(default_factory=list) group_policy: Literal["open", "mention", "allowlist"] = "open" group_allow_from: list[str] = Field(default_factory=list) diff --git a/tests/test_matrix_channel.py b/tests/test_matrix_channel.py index 616b0bc..932e612 100644 --- a/tests/test_matrix_channel.py +++ b/tests/test_matrix_channel.py @@ -1,3 +1,4 @@ +from pathlib import Path from types import SimpleNamespace import pytest @@ -43,6 +44,11 @@ class _FakeAsyncClient: self.response_callbacks: list[tuple[object, object]] = [] self.room_send_calls: list[dict[str, object]] = [] self.typing_calls: list[tuple[str, bool, int]] = [] + self.download_calls: list[dict[str, object]] = [] + self.download_response: object | None = None + self.download_bytes: bytes = b"media" + self.download_content_type: str = "application/octet-stream" + self.download_filename: str | None = None self.raise_on_send = False self.raise_on_typing = False @@ -89,6 +95,16 @@ class _FakeAsyncClient: if self.raise_on_typing: raise RuntimeError("typing failed") + async def download(self, **kwargs): + self.download_calls.append(kwargs) + if self.download_response is not None: + return self.download_response + return matrix_module.MemoryDownloadResponse( + body=self.download_bytes, + content_type=self.download_content_type, + filename=self.download_filename, + ) + async def close(self) -> None: return None @@ -133,6 +149,7 @@ async def test_start_skips_load_store_when_device_id_missing( assert len(clients) == 1 assert clients[0].load_store_called is False + assert len(clients[0].callbacks) == 3 assert len(clients[0].response_callbacks) == 3 await channel.stop() @@ -374,6 +391,212 @@ async def test_on_message_room_mention_requires_opt_in() -> None: assert client.typing_calls == [("!room:matrix.org", True, TYPING_NOTICE_TIMEOUT_MS)] +@pytest.mark.asyncio +async def test_on_media_message_downloads_attachment_and_sets_metadata( + monkeypatch, tmp_path +) -> None: + monkeypatch.setattr("nanobot.channels.matrix.get_data_dir", lambda: tmp_path) + + channel = MatrixChannel(_make_config(), MessageBus()) + client = _FakeAsyncClient("", "", "", None) + client.download_bytes = b"image" + channel.client = client + + handled: list[dict[str, object]] = [] + + async def _fake_handle_message(**kwargs) -> None: + handled.append(kwargs) + + channel._handle_message = _fake_handle_message # type: ignore[method-assign] + + room = SimpleNamespace(room_id="!room:matrix.org", display_name="Test room", member_count=2) + event = SimpleNamespace( + sender="@alice:matrix.org", + body="photo.png", + url="mxc://example.org/mediaid", + event_id="$event1", + source={ + "content": { + "msgtype": "m.image", + "info": {"mimetype": "image/png", "size": 5}, + } + }, + ) + + await channel._on_media_message(room, event) + + assert len(client.download_calls) == 1 + assert len(handled) == 1 + assert client.typing_calls == [("!room:matrix.org", True, TYPING_NOTICE_TIMEOUT_MS)] + + media_paths = handled[0]["media"] + assert isinstance(media_paths, list) and len(media_paths) == 1 + media_path = Path(media_paths[0]) + assert media_path.is_file() + assert media_path.read_bytes() == b"image" + + metadata = handled[0]["metadata"] + attachments = metadata["attachments"] + assert isinstance(attachments, list) and len(attachments) == 1 + assert attachments[0]["type"] == "image" + assert attachments[0]["mxc_url"] == "mxc://example.org/mediaid" + assert attachments[0]["path"] == str(media_path) + assert "[attachment: " in handled[0]["content"] + + +@pytest.mark.asyncio +async def test_on_media_message_respects_declared_size_limit( + monkeypatch, tmp_path +) -> None: + monkeypatch.setattr("nanobot.channels.matrix.get_data_dir", lambda: tmp_path) + + channel = MatrixChannel(_make_config(max_inbound_media_bytes=3), MessageBus()) + client = _FakeAsyncClient("", "", "", None) + channel.client = client + + handled: list[dict[str, object]] = [] + + async def _fake_handle_message(**kwargs) -> None: + handled.append(kwargs) + + channel._handle_message = _fake_handle_message # type: ignore[method-assign] + + room = SimpleNamespace(room_id="!room:matrix.org", display_name="Test room", member_count=2) + event = SimpleNamespace( + sender="@alice:matrix.org", + body="large.bin", + url="mxc://example.org/large", + event_id="$event2", + source={"content": {"msgtype": "m.file", "info": {"size": 10}}}, + ) + + await channel._on_media_message(room, event) + + assert client.download_calls == [] + assert len(handled) == 1 + assert handled[0]["media"] == [] + assert handled[0]["metadata"]["attachments"] == [] + assert "[attachment: large.bin - too large]" in handled[0]["content"] + + +@pytest.mark.asyncio +async def test_on_media_message_handles_download_error(monkeypatch, tmp_path) -> None: + monkeypatch.setattr("nanobot.channels.matrix.get_data_dir", lambda: tmp_path) + + channel = MatrixChannel(_make_config(), MessageBus()) + client = _FakeAsyncClient("", "", "", None) + client.download_response = matrix_module.DownloadError("download failed") + channel.client = client + + handled: list[dict[str, object]] = [] + + async def _fake_handle_message(**kwargs) -> None: + handled.append(kwargs) + + channel._handle_message = _fake_handle_message # type: ignore[method-assign] + + room = SimpleNamespace(room_id="!room:matrix.org", display_name="Test room", member_count=2) + event = SimpleNamespace( + sender="@alice:matrix.org", + body="photo.png", + url="mxc://example.org/mediaid", + event_id="$event3", + source={"content": {"msgtype": "m.image"}}, + ) + + await channel._on_media_message(room, event) + + assert len(client.download_calls) == 1 + assert len(handled) == 1 + assert handled[0]["media"] == [] + assert handled[0]["metadata"]["attachments"] == [] + assert "[attachment: photo.png - download failed]" in handled[0]["content"] + + +@pytest.mark.asyncio +async def test_on_media_message_decrypts_encrypted_media(monkeypatch, tmp_path) -> None: + monkeypatch.setattr("nanobot.channels.matrix.get_data_dir", lambda: tmp_path) + monkeypatch.setattr( + matrix_module, + "decrypt_attachment", + lambda ciphertext, key, sha256, iv: b"plain", + ) + + channel = MatrixChannel(_make_config(), MessageBus()) + client = _FakeAsyncClient("", "", "", None) + client.download_bytes = b"cipher" + channel.client = client + + handled: list[dict[str, object]] = [] + + async def _fake_handle_message(**kwargs) -> None: + handled.append(kwargs) + + channel._handle_message = _fake_handle_message # type: ignore[method-assign] + + room = SimpleNamespace(room_id="!room:matrix.org", display_name="Test room", member_count=2) + event = SimpleNamespace( + sender="@alice:matrix.org", + body="secret.txt", + url="mxc://example.org/encrypted", + event_id="$event4", + key={"k": "key"}, + hashes={"sha256": "hash"}, + iv="iv", + source={"content": {"msgtype": "m.file", "info": {"size": 6}}}, + ) + + await channel._on_media_message(room, event) + + assert len(handled) == 1 + media_path = Path(handled[0]["media"][0]) + assert media_path.read_bytes() == b"plain" + attachment = handled[0]["metadata"]["attachments"][0] + assert attachment["encrypted"] is True + assert attachment["size_bytes"] == 5 + + +@pytest.mark.asyncio +async def test_on_media_message_handles_decrypt_error(monkeypatch, tmp_path) -> None: + monkeypatch.setattr("nanobot.channels.matrix.get_data_dir", lambda: tmp_path) + + def _raise(*args, **kwargs): + raise matrix_module.EncryptionError("boom") + + monkeypatch.setattr(matrix_module, "decrypt_attachment", _raise) + + channel = MatrixChannel(_make_config(), MessageBus()) + client = _FakeAsyncClient("", "", "", None) + client.download_bytes = b"cipher" + channel.client = client + + handled: list[dict[str, object]] = [] + + async def _fake_handle_message(**kwargs) -> None: + handled.append(kwargs) + + channel._handle_message = _fake_handle_message # type: ignore[method-assign] + + room = SimpleNamespace(room_id="!room:matrix.org", display_name="Test room", member_count=2) + event = SimpleNamespace( + sender="@alice:matrix.org", + body="secret.txt", + url="mxc://example.org/encrypted", + event_id="$event5", + key={"k": "key"}, + hashes={"sha256": "hash"}, + iv="iv", + source={"content": {"msgtype": "m.file"}}, + ) + + await channel._on_media_message(room, event) + + assert len(handled) == 1 + assert handled[0]["media"] == [] + assert handled[0]["metadata"]["attachments"] == [] + assert "[attachment: secret.txt - download failed]" in handled[0]["content"] + + @pytest.mark.asyncio async def test_send_clears_typing_after_send() -> None: channel = MatrixChannel(_make_config(), MessageBus())