Merge pull request #7 from Athemis/feat/matrix-improvements
fix(matrix): harmonize units and keep typing indicator during tool calls
This commit is contained in:
@@ -43,7 +43,7 @@ TYPING_NOTICE_TIMEOUT_MS = 30_000
|
|||||||
# https://spec.matrix.org/v1.17/client-server-api/#typing-notifications
|
# https://spec.matrix.org/v1.17/client-server-api/#typing-notifications
|
||||||
# Keepalive interval must stay below TYPING_NOTICE_TIMEOUT_MS so the typing
|
# Keepalive interval must stay below TYPING_NOTICE_TIMEOUT_MS so the typing
|
||||||
# indicator does not expire while the agent is still processing.
|
# indicator does not expire while the agent is still processing.
|
||||||
TYPING_KEEPALIVE_INTERVAL_SECONDS = 20.0
|
TYPING_KEEPALIVE_INTERVAL_MS = 20_000
|
||||||
MATRIX_HTML_FORMAT = "org.matrix.custom.html"
|
MATRIX_HTML_FORMAT = "org.matrix.custom.html"
|
||||||
MATRIX_ATTACHMENT_MARKER_TEMPLATE = "[attachment: {}]"
|
MATRIX_ATTACHMENT_MARKER_TEMPLATE = "[attachment: {}]"
|
||||||
MATRIX_ATTACHMENT_TOO_LARGE_TEMPLATE = "[attachment: {} - too large]"
|
MATRIX_ATTACHMENT_TOO_LARGE_TEMPLATE = "[attachment: {} - too large]"
|
||||||
@@ -606,13 +606,14 @@ class MatrixChannel(BaseChannel):
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
async def send(self, msg: OutboundMessage) -> None:
|
async def send(self, msg: OutboundMessage) -> None:
|
||||||
"""Send message text and optional attachments to a Matrix room, then clear typing state."""
|
"""Send Matrix outbound content and clear typing only for non-progress messages."""
|
||||||
if not self.client:
|
if not self.client:
|
||||||
return
|
return
|
||||||
|
|
||||||
text = msg.content or ""
|
text = msg.content or ""
|
||||||
candidates = self._collect_outbound_media_candidates(msg.media)
|
candidates = self._collect_outbound_media_candidates(msg.media)
|
||||||
relates_to = self._build_thread_relates_to(msg.metadata)
|
relates_to = self._build_thread_relates_to(msg.metadata)
|
||||||
|
is_progress = bool((msg.metadata or {}).get("_progress"))
|
||||||
|
|
||||||
try:
|
try:
|
||||||
failures: list[str] = []
|
failures: list[str] = []
|
||||||
@@ -641,6 +642,7 @@ class MatrixChannel(BaseChannel):
|
|||||||
content["m.relates_to"] = relates_to
|
content["m.relates_to"] = relates_to
|
||||||
await self._send_room_content(msg.chat_id, content)
|
await self._send_room_content(msg.chat_id, content)
|
||||||
finally:
|
finally:
|
||||||
|
if not is_progress:
|
||||||
await self._stop_typing_keepalive(msg.chat_id, clear_typing=True)
|
await self._stop_typing_keepalive(msg.chat_id, clear_typing=True)
|
||||||
|
|
||||||
def _register_event_callbacks(self) -> None:
|
def _register_event_callbacks(self) -> None:
|
||||||
@@ -713,7 +715,7 @@ class MatrixChannel(BaseChannel):
|
|||||||
async def _typing_loop() -> None:
|
async def _typing_loop() -> None:
|
||||||
try:
|
try:
|
||||||
while self._running:
|
while self._running:
|
||||||
await asyncio.sleep(TYPING_KEEPALIVE_INTERVAL_SECONDS)
|
await asyncio.sleep(TYPING_KEEPALIVE_INTERVAL_MS / 1000)
|
||||||
await self._set_typing(room_id, True)
|
await self._set_typing(room_id, True)
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
pass
|
pass
|
||||||
|
|||||||
@@ -332,7 +332,7 @@ async def test_typing_keepalive_refreshes_periodically(monkeypatch) -> None:
|
|||||||
channel.client = client
|
channel.client = client
|
||||||
channel._running = True
|
channel._running = True
|
||||||
|
|
||||||
monkeypatch.setattr(matrix_module, "TYPING_KEEPALIVE_INTERVAL_SECONDS", 0.01)
|
monkeypatch.setattr(matrix_module, "TYPING_KEEPALIVE_INTERVAL_MS", 10)
|
||||||
|
|
||||||
await channel._start_typing_keepalive("!room:matrix.org")
|
await channel._start_typing_keepalive("!room:matrix.org")
|
||||||
await asyncio.sleep(0.03)
|
await asyncio.sleep(0.03)
|
||||||
@@ -1141,6 +1141,29 @@ async def test_send_stops_typing_keepalive_task() -> None:
|
|||||||
assert client.typing_calls[-1] == ("!room:matrix.org", False, TYPING_NOTICE_TIMEOUT_MS)
|
assert client.typing_calls[-1] == ("!room:matrix.org", False, TYPING_NOTICE_TIMEOUT_MS)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_send_progress_keeps_typing_keepalive_running() -> None:
|
||||||
|
channel = MatrixChannel(_make_config(), MessageBus())
|
||||||
|
client = _FakeAsyncClient("", "", "", None)
|
||||||
|
channel.client = client
|
||||||
|
channel._running = True
|
||||||
|
|
||||||
|
await channel._start_typing_keepalive("!room:matrix.org")
|
||||||
|
assert "!room:matrix.org" in channel._typing_tasks
|
||||||
|
|
||||||
|
await channel.send(
|
||||||
|
OutboundMessage(
|
||||||
|
channel="matrix",
|
||||||
|
chat_id="!room:matrix.org",
|
||||||
|
content="working...",
|
||||||
|
metadata={"_progress": True, "_progress_kind": "reasoning"},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
assert "!room:matrix.org" in channel._typing_tasks
|
||||||
|
assert client.typing_calls[-1] == ("!room:matrix.org", True, TYPING_NOTICE_TIMEOUT_MS)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_send_clears_typing_when_send_fails() -> None:
|
async def test_send_clears_typing_when_send_fails() -> None:
|
||||||
channel = MatrixChannel(_make_config(), MessageBus())
|
channel = MatrixChannel(_make_config(), MessageBus())
|
||||||
|
|||||||
Reference in New Issue
Block a user