feat(weixin): add outbound media file sending via CDN upload

Previously the WeChat channel's send() method only handled text messages,
completely ignoring msg.media. When the agent called message(media=[...]),
the file was never delivered to the user.

Implement the full WeChat CDN upload protocol following the reference
@tencent-weixin/openclaw-weixin v1.0.2:
  1. Generate a client-side AES-128 key (16 random bytes)
  2. Call getuploadurl with file metadata + hex-encoded AES key
  3. AES-128-ECB encrypt the file and POST to CDN with filekey param
  4. Read x-encrypted-param from CDN response header as download param
  5. Send message with the media item (image/video/file) referencing
     the CDN upload

Also adds:
- _encrypt_aes_ecb() for AES-128-ECB encryption (reverse of existing
  _decrypt_aes_ecb)
- Media type detection from file extension (image/video/file)
- Graceful error handling: failed media sends notify the user via text
  without blocking subsequent text delivery

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
ZhangYuanhan-AI
2026-03-23 10:20:15 +08:00
committed by Xubin Ren
parent 8abbe8a6df
commit 11e1bbbab7

View File

@@ -11,7 +11,9 @@ from __future__ import annotations
import asyncio
import base64
import hashlib
import json
import mimetypes
import os
import re
import time
@@ -64,6 +66,15 @@ RETRY_DELAY_S = 2
# Default long-poll timeout; overridden by server via longpolling_timeout_ms.
DEFAULT_LONG_POLL_TIMEOUT_S = 35
# Media-type codes for getuploadurl (1=image, 2=video, 3=file)
UPLOAD_MEDIA_IMAGE = 1
UPLOAD_MEDIA_VIDEO = 2
UPLOAD_MEDIA_FILE = 3
# File extensions considered as images / videos for outbound media
_IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".gif", ".bmp", ".webp", ".tiff", ".ico", ".svg"}
_VIDEO_EXTS = {".mp4", ".avi", ".mov", ".mkv", ".webm", ".flv"}
class WeixinConfig(Base):
"""Personal WeChat channel configuration."""
@@ -617,18 +628,30 @@ class WeixinChannel(BaseChannel):
return
content = msg.content.strip()
if not content:
return
ctx_token = self._context_tokens.get(msg.chat_id, "")
if not ctx_token:
# Reference plugin refuses to send without context_token (send.ts:88-91)
logger.warning(
"WeChat: no context_token for chat_id={}, cannot send",
msg.chat_id,
)
return
# --- Send media files first (following Telegram channel pattern) ---
for media_path in (msg.media or []):
try:
await self._send_media_file(msg.chat_id, media_path, ctx_token)
except Exception as e:
filename = Path(media_path).name
logger.error("Failed to send WeChat media {}: {}", media_path, e)
# Notify user about failure via text
await self._send_text(
msg.chat_id, f"[Failed to send: {filename}]", ctx_token,
)
# --- Send text content ---
if not content:
return
try:
chunks = split_message(content, WEIXIN_MAX_MESSAGE_LEN)
for chunk in chunks:
@@ -675,9 +698,152 @@ class WeixinChannel(BaseChannel):
data.get("errmsg", ""),
)
async def _send_media_file(
self,
to_user_id: str,
media_path: str,
context_token: str,
) -> None:
"""Upload a local file to WeChat CDN and send it as a media message.
Follows the exact protocol from ``@tencent-weixin/openclaw-weixin`` v1.0.2:
1. Generate a random 16-byte AES key (client-side).
2. Call ``getuploadurl`` with file metadata + hex-encoded AES key.
3. AES-128-ECB encrypt the file and POST to CDN (``{cdnBaseUrl}/upload``).
4. Read ``x-encrypted-param`` header from CDN response as the download param.
5. Send a ``sendmessage`` with the appropriate media item referencing the upload.
"""
p = Path(media_path)
if not p.is_file():
raise FileNotFoundError(f"Media file not found: {media_path}")
raw_data = p.read_bytes()
raw_size = len(raw_data)
raw_md5 = hashlib.md5(raw_data).hexdigest()
# Determine upload media type from extension
ext = p.suffix.lower()
if ext in _IMAGE_EXTS:
upload_type = UPLOAD_MEDIA_IMAGE
item_type = ITEM_IMAGE
item_key = "image_item"
elif ext in _VIDEO_EXTS:
upload_type = UPLOAD_MEDIA_VIDEO
item_type = ITEM_VIDEO
item_key = "video_item"
else:
upload_type = UPLOAD_MEDIA_FILE
item_type = ITEM_FILE
item_key = "file_item"
# Generate client-side AES-128 key (16 random bytes)
aes_key_raw = os.urandom(16)
aes_key_hex = aes_key_raw.hex()
# Compute encrypted size: PKCS7 padding to 16-byte boundary
# Matches aesEcbPaddedSize: Math.ceil((size + 1) / 16) * 16
padded_size = ((raw_size + 1 + 15) // 16) * 16
# Step 1: Get upload URL (upload_param) from server
file_key = os.urandom(16).hex()
upload_body: dict[str, Any] = {
"filekey": file_key,
"media_type": upload_type,
"to_user_id": to_user_id,
"rawsize": raw_size,
"rawfilemd5": raw_md5,
"filesize": padded_size,
"no_need_thumb": True,
"aeskey": aes_key_hex,
}
assert self._client is not None
upload_resp = await self._api_post("ilink/bot/getuploadurl", upload_body)
logger.debug("WeChat getuploadurl response: {}", upload_resp)
upload_param = upload_resp.get("upload_param", "")
if not upload_param:
raise RuntimeError(f"getuploadurl returned no upload_param: {upload_resp}")
# Step 2: AES-128-ECB encrypt and POST to CDN
aes_key_b64 = base64.b64encode(aes_key_raw).decode()
encrypted_data = _encrypt_aes_ecb(raw_data, aes_key_b64)
cdn_upload_url = (
f"{self.config.cdn_base_url}/upload"
f"?encrypted_query_param={quote(upload_param)}"
f"&filekey={quote(file_key)}"
)
logger.debug("WeChat CDN POST url={} ciphertextSize={}", cdn_upload_url[:80], len(encrypted_data))
cdn_resp = await self._client.post(
cdn_upload_url,
content=encrypted_data,
headers={"Content-Type": "application/octet-stream"},
)
cdn_resp.raise_for_status()
# The download encrypted_query_param comes from CDN response header
download_param = cdn_resp.headers.get("x-encrypted-param", "")
if not download_param:
raise RuntimeError(
"CDN upload response missing x-encrypted-param header; "
f"status={cdn_resp.status_code} headers={dict(cdn_resp.headers)}"
)
logger.debug("WeChat CDN upload success for {}, got download_param", p.name)
# Step 3: Send message with the media item
# aes_key for CDNMedia is the hex key encoded as base64
# (matches: Buffer.from(uploaded.aeskey).toString("base64"))
cdn_aes_key_b64 = base64.b64encode(aes_key_hex.encode()).decode()
media_item: dict[str, Any] = {
"media": {
"encrypt_query_param": download_param,
"aes_key": cdn_aes_key_b64,
"encrypt_type": 1,
},
}
if item_type == ITEM_IMAGE:
media_item["mid_size"] = padded_size
elif item_type == ITEM_VIDEO:
media_item["video_size"] = padded_size
elif item_type == ITEM_FILE:
media_item["file_name"] = p.name
media_item["len"] = str(raw_size)
# Send each media item as its own message (matching reference plugin)
client_id = f"nanobot-{uuid.uuid4().hex[:12]}"
item_list: list[dict] = [{"type": item_type, item_key: media_item}]
weixin_msg: dict[str, Any] = {
"from_user_id": "",
"to_user_id": to_user_id,
"client_id": client_id,
"message_type": MESSAGE_TYPE_BOT,
"message_state": MESSAGE_STATE_FINISH,
"item_list": item_list,
}
if context_token:
weixin_msg["context_token"] = context_token
body: dict[str, Any] = {
"msg": weixin_msg,
"base_info": BASE_INFO,
}
data = await self._api_post("ilink/bot/sendmessage", body)
errcode = data.get("errcode", 0)
if errcode and errcode != 0:
raise RuntimeError(
f"WeChat send media error (code {errcode}): {data.get('errmsg', '')}"
)
logger.info("WeChat media sent: {} (type={})", p.name, item_key)
# ---------------------------------------------------------------------------
# AES-128-ECB decryption (matches pic-decrypt.ts parseAesKey + aes-ecb.ts)
# AES-128-ECB encryption / decryption (matches pic-decrypt.ts / aes-ecb.ts)
# ---------------------------------------------------------------------------
@@ -703,6 +869,37 @@ def _parse_aes_key(aes_key_b64: str) -> bytes:
)
def _encrypt_aes_ecb(data: bytes, aes_key_b64: str) -> bytes:
"""Encrypt data with AES-128-ECB and PKCS7 padding for CDN upload."""
try:
key = _parse_aes_key(aes_key_b64)
except Exception as e:
logger.warning("Failed to parse AES key for encryption, sending raw: {}", e)
return data
# PKCS7 padding
pad_len = 16 - len(data) % 16
padded = data + bytes([pad_len] * pad_len)
try:
from Crypto.Cipher import AES
cipher = AES.new(key, AES.MODE_ECB)
return cipher.encrypt(padded)
except ImportError:
pass
try:
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
cipher_obj = Cipher(algorithms.AES(key), modes.ECB())
encryptor = cipher_obj.encryptor()
return encryptor.update(padded) + encryptor.finalize()
except ImportError:
logger.warning("Cannot encrypt media: install 'pycryptodome' or 'cryptography'")
return data
def _decrypt_aes_ecb(data: bytes, aes_key_b64: str) -> bytes:
"""Decrypt AES-128-ECB media data.