feat(matrix): add startup warnings and response error logging
This commit is contained in:
@@ -1,7 +1,17 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
from nio import AsyncClient, AsyncClientConfig, InviteEvent, MatrixRoom, RoomMessageText
|
from loguru import logger
|
||||||
|
from nio import (
|
||||||
|
AsyncClient,
|
||||||
|
AsyncClientConfig,
|
||||||
|
InviteEvent,
|
||||||
|
JoinError,
|
||||||
|
MatrixRoom,
|
||||||
|
RoomMessageText,
|
||||||
|
RoomSendError,
|
||||||
|
SyncError,
|
||||||
|
)
|
||||||
|
|
||||||
from nanobot.bus.events import OutboundMessage
|
from nanobot.bus.events import OutboundMessage
|
||||||
from nanobot.channels.base import BaseChannel
|
from nanobot.channels.base import BaseChannel
|
||||||
@@ -21,6 +31,7 @@ class MatrixChannel(BaseChannel):
|
|||||||
self._sync_task: asyncio.Task | None = None
|
self._sync_task: asyncio.Task | None = None
|
||||||
|
|
||||||
async def start(self) -> None:
|
async def start(self) -> None:
|
||||||
|
"""Start Matrix client and begin sync loop."""
|
||||||
self._running = True
|
self._running = True
|
||||||
|
|
||||||
store_path = get_data_dir() / "matrix-store"
|
store_path = get_data_dir() / "matrix-store"
|
||||||
@@ -40,11 +51,24 @@ class MatrixChannel(BaseChannel):
|
|||||||
self.client.access_token = self.config.access_token
|
self.client.access_token = self.config.access_token
|
||||||
self.client.device_id = self.config.device_id
|
self.client.device_id = self.config.device_id
|
||||||
|
|
||||||
self.client.add_event_callback(self._on_message, RoomMessageText)
|
self._register_event_callbacks()
|
||||||
self.client.add_event_callback(self._on_room_invite, InviteEvent)
|
self._register_response_callbacks()
|
||||||
|
|
||||||
if self.config.device_id:
|
if self.config.device_id:
|
||||||
self.client.load_store()
|
try:
|
||||||
|
self.client.load_store()
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(
|
||||||
|
"Matrix store load failed ({}: {}); sync token restore is disabled and "
|
||||||
|
"restart may replay recent messages.",
|
||||||
|
type(e).__name__,
|
||||||
|
str(e),
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
logger.warning(
|
||||||
|
"Matrix device_id is empty; sync token restore is disabled and restart may "
|
||||||
|
"replay recent messages."
|
||||||
|
)
|
||||||
|
|
||||||
self._sync_task = asyncio.create_task(self._sync_loop())
|
self._sync_task = asyncio.create_task(self._sync_loop())
|
||||||
|
|
||||||
@@ -85,9 +109,48 @@ class MatrixChannel(BaseChannel):
|
|||||||
ignore_unverified_devices=True,
|
ignore_unverified_devices=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def _register_event_callbacks(self) -> None:
|
||||||
|
"""Register Matrix event callbacks used by this channel."""
|
||||||
|
self.client.add_event_callback(self._on_message, RoomMessageText)
|
||||||
|
self.client.add_event_callback(self._on_room_invite, InviteEvent)
|
||||||
|
|
||||||
|
def _register_response_callbacks(self) -> None:
|
||||||
|
"""Register response callbacks for operational error observability."""
|
||||||
|
self.client.add_response_callback(self._on_sync_error, SyncError)
|
||||||
|
self.client.add_response_callback(self._on_join_error, JoinError)
|
||||||
|
self.client.add_response_callback(self._on_send_error, RoomSendError)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _is_auth_error(errcode: str | None) -> bool:
|
||||||
|
"""Return True if the Matrix errcode indicates auth/token problems."""
|
||||||
|
return errcode in {"M_UNKNOWN_TOKEN", "M_FORBIDDEN", "M_UNAUTHORIZED"}
|
||||||
|
|
||||||
|
async def _on_sync_error(self, response: SyncError) -> None:
|
||||||
|
"""Log sync errors with clear severity."""
|
||||||
|
if self._is_auth_error(response.status_code) or response.soft_logout:
|
||||||
|
logger.error("Matrix sync failed: {}", response)
|
||||||
|
return
|
||||||
|
logger.warning("Matrix sync warning: {}", response)
|
||||||
|
|
||||||
|
async def _on_join_error(self, response: JoinError) -> None:
|
||||||
|
"""Log room-join errors from invite handling."""
|
||||||
|
if self._is_auth_error(response.status_code):
|
||||||
|
logger.error("Matrix join failed: {}", response)
|
||||||
|
return
|
||||||
|
logger.warning("Matrix join warning: {}", response)
|
||||||
|
|
||||||
|
async def _on_send_error(self, response: RoomSendError) -> None:
|
||||||
|
"""Log message send failures."""
|
||||||
|
if self._is_auth_error(response.status_code):
|
||||||
|
logger.error("Matrix send failed: {}", response)
|
||||||
|
return
|
||||||
|
logger.warning("Matrix send warning: {}", response)
|
||||||
|
|
||||||
async def _sync_loop(self) -> None:
|
async def _sync_loop(self) -> None:
|
||||||
while self._running:
|
while self._running:
|
||||||
try:
|
try:
|
||||||
|
# full_state applies only to the first sync inside sync_forever and helps
|
||||||
|
# rebuild room state when restoring from stored sync tokens.
|
||||||
await self.client.sync_forever(timeout=30000, full_state=True)
|
await self.client.sync_forever(timeout=30000, full_state=True)
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
break
|
break
|
||||||
|
|||||||
Reference in New Issue
Block a user