From ca66ddb0bf9886a9663f41defc7ffc942fd7131c Mon Sep 17 00:00:00 2001 From: Alexander Minges Date: Tue, 10 Feb 2026 17:21:13 +0100 Subject: [PATCH] feat(matrix): refresh typing indicator while processing --- nanobot/channels/matrix.py | 50 ++++++++++++++++++++++++++++++++---- tests/test_matrix_channel.py | 37 ++++++++++++++++++++++++++ 2 files changed, 82 insertions(+), 5 deletions(-) diff --git a/nanobot/channels/matrix.py b/nanobot/channels/matrix.py index 3edcf63..5893bb2 100644 --- a/nanobot/channels/matrix.py +++ b/nanobot/channels/matrix.py @@ -38,6 +38,7 @@ from nanobot.utils.helpers import safe_filename LOGGING_STACK_BASE_DEPTH = 2 TYPING_NOTICE_TIMEOUT_MS = 30_000 +TYPING_KEEPALIVE_INTERVAL_SECONDS = 20.0 MATRIX_HTML_FORMAT = "org.matrix.custom.html" MATRIX_ATTACHMENT_MARKER_TEMPLATE = "[attachment: {}]" MATRIX_ATTACHMENT_TOO_LARGE_TEMPLATE = "[attachment: {} - too large]" @@ -224,6 +225,7 @@ class MatrixChannel(BaseChannel): super().__init__(config, bus) self.client: AsyncClient | None = None self._sync_task: asyncio.Task | None = None + self._typing_tasks: dict[str, asyncio.Task] = {} async def start(self) -> None: """Start Matrix client and begin sync loop.""" @@ -272,6 +274,9 @@ class MatrixChannel(BaseChannel): """Stop the Matrix channel with graceful sync shutdown.""" self._running = False + for room_id in list(self._typing_tasks): + await self._stop_typing_keepalive(room_id, clear_typing=False) + if self.client: # Request sync_forever loop to exit cleanly. self.client.stop_sync_forever() @@ -306,7 +311,7 @@ class MatrixChannel(BaseChannel): ignore_unverified_devices=True, ) finally: - await self._set_typing(msg.chat_id, False) + await self._stop_typing_keepalive(msg.chat_id, clear_typing=True) def _register_event_callbacks(self) -> None: """Register Matrix event callbacks used by this channel.""" @@ -368,6 +373,41 @@ class MatrixChannel(BaseChannel): str(e), ) + async def _start_typing_keepalive(self, room_id: str) -> None: + """Start periodic Matrix typing refresh for a room.""" + await self._stop_typing_keepalive(room_id, clear_typing=False) + await self._set_typing(room_id, True) + if not self._running: + return + + async def _typing_loop() -> None: + try: + while self._running: + await asyncio.sleep(TYPING_KEEPALIVE_INTERVAL_SECONDS) + await self._set_typing(room_id, True) + except asyncio.CancelledError: + pass + + self._typing_tasks[room_id] = asyncio.create_task(_typing_loop()) + + async def _stop_typing_keepalive( + self, + room_id: str, + *, + clear_typing: bool, + ) -> None: + """Stop periodic Matrix typing refresh for a room.""" + task = self._typing_tasks.pop(room_id, None) + if task: + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + if clear_typing: + await self._set_typing(room_id, False) + async def _sync_loop(self) -> None: while self._running: try: @@ -684,7 +724,7 @@ class MatrixChannel(BaseChannel): if not self._should_process_message(room, event): return - await self._set_typing(room.room_id, True) + await self._start_typing_keepalive(room.room_id) try: await self._handle_message( sender_id=event.sender, @@ -693,7 +733,7 @@ class MatrixChannel(BaseChannel): metadata={"room": getattr(room, "display_name", room.room_id)}, ) except Exception: - await self._set_typing(room.room_id, False) + await self._stop_typing_keepalive(room.room_id, clear_typing=True) raise async def _on_media_message(self, room: MatrixRoom, event: Any) -> None: @@ -718,7 +758,7 @@ class MatrixChannel(BaseChannel): # TODO: Optionally add audio transcription support for Matrix attachments, # similar to Telegram's voice/audio flow, behind explicit config. - await self._set_typing(room.room_id, True) + await self._start_typing_keepalive(room.room_id) try: await self._handle_message( sender_id=event.sender, @@ -731,5 +771,5 @@ class MatrixChannel(BaseChannel): }, ) except Exception: - await self._set_typing(room.room_id, False) + await self._stop_typing_keepalive(room.room_id, clear_typing=True) raise diff --git a/tests/test_matrix_channel.py b/tests/test_matrix_channel.py index 932e612..6a33a5e 100644 --- a/tests/test_matrix_channel.py +++ b/tests/test_matrix_channel.py @@ -1,3 +1,4 @@ +import asyncio from pathlib import Path from types import SimpleNamespace @@ -224,6 +225,24 @@ async def test_on_message_sets_typing_for_allowed_sender() -> None: ] +@pytest.mark.asyncio +async def test_typing_keepalive_refreshes_periodically(monkeypatch) -> None: + channel = MatrixChannel(_make_config(), MessageBus()) + client = _FakeAsyncClient("", "", "", None) + channel.client = client + channel._running = True + + monkeypatch.setattr(matrix_module, "TYPING_KEEPALIVE_INTERVAL_SECONDS", 0.01) + + await channel._start_typing_keepalive("!room:matrix.org") + await asyncio.sleep(0.03) + await channel._stop_typing_keepalive("!room:matrix.org", clear_typing=True) + + true_updates = [call for call in client.typing_calls if call[1] is True] + assert len(true_updates) >= 2 + assert client.typing_calls[-1] == ("!room:matrix.org", False, TYPING_NOTICE_TIMEOUT_MS) + + @pytest.mark.asyncio async def test_on_message_skips_typing_for_self_message() -> None: channel = MatrixChannel(_make_config(), MessageBus()) @@ -612,6 +631,24 @@ async def test_send_clears_typing_after_send() -> None: assert client.typing_calls[-1] == ("!room:matrix.org", False, TYPING_NOTICE_TIMEOUT_MS) +@pytest.mark.asyncio +async def test_send_stops_typing_keepalive_task() -> None: + channel = MatrixChannel(_make_config(), MessageBus()) + client = _FakeAsyncClient("", "", "", None) + channel.client = client + channel._running = True + + await channel._start_typing_keepalive("!room:matrix.org") + assert "!room:matrix.org" in channel._typing_tasks + + await channel.send( + OutboundMessage(channel="matrix", chat_id="!room:matrix.org", content="Hi") + ) + + assert "!room:matrix.org" not in channel._typing_tasks + assert client.typing_calls[-1] == ("!room:matrix.org", False, TYPING_NOTICE_TIMEOUT_MS) + + @pytest.mark.asyncio async def test_send_clears_typing_when_send_fails() -> None: channel = MatrixChannel(_make_config(), MessageBus())