From 988a85d8de2bcb5dcef9356f933343774f898b1e Mon Sep 17 00:00:00 2001 From: Re-bin Date: Thu, 26 Feb 2026 03:04:01 +0000 Subject: [PATCH] =?UTF-8?q?refactor:=20optimize=20matrix=20channel=20?= =?UTF-8?q?=E2=80=94=20optional=20deps,=20trim=20comments,=20simplify=20me?= =?UTF-8?q?thods?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 6 +- nanobot/agent/loop.py | 2 - nanobot/channels/matrix.py | 936 +++++++++---------------------------- pyproject.toml | 6 +- 4 files changed, 236 insertions(+), 714 deletions(-) diff --git a/README.md b/README.md index 00ffdc4..4ddfc33 100644 --- a/README.md +++ b/README.md @@ -312,7 +312,11 @@ nanobot gateway
Matrix (Element) -Uses Matrix sync via `matrix-nio` (inbound media + outbound file attachments). +Install Matrix dependencies first: + +```bash +pip install nanobot-ai[matrix] +``` **1. Create/choose a Matrix account** diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index b402ea0..3e513cb 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -229,8 +229,6 @@ class AgentLoop: ) else: clean = self._strip_think(response.content) - if on_progress and clean: - await on_progress(clean) messages = self.context.add_assistant_message( messages, clean, reasoning_content=response.reasoning_content, ) diff --git a/nanobot/channels/matrix.py b/nanobot/channels/matrix.py index f85aab5..21192e9 100644 --- a/nanobot/channels/matrix.py +++ b/nanobot/channels/matrix.py @@ -1,4 +1,4 @@ -"""Matrix channel implementation for inbound sync and outbound message/media delivery.""" +"""Matrix (Element) channel — inbound sync + outbound message/media delivery.""" import asyncio import logging @@ -6,109 +6,56 @@ import mimetypes from pathlib import Path from typing import Any, TypeAlias -import nh3 from loguru import logger -from mistune import create_markdown -from nio import ( - AsyncClient, - AsyncClientConfig, - ContentRepositoryConfigError, - DownloadError, - InviteEvent, - JoinError, - MatrixRoom, - MemoryDownloadResponse, - RoomEncryptedMedia, - RoomMessage, - RoomMessageMedia, - RoomMessageText, - RoomSendError, - RoomTypingError, - SyncError, - UploadError, -) -from nio.crypto.attachments import decrypt_attachment -from nio.exceptions import EncryptionError + +try: + import nh3 + from mistune import create_markdown + from nio import ( + AsyncClient, AsyncClientConfig, ContentRepositoryConfigError, + DownloadError, InviteEvent, JoinError, MatrixRoom, MemoryDownloadResponse, + RoomEncryptedMedia, RoomMessage, RoomMessageMedia, RoomMessageText, + RoomSendError, RoomTypingError, SyncError, UploadError, + ) + from nio.crypto.attachments import decrypt_attachment + from nio.exceptions import EncryptionError +except ImportError as e: + raise ImportError( + "Matrix dependencies not installed. Run: pip install nanobot-ai[matrix]" + ) from e from nanobot.bus.events import OutboundMessage from nanobot.channels.base import BaseChannel from nanobot.config.loader import get_data_dir from nanobot.utils.helpers import safe_filename -LOGGING_STACK_BASE_DEPTH = 2 -# Typing state lifetime advertised to Matrix clients/servers. TYPING_NOTICE_TIMEOUT_MS = 30_000 -# Matrix typing notifications are ephemeral; spec guidance is to keep -# refreshing while work is ongoing (practically ~20-30s cadence). -# https://spec.matrix.org/v1.17/client-server-api/#typing-notifications -# Keepalive interval must stay below TYPING_NOTICE_TIMEOUT_MS so the typing -# indicator does not expire while the agent is still processing. +# Must stay below TYPING_NOTICE_TIMEOUT_MS so the indicator doesn't expire mid-processing. TYPING_KEEPALIVE_INTERVAL_MS = 20_000 MATRIX_HTML_FORMAT = "org.matrix.custom.html" -MATRIX_ATTACHMENT_MARKER_TEMPLATE = "[attachment: {}]" -MATRIX_ATTACHMENT_TOO_LARGE_TEMPLATE = "[attachment: {} - too large]" -MATRIX_ATTACHMENT_FAILED_TEMPLATE = "[attachment: {} - download failed]" -MATRIX_ATTACHMENT_UPLOAD_FAILED_TEMPLATE = "[attachment: {} - upload failed]" -MATRIX_DEFAULT_ATTACHMENT_NAME = "attachment" +_ATTACH_MARKER = "[attachment: {}]" +_ATTACH_TOO_LARGE = "[attachment: {} - too large]" +_ATTACH_FAILED = "[attachment: {} - download failed]" +_ATTACH_UPLOAD_FAILED = "[attachment: {} - upload failed]" +_DEFAULT_ATTACH_NAME = "attachment" +_MSGTYPE_MAP = {"m.image": "image", "m.audio": "audio", "m.video": "video", "m.file": "file"} -# Runtime callback filter for nio event dispatch (checked via isinstance). MATRIX_MEDIA_EVENT_FILTER = (RoomMessageMedia, RoomEncryptedMedia) -# Static typing alias for media-specific handlers/helpers. MatrixMediaEvent: TypeAlias = RoomMessageMedia | RoomEncryptedMedia -# Markdown renderer policy: -# https://spec.matrix.org/v1.17/client-server-api/#mroommessage-msgtypes -# - Only enable portable features that map cleanly to Matrix-compatible HTML. -# - escape=True ensures raw model HTML is treated as text unless we explicitly -# add structured support for Matrix-specific HTML features later. MATRIX_MARKDOWN = create_markdown( escape=True, plugins=["table", "strikethrough", "url", "superscript", "subscript"], ) -# Sanitizer policy: -# https://spec.matrix.org/v1.17/client-server-api/#mroommessage-msgtypes -# - Start from Matrix formatted-message guidance, but keep a smaller allowlist -# to reduce risk and keep client behavior predictable for LLM output. -# - Enforce mxc:// for img src to align media rendering with Matrix content -# repository semantics. -# - Unused spec-permitted features (e.g. some href schemes and data-mx-* attrs) -# are intentionally deferred until explicitly needed. MATRIX_ALLOWED_HTML_TAGS = { - "p", - "a", - "strong", - "em", - "del", - "code", - "pre", - "blockquote", - "ul", - "ol", - "li", - "h1", - "h2", - "h3", - "h4", - "h5", - "h6", - "hr", - "br", - "table", - "thead", - "tbody", - "tr", - "th", - "td", - "caption", - "sup", - "sub", - "img", + "p", "a", "strong", "em", "del", "code", "pre", "blockquote", + "ul", "ol", "li", "h1", "h2", "h3", "h4", "h5", "h6", + "hr", "br", "table", "thead", "tbody", "tr", "th", "td", + "caption", "sup", "sub", "img", } MATRIX_ALLOWED_HTML_ATTRIBUTES: dict[str, set[str]] = { - "a": {"href"}, - "code": {"class"}, - "ol": {"start"}, + "a": {"href"}, "code": {"class"}, "ol": {"start"}, "img": {"src", "alt", "title", "width", "height"}, } MATRIX_ALLOWED_URL_SCHEMES = {"https", "http", "matrix", "mailto", "mxc"} @@ -117,22 +64,12 @@ MATRIX_ALLOWED_URL_SCHEMES = {"https", "http", "matrix", "mailto", "mxc"} def _filter_matrix_html_attribute(tag: str, attr: str, value: str) -> str | None: """Filter attribute values to a safe Matrix-compatible subset.""" if tag == "a" and attr == "href": - lower_value = value.lower() - if lower_value.startswith(("https://", "http://", "matrix:", "mailto:")): - return value - return None - + return value if value.lower().startswith(("https://", "http://", "matrix:", "mailto:")) else None if tag == "img" and attr == "src": return value if value.lower().startswith("mxc://") else None - if tag == "code" and attr == "class": - classes = [ - cls - for cls in value.split() - if cls.startswith("language-") and not cls.startswith("language-_") - ] + classes = [c for c in value.split() if c.startswith("language-") and not c.startswith("language-_")] return " ".join(classes) if classes else None - return value @@ -147,100 +84,59 @@ MATRIX_HTML_CLEANER = nh3.Cleaner( def _render_markdown_html(text: str) -> str | None: - """Render markdown to HTML for Matrix formatted messages.""" + """Render markdown to sanitized HTML; returns None for plain text.""" try: - rendered = MATRIX_MARKDOWN(text) - formatted = MATRIX_HTML_CLEANER.clean(rendered).strip() - except Exception as e: - logger.debug( - "Matrix markdown rendering failed ({}): {}", - type(e).__name__, - str(e), - ) + formatted = MATRIX_HTML_CLEANER.clean(MATRIX_MARKDOWN(text)).strip() + except Exception: return None - if not formatted: return None - - # Skip formatted_body for plain output (

