feat(matrix): reply in threads with fallback relations
Propagate Matrix thread metadata from inbound events and attach m.relates_to (rel_type=m.thread, m.in_reply_to, is_falling_back=true) to outbound messages including attachments. Add tests for thread metadata and thread replies.
This commit is contained in:
committed by
Alexander Minges
parent
6a40665753
commit
705d5738e3
@@ -480,8 +480,20 @@ class MatrixChannel(BaseChannel):
|
|||||||
return 0
|
return 0
|
||||||
return min(local_limit, server_limit)
|
return min(local_limit, server_limit)
|
||||||
|
|
||||||
|
def _configured_media_limit_bytes(self) -> int:
|
||||||
|
"""Resolve the configured local media limit with backward compatibility."""
|
||||||
|
for name in ("max_inbound_media_bytes", "max_media_bytes"):
|
||||||
|
value = getattr(self.config, name, None)
|
||||||
|
if isinstance(value, int):
|
||||||
|
return value
|
||||||
|
return 0
|
||||||
|
|
||||||
async def _upload_and_send_attachment(
|
async def _upload_and_send_attachment(
|
||||||
self, room_id: str, path: Path, limit_bytes: int
|
self,
|
||||||
|
room_id: str,
|
||||||
|
path: Path,
|
||||||
|
limit_bytes: int,
|
||||||
|
relates_to: dict[str, Any] | None = None,
|
||||||
) -> str | None:
|
) -> str | None:
|
||||||
"""Upload one local file to Matrix and send it as a media message."""
|
"""Upload one local file to Matrix and send it as a media message."""
|
||||||
if not self.client:
|
if not self.client:
|
||||||
@@ -578,6 +590,8 @@ class MatrixChannel(BaseChannel):
|
|||||||
mxc_url=mxc_url,
|
mxc_url=mxc_url,
|
||||||
encryption_info=encryption_info,
|
encryption_info=encryption_info,
|
||||||
)
|
)
|
||||||
|
if relates_to:
|
||||||
|
content["m.relates_to"] = relates_to
|
||||||
try:
|
try:
|
||||||
await self._send_room_content(room_id, content)
|
await self._send_room_content(room_id, content)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -596,6 +610,7 @@ class MatrixChannel(BaseChannel):
|
|||||||
|
|
||||||
text = msg.content or ""
|
text = msg.content or ""
|
||||||
candidates = self._collect_outbound_media_candidates(msg.media)
|
candidates = self._collect_outbound_media_candidates(msg.media)
|
||||||
|
relates_to = self._build_thread_relates_to(msg.metadata)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
failures: list[str] = []
|
failures: list[str] = []
|
||||||
@@ -607,6 +622,7 @@ class MatrixChannel(BaseChannel):
|
|||||||
room_id=msg.chat_id,
|
room_id=msg.chat_id,
|
||||||
path=path,
|
path=path,
|
||||||
limit_bytes=limit_bytes,
|
limit_bytes=limit_bytes,
|
||||||
|
relates_to=relates_to,
|
||||||
)
|
)
|
||||||
if failure_marker:
|
if failure_marker:
|
||||||
failures.append(failure_marker)
|
failures.append(failure_marker)
|
||||||
@@ -618,7 +634,10 @@ class MatrixChannel(BaseChannel):
|
|||||||
text = "\n".join(failures)
|
text = "\n".join(failures)
|
||||||
|
|
||||||
if text or not candidates:
|
if text or not candidates:
|
||||||
await self._send_room_content(msg.chat_id, _build_matrix_text_content(text))
|
content = _build_matrix_text_content(text)
|
||||||
|
if relates_to:
|
||||||
|
content["m.relates_to"] = relates_to
|
||||||
|
await self._send_room_content(msg.chat_id, content)
|
||||||
finally:
|
finally:
|
||||||
await self._stop_typing_keepalive(msg.chat_id, clear_typing=True)
|
await self._stop_typing_keepalive(msg.chat_id, clear_typing=True)
|
||||||
|
|
||||||
@@ -793,7 +812,7 @@ class MatrixChannel(BaseChannel):
|
|||||||
content = source.get("content")
|
content = source.get("content")
|
||||||
return content if isinstance(content, dict) else {}
|
return content if isinstance(content, dict) else {}
|
||||||
|
|
||||||
def _event_thread_root_id(self, event: RoomMessage) -> str | None:
|
def _event_thread_root_id(self, event: Any) -> str | None:
|
||||||
"""Return thread root event_id if this message is inside a thread."""
|
"""Return thread root event_id if this message is inside a thread."""
|
||||||
content = self._event_source_content(event)
|
content = self._event_source_content(event)
|
||||||
relates_to = content.get("m.relates_to")
|
relates_to = content.get("m.relates_to")
|
||||||
@@ -804,7 +823,7 @@ class MatrixChannel(BaseChannel):
|
|||||||
root_id = relates_to.get("event_id")
|
root_id = relates_to.get("event_id")
|
||||||
return root_id if isinstance(root_id, str) and root_id else None
|
return root_id if isinstance(root_id, str) and root_id else None
|
||||||
|
|
||||||
def _thread_metadata(self, event: RoomMessage) -> dict[str, str] | None:
|
def _thread_metadata(self, event: Any) -> dict[str, str] | None:
|
||||||
"""Build metadata used to reply within a thread."""
|
"""Build metadata used to reply within a thread."""
|
||||||
root_id = self._event_thread_root_id(event)
|
root_id = self._event_thread_root_id(event)
|
||||||
if not root_id:
|
if not root_id:
|
||||||
@@ -833,7 +852,7 @@ class MatrixChannel(BaseChannel):
|
|||||||
"is_falling_back": True,
|
"is_falling_back": True,
|
||||||
}
|
}
|
||||||
|
|
||||||
def _event_attachment_type(self, event: MatrixMediaEvent) -> str:
|
def _event_attachment_type(self, event: Any) -> str:
|
||||||
"""Map Matrix event payload/type to a stable attachment kind."""
|
"""Map Matrix event payload/type to a stable attachment kind."""
|
||||||
msgtype = self._event_source_content(event).get("msgtype")
|
msgtype = self._event_source_content(event).get("msgtype")
|
||||||
if msgtype == "m.image":
|
if msgtype == "m.image":
|
||||||
@@ -1073,11 +1092,20 @@ class MatrixChannel(BaseChannel):
|
|||||||
|
|
||||||
await self._start_typing_keepalive(room.room_id)
|
await self._start_typing_keepalive(room.room_id)
|
||||||
try:
|
try:
|
||||||
|
metadata: dict[str, Any] = {
|
||||||
|
"room": getattr(room, "display_name", room.room_id),
|
||||||
|
}
|
||||||
|
event_id = getattr(event, "event_id", None)
|
||||||
|
if isinstance(event_id, str) and event_id:
|
||||||
|
metadata["event_id"] = event_id
|
||||||
|
thread_meta = self._thread_metadata(event)
|
||||||
|
if thread_meta:
|
||||||
|
metadata.update(thread_meta)
|
||||||
await self._handle_message(
|
await self._handle_message(
|
||||||
sender_id=event.sender,
|
sender_id=event.sender,
|
||||||
chat_id=room.room_id,
|
chat_id=room.room_id,
|
||||||
content=event.body,
|
content=event.body,
|
||||||
metadata={"room": getattr(room, "display_name", room.room_id)},
|
metadata=metadata,
|
||||||
)
|
)
|
||||||
except Exception:
|
except Exception:
|
||||||
await self._stop_typing_keepalive(room.room_id, clear_typing=True)
|
await self._stop_typing_keepalive(room.room_id, clear_typing=True)
|
||||||
@@ -1107,15 +1135,22 @@ class MatrixChannel(BaseChannel):
|
|||||||
|
|
||||||
await self._start_typing_keepalive(room.room_id)
|
await self._start_typing_keepalive(room.room_id)
|
||||||
try:
|
try:
|
||||||
|
metadata: dict[str, Any] = {
|
||||||
|
"room": getattr(room, "display_name", room.room_id),
|
||||||
|
"attachments": attachments,
|
||||||
|
}
|
||||||
|
event_id = getattr(event, "event_id", None)
|
||||||
|
if isinstance(event_id, str) and event_id:
|
||||||
|
metadata["event_id"] = event_id
|
||||||
|
thread_meta = self._thread_metadata(event)
|
||||||
|
if thread_meta:
|
||||||
|
metadata.update(thread_meta)
|
||||||
await self._handle_message(
|
await self._handle_message(
|
||||||
sender_id=event.sender,
|
sender_id=event.sender,
|
||||||
chat_id=room.room_id,
|
chat_id=room.room_id,
|
||||||
content="\n".join(content_parts),
|
content="\n".join(content_parts),
|
||||||
media=media_paths,
|
media=media_paths,
|
||||||
metadata={
|
metadata=metadata,
|
||||||
"room": getattr(room, "display_name", room.room_id),
|
|
||||||
"attachments": attachments,
|
|
||||||
},
|
|
||||||
)
|
)
|
||||||
except Exception:
|
except Exception:
|
||||||
await self._stop_typing_keepalive(room.room_id, clear_typing=True)
|
await self._stop_typing_keepalive(room.room_id, clear_typing=True)
|
||||||
|
|||||||
@@ -510,6 +510,43 @@ async def test_on_message_room_mention_requires_opt_in() -> None:
|
|||||||
assert client.typing_calls == [("!room:matrix.org", True, TYPING_NOTICE_TIMEOUT_MS)]
|
assert client.typing_calls == [("!room:matrix.org", True, TYPING_NOTICE_TIMEOUT_MS)]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_on_message_sets_thread_metadata_when_threaded_event() -> None:
|
||||||
|
channel = MatrixChannel(_make_config(), MessageBus())
|
||||||
|
client = _FakeAsyncClient("", "", "", None)
|
||||||
|
channel.client = client
|
||||||
|
|
||||||
|
handled: list[dict[str, object]] = []
|
||||||
|
|
||||||
|
async def _fake_handle_message(**kwargs) -> None:
|
||||||
|
handled.append(kwargs)
|
||||||
|
|
||||||
|
channel._handle_message = _fake_handle_message # type: ignore[method-assign]
|
||||||
|
|
||||||
|
room = SimpleNamespace(room_id="!room:matrix.org", display_name="Test room", member_count=3)
|
||||||
|
event = SimpleNamespace(
|
||||||
|
sender="@alice:matrix.org",
|
||||||
|
body="Hello",
|
||||||
|
event_id="$reply1",
|
||||||
|
source={
|
||||||
|
"content": {
|
||||||
|
"m.relates_to": {
|
||||||
|
"rel_type": "m.thread",
|
||||||
|
"event_id": "$root1",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
await channel._on_message(room, event)
|
||||||
|
|
||||||
|
assert len(handled) == 1
|
||||||
|
metadata = handled[0]["metadata"]
|
||||||
|
assert metadata["thread_root_event_id"] == "$root1"
|
||||||
|
assert metadata["thread_reply_to_event_id"] == "$reply1"
|
||||||
|
assert metadata["event_id"] == "$reply1"
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_on_media_message_downloads_attachment_and_sets_metadata(
|
async def test_on_media_message_downloads_attachment_and_sets_metadata(
|
||||||
monkeypatch, tmp_path
|
monkeypatch, tmp_path
|
||||||
@@ -563,6 +600,51 @@ async def test_on_media_message_downloads_attachment_and_sets_metadata(
|
|||||||
assert "[attachment: " in handled[0]["content"]
|
assert "[attachment: " in handled[0]["content"]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_on_media_message_sets_thread_metadata_when_threaded_event(
|
||||||
|
monkeypatch, tmp_path
|
||||||
|
) -> None:
|
||||||
|
monkeypatch.setattr("nanobot.channels.matrix.get_data_dir", lambda: tmp_path)
|
||||||
|
|
||||||
|
channel = MatrixChannel(_make_config(), MessageBus())
|
||||||
|
client = _FakeAsyncClient("", "", "", None)
|
||||||
|
client.download_bytes = b"image"
|
||||||
|
channel.client = client
|
||||||
|
|
||||||
|
handled: list[dict[str, object]] = []
|
||||||
|
|
||||||
|
async def _fake_handle_message(**kwargs) -> None:
|
||||||
|
handled.append(kwargs)
|
||||||
|
|
||||||
|
channel._handle_message = _fake_handle_message # type: ignore[method-assign]
|
||||||
|
|
||||||
|
room = SimpleNamespace(room_id="!room:matrix.org", display_name="Test room", member_count=2)
|
||||||
|
event = SimpleNamespace(
|
||||||
|
sender="@alice:matrix.org",
|
||||||
|
body="photo.png",
|
||||||
|
url="mxc://example.org/mediaid",
|
||||||
|
event_id="$event1",
|
||||||
|
source={
|
||||||
|
"content": {
|
||||||
|
"msgtype": "m.image",
|
||||||
|
"info": {"mimetype": "image/png", "size": 5},
|
||||||
|
"m.relates_to": {
|
||||||
|
"rel_type": "m.thread",
|
||||||
|
"event_id": "$root1",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
await channel._on_media_message(room, event)
|
||||||
|
|
||||||
|
assert len(handled) == 1
|
||||||
|
metadata = handled[0]["metadata"]
|
||||||
|
assert metadata["thread_root_event_id"] == "$root1"
|
||||||
|
assert metadata["thread_reply_to_event_id"] == "$event1"
|
||||||
|
assert metadata["event_id"] == "$event1"
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_on_media_message_respects_declared_size_limit(
|
async def test_on_media_message_respects_declared_size_limit(
|
||||||
monkeypatch, tmp_path
|
monkeypatch, tmp_path
|
||||||
@@ -801,6 +883,34 @@ async def test_send_uploads_media_and_sends_file_event(tmp_path) -> None:
|
|||||||
assert client.room_send_calls[1]["content"]["body"] == "Please review."
|
assert client.room_send_calls[1]["content"]["body"] == "Please review."
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_send_adds_thread_relates_to_for_thread_metadata() -> None:
|
||||||
|
channel = MatrixChannel(_make_config(), MessageBus())
|
||||||
|
client = _FakeAsyncClient("", "", "", None)
|
||||||
|
channel.client = client
|
||||||
|
|
||||||
|
metadata = {
|
||||||
|
"thread_root_event_id": "$root1",
|
||||||
|
"thread_reply_to_event_id": "$reply1",
|
||||||
|
}
|
||||||
|
await channel.send(
|
||||||
|
OutboundMessage(
|
||||||
|
channel="matrix",
|
||||||
|
chat_id="!room:matrix.org",
|
||||||
|
content="Hi",
|
||||||
|
metadata=metadata,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
content = client.room_send_calls[0]["content"]
|
||||||
|
assert content["m.relates_to"] == {
|
||||||
|
"rel_type": "m.thread",
|
||||||
|
"event_id": "$root1",
|
||||||
|
"m.in_reply_to": {"event_id": "$reply1"},
|
||||||
|
"is_falling_back": True,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_send_uses_encrypted_media_payload_in_encrypted_room(tmp_path) -> None:
|
async def test_send_uses_encrypted_media_payload_in_encrypted_room(tmp_path) -> None:
|
||||||
channel = MatrixChannel(_make_config(e2ee_enabled=True), MessageBus())
|
channel = MatrixChannel(_make_config(e2ee_enabled=True), MessageBus())
|
||||||
@@ -851,6 +961,50 @@ async def test_send_does_not_parse_attachment_marker_without_media(tmp_path) ->
|
|||||||
assert client.room_send_calls[0]["content"]["body"] == f"[attachment: {missing_path}]"
|
assert client.room_send_calls[0]["content"]["body"] == f"[attachment: {missing_path}]"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_send_passes_thread_relates_to_to_attachment_upload(monkeypatch) -> None:
|
||||||
|
channel = MatrixChannel(_make_config(), MessageBus())
|
||||||
|
client = _FakeAsyncClient("", "", "", None)
|
||||||
|
channel.client = client
|
||||||
|
channel._server_upload_limit_checked = True
|
||||||
|
channel._server_upload_limit_bytes = None
|
||||||
|
|
||||||
|
captured: dict[str, object] = {}
|
||||||
|
|
||||||
|
async def _fake_upload_and_send_attachment(
|
||||||
|
*,
|
||||||
|
room_id: str,
|
||||||
|
path: Path,
|
||||||
|
limit_bytes: int,
|
||||||
|
relates_to: dict[str, object] | None = None,
|
||||||
|
) -> str | None:
|
||||||
|
captured["relates_to"] = relates_to
|
||||||
|
return None
|
||||||
|
|
||||||
|
monkeypatch.setattr(channel, "_upload_and_send_attachment", _fake_upload_and_send_attachment)
|
||||||
|
|
||||||
|
metadata = {
|
||||||
|
"thread_root_event_id": "$root1",
|
||||||
|
"thread_reply_to_event_id": "$reply1",
|
||||||
|
}
|
||||||
|
await channel.send(
|
||||||
|
OutboundMessage(
|
||||||
|
channel="matrix",
|
||||||
|
chat_id="!room:matrix.org",
|
||||||
|
content="Hi",
|
||||||
|
media=["/tmp/fake.txt"],
|
||||||
|
metadata=metadata,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
assert captured["relates_to"] == {
|
||||||
|
"rel_type": "m.thread",
|
||||||
|
"event_id": "$root1",
|
||||||
|
"m.in_reply_to": {"event_id": "$reply1"},
|
||||||
|
"is_falling_back": True,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_send_workspace_restriction_blocks_external_attachment(tmp_path) -> None:
|
async def test_send_workspace_restriction_blocks_external_attachment(tmp_path) -> None:
|
||||||
workspace = tmp_path / "workspace"
|
workspace = tmp_path / "workspace"
|
||||||
|
|||||||
Reference in New Issue
Block a user