"""Base channel interface for chat platforms.""" from __future__ import annotations from abc import ABC, abstractmethod from pathlib import Path from typing import Any from loguru import logger from nanobot.bus.events import InboundMessage, OutboundMessage from nanobot.bus.queue import MessageBus class BaseChannel(ABC): """ Abstract base class for chat channel implementations. Each channel (Telegram, Discord, etc.) should implement this interface to integrate with the nanobot message bus. """ name: str = "base" display_name: str = "Base" transcription_api_key: str = "" def __init__(self, config: Any, bus: MessageBus): """ Initialize the channel. Args: config: Channel-specific configuration. bus: The message bus for communication. """ self.config = config self.bus = bus self._running = False async def transcribe_audio(self, file_path: str | Path) -> str: """Transcribe an audio file via Groq Whisper. Returns empty string on failure.""" if not self.transcription_api_key: return "" try: from nanobot.providers.transcription import GroqTranscriptionProvider provider = GroqTranscriptionProvider(api_key=self.transcription_api_key) return await provider.transcribe(file_path) except Exception as e: logger.warning("{}: audio transcription failed: {}", self.name, e) return "" async def login(self, force: bool = False) -> bool: """ Perform channel-specific interactive login (e.g. QR code scan). Args: force: If True, ignore existing credentials and force re-authentication. Returns True if already authenticated or login succeeds. Override in subclasses that support interactive login. """ return True @abstractmethod async def start(self) -> None: """ Start the channel and begin listening for messages. This should be a long-running async task that: 1. Connects to the chat platform 2. Listens for incoming messages 3. Forwards messages to the bus via _handle_message() """ pass @abstractmethod async def stop(self) -> None: """Stop the channel and clean up resources.""" pass @abstractmethod async def send(self, msg: OutboundMessage) -> None: """ Send a message through this channel. Args: msg: The message to send. """ pass async def send_delta(self, chat_id: str, delta: str, metadata: dict[str, Any] | None = None) -> None: """Deliver a streaming text chunk. Override in subclass to enable streaming.""" pass @property def supports_streaming(self) -> bool: """True when config enables streaming AND this subclass implements send_delta.""" cfg = self.config streaming = cfg.get("streaming", False) if isinstance(cfg, dict) else getattr(cfg, "streaming", False) return bool(streaming) and type(self).send_delta is not BaseChannel.send_delta def is_allowed(self, sender_id: str) -> bool: """Check if *sender_id* is permitted. Empty list → deny all; ``"*"`` → allow all.""" allow_list = getattr(self.config, "allow_from", []) if not allow_list: logger.warning("{}: allow_from is empty — all access denied", self.name) return False if "*" in allow_list: return True return str(sender_id) in allow_list async def _handle_message( self, sender_id: str, chat_id: str, content: str, media: list[str] | None = None, metadata: dict[str, Any] | None = None, session_key: str | None = None, ) -> None: """ Handle an incoming message from the chat platform. This method checks permissions and forwards to the bus. Args: sender_id: The sender's identifier. chat_id: The chat/channel identifier. content: Message text content. media: Optional list of media URLs. metadata: Optional channel-specific metadata. session_key: Optional session key override (e.g. thread-scoped sessions). """ if not self.is_allowed(sender_id): logger.warning( "Access denied for sender {} on channel {}. " "Add them to allowFrom list in config to grant access.", sender_id, self.name, ) return meta = metadata or {} if self.supports_streaming: meta = {**meta, "_wants_stream": True} msg = InboundMessage( channel=self.name, sender_id=str(sender_id), chat_id=str(chat_id), content=content, media=media or [], metadata=meta, session_key_override=session_key, ) await self.bus.publish_inbound(msg) @classmethod def default_config(cls) -> dict[str, Any]: """Return default config for onboard. Override in plugins to auto-populate config.json.""" return {"enabled": False} @property def is_running(self) -> bool: """Check if the channel is running.""" return self._running