Merge PR #2386: feat(channel): enhance Telegram, QQ, Feishu, and WhatsApp

feat: telegram/qq/whatsapp/feishu enhancement
This commit is contained in:
Xubin Ren
2026-03-24 11:40:15 +08:00
committed by GitHub
7 changed files with 664 additions and 65 deletions

View File

@@ -29,6 +29,7 @@ export interface InboundMessage {
content: string; content: string;
timestamp: number; timestamp: number;
isGroup: boolean; isGroup: boolean;
wasMentioned?: boolean;
media?: string[]; media?: string[];
} }
@@ -48,6 +49,31 @@ export class WhatsAppClient {
this.options = options; this.options = options;
} }
private normalizeJid(jid: string | undefined | null): string {
return (jid || '').split(':')[0];
}
private wasMentioned(msg: any): boolean {
if (!msg?.key?.remoteJid?.endsWith('@g.us')) return false;
const candidates = [
msg?.message?.extendedTextMessage?.contextInfo?.mentionedJid,
msg?.message?.imageMessage?.contextInfo?.mentionedJid,
msg?.message?.videoMessage?.contextInfo?.mentionedJid,
msg?.message?.documentMessage?.contextInfo?.mentionedJid,
msg?.message?.audioMessage?.contextInfo?.mentionedJid,
];
const mentioned = candidates.flatMap((items) => (Array.isArray(items) ? items : []));
if (mentioned.length === 0) return false;
const selfIds = new Set(
[this.sock?.user?.id, this.sock?.user?.lid, this.sock?.user?.jid]
.map((jid) => this.normalizeJid(jid))
.filter(Boolean),
);
return mentioned.some((jid: string) => selfIds.has(this.normalizeJid(jid)));
}
async connect(): Promise<void> { async connect(): Promise<void> {
const logger = pino({ level: 'silent' }); const logger = pino({ level: 'silent' });
const { state, saveCreds } = await useMultiFileAuthState(this.options.authDir); const { state, saveCreds } = await useMultiFileAuthState(this.options.authDir);
@@ -145,6 +171,7 @@ export class WhatsAppClient {
if (!finalContent && mediaPaths.length === 0) continue; if (!finalContent && mediaPaths.length === 0) continue;
const isGroup = msg.key.remoteJid?.endsWith('@g.us') || false; const isGroup = msg.key.remoteJid?.endsWith('@g.us') || false;
const wasMentioned = this.wasMentioned(msg);
this.options.onMessage({ this.options.onMessage({
id: msg.key.id || '', id: msg.key.id || '',
@@ -153,6 +180,7 @@ export class WhatsAppClient {
content: finalContent, content: finalContent,
timestamp: msg.messageTimestamp as number, timestamp: msg.messageTimestamp as number,
isGroup, isGroup,
...(isGroup ? { wasMentioned } : {}),
...(mediaPaths.length > 0 ? { media: mediaPaths } : {}), ...(mediaPaths.length > 0 ? { media: mediaPaths } : {}),
}); });
} }

View File

@@ -960,6 +960,9 @@ class FeishuChannel(BaseChannel):
and not msg.metadata.get("_progress", False) and not msg.metadata.get("_progress", False)
): ):
reply_message_id = msg.metadata.get("message_id") or None reply_message_id = msg.metadata.get("message_id") or None
# For topic group messages, always reply to keep context in thread
elif msg.metadata.get("thread_id"):
reply_message_id = msg.metadata.get("root_id") or msg.metadata.get("message_id") or None
first_send = True # tracks whether the reply has already been used first_send = True # tracks whether the reply has already been used
@@ -1121,6 +1124,7 @@ class FeishuChannel(BaseChannel):
# Extract reply context (parent/root message IDs) # Extract reply context (parent/root message IDs)
parent_id = getattr(message, "parent_id", None) or None parent_id = getattr(message, "parent_id", None) or None
root_id = getattr(message, "root_id", None) or None root_id = getattr(message, "root_id", None) or None
thread_id = getattr(message, "thread_id", None) or None
# Prepend quoted message text when the user replied to another message # Prepend quoted message text when the user replied to another message
if parent_id and self._client: if parent_id and self._client:
@@ -1149,6 +1153,7 @@ class FeishuChannel(BaseChannel):
"msg_type": msg_type, "msg_type": msg_type,
"parent_id": parent_id, "parent_id": parent_id,
"root_id": root_id, "root_id": root_id,
"thread_id": thread_id,
} }
) )

