diff --git a/nanobot/channels/matrix.py b/nanobot/channels/matrix.py index 7e84626..89e7616 100644 --- a/nanobot/channels/matrix.py +++ b/nanobot/channels/matrix.py @@ -1,7 +1,17 @@ import asyncio from typing import Any -from nio import AsyncClient, AsyncClientConfig, InviteEvent, MatrixRoom, RoomMessageText +from loguru import logger +from nio import ( + AsyncClient, + AsyncClientConfig, + InviteEvent, + JoinError, + MatrixRoom, + RoomMessageText, + RoomSendError, + SyncError, +) from nanobot.bus.events import OutboundMessage from nanobot.channels.base import BaseChannel @@ -21,6 +31,7 @@ class MatrixChannel(BaseChannel): self._sync_task: asyncio.Task | None = None async def start(self) -> None: + """Start Matrix client and begin sync loop.""" self._running = True store_path = get_data_dir() / "matrix-store" @@ -40,11 +51,24 @@ class MatrixChannel(BaseChannel): self.client.access_token = self.config.access_token self.client.device_id = self.config.device_id - self.client.add_event_callback(self._on_message, RoomMessageText) - self.client.add_event_callback(self._on_room_invite, InviteEvent) + self._register_event_callbacks() + self._register_response_callbacks() if self.config.device_id: - self.client.load_store() + try: + self.client.load_store() + except Exception as e: + logger.warning( + "Matrix store load failed ({}: {}); sync token restore is disabled and " + "restart may replay recent messages.", + type(e).__name__, + str(e), + ) + else: + logger.warning( + "Matrix device_id is empty; sync token restore is disabled and restart may " + "replay recent messages." + ) self._sync_task = asyncio.create_task(self._sync_loop()) @@ -85,9 +109,48 @@ class MatrixChannel(BaseChannel): ignore_unverified_devices=True, ) + def _register_event_callbacks(self) -> None: + """Register Matrix event callbacks used by this channel.""" + self.client.add_event_callback(self._on_message, RoomMessageText) + self.client.add_event_callback(self._on_room_invite, InviteEvent) + + def _register_response_callbacks(self) -> None: + """Register response callbacks for operational error observability.""" + self.client.add_response_callback(self._on_sync_error, SyncError) + self.client.add_response_callback(self._on_join_error, JoinError) + self.client.add_response_callback(self._on_send_error, RoomSendError) + + @staticmethod + def _is_auth_error(errcode: str | None) -> bool: + """Return True if the Matrix errcode indicates auth/token problems.""" + return errcode in {"M_UNKNOWN_TOKEN", "M_FORBIDDEN", "M_UNAUTHORIZED"} + + async def _on_sync_error(self, response: SyncError) -> None: + """Log sync errors with clear severity.""" + if self._is_auth_error(response.status_code) or response.soft_logout: + logger.error("Matrix sync failed: {}", response) + return + logger.warning("Matrix sync warning: {}", response) + + async def _on_join_error(self, response: JoinError) -> None: + """Log room-join errors from invite handling.""" + if self._is_auth_error(response.status_code): + logger.error("Matrix join failed: {}", response) + return + logger.warning("Matrix join warning: {}", response) + + async def _on_send_error(self, response: RoomSendError) -> None: + """Log message send failures.""" + if self._is_auth_error(response.status_code): + logger.error("Matrix send failed: {}", response) + return + logger.warning("Matrix send warning: {}", response) + async def _sync_loop(self) -> None: while self._running: try: + # full_state applies only to the first sync inside sync_forever and helps + # rebuild room state when restoring from stored sync tokens. await self.client.sync_forever(timeout=30000, full_state=True) except asyncio.CancelledError: break