Files
nanobot/nanobot/channels/dingtalk.py
2026-03-07 16:07:57 +00:00

472 lines
18 KiB
Python

"""DingTalk/DingDing channel implementation using Stream Mode."""
import asyncio
import json
import mimetypes
import os
import time
from pathlib import Path
from typing import Any
from urllib.parse import unquote, urlparse
import httpx
from loguru import logger
from nanobot.bus.events import OutboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.channels.base import BaseChannel
from nanobot.config.schema import DingTalkConfig
try:
from dingtalk_stream import (
AckMessage,
CallbackHandler,
CallbackMessage,
Credential,
DingTalkStreamClient,
)
from dingtalk_stream.chatbot import ChatbotMessage
DINGTALK_AVAILABLE = True
except ImportError:
DINGTALK_AVAILABLE = False
# Fallback so class definitions don't crash at module level
CallbackHandler = object # type: ignore[assignment,misc]
CallbackMessage = None # type: ignore[assignment,misc]
AckMessage = None # type: ignore[assignment,misc]
ChatbotMessage = None # type: ignore[assignment,misc]
class NanobotDingTalkHandler(CallbackHandler):
"""
Standard DingTalk Stream SDK Callback Handler.
Parses incoming messages and forwards them to the Nanobot channel.
"""
def __init__(self, channel: "DingTalkChannel"):
super().__init__()
self.channel = channel
async def process(self, message: CallbackMessage):
"""Process incoming stream message."""
try:
# Parse using SDK's ChatbotMessage for robust handling
chatbot_msg = ChatbotMessage.from_dict(message.data)
# Extract text content; fall back to raw dict if SDK object is empty
content = ""
if chatbot_msg.text:
content = chatbot_msg.text.content.strip()
if not content:
content = message.data.get("text", {}).get("content", "").strip()
if not content:
logger.warning(
"Received empty or unsupported message type: {}",
chatbot_msg.message_type,
)
return AckMessage.STATUS_OK, "OK"
sender_id = chatbot_msg.sender_staff_id or chatbot_msg.sender_id
sender_name = chatbot_msg.sender_nick or "Unknown"
conversation_type = message.data.get("conversationType")
conversation_id = (
message.data.get("conversationId")
or message.data.get("openConversationId")
)
logger.info("Received DingTalk message from {} ({}): {}", sender_name, sender_id, content)
# Forward to Nanobot via _on_message (non-blocking).
# Store reference to prevent GC before task completes.
task = asyncio.create_task(
self.channel._on_message(
content,
sender_id,
sender_name,
conversation_type,
conversation_id,
)
)
self.channel._background_tasks.add(task)
task.add_done_callback(self.channel._background_tasks.discard)
return AckMessage.STATUS_OK, "OK"
except Exception as e:
logger.error("Error processing DingTalk message: {}", e)
# Return OK to avoid retry loop from DingTalk server
return AckMessage.STATUS_OK, "Error"
class DingTalkChannel(BaseChannel):
"""
DingTalk channel using Stream Mode.
Uses WebSocket to receive events via `dingtalk-stream` SDK.
Uses direct HTTP API to send messages (SDK is mainly for receiving).
Supports both private (1:1) and group chats.
Group chat_id is stored with a "group:" prefix to route replies back.
"""
name = "dingtalk"
_IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".gif", ".bmp", ".webp"}
_AUDIO_EXTS = {".amr", ".mp3", ".wav", ".ogg", ".m4a", ".aac"}
_VIDEO_EXTS = {".mp4", ".mov", ".avi", ".mkv", ".webm"}
def __init__(self, config: DingTalkConfig, bus: MessageBus):
super().__init__(config, bus)
self.config: DingTalkConfig = config
self._client: Any = None
self._http: httpx.AsyncClient | None = None
# Access Token management for sending messages
self._access_token: str | None = None
self._token_expiry: float = 0
# Hold references to background tasks to prevent GC
self._background_tasks: set[asyncio.Task] = set()
async def start(self) -> None:
"""Start the DingTalk bot with Stream Mode."""
try:
if not DINGTALK_AVAILABLE:
logger.error(
"DingTalk Stream SDK not installed. Run: pip install dingtalk-stream"
)
return
if not self.config.client_id or not self.config.client_secret:
logger.error("DingTalk client_id and client_secret not configured")
return
self._running = True
self._http = httpx.AsyncClient()
logger.info(
"Initializing DingTalk Stream Client with Client ID: {}...",
self.config.client_id,
)
credential = Credential(self.config.client_id, self.config.client_secret)
self._client = DingTalkStreamClient(credential)
# Register standard handler
handler = NanobotDingTalkHandler(self)
self._client.register_callback_handler(ChatbotMessage.TOPIC, handler)
logger.info("DingTalk bot started with Stream Mode")
# Reconnect loop: restart stream if SDK exits or crashes
while self._running:
try:
await self._client.start()
except Exception as e:
logger.warning("DingTalk stream error: {}", e)
if self._running:
logger.info("Reconnecting DingTalk stream in 5 seconds...")
await asyncio.sleep(5)
except Exception as e:
logger.exception("Failed to start DingTalk channel: {}", e)
async def stop(self) -> None:
"""Stop the DingTalk bot."""
self._running = False
# Close the shared HTTP client
if self._http:
await self._http.aclose()
self._http = None
# Cancel outstanding background tasks
for task in self._background_tasks:
task.cancel()
self._background_tasks.clear()
async def _get_access_token(self) -> str | None:
"""Get or refresh Access Token."""
if self._access_token and time.time() < self._token_expiry:
return self._access_token
url = "https://api.dingtalk.com/v1.0/oauth2/accessToken"
data = {
"appKey": self.config.client_id,
"appSecret": self.config.client_secret,
}
if not self._http:
logger.warning("DingTalk HTTP client not initialized, cannot refresh token")
return None
try:
resp = await self._http.post(url, json=data)
resp.raise_for_status()
res_data = resp.json()
self._access_token = res_data.get("accessToken")
# Expire 60s early to be safe
self._token_expiry = time.time() + int(res_data.get("expireIn", 7200)) - 60
return self._access_token
except Exception as e:
logger.error("Failed to get DingTalk access token: {}", e)
return None
@staticmethod
def _is_http_url(value: str) -> bool:
return urlparse(value).scheme in ("http", "https")
def _guess_upload_type(self, media_ref: str) -> str:
ext = Path(urlparse(media_ref).path).suffix.lower()
if ext in self._IMAGE_EXTS: return "image"
if ext in self._AUDIO_EXTS: return "voice"
if ext in self._VIDEO_EXTS: return "video"
return "file"
def _guess_filename(self, media_ref: str, upload_type: str) -> str:
name = os.path.basename(urlparse(media_ref).path)
return name or {"image": "image.jpg", "voice": "audio.amr", "video": "video.mp4"}.get(upload_type, "file.bin")
async def _read_media_bytes(
self,
media_ref: str,
) -> tuple[bytes | None, str | None, str | None]:
if not media_ref:
return None, None, None
if self._is_http_url(media_ref):
if not self._http:
return None, None, None
try:
resp = await self._http.get(media_ref, follow_redirects=True)
if resp.status_code >= 400:
logger.warning(
"DingTalk media download failed status={} ref={}",
resp.status_code,
media_ref,
)
return None, None, None
content_type = (resp.headers.get("content-type") or "").split(";")[0].strip()
filename = self._guess_filename(media_ref, self._guess_upload_type(media_ref))
return resp.content, filename, content_type or None
except Exception as e:
logger.error("DingTalk media download error ref={} err={}", media_ref, e)
return None, None, None
try:
if media_ref.startswith("file://"):
parsed = urlparse(media_ref)
local_path = Path(unquote(parsed.path))
else:
local_path = Path(os.path.expanduser(media_ref))
if not local_path.is_file():
logger.warning("DingTalk media file not found: {}", local_path)
return None, None, None
data = await asyncio.to_thread(local_path.read_bytes)
content_type = mimetypes.guess_type(local_path.name)[0]
return data, local_path.name, content_type
except Exception as e:
logger.error("DingTalk media read error ref={} err={}", media_ref, e)
return None, None, None
async def _upload_media(
self,
token: str,
data: bytes,
media_type: str,
filename: str,
content_type: str | None,
) -> str | None:
if not self._http:
return None
url = f"https://oapi.dingtalk.com/media/upload?access_token={token}&type={media_type}"
mime = content_type or mimetypes.guess_type(filename)[0] or "application/octet-stream"
files = {"media": (filename, data, mime)}
try:
resp = await self._http.post(url, files=files)
text = resp.text
result = resp.json() if resp.headers.get("content-type", "").startswith("application/json") else {}
if resp.status_code >= 400:
logger.error("DingTalk media upload failed status={} type={} body={}", resp.status_code, media_type, text[:500])
return None
errcode = result.get("errcode", 0)
if errcode != 0:
logger.error("DingTalk media upload api error type={} errcode={} body={}", media_type, errcode, text[:500])
return None
sub = result.get("result") or {}
media_id = result.get("media_id") or result.get("mediaId") or sub.get("media_id") or sub.get("mediaId")
if not media_id:
logger.error("DingTalk media upload missing media_id body={}", text[:500])
return None
return str(media_id)
except Exception as e:
logger.error("DingTalk media upload error type={} err={}", media_type, e)
return None
async def _send_batch_message(
self,
token: str,
chat_id: str,
msg_key: str,
msg_param: dict[str, Any],
) -> bool:
if not self._http:
logger.warning("DingTalk HTTP client not initialized, cannot send")
return False
headers = {"x-acs-dingtalk-access-token": token}
if chat_id.startswith("group:"):
# Group chat
url = "https://api.dingtalk.com/v1.0/robot/groupMessages/send"
payload = {
"robotCode": self.config.client_id,
"openConversationId": chat_id[6:], # Remove "group:" prefix,
"msgKey": msg_key,
"msgParam": json.dumps(msg_param, ensure_ascii=False),
}
else:
# Private chat
url = "https://api.dingtalk.com/v1.0/robot/oToMessages/batchSend"
payload = {
"robotCode": self.config.client_id,
"userIds": [chat_id],
"msgKey": msg_key,
"msgParam": json.dumps(msg_param, ensure_ascii=False),
}
try:
resp = await self._http.post(url, json=payload, headers=headers)
body = resp.text
if resp.status_code != 200:
logger.error("DingTalk send failed msgKey={} status={} body={}", msg_key, resp.status_code, body[:500])
return False
try: result = resp.json()
except Exception: result = {}
errcode = result.get("errcode")
if errcode not in (None, 0):
logger.error("DingTalk send api error msgKey={} errcode={} body={}", msg_key, errcode, body[:500])
return False
logger.debug("DingTalk message sent to {} with msgKey={}", chat_id, msg_key)
return True
except Exception as e:
logger.error("Error sending DingTalk message msgKey={} err={}", msg_key, e)
return False
async def _send_markdown_text(self, token: str, chat_id: str, content: str) -> bool:
return await self._send_batch_message(
token,
chat_id,
"sampleMarkdown",
{"text": content, "title": "Nanobot Reply"},
)
async def _send_media_ref(self, token: str, chat_id: str, media_ref: str) -> bool:
media_ref = (media_ref or "").strip()
if not media_ref:
return True
upload_type = self._guess_upload_type(media_ref)
if upload_type == "image" and self._is_http_url(media_ref):
ok = await self._send_batch_message(
token,
chat_id,
"sampleImageMsg",
{"photoURL": media_ref},
)
if ok:
return True
logger.warning("DingTalk image url send failed, trying upload fallback: {}", media_ref)
data, filename, content_type = await self._read_media_bytes(media_ref)
if not data:
logger.error("DingTalk media read failed: {}", media_ref)
return False
filename = filename or self._guess_filename(media_ref, upload_type)
file_type = Path(filename).suffix.lower().lstrip(".")
if not file_type:
guessed = mimetypes.guess_extension(content_type or "")
file_type = (guessed or ".bin").lstrip(".")
if file_type == "jpeg":
file_type = "jpg"
media_id = await self._upload_media(
token=token,
data=data,
media_type=upload_type,
filename=filename,
content_type=content_type,
)
if not media_id:
return False
if upload_type == "image":
# Verified in production: sampleImageMsg accepts media_id in photoURL.
ok = await self._send_batch_message(
token,
chat_id,
"sampleImageMsg",
{"photoURL": media_id},
)
if ok:
return True
logger.warning("DingTalk image media_id send failed, falling back to file: {}", media_ref)
return await self._send_batch_message(
token,
chat_id,
"sampleFile",
{"mediaId": media_id, "fileName": filename, "fileType": file_type},
)
async def send(self, msg: OutboundMessage) -> None:
"""Send a message through DingTalk."""
token = await self._get_access_token()
if not token:
return
if msg.content and msg.content.strip():
await self._send_markdown_text(token, msg.chat_id, msg.content.strip())
for media_ref in msg.media or []:
ok = await self._send_media_ref(token, msg.chat_id, media_ref)
if ok:
continue
logger.error("DingTalk media send failed for {}", media_ref)
# Send visible fallback so failures are observable by the user.
filename = self._guess_filename(media_ref, self._guess_upload_type(media_ref))
await self._send_markdown_text(
token,
msg.chat_id,
f"[Attachment send failed: {filename}]",
)
async def _on_message(
self,
content: str,
sender_id: str,
sender_name: str,
conversation_type: str | None = None,
conversation_id: str | None = None,
) -> None:
"""Handle incoming message (called by NanobotDingTalkHandler).
Delegates to BaseChannel._handle_message() which enforces allow_from
permission checks before publishing to the bus.
"""
try:
logger.info("DingTalk inbound: {} from {}", content, sender_name)
is_group = conversation_type == "2" and conversation_id
chat_id = f"group:{conversation_id}" if is_group else sender_id
await self._handle_message(
sender_id=sender_id,
chat_id=chat_id,
content=str(content),
metadata={
"sender_name": sender_name,
"platform": "dingtalk",
"conversation_type": conversation_type,
},
)
except Exception as e:
logger.error("Error publishing DingTalk message: {}", e)