View File

@@ -1,33 +1,108 @@
"""QQ channel implementation using botpy SDK.""" """QQ channel implementation using botpy SDK.
Inbound:
- Parse QQ botpy messages (C2C / Group)
- Download attachments to media dir using chunked streaming write (memory-safe)
- Publish to Nanobot bus via BaseChannel._handle_message()
- Content includes a clear, actionable "Received files:" list with local paths
Outbound:
- Send attachments (msg.media) first via QQ rich media API (base64 upload + msg_type=7)
- Then send text (plain or markdown)
- msg.media supports local paths, file:// paths, and http(s) URLs
Notes:
- QQ restricts many audio/video formats. We conservatively classify as image vs file.
- Attachment structures differ across botpy versions; we try multiple field candidates.
"""
from __future__ import annotations
import asyncio import asyncio
import base64
import mimetypes
import os
import re
import time
from collections import deque from collections import deque
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal from typing import TYPE_CHECKING, Any, Literal
from urllib.parse import unquote, urlparse
import aiohttp
from loguru import logger from loguru import logger
from pydantic import Field
from nanobot.bus.events import OutboundMessage from nanobot.bus.events import OutboundMessage
from nanobot.bus.queue import MessageBus from nanobot.bus.queue import MessageBus
from nanobot.channels.base import BaseChannel from nanobot.channels.base import BaseChannel
from nanobot.config.schema import Base from nanobot.config.schema import Base
from pydantic import Field from nanobot.security.network import validate_url_target
try:
from nanobot.config.paths import get_media_dir
except Exception: # pragma: no cover
get_media_dir = None # type: ignore
try: try:
import botpy import botpy
from botpy.message import C2CMessage, GroupMessage from botpy.http import Route
QQ_AVAILABLE = True QQ_AVAILABLE = True
except ImportError: except ImportError: # pragma: no cover
QQ_AVAILABLE = False QQ_AVAILABLE = False
botpy = None botpy = None
C2CMessage = None Route = None
GroupMessage = None
if TYPE_CHECKING: if TYPE_CHECKING:
from botpy.message import C2CMessage, GroupMessage from botpy.message import BaseMessage, C2CMessage, GroupMessage
from botpy.types.message import Media
def _make_bot_class(channel: "QQChannel") -> "type[botpy.Client]": # QQ rich media file_type: 1=image, 4=file
# (2=voice, 3=video are restricted; we only use image vs file)
QQ_FILE_TYPE_IMAGE = 1
QQ_FILE_TYPE_FILE = 4
_IMAGE_EXTS = {
".png",
".jpg",
".jpeg",
".gif",
".bmp",
".webp",
".tif",
".tiff",
".ico",
".svg",
}
# Replace unsafe characters with "_", keep Chinese and common safe punctuation.
_SAFE_NAME_RE = re.compile(r"[^\w.\-()\[\]()【】\u4e00-\u9fff]+", re.UNICODE)
def _sanitize_filename(name: str) -> str:
"""Sanitize filename to avoid traversal and problematic chars."""
name = (name or "").strip()
name = Path(name).name
name = _SAFE_NAME_RE.sub("_", name).strip("._ ")
return name
def _is_image_name(name: str) -> bool:
return Path(name).suffix.lower() in _IMAGE_EXTS
def _guess_send_file_type(filename: str) -> int:
"""Conservative send type: images -> 1, else -> 4."""
ext = Path(filename).suffix.lower()
mime, _ = mimetypes.guess_type(filename)
if ext in _IMAGE_EXTS or (mime and mime.startswith("image/")):
return QQ_FILE_TYPE_IMAGE
return QQ_FILE_TYPE_FILE
def _make_bot_class(channel: QQChannel) -> type[botpy.Client]:
"""Create a botpy Client subclass bound to the given channel.""" """Create a botpy Client subclass bound to the given channel."""
intents = botpy.Intents(public_messages=True, direct_message=True) intents = botpy.Intents(public_messages=True, direct_message=True)
@@ -39,10 +114,10 @@ def _make_bot_class(channel: "QQChannel") -> "type[botpy.Client]":
async def on_ready(self): async def on_ready(self):
logger.info("QQ bot ready: {}", self.robot.name) logger.info("QQ bot ready: {}", self.robot.name)
async def on_c2c_message_create(self, message: "C2CMessage"): async def on_c2c_message_create(self, message: C2CMessage):
await channel._on_message(message, is_group=False) await channel._on_message(message, is_group=False)
async def on_group_at_message_create(self, message: "GroupMessage"): async def on_group_at_message_create(self, message: GroupMessage):
await channel._on_message(message, is_group=True) await channel._on_message(message, is_group=True)
async def on_direct_message_create(self, message): async def on_direct_message_create(self, message):
@@ -60,6 +135,13 @@ class QQConfig(Base):
allow_from: list[str] = Field(default_factory=list) allow_from: list[str] = Field(default_factory=list)
msg_format: Literal["plain", "markdown"] = "plain" msg_format: Literal["plain", "markdown"] = "plain"
# Optional: directory to save inbound attachments. If empty, use nanobot get_media_dir("qq").
media_dir: str = ""
# Download tuning
download_chunk_size: int = 1024 * 256 # 256KB
download_max_bytes: int = 1024 * 1024 * 200 # 200MB safety limit
class QQChannel(BaseChannel): class QQChannel(BaseChannel):
"""QQ channel using botpy SDK with WebSocket connection.""" """QQ channel using botpy SDK with WebSocket connection."""
@@ -76,13 +158,38 @@ class QQChannel(BaseChannel):
config = QQConfig.model_validate(config) config = QQConfig.model_validate(config)
super().__init__(config, bus) super().__init__(config, bus)
self.config: QQConfig = config self.config: QQConfig = config
self._client: "botpy.Client | None" = None
self._processed_ids: deque = deque(maxlen=1000) self._client: botpy.Client | None = None
self._msg_seq: int = 1 # 消息序列号,避免被 QQ API 去重 self._http: aiohttp.ClientSession | None = None
self._processed_ids: deque[str] = deque(maxlen=1000)
self._msg_seq: int = 1 # used to avoid QQ API dedup
self._chat_type_cache: dict[str, str] = {} self._chat_type_cache: dict[str, str] = {}
self._media_root: Path = self._init_media_root()
# ---------------------------
# Lifecycle
# ---------------------------
def _init_media_root(self) -> Path:
"""Choose a directory for saving inbound attachments."""
if self.config.media_dir:
root = Path(self.config.media_dir).expanduser()
elif get_media_dir:
try:
root = Path(get_media_dir("qq"))
except Exception:
root = Path.home() / ".nanobot" / "media" / "qq"
else:
root = Path.home() / ".nanobot" / "media" / "qq"
root.mkdir(parents=True, exist_ok=True)
logger.info("QQ media directory: {}", str(root))
return root
async def start(self) -> None: async def start(self) -> None:
"""Start the QQ bot.""" """Start the QQ bot with auto-reconnect loop."""
if not QQ_AVAILABLE: if not QQ_AVAILABLE:
logger.error("QQ SDK not installed. Run: pip install qq-botpy") logger.error("QQ SDK not installed. Run: pip install qq-botpy")
return return
@@ -92,8 +199,9 @@ class QQChannel(BaseChannel):
return return
self._running = True self._running = True
BotClass = _make_bot_class(self) self._http = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=120))
self._client = BotClass()
self._client = _make_bot_class(self)()
logger.info("QQ bot started (C2C & Group supported)") logger.info("QQ bot started (C2C & Group supported)")
await self._run_bot() await self._run_bot()
@@ -109,23 +217,79 @@ class QQChannel(BaseChannel):
await asyncio.sleep(5) await asyncio.sleep(5)
async def stop(self) -> None: async def stop(self) -> None:
"""Stop the QQ bot.""" """Stop bot and cleanup resources."""
self._running = False self._running = False
if self._client: if self._client:
try: try:
await self._client.close() await self._client.close()
except Exception: except Exception:
pass pass
self._client = None
if self._http:
try:
await self._http.close()
except Exception:
pass
self._http = None
logger.info("QQ bot stopped") logger.info("QQ bot stopped")
# ---------------------------
# Outbound (send)
# ---------------------------
async def send(self, msg: OutboundMessage) -> None: async def send(self, msg: OutboundMessage) -> None:
"""Send a message through QQ.""" """Send attachments first, then text."""
if not self._client: if not self._client:
logger.warning("QQ client not initialized") logger.warning("QQ client not initialized")
return return
try:
msg_id = msg.metadata.get("message_id") msg_id = msg.metadata.get("message_id")
chat_type = self._chat_type_cache.get(msg.chat_id, "c2c")
is_group = chat_type == "group"
# 1) Send media
for media_ref in msg.media or []:
ok = await self._send_media(
chat_id=msg.chat_id,
media_ref=media_ref,
msg_id=msg_id,
is_group=is_group,
)
if not ok:
filename = (
os.path.basename(urlparse(media_ref).path)
or os.path.basename(media_ref)
or "file"
)
await self._send_text_only(
chat_id=msg.chat_id,
is_group=is_group,
msg_id=msg_id,
content=f"[Attachment send failed: {filename}]",
)
# 2) Send text
if msg.content and msg.content.strip():
await self._send_text_only(
chat_id=msg.chat_id,
is_group=is_group,
msg_id=msg_id,
content=msg.content.strip(),
)
async def _send_text_only(
self,
chat_id: str,
is_group: bool,
msg_id: str | None,
content: str,
) -> None:
"""Send a plain/markdown text message."""
if not self._client:
return
self._msg_seq += 1 self._msg_seq += 1
use_markdown = self.config.msg_format == "markdown" use_markdown = self.config.msg_format == "markdown"
payload: dict[str, Any] = { payload: dict[str, Any] = {
@@ -134,50 +298,342 @@ class QQChannel(BaseChannel):
"msg_seq": self._msg_seq, "msg_seq": self._msg_seq,
} }
if use_markdown: if use_markdown:
payload["markdown"] = {"content": msg.content} payload["markdown"] = {"content": content}
else: else:
payload["content"] = msg.content payload["content"] = content
chat_type = self._chat_type_cache.get(msg.chat_id, "c2c") if is_group:
if chat_type == "group": await self._client.api.post_group_message(group_openid=chat_id, **payload)
else:
await self._client.api.post_c2c_message(openid=chat_id, **payload)
async def _send_media(
self,
chat_id: str,
media_ref: str,
msg_id: str | None,
is_group: bool,
) -> bool:
"""Read bytes -> base64 upload -> msg_type=7 send."""
if not self._client:
return False
data, filename = await self._read_media_bytes(media_ref)
if not data or not filename:
return False
try:
file_type = _guess_send_file_type(filename)
file_data_b64 = base64.b64encode(data).decode()
media_obj = await self._post_base64file(
chat_id=chat_id,
is_group=is_group,
file_type=file_type,
file_data=file_data_b64,
file_name=filename,
srv_send_msg=False,
)
if not media_obj:
logger.error("QQ media upload failed: empty response")
return False
self._msg_seq += 1
if is_group:
await self._client.api.post_group_message( await self._client.api.post_group_message(
group_openid=msg.chat_id, group_openid=chat_id,
**payload, msg_type=7,
msg_id=msg_id,
msg_seq=self._msg_seq,
media=media_obj,
) )
else: else:
await self._client.api.post_c2c_message( await self._client.api.post_c2c_message(
openid=msg.chat_id, openid=chat_id,
**payload, msg_type=7,
msg_id=msg_id,
msg_seq=self._msg_seq,
media=media_obj,
) )
except Exception as e:
logger.error("Error sending QQ message: {}", e)
async def _on_message(self, data: "C2CMessage | GroupMessage", is_group: bool = False) -> None: logger.info("QQ media sent: {}", filename)
"""Handle incoming message from QQ.""" return True
except Exception as e:
logger.error("QQ send media failed filename={} err={}", filename, e)
return False
async def _read_media_bytes(self, media_ref: str) -> tuple[bytes | None, str | None]:
"""Read bytes from http(s) or local file path; return (data, filename)."""
media_ref = (media_ref or "").strip()
if not media_ref:
return None, None
# Local file: plain path or file:// URI
if not media_ref.startswith("http://") and not media_ref.startswith("https://"):
try: try:
# Dedup by message ID if media_ref.startswith("file://"):
parsed = urlparse(media_ref)
# Windows: path in netloc; Unix: path in path
raw = parsed.path or parsed.netloc
local_path = Path(unquote(raw))
else:
local_path = Path(os.path.expanduser(media_ref))
if not local_path.is_file():
logger.warning("QQ outbound media file not found: {}", str(local_path))
return None, None
data = await asyncio.to_thread(local_path.read_bytes)
return data, local_path.name
except Exception as e:
logger.warning("QQ outbound media read error ref={} err={}", media_ref, e)
return None, None
# Remote URL
ok, err = validate_url_target(media_ref)
if not ok:
logger.warning("QQ outbound media URL validation failed url={} err={}", media_ref, err)
return None, None
if not self._http:
self._http = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=120))
try:
async with self._http.get(media_ref, allow_redirects=True) as resp:
if resp.status >= 400:
logger.warning(
"QQ outbound media download failed status={} url={}",
resp.status,
media_ref,
)
return None, None
data = await resp.read()
if not data:
return None, None
filename = os.path.basename(urlparse(media_ref).path) or "file.bin"
return data, filename
except Exception as e:
logger.warning("QQ outbound media download error url={} err={}", media_ref, e)
return None, None
# https://github.com/tencent-connect/botpy/issues/198
# https://bot.q.qq.com/wiki/develop/api-v2/server-inter/message/send-receive/rich-media.html
async def _post_base64file(
self,
chat_id: str,
is_group: bool,
file_type: int,
file_data: str,
file_name: str | None = None,
srv_send_msg: bool = False,
) -> Media:
"""Upload base64-encoded file and return Media object."""
if not self._client:
raise RuntimeError("QQ client not initialized")
if is_group:
endpoint = "/v2/groups/{group_openid}/files"
id_key = "group_openid"
else:
endpoint = "/v2/users/{openid}/files"
id_key = "openid"
payload = {
id_key: chat_id,
"file_type": file_type,
"file_data": file_data,
"file_name": file_name,
"srv_send_msg": srv_send_msg,
}
route = Route("POST", endpoint, **{id_key: chat_id})
return await self._client.api._http.request(route, json=payload)
# ---------------------------
# Inbound (receive)
# ---------------------------
async def _on_message(self, data: C2CMessage | GroupMessage, is_group: bool = False) -> None:
"""Parse inbound message, download attachments, and publish to the bus."""
if data.id in self._processed_ids: if data.id in self._processed_ids:
return return
self._processed_ids.append(data.id) self._processed_ids.append(data.id)
content = (data.content or "").strip()
if not content:
return
if is_group: if is_group:
chat_id = data.group_openid chat_id = data.group_openid
user_id = data.author.member_openid user_id = data.author.member_openid
self._chat_type_cache[chat_id] = "group" self._chat_type_cache[chat_id] = "group"
else: else:
chat_id = str(getattr(data.author, 'id', None) or getattr(data.author, 'user_openid', 'unknown')) chat_id = str(
getattr(data.author, "id", None) or getattr(data.author, "user_openid", "unknown")
)
user_id = chat_id user_id = chat_id
self._chat_type_cache[chat_id] = "c2c" self._chat_type_cache[chat_id] = "c2c"
content = (data.content or "").strip()
# the data used by tests don't contain attachments property
# so we use getattr with a default of [] to avoid AttributeError in tests
attachments = getattr(data, "attachments", None) or []
media_paths, recv_lines, att_meta = await self._handle_attachments(attachments)
# Compose content that always contains actionable saved paths
if recv_lines:
tag = "[Image]" if any(_is_image_name(Path(p).name) for p in media_paths) else "[File]"
file_block = "Received files:\n" + "\n".join(recv_lines)
content = f"{content}\n\n{file_block}".strip() if content else f"{tag}\n{file_block}"
if not content and not media_paths:
return
await self._handle_message( await self._handle_message(
sender_id=user_id, sender_id=user_id,
chat_id=chat_id, chat_id=chat_id,
content=content, content=content,
metadata={"message_id": data.id}, media=media_paths if media_paths else None,
metadata={
"message_id": data.id,
"attachments": att_meta,
},
) )
async def _handle_attachments(
self,
attachments: list[BaseMessage._Attachments],
) -> tuple[list[str], list[str], list[dict[str, Any]]]:
"""Extract, download (chunked), and format attachments for agent consumption."""
media_paths: list[str] = []
recv_lines: list[str] = []
att_meta: list[dict[str, Any]] = []
if not attachments:
return media_paths, recv_lines, att_meta
for att in attachments:
url, filename, ctype = att.url, att.filename, att.content_type
logger.info("Downloading file from QQ: {}", filename or url)
local_path = await self._download_to_media_dir_chunked(url, filename_hint=filename)
att_meta.append(
{
"url": url,
"filename": filename,
"content_type": ctype,
"saved_path": local_path,
}
)
if local_path:
media_paths.append(local_path)
shown_name = filename or os.path.basename(local_path)
recv_lines.append(f"- {shown_name}\n saved: {local_path}")
else:
shown_name = filename or url
recv_lines.append(f"- {shown_name}\n saved: [download failed]")
return media_paths, recv_lines, att_meta
async def _download_to_media_dir_chunked(
self,
url: str,
filename_hint: str = "",
) -> str | None:
"""Download an inbound attachment using streaming chunk write.
Uses chunked streaming to avoid loading large files into memory.
Enforces a max download size and writes to a .part temp file
that is atomically renamed on success.
"""
if not self._http:
self._http = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=120))
safe = _sanitize_filename(filename_hint)
ts = int(time.time() * 1000)
tmp_path: Path | None = None
try:
async with self._http.get(
url,
timeout=aiohttp.ClientTimeout(total=120),
allow_redirects=True,
) as resp:
if resp.status != 200:
logger.warning("QQ download failed: status={} url={}", resp.status, url)
return None
ctype = (resp.headers.get("Content-Type") or "").lower()
# Infer extension: url -> filename_hint -> content-type -> fallback
ext = Path(urlparse(url).path).suffix
if not ext:
ext = Path(filename_hint).suffix
if not ext:
if "png" in ctype:
ext = ".png"
elif "jpeg" in ctype or "jpg" in ctype:
ext = ".jpg"
elif "gif" in ctype:
ext = ".gif"
elif "webp" in ctype:
ext = ".webp"
elif "pdf" in ctype:
ext = ".pdf"
else:
ext = ".bin"
if safe:
if not Path(safe).suffix:
safe = safe + ext
filename = safe
else:
filename = f"qq_file_{ts}{ext}"
target = self._media_root / filename
if target.exists():
target = self._media_root / f"{target.stem}_{ts}{target.suffix}"
tmp_path = target.with_suffix(target.suffix + ".part")
# Stream write
downloaded = 0
chunk_size = max(1024, int(self.config.download_chunk_size or 262144))
max_bytes = max(
1024 * 1024, int(self.config.download_max_bytes or (200 * 1024 * 1024))
)
def _open_tmp():
tmp_path.parent.mkdir(parents=True, exist_ok=True)
return open(tmp_path, "wb") # noqa: SIM115
f = await asyncio.to_thread(_open_tmp)
try:
async for chunk in resp.content.iter_chunked(chunk_size):
if not chunk:
continue
downloaded += len(chunk)
if downloaded > max_bytes:
logger.warning(
"QQ download exceeded max_bytes={} url={} -> abort",
max_bytes,
url,
)
return None
await asyncio.to_thread(f.write, chunk)
finally:
await asyncio.to_thread(f.close)
# Atomic rename
await asyncio.to_thread(os.replace, tmp_path, target)
tmp_path = None # mark as moved
logger.info("QQ file saved: {}", str(target))
return str(target)
except Exception as e:
logger.error("QQ download error: {}", e)
return None
finally:
# Cleanup partial file
if tmp_path is not None:
try:
tmp_path.unlink(missing_ok=True)
except Exception: except Exception:
logger.exception("Error handling QQ message") pass

