"""QQ channel implementation using botpy SDK.""" import asyncio from collections import deque from typing import TYPE_CHECKING, Dict from loguru import logger from nanobot.bus.events import OutboundMessage from nanobot.bus.queue import MessageBus from nanobot.channels.base import BaseChannel from nanobot.config.schema import QQConfig try: import botpy from botpy.message import C2CMessage, GroupMessage # 1. Import GroupMessage QQ_AVAILABLE = True except ImportError: QQ_AVAILABLE = False botpy = None C2CMessage = None GroupMessage = None if TYPE_CHECKING: from botpy.message import C2CMessage, GroupMessage def _make_bot_class(channel: "QQChannel") -> "type[botpy.Client]": """Create a botpy Client subclass bound to the given channel.""" # 2. Ensure intents enable public_messages (required for group messages) intents = botpy.Intents(public_messages=True, direct_message=True) class _Bot(botpy.Client): def __init__(self): super().__init__(intents=intents) async def on_ready(self): logger.info(f"QQ bot ready: {self.robot.name}") async def on_c2c_message_create(self, message: "C2CMessage"): # C2C (Private) message await channel._on_message(message, is_group=False) async def on_group_at_message_create(self, message: "GroupMessage"): # 3. Added: Listen for group @messages # Note: Official bots only receive messages @mentioning them unless privileged await channel._on_message(message, is_group=True) async def on_direct_message_create(self, message): # Guild Direct Message await channel._on_message(message, is_group=False) return _Bot class QQChannel(BaseChannel): """QQ channel using botpy SDK with WebSocket connection.""" name = "qq" def __init__(self, config: QQConfig, bus: MessageBus): super().__init__(config, bus) self.config: QQConfig = config self._client: "botpy.Client | None" = None self._processed_ids: deque = deque(maxlen=1000) self._bot_task: asyncio.Task | None = None # Cache to track if chat_id is a group or individual to select the correct reply API # Format: {chat_id: "group" | "c2c"} self._chat_type_cache: Dict[str, str] = {} async def start(self) -> None: """Start the QQ bot.""" if not QQ_AVAILABLE: logger.error("QQ SDK not installed. Run: pip install qq-botpy") return if not self.config.app_id or not self.config.secret: logger.error("QQ app_id and secret not configured") return self._running = True BotClass = _make_bot_class(self) self._client = BotClass() self._bot_task = asyncio.create_task(self._run_bot()) logger.info("QQ bot started (C2C & Group supported)") async def _run_bot(self) -> None: """Run the bot connection.""" try: await self._client.start(appid=self.config.app_id, secret=self.config.secret) except Exception as e: logger.error(f"QQ auth failed: {e}") self._running = False async def stop(self) -> None: """Stop the QQ bot.""" self._running = False if self._bot_task: self._bot_task.cancel() try: await self._bot_task except asyncio.CancelledError: pass logger.info("QQ bot stopped") async def send(self, msg: OutboundMessage) -> None: """Send a message through QQ.""" if not self._client: logger.warning("QQ client not initialized") return # 4. Modified send logic: Check chat_id type to call the correct API msg_type = self._chat_type_cache.get(msg.chat_id, "c2c") # Default to c2c try: if msg_type == "group": # Send group message await self._client.api.post_group_message( group_openid=msg.chat_id, msg_type=0, msg_id=msg.metadata.get("message_id"), # Reply to specific message ID (optional but recommended) content=msg.content ) else: # Send C2C (private) message await self._client.api.post_c2c_message( openid=msg.chat_id, msg_type=0, msg_id=msg.metadata.get("message_id"), content=msg.content, ) except Exception as e: logger.error(f"Error sending QQ message ({msg_type}): {e}") async def _on_message(self, data: "C2CMessage | GroupMessage", is_group: bool = False) -> None: """Handle incoming message from QQ.""" try: # Dedup by message ID if data.id in self._processed_ids: return self._processed_ids.append(data.id) content = (data.content or "").strip() if not content: return # 5. Extract ID and cache type if is_group: # Group message: chat_id uses group_openid chat_id = data.group_openid user_id = data.author.member_openid # Sender's ID self._chat_type_cache[chat_id] = "group" # Remove @bot text (optional, prevents Nanobot from treating the name as prompt) # content = content.replace("@BotName", "").strip() else: # Private message: chat_id uses user_openid chat_id = str(getattr(data.author, 'id', None) or getattr(data.author, 'user_openid', 'unknown')) user_id = chat_id self._chat_type_cache[chat_id] = "c2c" await self._handle_message( sender_id=user_id, chat_id=chat_id, content=content, metadata={"message_id": data.id}, ) except Exception as e: logger.error(f"Error handling QQ message: {e}")