...

) to keep payload minimal. - stripped = formatted.strip() - if stripped.startswith("

") and stripped.endswith("

"): - paragraph_inner = stripped[3:-4] - # Keep plaintext-only paragraphs minimal, but preserve inline markup/links. - if "<" not in paragraph_inner and ">" not in paragraph_inner: + # Skip formatted_body for plain

text

to keep payload minimal. + if formatted.startswith("

") and formatted.endswith("

"): + inner = formatted[3:-4] + if "<" not in inner and ">" not in inner: return None - return formatted def _build_matrix_text_content(text: str) -> dict[str, object]: - """Build Matrix m.text payload with plaintext fallback and optional HTML.""" - content: dict[str, object] = { - "msgtype": "m.text", - # Note: When `formatted_body` is present, Matrix spec expects `body` to - # be its plaintext representation (fallback for clients without HTML). - # We currently keep raw text (often markdown) for simplicity. - # https://spec.matrix.org/v1.17/client-server-api/#mroommessage-msgtypes - "body": text, - # Matrix spec recommends always including m.mentions for message - # semantics/interoperability, even when no mentions are present. - # https://spec.matrix.org/v1.17/client-server-api/#mmentions - "m.mentions": {}, - } - formatted_html = _render_markdown_html(text) - if not formatted_html: - return content - - content["format"] = MATRIX_HTML_FORMAT - content["formatted_body"] = formatted_html + """Build Matrix m.text payload with optional HTML formatted_body.""" + content: dict[str, object] = {"msgtype": "m.text", "body": text, "m.mentions": {}} + if html := _render_markdown_html(text): + content["format"] = MATRIX_HTML_FORMAT + content["formatted_body"] = html return content class _NioLoguruHandler(logging.Handler): - """Route stdlib logging records from matrix-nio into Loguru output.""" + """Route matrix-nio stdlib logs into Loguru.""" def emit(self, record: logging.LogRecord) -> None: try: level = logger.level(record.levelname).name except ValueError: level = record.levelno - - frame = logging.currentframe() - # Skip logging internals plus this handler frame when forwarding to Loguru. - depth = LOGGING_STACK_BASE_DEPTH + frame, depth = logging.currentframe(), 2 while frame and frame.f_code.co_filename == logging.__file__: - frame = frame.f_back - depth += 1 - + frame, depth = frame.f_back, depth + 1 logger.opt(depth=depth, exception=record.exc_info).log(level, record.getMessage()) def _configure_nio_logging_bridge() -> None: - """Ensure matrix-nio logs are emitted through the project's Loguru format.""" + """Bridge matrix-nio logs to Loguru (idempotent).""" nio_logger = logging.getLogger("nio") - if any(isinstance(handler, _NioLoguruHandler) for handler in nio_logger.handlers): - return - - nio_logger.handlers = [_NioLoguruHandler()] - nio_logger.propagate = False + if not any(isinstance(h, _NioLoguruHandler) for h in nio_logger.handlers): + nio_logger.handlers = [_NioLoguruHandler()] + nio_logger.propagate = False class MatrixChannel(BaseChannel): - """ - Matrix (Element) channel using long-polling sync. - """ + """Matrix (Element) channel using long-polling sync.""" name = "matrix" - def __init__( - self, - config: Any, - bus, - *, - restrict_to_workspace: bool = False, - workspace: Path | None = None, - ): - """Store Matrix client settings, task handles, and outbound media policy flags.""" + def __init__(self, config: Any, bus, *, restrict_to_workspace: bool = False, + workspace: Path | None = None): super().__init__(config, bus) self.client: AsyncClient | None = None self._sync_task: asyncio.Task | None = None @@ -259,15 +155,10 @@ class MatrixChannel(BaseChannel): store_path.mkdir(parents=True, exist_ok=True) self.client = AsyncClient( - homeserver=self.config.homeserver, - user=self.config.user_id, - store_path=store_path, # Where tokens are saved - config=AsyncClientConfig( - store_sync_tokens=True, # Auto-persists next_batch tokens - encryption_enabled=self.config.e2ee_enabled, - ), + homeserver=self.config.homeserver, user=self.config.user_id, + store_path=store_path, + config=AsyncClientConfig(store_sync_tokens=True, encryption_enabled=self.config.e2ee_enabled), ) - self.client.user_id = self.config.user_id self.client.access_token = self.config.access_token self.client.device_id = self.config.device_id @@ -275,78 +166,43 @@ class MatrixChannel(BaseChannel): self._register_event_callbacks() self._register_response_callbacks() - if self.config.e2ee_enabled: - logger.info("Matrix E2EE is enabled.") - else: - logger.warning( - "Matrix E2EE is disabled; encrypted room messages may be undecryptable and " - "encrypted-device verification is not applied on send." - ) + if not self.config.e2ee_enabled: + logger.warning("Matrix E2EE disabled; encrypted rooms may be undecryptable.") if self.config.device_id: try: self.client.load_store() - except Exception as e: - logger.warning( - "Matrix store load failed ({}: {}); sync token restore is disabled and " - "restart may replay recent messages.", - type(e).__name__, - str(e), - ) + except Exception: + logger.exception("Matrix store load failed; restart may replay recent messages.") else: - logger.warning( - "Matrix device_id is empty; sync token restore is disabled and restart may " - "replay recent messages." - ) + logger.warning("Matrix device_id empty; restart may replay recent messages.") self._sync_task = asyncio.create_task(self._sync_loop()) async def stop(self) -> None: """Stop the Matrix channel with graceful sync shutdown.""" self._running = False - for room_id in list(self._typing_tasks): await self._stop_typing_keepalive(room_id, clear_typing=False) - if self.client: - # Request sync_forever loop to exit cleanly. self.client.stop_sync_forever() - if self._sync_task: try: - await asyncio.wait_for( - asyncio.shield(self._sync_task), - timeout=self.config.sync_stop_grace_seconds, - ) - except asyncio.TimeoutError: + await asyncio.wait_for(asyncio.shield(self._sync_task), + timeout=self.config.sync_stop_grace_seconds) + except (asyncio.TimeoutError, asyncio.CancelledError): self._sync_task.cancel() try: await self._sync_task except asyncio.CancelledError: pass - except asyncio.CancelledError: - pass - if self.client: await self.client.close() - @staticmethod - def _path_dedupe_key(path: Path) -> str: - """Return a stable deduplication key for attachment paths.""" - expanded = path.expanduser() - try: - return str(expanded.resolve(strict=False)) - except OSError: - return str(expanded) - def _is_workspace_path_allowed(self, path: Path) -> bool: - """Enforce optional workspace-only outbound attachment policy.""" - if not self._restrict_to_workspace: + """Check path is inside workspace (when restriction enabled).""" + if not self._restrict_to_workspace or not self._workspace: return True - - if self._workspace is None: - return False - try: path.resolve(strict=False).relative_to(self._workspace) return True @@ -354,288 +210,150 @@ class MatrixChannel(BaseChannel): return False def _collect_outbound_media_candidates(self, media: list[str]) -> list[Path]: - """Collect unique outbound attachment paths from OutboundMessage.media.""" - candidates: list[Path] = [] + """Deduplicate and resolve outbound attachment paths.""" seen: set[str] = set() - + candidates: list[Path] = [] for raw in media: if not isinstance(raw, str) or not raw.strip(): continue path = Path(raw.strip()).expanduser() - key = self._path_dedupe_key(path) - if key in seen: - continue - seen.add(key) - candidates.append(path) - + try: + key = str(path.resolve(strict=False)) + except OSError: + key = str(path) + if key not in seen: + seen.add(key) + candidates.append(path) return candidates @staticmethod def _build_outbound_attachment_content( - *, - filename: str, - mime: str, - size_bytes: int, - mxc_url: str, - encryption_info: dict[str, Any] | None = None, + *, filename: str, mime: str, size_bytes: int, + mxc_url: str, encryption_info: dict[str, Any] | None = None, ) -> dict[str, Any]: """Build Matrix content payload for an uploaded file/image/audio/video.""" - msgtype = "m.file" - if mime.startswith("image/"): - msgtype = "m.image" - elif mime.startswith("audio/"): - msgtype = "m.audio" - elif mime.startswith("video/"): - msgtype = "m.video" - + prefix = mime.split("/")[0] + msgtype = {"image": "m.image", "audio": "m.audio", "video": "m.video"}.get(prefix, "m.file") content: dict[str, Any] = { - "msgtype": msgtype, - "body": filename, - "filename": filename, - "info": { - "mimetype": mime, - "size": size_bytes, - }, - "m.mentions": {}, + "msgtype": msgtype, "body": filename, "filename": filename, + "info": {"mimetype": mime, "size": size_bytes}, "m.mentions": {}, } - if encryption_info: - # Encrypted media events use `file` metadata (with url/hash/key/iv), - # while unencrypted media events use top-level `url`. - file_info = dict(encryption_info) - file_info["url"] = mxc_url - content["file"] = file_info + content["file"] = {**encryption_info, "url": mxc_url} else: content["url"] = mxc_url - return content def _is_encrypted_room(self, room_id: str) -> bool: - """Return True if the Matrix room is known as encrypted.""" if not self.client: return False room = getattr(self.client, "rooms", {}).get(room_id) return bool(getattr(room, "encrypted", False)) async def _send_room_content(self, room_id: str, content: dict[str, Any]) -> None: - """Send Matrix m.room.message content with configured E2EE send options.""" + """Send m.room.message with E2EE options.""" if not self.client: return - - room_send_kwargs: dict[str, Any] = { - "room_id": room_id, - "message_type": "m.room.message", - "content": content, - } + kwargs: dict[str, Any] = {"room_id": room_id, "message_type": "m.room.message", "content": content} if self.config.e2ee_enabled: - # TODO(matrix): Add explicit config for strict verified-device sending mode. - room_send_kwargs["ignore_unverified_devices"] = True - - await self.client.room_send(**room_send_kwargs) + kwargs["ignore_unverified_devices"] = True + await self.client.room_send(**kwargs) async def _resolve_server_upload_limit_bytes(self) -> int | None: - """Resolve homeserver-advertised upload limit once per channel lifecycle.""" + """Query homeserver upload limit once per channel lifecycle.""" if self._server_upload_limit_checked: return self._server_upload_limit_bytes - self._server_upload_limit_checked = True if not self.client: return None - try: response = await self.client.content_repository_config() - except Exception as e: - logger.debug( - "Matrix media config lookup failed ({}): {}", - type(e).__name__, - str(e), - ) + except Exception: return None - upload_size = getattr(response, "upload_size", None) if isinstance(upload_size, int) and upload_size > 0: self._server_upload_limit_bytes = upload_size - return self._server_upload_limit_bytes - - if isinstance(response, ContentRepositoryConfigError): - logger.debug("Matrix media config lookup failed: {}", response) - return None - - logger.debug( - "Matrix media config lookup returned unexpected response {}", - type(response).__name__, - ) + return upload_size return None async def _effective_media_limit_bytes(self) -> int: - """ - Compute effective Matrix media size cap. - - `m.upload.size` (if advertised) is treated as the homeserver-side cap. - `maxMediaBytes` is a local hard limit/fallback. Using the stricter value - keeps resource usage predictable while honoring server constraints. - """ + """min(local config, server advertised) — 0 blocks all uploads.""" local_limit = max(int(self.config.max_media_bytes), 0) server_limit = await self._resolve_server_upload_limit_bytes() if server_limit is None: return local_limit - if local_limit == 0: - return 0 - return min(local_limit, server_limit) - - def _configured_media_limit_bytes(self) -> int: - """Resolve the configured local media limit with backward compatibility.""" - for name in ("max_inbound_media_bytes", "max_media_bytes"): - value = getattr(self.config, name, None) - if isinstance(value, int): - return value - return 0 + return min(local_limit, server_limit) if local_limit else 0 async def _upload_and_send_attachment( - self, - room_id: str, - path: Path, - limit_bytes: int, + self, room_id: str, path: Path, limit_bytes: int, relates_to: dict[str, Any] | None = None, ) -> str | None: - """Upload one local file to Matrix and send it as a media message.""" + """Upload one local file to Matrix and send it as a media message. Returns failure marker or None.""" if not self.client: - return MATRIX_ATTACHMENT_UPLOAD_FAILED_TEMPLATE.format(path.name or MATRIX_DEFAULT_ATTACHMENT_NAME) + return _ATTACH_UPLOAD_FAILED.format(path.name or _DEFAULT_ATTACH_NAME) resolved = path.expanduser().resolve(strict=False) - filename = safe_filename(resolved.name) or MATRIX_DEFAULT_ATTACHMENT_NAME - - if not resolved.is_file(): - logger.warning("Matrix outbound attachment missing file: {}", resolved) - return MATRIX_ATTACHMENT_UPLOAD_FAILED_TEMPLATE.format(filename) - - if not self._is_workspace_path_allowed(resolved): - logger.warning( - "Matrix outbound attachment denied by workspace restriction: {}", - resolved, - ) - return MATRIX_ATTACHMENT_UPLOAD_FAILED_TEMPLATE.format(filename) + filename = safe_filename(resolved.name) or _DEFAULT_ATTACH_NAME + fail = _ATTACH_UPLOAD_FAILED.format(filename) + if not resolved.is_file() or not self._is_workspace_path_allowed(resolved): + return fail try: size_bytes = resolved.stat().st_size - except OSError as e: - logger.warning( - "Matrix outbound attachment stat failed for {} ({}): {}", - resolved, - type(e).__name__, - str(e), - ) - return MATRIX_ATTACHMENT_UPLOAD_FAILED_TEMPLATE.format(filename) - - if limit_bytes <= 0: - logger.warning( - "Matrix outbound attachment skipped: media limit {} blocks all uploads for {}", - limit_bytes, - resolved, - ) - return MATRIX_ATTACHMENT_TOO_LARGE_TEMPLATE.format(filename) - - if size_bytes > limit_bytes: - logger.warning( - "Matrix outbound attachment skipped: {} bytes exceeds limit {} for {}", - size_bytes, - limit_bytes, - resolved, - ) - return MATRIX_ATTACHMENT_TOO_LARGE_TEMPLATE.format(filename) + except OSError: + return fail + if limit_bytes <= 0 or size_bytes > limit_bytes: + return _ATTACH_TOO_LARGE.format(filename) mime = mimetypes.guess_type(filename, strict=False)[0] or "application/octet-stream" - encrypt_upload = self.config.e2ee_enabled and self._is_encrypted_room(room_id) try: - with resolved.open("rb") as data_provider: + with resolved.open("rb") as f: upload_result = await self.client.upload( - data_provider, - content_type=mime, - filename=filename, - encrypt=encrypt_upload, + f, content_type=mime, filename=filename, + encrypt=self.config.e2ee_enabled and self._is_encrypted_room(room_id), filesize=size_bytes, ) - except Exception as e: - logger.warning( - "Matrix outbound attachment upload failed for {} ({}): {}", - resolved, - type(e).__name__, - str(e), - ) - return MATRIX_ATTACHMENT_UPLOAD_FAILED_TEMPLATE.format(filename) - upload_response = upload_result[0] if isinstance(upload_result, tuple) else upload_result - encryption_info: dict[str, Any] | None = None - if isinstance(upload_result, tuple) and isinstance(upload_result[1], dict): - encryption_info = upload_result[1] - if isinstance(upload_response, UploadError): - logger.warning( - "Matrix outbound attachment upload failed for {}: {}", - resolved, - upload_response, - ) - return MATRIX_ATTACHMENT_UPLOAD_FAILED_TEMPLATE.format(filename) + except Exception: + return fail + upload_response = upload_result[0] if isinstance(upload_result, tuple) else upload_result + encryption_info = upload_result[1] if isinstance(upload_result, tuple) and isinstance(upload_result[1], dict) else None + if isinstance(upload_response, UploadError): + return fail mxc_url = getattr(upload_response, "content_uri", None) if not isinstance(mxc_url, str) or not mxc_url.startswith("mxc://"): - logger.warning( - "Matrix outbound attachment upload returned unexpected response {} for {}", - type(upload_response).__name__, - resolved, - ) - return MATRIX_ATTACHMENT_UPLOAD_FAILED_TEMPLATE.format(filename) + return fail content = self._build_outbound_attachment_content( - filename=filename, - mime=mime, - size_bytes=size_bytes, - mxc_url=mxc_url, - encryption_info=encryption_info, + filename=filename, mime=mime, size_bytes=size_bytes, + mxc_url=mxc_url, encryption_info=encryption_info, ) if relates_to: content["m.relates_to"] = relates_to try: await self._send_room_content(room_id, content) - except Exception as e: - logger.warning( - "Matrix outbound attachment send failed for {} ({}): {}", - resolved, - type(e).__name__, - str(e), - ) - return MATRIX_ATTACHMENT_UPLOAD_FAILED_TEMPLATE.format(filename) + except Exception: + return fail return None async def send(self, msg: OutboundMessage) -> None: - """Send Matrix outbound content and clear typing only for non-progress messages.""" + """Send outbound content; clear typing for non-progress messages.""" if not self.client: return - text = msg.content or "" candidates = self._collect_outbound_media_candidates(msg.media) relates_to = self._build_thread_relates_to(msg.metadata) is_progress = bool((msg.metadata or {}).get("_progress")) - try: failures: list[str] = [] - if candidates: limit_bytes = await self._effective_media_limit_bytes() for path in candidates: - failure_marker = await self._upload_and_send_attachment( - room_id=msg.chat_id, - path=path, - limit_bytes=limit_bytes, - relates_to=relates_to, - ) - if failure_marker: - failures.append(failure_marker) - + if fail := await self._upload_and_send_attachment( + msg.chat_id, path, limit_bytes, relates_to): + failures.append(fail) if failures: - if text.strip(): - text = f"{text.rstrip()}\n" + "\n".join(failures) - else: - text = "\n".join(failures) - + text = f"{text.rstrip()}\n{chr(10).join(failures)}" if text.strip() else "\n".join(failures) if text or not candidates: content = _build_matrix_text_content(text) if relates_to: @@ -646,73 +364,51 @@ class MatrixChannel(BaseChannel): await self._stop_typing_keepalive(msg.chat_id, clear_typing=True) def _register_event_callbacks(self) -> None: - """Register Matrix event callbacks used by this channel.""" self.client.add_event_callback(self._on_message, RoomMessageText) self.client.add_event_callback(self._on_media_message, MATRIX_MEDIA_EVENT_FILTER) self.client.add_event_callback(self._on_room_invite, InviteEvent) def _register_response_callbacks(self) -> None: - """Register response callbacks for operational error observability.""" self.client.add_response_callback(self._on_sync_error, SyncError) self.client.add_response_callback(self._on_join_error, JoinError) self.client.add_response_callback(self._on_send_error, RoomSendError) - @staticmethod - def _is_auth_error(errcode: str | None) -> bool: - """Return True if the Matrix errcode indicates auth/token problems.""" - return errcode in {"M_UNKNOWN_TOKEN", "M_FORBIDDEN", "M_UNAUTHORIZED"} + def _log_response_error(self, label: str, response: Any) -> None: + """Log Matrix response errors — auth errors at ERROR level, rest at WARNING.""" + code = getattr(response, "status_code", None) + is_auth = code in {"M_UNKNOWN_TOKEN", "M_FORBIDDEN", "M_UNAUTHORIZED"} + is_fatal = is_auth or getattr(response, "soft_logout", False) + (logger.error if is_fatal else logger.warning)("Matrix {} failed: {}", label, response) async def _on_sync_error(self, response: SyncError) -> None: - """Log sync errors with clear severity.""" - if self._is_auth_error(response.status_code) or response.soft_logout: - logger.error("Matrix sync failed: {}", response) - return - logger.warning("Matrix sync warning: {}", response) + self._log_response_error("sync", response) async def _on_join_error(self, response: JoinError) -> None: - """Log room-join errors from invite handling.""" - if self._is_auth_error(response.status_code): - logger.error("Matrix join failed: {}", response) - return - logger.warning("Matrix join warning: {}", response) + self._log_response_error("join", response) async def _on_send_error(self, response: RoomSendError) -> None: - """Log message send failures.""" - if self._is_auth_error(response.status_code): - logger.error("Matrix send failed: {}", response) - return - logger.warning("Matrix send warning: {}", response) + self._log_response_error("send", response) async def _set_typing(self, room_id: str, typing: bool) -> None: - """Best-effort typing indicator update that never blocks message flow.""" + """Best-effort typing indicator update.""" if not self.client: return - try: - response = await self.client.room_typing( - room_id=room_id, - typing_state=typing, - timeout=TYPING_NOTICE_TIMEOUT_MS, - ) + response = await self.client.room_typing(room_id=room_id, typing_state=typing, + timeout=TYPING_NOTICE_TIMEOUT_MS) if isinstance(response, RoomTypingError): - logger.debug("Matrix typing update failed for room {}: {}", room_id, response) - except Exception as e: - logger.debug( - "Matrix typing update failed for room {} (typing={}): {}: {}", - room_id, - typing, - type(e).__name__, - str(e), - ) + logger.debug("Matrix typing failed for {}: {}", room_id, response) + except Exception: + pass async def _start_typing_keepalive(self, room_id: str) -> None: - """Start periodic Matrix typing refresh for a room (spec-recommended keepalive).""" + """Start periodic typing refresh (spec-recommended keepalive).""" await self._stop_typing_keepalive(room_id, clear_typing=False) await self._set_typing(room_id, True) if not self._running: return - async def _typing_loop() -> None: + async def loop() -> None: try: while self._running: await asyncio.sleep(TYPING_KEEPALIVE_INTERVAL_MS / 1000) @@ -720,31 +416,21 @@ class MatrixChannel(BaseChannel): except asyncio.CancelledError: pass - self._typing_tasks[room_id] = asyncio.create_task(_typing_loop()) + self._typing_tasks[room_id] = asyncio.create_task(loop()) - async def _stop_typing_keepalive( - self, - room_id: str, - *, - clear_typing: bool, - ) -> None: - """Stop periodic Matrix typing refresh for a room.""" - task = self._typing_tasks.pop(room_id, None) - if task: + async def _stop_typing_keepalive(self, room_id: str, *, clear_typing: bool) -> None: + if task := self._typing_tasks.pop(room_id, None): task.cancel() try: await task except asyncio.CancelledError: pass - if clear_typing: await self._set_typing(room_id, False) async def _sync_loop(self) -> None: while self._running: try: - # full_state applies only to the first sync inside sync_forever and helps - # rebuild room state when restoring from stored sync tokens. await self.client.sync_forever(timeout=30000, full_state=True) except asyncio.CancelledError: break @@ -753,63 +439,48 @@ class MatrixChannel(BaseChannel): async def _on_room_invite(self, room: MatrixRoom, event: InviteEvent) -> None: allow_from = self.config.allow_from or [] - if allow_from and event.sender not in allow_from: - return - - await self.client.join(room.room_id) + if not allow_from or event.sender in allow_from: + await self.client.join(room.room_id) def _is_direct_room(self, room: MatrixRoom) -> bool: - """Return True if the room behaves like a DM (2 or fewer members).""" - member_count = getattr(room, "member_count", None) - return isinstance(member_count, int) and member_count <= 2 + count = getattr(room, "member_count", None) + return isinstance(count, int) and count <= 2 - def _is_bot_mentioned_from_mx_mentions(self, event: RoomMessage) -> bool: - """Resolve mentions strictly from Matrix-native m.mentions payload.""" + def _is_bot_mentioned(self, event: RoomMessage) -> bool: + """Check m.mentions payload for bot mention.""" source = getattr(event, "source", None) if not isinstance(source, dict): return False - - content = source.get("content") - if not isinstance(content, dict): - return False - - mentions = content.get("m.mentions") + mentions = (source.get("content") or {}).get("m.mentions") if not isinstance(mentions, dict): return False - user_ids = mentions.get("user_ids") if isinstance(user_ids, list) and self.config.user_id in user_ids: return True - return bool(self.config.allow_room_mentions and mentions.get("room") is True) def _should_process_message(self, room: MatrixRoom, event: RoomMessage) -> bool: - """Apply sender and room policy checks before processing Matrix messages.""" + """Apply sender and room policy checks.""" if not self.is_allowed(event.sender): return False - if self._is_direct_room(room): return True - policy = self.config.group_policy if policy == "open": return True if policy == "allowlist": return room.room_id in (self.config.group_allow_from or []) if policy == "mention": - return self._is_bot_mentioned_from_mx_mentions(event) - + return self._is_bot_mentioned(event) return False def _media_dir(self) -> Path: - """Return directory used to persist downloaded Matrix attachments.""" - media_dir = get_data_dir() / "media" / "matrix" - media_dir.mkdir(parents=True, exist_ok=True) - return media_dir + d = get_data_dir() / "media" / "matrix" + d.mkdir(parents=True, exist_ok=True) + return d @staticmethod def _event_source_content(event: RoomMessage) -> dict[str, Any]: - """Extract Matrix event content payload when available.""" source = getattr(event, "source", None) if not isinstance(source, dict): return {} @@ -817,30 +488,22 @@ class MatrixChannel(BaseChannel): return content if isinstance(content, dict) else {} def _event_thread_root_id(self, event: RoomMessage) -> str | None: - """Return thread root event_id if this message is inside a thread.""" - content = self._event_source_content(event) - relates_to = content.get("m.relates_to") - if not isinstance(relates_to, dict): - return None - if relates_to.get("rel_type") != "m.thread": + relates_to = self._event_source_content(event).get("m.relates_to") + if not isinstance(relates_to, dict) or relates_to.get("rel_type") != "m.thread": return None root_id = relates_to.get("event_id") return root_id if isinstance(root_id, str) and root_id else None def _thread_metadata(self, event: RoomMessage) -> dict[str, str] | None: - """Build metadata used to reply within a thread.""" - root_id = self._event_thread_root_id(event) - if not root_id: + if not (root_id := self._event_thread_root_id(event)): return None - reply_to = getattr(event, "event_id", None) meta: dict[str, str] = {"thread_root_event_id": root_id} - if isinstance(reply_to, str) and reply_to: + if isinstance(reply_to := getattr(event, "event_id", None), str) and reply_to: meta["thread_reply_to_event_id"] = reply_to return meta @staticmethod def _build_thread_relates_to(metadata: dict[str, Any] | None) -> dict[str, Any] | None: - """Build m.relates_to payload for Matrix thread replies.""" if not metadata: return None root_id = metadata.get("thread_root_event_id") @@ -849,315 +512,170 @@ class MatrixChannel(BaseChannel): reply_to = metadata.get("thread_reply_to_event_id") or metadata.get("event_id") if not isinstance(reply_to, str) or not reply_to: return None - return { - "rel_type": "m.thread", - "event_id": root_id, - "m.in_reply_to": {"event_id": reply_to}, - "is_falling_back": True, - } + return {"rel_type": "m.thread", "event_id": root_id, + "m.in_reply_to": {"event_id": reply_to}, "is_falling_back": True} def _event_attachment_type(self, event: MatrixMediaEvent) -> str: - """Map Matrix event payload/type to a stable attachment kind.""" msgtype = self._event_source_content(event).get("msgtype") - if msgtype == "m.image": - return "image" - if msgtype == "m.audio": - return "audio" - if msgtype == "m.video": - return "video" - if msgtype == "m.file": - return "file" - - class_name = type(event).__name__.lower() - if "image" in class_name: - return "image" - if "audio" in class_name: - return "audio" - if "video" in class_name: - return "video" - return "file" + return _MSGTYPE_MAP.get(msgtype, "file") @staticmethod def _is_encrypted_media_event(event: MatrixMediaEvent) -> bool: - """Return True for encrypted Matrix media events.""" - return ( - isinstance(getattr(event, "key", None), dict) - and isinstance(getattr(event, "hashes", None), dict) - and isinstance(getattr(event, "iv", None), str) - ) + return (isinstance(getattr(event, "key", None), dict) + and isinstance(getattr(event, "hashes", None), dict) + and isinstance(getattr(event, "iv", None), str)) def _event_declared_size_bytes(self, event: MatrixMediaEvent) -> int | None: - """Return declared media size from Matrix event info, if present.""" info = self._event_source_content(event).get("info") - if not isinstance(info, dict): - return None - size = info.get("size") - if isinstance(size, int) and size >= 0: - return size - return None + size = info.get("size") if isinstance(info, dict) else None + return size if isinstance(size, int) and size >= 0 else None def _event_mime(self, event: MatrixMediaEvent) -> str | None: - """Best-effort MIME extraction from Matrix media event.""" info = self._event_source_content(event).get("info") - if isinstance(info, dict): - mime = info.get("mimetype") - if isinstance(mime, str) and mime: - return mime - - mime = getattr(event, "mimetype", None) - if isinstance(mime, str) and mime: - return mime - return None + if isinstance(info, dict) and isinstance(m := info.get("mimetype"), str) and m: + return m + m = getattr(event, "mimetype", None) + return m if isinstance(m, str) and m else None def _event_filename(self, event: MatrixMediaEvent, attachment_type: str) -> str: - """Build a safe filename for a Matrix attachment.""" body = getattr(event, "body", None) if isinstance(body, str) and body.strip(): - candidate = safe_filename(Path(body).name) - if candidate: + if candidate := safe_filename(Path(body).name): return candidate - return MATRIX_DEFAULT_ATTACHMENT_NAME if attachment_type == "file" else attachment_type + return _DEFAULT_ATTACH_NAME if attachment_type == "file" else attachment_type - def _build_attachment_path( - self, - event: MatrixMediaEvent, - attachment_type: str, - filename: str, - mime: str | None, - ) -> Path: - """Compute a deterministic local file path for a downloaded attachment.""" - safe_name = safe_filename(Path(filename).name) or MATRIX_DEFAULT_ATTACHMENT_NAME + def _build_attachment_path(self, event: MatrixMediaEvent, attachment_type: str, + filename: str, mime: str | None) -> Path: + safe_name = safe_filename(Path(filename).name) or _DEFAULT_ATTACH_NAME suffix = Path(safe_name).suffix if not suffix and mime: - guessed = mimetypes.guess_extension(mime, strict=False) or "" - if guessed: - safe_name = f"{safe_name}{guessed}" - suffix = guessed - - stem = Path(safe_name).stem or attachment_type - stem = stem[:72] + if guessed := mimetypes.guess_extension(mime, strict=False): + safe_name, suffix = f"{safe_name}{guessed}", guessed + stem = (Path(safe_name).stem or attachment_type)[:72] suffix = suffix[:16] - event_id = safe_filename(str(getattr(event, "event_id", "") or "evt").lstrip("$")) event_prefix = (event_id[:24] or "evt").strip("_") return self._media_dir() / f"{event_prefix}_{stem}{suffix}" async def _download_media_bytes(self, mxc_url: str) -> bytes | None: - """Download media bytes from Matrix content repository.""" if not self.client: return None - response = await self.client.download(mxc=mxc_url) if isinstance(response, DownloadError): - logger.warning("Matrix attachment download failed for {}: {}", mxc_url, response) + logger.warning("Matrix download failed for {}: {}", mxc_url, response) return None - body = getattr(response, "body", None) if isinstance(body, (bytes, bytearray)): return bytes(body) - if isinstance(response, MemoryDownloadResponse): return bytes(response.body) - if isinstance(body, (str, Path)): path = Path(body) if path.is_file(): try: return path.read_bytes() - except OSError as e: - logger.warning( - "Matrix attachment read failed for {} ({}): {}", - mxc_url, - type(e).__name__, - str(e), - ) + except OSError: return None - - logger.warning( - "Matrix attachment download failed for {}: unexpected response type {}", - mxc_url, - type(response).__name__, - ) return None def _decrypt_media_bytes(self, event: MatrixMediaEvent, ciphertext: bytes) -> bytes | None: - """Decrypt encrypted Matrix attachment bytes.""" - key_obj = getattr(event, "key", None) - hashes = getattr(event, "hashes", None) - iv = getattr(event, "iv", None) - + key_obj, hashes, iv = getattr(event, "key", None), getattr(event, "hashes", None), getattr(event, "iv", None) key = key_obj.get("k") if isinstance(key_obj, dict) else None sha256 = hashes.get("sha256") if isinstance(hashes, dict) else None - if not isinstance(key, str) or not isinstance(sha256, str) or not isinstance(iv, str): - logger.warning( - "Matrix encrypted attachment missing key material for event {}", - getattr(event, "event_id", ""), - ) + if not all(isinstance(v, str) for v in (key, sha256, iv)): return None - try: return decrypt_attachment(ciphertext, key, sha256, iv) - except (EncryptionError, ValueError, TypeError) as e: - logger.warning( - "Matrix encrypted attachment decryption failed for event {} ({}): {}", - getattr(event, "event_id", ""), - type(e).__name__, - str(e), - ) + except (EncryptionError, ValueError, TypeError): + logger.warning("Matrix decrypt failed for event {}", getattr(event, "event_id", "")) return None async def _fetch_media_attachment( - self, - room: MatrixRoom, - event: MatrixMediaEvent, + self, room: MatrixRoom, event: MatrixMediaEvent, ) -> tuple[dict[str, Any] | None, str]: - """Download and prepare a Matrix attachment for inbound processing.""" - attachment_type = self._event_attachment_type(event) + """Download, decrypt if needed, and persist a Matrix attachment.""" + atype = self._event_attachment_type(event) mime = self._event_mime(event) - filename = self._event_filename(event, attachment_type) + filename = self._event_filename(event, atype) mxc_url = getattr(event, "url", None) + fail = _ATTACH_FAILED.format(filename) if not isinstance(mxc_url, str) or not mxc_url.startswith("mxc://"): - logger.warning( - "Matrix attachment skipped in room {}: invalid mxc URL {}", - room.room_id, - mxc_url, - ) - return None, MATRIX_ATTACHMENT_FAILED_TEMPLATE.format(filename) + return None, fail limit_bytes = await self._effective_media_limit_bytes() - declared_size = self._event_declared_size_bytes(event) - if ( - declared_size is not None - and declared_size > limit_bytes - ): - logger.warning( - "Matrix attachment skipped in room {}: declared size {} exceeds limit {}", - room.room_id, - declared_size, - limit_bytes, - ) - return None, MATRIX_ATTACHMENT_TOO_LARGE_TEMPLATE.format(filename) + declared = self._event_declared_size_bytes(event) + if declared is not None and declared > limit_bytes: + return None, _ATTACH_TOO_LARGE.format(filename) downloaded = await self._download_media_bytes(mxc_url) if downloaded is None: - return None, MATRIX_ATTACHMENT_FAILED_TEMPLATE.format(filename) + return None, fail encrypted = self._is_encrypted_media_event(event) data = downloaded if encrypted: - decrypted = self._decrypt_media_bytes(event, downloaded) - if decrypted is None: - return None, MATRIX_ATTACHMENT_FAILED_TEMPLATE.format(filename) - data = decrypted + if (data := self._decrypt_media_bytes(event, downloaded)) is None: + return None, fail if len(data) > limit_bytes: - logger.warning( - "Matrix attachment skipped in room {}: downloaded size {} exceeds limit {}", - room.room_id, - len(data), - limit_bytes, - ) - return None, MATRIX_ATTACHMENT_TOO_LARGE_TEMPLATE.format(filename) + return None, _ATTACH_TOO_LARGE.format(filename) - path = self._build_attachment_path( - event, - attachment_type, - filename, - mime, - ) + path = self._build_attachment_path(event, atype, filename, mime) try: path.write_bytes(data) - except OSError as e: - logger.warning( - "Matrix attachment persist failed for room {} ({}): {}", - room.room_id, - type(e).__name__, - str(e), - ) - return None, MATRIX_ATTACHMENT_FAILED_TEMPLATE.format(filename) + except OSError: + return None, fail attachment = { - "type": attachment_type, - "mime": mime, - "filename": filename, + "type": atype, "mime": mime, "filename": filename, "event_id": str(getattr(event, "event_id", "") or ""), - "encrypted": encrypted, - "size_bytes": len(data), - "path": str(path), - "mxc_url": mxc_url, + "encrypted": encrypted, "size_bytes": len(data), + "path": str(path), "mxc_url": mxc_url, } - return attachment, MATRIX_ATTACHMENT_MARKER_TEMPLATE.format(path) + return attachment, _ATTACH_MARKER.format(path) + + def _base_metadata(self, room: MatrixRoom, event: RoomMessage) -> dict[str, Any]: + """Build common metadata for text and media handlers.""" + meta: dict[str, Any] = {"room": getattr(room, "display_name", room.room_id)} + if isinstance(eid := getattr(event, "event_id", None), str) and eid: + meta["event_id"] = eid + if thread := self._thread_metadata(event): + meta.update(thread) + return meta async def _on_message(self, room: MatrixRoom, event: RoomMessageText) -> None: - # Ignore self messages - if event.sender == self.config.user_id: + if event.sender == self.config.user_id or not self._should_process_message(room, event): return - - if not self._should_process_message(room, event): - return - await self._start_typing_keepalive(room.room_id) try: - metadata: dict[str, Any] = { - "room": getattr(room, "display_name", room.room_id), - } - event_id = getattr(event, "event_id", None) - if isinstance(event_id, str) and event_id: - metadata["event_id"] = event_id - thread_meta = self._thread_metadata(event) - if thread_meta: - metadata.update(thread_meta) await self._handle_message( - sender_id=event.sender, - chat_id=room.room_id, - content=event.body, - metadata=metadata, + sender_id=event.sender, chat_id=room.room_id, + content=event.body, metadata=self._base_metadata(room, event), ) except Exception: await self._stop_typing_keepalive(room.room_id, clear_typing=True) raise async def _on_media_message(self, room: MatrixRoom, event: MatrixMediaEvent) -> None: - """Handle inbound Matrix media events and forward local attachment paths.""" - if event.sender == self.config.user_id: + if event.sender == self.config.user_id or not self._should_process_message(room, event): return - - if not self._should_process_message(room, event): - return - attachment, marker = await self._fetch_media_attachment(room, event) - attachments = [attachment] if attachment else [] - markers = [marker] - media_paths = [a["path"] for a in attachments] - - body = getattr(event, "body", None) - content_parts: list[str] = [] - if isinstance(body, str) and body.strip(): - content_parts.append(body.strip()) - content_parts.extend(markers) - - # TODO: Optionally add audio transcription support for Matrix attachments, - # behind explicit config. + parts: list[str] = [] + if isinstance(body := getattr(event, "body", None), str) and body.strip(): + parts.append(body.strip()) + parts.append(marker) await self._start_typing_keepalive(room.room_id) try: - metadata: dict[str, Any] = { - "room": getattr(room, "display_name", room.room_id), - "attachments": attachments, - } - event_id = getattr(event, "event_id", None) - if isinstance(event_id, str) and event_id: - metadata["event_id"] = event_id - thread_meta = self._thread_metadata(event) - if thread_meta: - metadata.update(thread_meta) + meta = self._base_metadata(room, event) + if attachment: + meta["attachments"] = [attachment] await self._handle_message( - sender_id=event.sender, - chat_id=room.room_id, - content="\n".join(content_parts), - media=media_paths, - metadata=metadata, + sender_id=event.sender, chat_id=room.room_id, + content="\n".join(parts), + media=[attachment["path"]] if attachment else [], + metadata=meta, ) except Exception: await self._stop_typing_keepalive(room.room_id, clear_typing=True) diff --git a/pyproject.toml b/pyproject.toml index fc5ecc6..20dcb1e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,12 +42,14 @@ dependencies = [ "prompt-toolkit>=3.0.50,<4.0.0", "mcp>=1.26.0,<2.0.0", "json-repair>=0.57.0,<1.0.0", +] + +[project.optional-dependencies] +matrix = [ "matrix-nio[e2e]>=0.25.2", "mistune>=3.0.0,<4.0.0", "nh3>=0.2.17,<1.0.0", ] - -[project.optional-dependencies] dev = [ "pytest>=9.0.0,<10.0.0", "pytest-asyncio>=1.3.0,<2.0.0",