View File

@@ -11,7 +11,7 @@ from typing import Any, Literal
from loguru import logger from loguru import logger
from pydantic import Field from pydantic import Field
from telegram import BotCommand, ReplyParameters, Update from telegram import BotCommand, ReactionTypeEmoji, ReplyParameters, Update
from telegram.error import TimedOut from telegram.error import TimedOut
from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters
from telegram.request import HTTPXRequest from telegram.request import HTTPXRequest
@@ -173,6 +173,7 @@ class TelegramConfig(Base):
allow_from: list[str] = Field(default_factory=list) allow_from: list[str] = Field(default_factory=list)
proxy: str | None = None proxy: str | None = None
reply_to_message: bool = False reply_to_message: bool = False
react_emoji: str = "👀"
group_policy: Literal["open", "mention"] = "mention" group_policy: Literal["open", "mention"] = "mention"
connection_pool_size: int = 32 connection_pool_size: int = 32
pool_timeout: float = 5.0 pool_timeout: float = 5.0
@@ -812,6 +813,7 @@ class TelegramChannel(BaseChannel):
"session_key": session_key, "session_key": session_key,
} }
self._start_typing(str_chat_id) self._start_typing(str_chat_id)
await self._add_reaction(str_chat_id, message.message_id, self.config.react_emoji)
buf = self._media_group_buffers[key] buf = self._media_group_buffers[key]
if content and content != "[empty message]": if content and content != "[empty message]":
buf["contents"].append(content) buf["contents"].append(content)
@@ -822,6 +824,7 @@ class TelegramChannel(BaseChannel):
# Start typing indicator before processing # Start typing indicator before processing
self._start_typing(str_chat_id) self._start_typing(str_chat_id)
await self._add_reaction(str_chat_id, message.message_id, self.config.react_emoji)
# Forward to the message bus # Forward to the message bus
await self._handle_message( await self._handle_message(
@@ -861,6 +864,19 @@ class TelegramChannel(BaseChannel):
if task and not task.done(): if task and not task.done():
task.cancel() task.cancel()
async def _add_reaction(self, chat_id: str, message_id: int, emoji: str) -> None:
"""Add emoji reaction to a message (best-effort, non-blocking)."""
if not self._app or not emoji:
return
try:
await self._app.bot.set_message_reaction(
chat_id=int(chat_id),
message_id=message_id,
reaction=[ReactionTypeEmoji(emoji=emoji)],
)
except Exception as e:
logger.debug("Telegram reaction failed: {}", e)
async def _typing_loop(self, chat_id: str) -> None: async def _typing_loop(self, chat_id: str) -> None:
"""Repeatedly send 'typing' action until cancelled.""" """Repeatedly send 'typing' action until cancelled."""
try: try:

View File

@@ -26,6 +26,7 @@ class WhatsAppConfig(Base):
bridge_url: str = "ws://localhost:3001" bridge_url: str = "ws://localhost:3001"
bridge_token: str = "" bridge_token: str = ""
allow_from: list[str] = Field(default_factory=list) allow_from: list[str] = Field(default_factory=list)
group_policy: Literal["open", "mention"] = "open" # "open" responds to all, "mention" only when @mentioned
class WhatsAppChannel(BaseChannel): class WhatsAppChannel(BaseChannel):
@@ -187,6 +188,13 @@ class WhatsAppChannel(BaseChannel):
self._processed_message_ids.popitem(last=False) self._processed_message_ids.popitem(last=False)
# Extract just the phone number or lid as chat_id # Extract just the phone number or lid as chat_id
is_group = data.get("isGroup", False)
was_mentioned = data.get("wasMentioned", False)
if is_group and getattr(self.config, "group_policy", "open") == "mention":
if not was_mentioned:
return
user_id = pn if pn else sender user_id = pn if pn else sender
sender_id = user_id.split("@")[0] if "@" in user_id else user_id sender_id = user_id.split("@")[0] if "@" in user_id else user_id
logger.info("Sender {}", sender) logger.info("Sender {}", sender)

View File

@@ -1,11 +1,12 @@
import tempfile
from pathlib import Path
from types import SimpleNamespace from types import SimpleNamespace
import pytest import pytest
from nanobot.bus.events import OutboundMessage from nanobot.bus.events import OutboundMessage
from nanobot.bus.queue import MessageBus from nanobot.bus.queue import MessageBus
from nanobot.channels.qq import QQChannel from nanobot.channels.qq import QQChannel, QQConfig
from nanobot.channels.qq import QQConfig
class _FakeApi: class _FakeApi:
@@ -34,6 +35,7 @@ async def test_on_group_message_routes_to_group_chat_id() -> None:
content="hello", content="hello",
group_openid="group123", group_openid="group123",
author=SimpleNamespace(member_openid="user1"), author=SimpleNamespace(member_openid="user1"),
attachments=[],
) )
await channel._on_message(data, is_group=True) await channel._on_message(data, is_group=True)
@@ -123,3 +125,38 @@ async def test_send_group_message_uses_markdown_when_configured() -> None:
"msg_id": "msg1", "msg_id": "msg1",
"msg_seq": 2, "msg_seq": 2,
} }
@pytest.mark.asyncio
async def test_read_media_bytes_local_path() -> None:
channel = QQChannel(QQConfig(app_id="app", secret="secret"), MessageBus())
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as f:
f.write(b"\x89PNG\r\n")
tmp_path = f.name
data, filename = await channel._read_media_bytes(tmp_path)
assert data == b"\x89PNG\r\n"
assert filename == Path(tmp_path).name
@pytest.mark.asyncio
async def test_read_media_bytes_file_uri() -> None:
channel = QQChannel(QQConfig(app_id="app", secret="secret"), MessageBus())
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f:
f.write(b"JFIF")
tmp_path = f.name
data, filename = await channel._read_media_bytes(f"file://{tmp_path}")
assert data == b"JFIF"
assert filename == Path(tmp_path).name
@pytest.mark.asyncio
async def test_read_media_bytes_missing_file() -> None:
channel = QQChannel(QQConfig(app_id="app", secret="secret"), MessageBus())
data, filename = await channel._read_media_bytes("/nonexistent/path/image.png")
assert data is None
assert filename is None

View File

@@ -106,3 +106,52 @@ async def test_send_when_disconnected_is_noop():
await ch.send(msg) await ch.send(msg)
ch._ws.send.assert_not_called() ch._ws.send.assert_not_called()
@pytest.mark.asyncio
async def test_group_policy_mention_skips_unmentioned_group_message():
ch = WhatsAppChannel({"enabled": True, "groupPolicy": "mention"}, MagicMock())
ch._handle_message = AsyncMock()
await ch._handle_bridge_message(
json.dumps(
{
"type": "message",
"id": "m1",
"sender": "12345@g.us",
"pn": "user@s.whatsapp.net",
"content": "hello group",
"timestamp": 1,
"isGroup": True,
"wasMentioned": False,
}
)
)
ch._handle_message.assert_not_called()
@pytest.mark.asyncio
async def test_group_policy_mention_accepts_mentioned_group_message():
ch = WhatsAppChannel({"enabled": True, "groupPolicy": "mention"}, MagicMock())
ch._handle_message = AsyncMock()
await ch._handle_bridge_message(
json.dumps(
{
"type": "message",
"id": "m1",
"sender": "12345@g.us",
"pn": "user@s.whatsapp.net",
"content": "hello @bot",
"timestamp": 1,
"isGroup": True,
"wasMentioned": True,
}
)
)
ch._handle_message.assert_awaited_once()
kwargs = ch._handle_message.await_args.kwargs
assert kwargs["chat_id"] == "12345@g.us"
assert kwargs["sender_id"] == "user"