feat(channels): support multi-instance channel configs

This commit is contained in:
Hua
2026-03-13 22:41:24 +08:00
parent 12cffa248f
commit b24ad7b526
16 changed files with 955 additions and 54 deletions

View File

@@ -15,7 +15,7 @@ from loguru import logger
from nanobot.bus.events import OutboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.channels.base import BaseChannel
from nanobot.config.schema import DingTalkConfig
from nanobot.config.schema import DingTalkConfig, DingTalkInstanceConfig
try:
from dingtalk_stream import (
@@ -119,9 +119,9 @@ class DingTalkChannel(BaseChannel):
_AUDIO_EXTS = {".amr", ".mp3", ".wav", ".ogg", ".m4a", ".aac"}
_VIDEO_EXTS = {".mp4", ".mov", ".avi", ".mkv", ".webm"}
def __init__(self, config: DingTalkConfig, bus: MessageBus):
def __init__(self, config: DingTalkConfig | DingTalkInstanceConfig, bus: MessageBus):
super().__init__(config, bus)
self.config: DingTalkConfig = config
self.config: DingTalkConfig | DingTalkInstanceConfig = config
self._client: Any = None
self._http: httpx.AsyncClient | None = None

View File

@@ -13,7 +13,7 @@ from nanobot.bus.events import OutboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.channels.base import BaseChannel
from nanobot.config.paths import get_media_dir
from nanobot.config.schema import DiscordConfig
from nanobot.config.schema import DiscordConfig, DiscordInstanceConfig
from nanobot.utils.helpers import split_message
DISCORD_API_BASE = "https://discord.com/api/v10"
@@ -27,9 +27,9 @@ class DiscordChannel(BaseChannel):
name = "discord"
display_name = "Discord"
def __init__(self, config: DiscordConfig, bus: MessageBus):
def __init__(self, config: DiscordConfig | DiscordInstanceConfig, bus: MessageBus):
super().__init__(config, bus)
self.config: DiscordConfig = config
self.config: DiscordConfig | DiscordInstanceConfig = config
self._ws: websockets.WebSocketClientProtocol | None = None
self._seq: int | None = None
self._heartbeat_task: asyncio.Task | None = None

View File

@@ -19,7 +19,7 @@ from loguru import logger
from nanobot.bus.events import OutboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.channels.base import BaseChannel
from nanobot.config.schema import EmailConfig
from nanobot.config.schema import EmailConfig, EmailInstanceConfig
class EmailChannel(BaseChannel):
@@ -51,9 +51,9 @@ class EmailChannel(BaseChannel):
"Dec",
)
def __init__(self, config: EmailConfig, bus: MessageBus):
def __init__(self, config: EmailConfig | EmailInstanceConfig, bus: MessageBus):
super().__init__(config, bus)
self.config: EmailConfig = config
self.config: EmailConfig | EmailInstanceConfig = config
self._last_subject_by_chat: dict[str, str] = {}
self._last_message_id_by_chat: dict[str, str] = {}
self._processed_uids: set[str] = set() # Capped to prevent unbounded growth

View File

@@ -15,7 +15,7 @@ from nanobot.bus.events import OutboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.channels.base import BaseChannel
from nanobot.config.paths import get_media_dir
from nanobot.config.schema import FeishuConfig
from nanobot.config.schema import FeishuConfig, FeishuInstanceConfig
import importlib.util
@@ -246,9 +246,9 @@ class FeishuChannel(BaseChannel):
name = "feishu"
display_name = "Feishu"
def __init__(self, config: FeishuConfig, bus: MessageBus):
def __init__(self, config: FeishuConfig | FeishuInstanceConfig, bus: MessageBus):
super().__init__(config, bus)
self.config: FeishuConfig = config
self.config: FeishuConfig | FeishuInstanceConfig = config
self._client: Any = None
self._ws_client: Any = None
self._ws_thread: threading.Thread | None = None

View File

@@ -42,10 +42,41 @@ class ChannelManager:
continue
try:
cls = load_channel_class(modname)
channel = cls(section, self.bus)
channel.transcription_api_key = groq_key
self.channels[modname] = channel
logger.info("{} channel enabled", cls.display_name)
instances = getattr(section, "instances", None)
if instances is not None:
if not instances:
logger.warning(
"{} channel enabled but no instances configured",
cls.display_name,
)
continue
for inst in instances:
inst_name = getattr(inst, "name", None)
if not inst_name:
raise ValueError(
f'{modname}.instances item missing required field "name"'
)
# Session keys use "channel:chat_id", so instance names cannot use ":".
channel_name = f"{modname}/{inst_name}"
if channel_name in self.channels:
raise ValueError(f"Duplicate channel instance name: {channel_name}")
channel = cls(inst, self.bus)
channel.name = channel_name
channel.transcription_api_key = groq_key
self.channels[channel_name] = channel
logger.info(
"{} channel instance enabled: {}",
cls.display_name,
channel_name,
)
else:
channel = cls(section, self.bus)
channel.transcription_api_key = groq_key
self.channels[modname] = channel
logger.info("{} channel enabled", cls.display_name)
except ImportError as e:
logger.warning("{} channel not available: {}", modname, e)

View File

@@ -40,6 +40,7 @@ from nanobot.bus.events import OutboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.channels.base import BaseChannel
from nanobot.config.paths import get_data_dir, get_media_dir
from nanobot.config.schema import MatrixConfig, MatrixInstanceConfig
from nanobot.utils.helpers import safe_filename
TYPING_NOTICE_TIMEOUT_MS = 30_000
@@ -149,8 +150,9 @@ class MatrixChannel(BaseChannel):
name = "matrix"
display_name = "Matrix"
def __init__(self, config: Any, bus: MessageBus):
def __init__(self, config: MatrixConfig | MatrixInstanceConfig, bus: MessageBus):
super().__init__(config, bus)
self.config: MatrixConfig | MatrixInstanceConfig = config
self.client: AsyncClient | None = None
self._sync_task: asyncio.Task | None = None
self._typing_tasks: dict[str, asyncio.Task] = {}
@@ -159,12 +161,23 @@ class MatrixChannel(BaseChannel):
self._server_upload_limit_bytes: int | None = None
self._server_upload_limit_checked = False
def _get_store_path(self) -> Path:
"""Return the Matrix sync/encryption store path for this channel instance."""
base = get_data_dir() / "matrix-store"
instance_name = (
getattr(self.config, "name", "")
or (self.name.split("/", 1)[1] if "/" in self.name else "")
)
if not instance_name:
return base
return base / safe_filename(instance_name)
async def start(self) -> None:
"""Start Matrix client and begin sync loop."""
self._running = True
_configure_nio_logging_bridge()
store_path = get_data_dir() / "matrix-store"
store_path = self._get_store_path()
store_path.mkdir(parents=True, exist_ok=True)
self.client = AsyncClient(

View File

@@ -16,7 +16,8 @@ from nanobot.bus.events import OutboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.channels.base import BaseChannel
from nanobot.config.paths import get_runtime_subdir
from nanobot.config.schema import MochatConfig
from nanobot.config.schema import MochatConfig, MochatInstanceConfig
from nanobot.utils.helpers import safe_filename
try:
import socketio
@@ -218,14 +219,14 @@ class MochatChannel(BaseChannel):
name = "mochat"
display_name = "Mochat"
def __init__(self, config: MochatConfig, bus: MessageBus):
def __init__(self, config: MochatConfig | MochatInstanceConfig, bus: MessageBus):
super().__init__(config, bus)
self.config: MochatConfig = config
self.config: MochatConfig | MochatInstanceConfig = config
self._http: httpx.AsyncClient | None = None
self._socket: Any = None
self._ws_connected = self._ws_ready = False
self._state_dir = get_runtime_subdir("mochat")
self._state_dir = self._get_state_dir()
self._cursor_path = self._state_dir / "session_cursors.json"
self._session_cursor: dict[str, int] = {}
self._cursor_save_task: asyncio.Task | None = None
@@ -247,6 +248,17 @@ class MochatChannel(BaseChannel):
self._refresh_task: asyncio.Task | None = None
self._target_locks: dict[str, asyncio.Lock] = {}
def _get_state_dir(self):
"""Return the runtime state directory for this channel instance."""
base = get_runtime_subdir("mochat")
instance_name = (
getattr(self.config, "name", "")
or (self.name.split("/", 1)[1] if "/" in self.name else "")
)
if not instance_name:
return base
return base / safe_filename(instance_name)
# ---- lifecycle ---------------------------------------------------------
async def start(self) -> None:

View File

@@ -9,7 +9,7 @@ from loguru import logger
from nanobot.bus.events import OutboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.channels.base import BaseChannel
from nanobot.config.schema import QQConfig
from nanobot.config.schema import QQConfig, QQInstanceConfig
try:
import botpy
@@ -56,9 +56,9 @@ class QQChannel(BaseChannel):
name = "qq"
display_name = "QQ"
def __init__(self, config: QQConfig, bus: MessageBus):
def __init__(self, config: QQConfig | QQInstanceConfig, bus: MessageBus):
super().__init__(config, bus)
self.config: QQConfig = config
self.config: QQConfig | QQInstanceConfig = config
self._client: "botpy.Client | None" = None
self._processed_ids: deque = deque(maxlen=1000)
self._msg_seq: int = 1 # 消息序列号,避免被 QQ API 去重

View File

@@ -14,7 +14,7 @@ from slackify_markdown import slackify_markdown
from nanobot.bus.events import OutboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.channels.base import BaseChannel
from nanobot.config.schema import SlackConfig
from nanobot.config.schema import SlackConfig, SlackInstanceConfig
class SlackChannel(BaseChannel):
@@ -23,9 +23,9 @@ class SlackChannel(BaseChannel):
name = "slack"
display_name = "Slack"
def __init__(self, config: SlackConfig, bus: MessageBus):
def __init__(self, config: SlackConfig | SlackInstanceConfig, bus: MessageBus):
super().__init__(config, bus)
self.config: SlackConfig = config
self.config: SlackConfig | SlackInstanceConfig = config
self._web_client: AsyncWebClient | None = None
self._socket_client: SocketModeClient | None = None
self._bot_user_id: str | None = None

View File

@@ -17,7 +17,7 @@ from nanobot.bus.queue import MessageBus
from nanobot.agent.i18n import help_lines, normalize_language_code, telegram_command_descriptions, text
from nanobot.channels.base import BaseChannel
from nanobot.config.paths import get_media_dir
from nanobot.config.schema import TelegramConfig
from nanobot.config.schema import TelegramConfig, TelegramInstanceConfig
from nanobot.utils.helpers import split_message
TELEGRAM_MAX_MESSAGE_LEN = 4000 # Telegram message character limit
@@ -161,9 +161,9 @@ class TelegramChannel(BaseChannel):
COMMAND_NAMES = ("start", "new", "lang", "persona", "stop", "help", "restart")
def __init__(self, config: TelegramConfig, bus: MessageBus):
def __init__(self, config: TelegramConfig | TelegramInstanceConfig, bus: MessageBus):
super().__init__(config, bus)
self.config: TelegramConfig = config
self.config: TelegramConfig | TelegramInstanceConfig = config
self._app: Application | None = None
self._chat_ids: dict[str, int] = {} # Map sender_id to chat_id for replies
self._typing_tasks: dict[str, asyncio.Task] = {} # chat_id -> typing loop task

View File

@@ -12,7 +12,7 @@ from nanobot.bus.events import OutboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.channels.base import BaseChannel
from nanobot.config.paths import get_media_dir
from nanobot.config.schema import WecomConfig
from nanobot.config.schema import WecomConfig, WecomInstanceConfig
WECOM_AVAILABLE = importlib.util.find_spec("wecom_aibot_sdk") is not None
@@ -38,9 +38,9 @@ class WecomChannel(BaseChannel):
name = "wecom"
display_name = "WeCom"
def __init__(self, config: WecomConfig, bus: MessageBus):
def __init__(self, config: WecomConfig | WecomInstanceConfig, bus: MessageBus):
super().__init__(config, bus)
self.config: WecomConfig = config
self.config: WecomConfig | WecomInstanceConfig = config
self._client: Any = None
self._processed_message_ids: OrderedDict[str, None] = OrderedDict()
self._loop: asyncio.AbstractEventLoop | None = None

View File

@@ -10,7 +10,7 @@ from loguru import logger
from nanobot.bus.events import OutboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.channels.base import BaseChannel
from nanobot.config.schema import WhatsAppConfig
from nanobot.config.schema import WhatsAppConfig, WhatsAppInstanceConfig
class WhatsAppChannel(BaseChannel):
@@ -24,9 +24,9 @@ class WhatsAppChannel(BaseChannel):
name = "whatsapp"
display_name = "WhatsApp"
def __init__(self, config: WhatsAppConfig, bus: MessageBus):
def __init__(self, config: WhatsAppConfig | WhatsAppInstanceConfig, bus: MessageBus):
super().__init__(config, bus)
self.config: WhatsAppConfig = config
self.config: WhatsAppConfig | WhatsAppInstanceConfig = config
self._ws = None
self._connected = False
self._processed_message_ids: OrderedDict[str, None] = OrderedDict()

View File

@@ -1,9 +1,9 @@
"""Configuration schema using Pydantic."""
from pathlib import Path
from typing import Literal
from typing import Any, Literal
from pydantic import BaseModel, ConfigDict, Field
from pydantic import BaseModel, ConfigDict, Field, ValidationInfo, field_validator
from pydantic.alias_generators import to_camel
from pydantic_settings import BaseSettings
@@ -23,6 +23,19 @@ class WhatsAppConfig(Base):
allow_from: list[str] = Field(default_factory=list) # Allowed phone numbers
class WhatsAppInstanceConfig(WhatsAppConfig):
"""WhatsApp bridge instance config for multi-bot mode."""
name: str = Field(min_length=1)
class WhatsAppMultiConfig(Base):
"""WhatsApp channel configuration supporting multiple bridge instances."""
enabled: bool = False
instances: list[WhatsAppInstanceConfig] = Field(default_factory=list)
class TelegramConfig(Base):
"""Telegram channel configuration."""
@@ -36,6 +49,19 @@ class TelegramConfig(Base):
group_policy: Literal["open", "mention"] = "mention" # "mention" responds when @mentioned or replied to, "open" responds to all
class TelegramInstanceConfig(TelegramConfig):
"""Telegram bot instance config for multi-bot mode."""
name: str = Field(min_length=1)
class TelegramMultiConfig(Base):
"""Telegram channel configuration supporting multiple bot instances."""
enabled: bool = False
instances: list[TelegramInstanceConfig] = Field(default_factory=list)
class FeishuConfig(Base):
"""Feishu/Lark channel configuration using WebSocket long connection."""
@@ -51,6 +77,19 @@ class FeishuConfig(Base):
group_policy: Literal["open", "mention"] = "mention" # "mention" responds when @mentioned, "open" responds to all
class FeishuInstanceConfig(FeishuConfig):
"""Feishu bot instance config for multi-bot mode."""
name: str = Field(min_length=1)
class FeishuMultiConfig(Base):
"""Feishu channel configuration supporting multiple bot instances."""
enabled: bool = False
instances: list[FeishuInstanceConfig] = Field(default_factory=list)
class DingTalkConfig(Base):
"""DingTalk channel configuration using Stream mode."""
@@ -60,6 +99,19 @@ class DingTalkConfig(Base):
allow_from: list[str] = Field(default_factory=list) # Allowed staff_ids
class DingTalkInstanceConfig(DingTalkConfig):
"""DingTalk bot instance config for multi-bot mode."""
name: str = Field(min_length=1)
class DingTalkMultiConfig(Base):
"""DingTalk channel configuration supporting multiple bot instances."""
enabled: bool = False
instances: list[DingTalkInstanceConfig] = Field(default_factory=list)
class DiscordConfig(Base):
"""Discord channel configuration."""
@@ -71,6 +123,19 @@ class DiscordConfig(Base):
group_policy: Literal["mention", "open"] = "mention"
class DiscordInstanceConfig(DiscordConfig):
"""Discord bot instance config for multi-bot mode."""
name: str = Field(min_length=1)
class DiscordMultiConfig(Base):
"""Discord channel configuration supporting multiple bot instances."""
enabled: bool = False
instances: list[DiscordInstanceConfig] = Field(default_factory=list)
class MatrixConfig(Base):
"""Matrix (Element) channel configuration."""
@@ -92,6 +157,19 @@ class MatrixConfig(Base):
allow_room_mentions: bool = False
class MatrixInstanceConfig(MatrixConfig):
"""Matrix bot/account instance config for multi-account mode."""
name: str = Field(min_length=1)
class MatrixMultiConfig(Base):
"""Matrix channel configuration supporting multiple accounts."""
enabled: bool = False
instances: list[MatrixInstanceConfig] = Field(default_factory=list)
class EmailConfig(Base):
"""Email channel configuration (IMAP inbound + SMTP outbound)."""
@@ -126,6 +204,19 @@ class EmailConfig(Base):
allow_from: list[str] = Field(default_factory=list) # Allowed sender email addresses
class EmailInstanceConfig(EmailConfig):
"""Email account instance config for multi-account mode."""
name: str = Field(min_length=1)
class EmailMultiConfig(Base):
"""Email channel configuration supporting multiple accounts."""
enabled: bool = False
instances: list[EmailInstanceConfig] = Field(default_factory=list)
class MochatMentionConfig(Base):
"""Mochat mention behavior configuration."""
@@ -165,6 +256,19 @@ class MochatConfig(Base):
reply_delay_ms: int = 120000
class MochatInstanceConfig(MochatConfig):
"""Mochat account instance config for multi-account mode."""
name: str = Field(min_length=1)
class MochatMultiConfig(Base):
"""Mochat channel configuration supporting multiple accounts."""
enabled: bool = False
instances: list[MochatInstanceConfig] = Field(default_factory=list)
class SlackDMConfig(Base):
"""Slack DM policy configuration."""
@@ -190,15 +294,39 @@ class SlackConfig(Base):
dm: SlackDMConfig = Field(default_factory=SlackDMConfig)
class SlackInstanceConfig(SlackConfig):
"""Slack bot instance config for multi-bot mode."""
name: str = Field(min_length=1)
class SlackMultiConfig(Base):
"""Slack channel configuration supporting multiple bot instances."""
enabled: bool = False
instances: list[SlackInstanceConfig] = Field(default_factory=list)
class QQConfig(Base):
"""QQ channel configuration using botpy SDK."""
"""QQ channel configuration using botpy SDK (single instance)."""
enabled: bool = False
app_id: str = "" # 机器人 ID (AppID) from q.qq.com
secret: str = "" # 机器人密钥 (AppSecret) from q.qq.com
allow_from: list[str] = Field(
default_factory=list
) # Allowed user openids (empty = public access)
allow_from: list[str] = Field(default_factory=list) # Allowed user openids
class QQInstanceConfig(QQConfig):
"""QQ bot instance config for multi-bot mode."""
name: str = Field(min_length=1) # instance key, routed as channel name "qq/<name>"
class QQMultiConfig(Base):
"""QQ channel configuration supporting multiple bot instances."""
enabled: bool = False
instances: list[QQInstanceConfig] = Field(default_factory=list)
class WecomConfig(Base):
@@ -211,22 +339,82 @@ class WecomConfig(Base):
welcome_message: str = "" # Welcome message for enter_chat event
class WecomInstanceConfig(WecomConfig):
"""WeCom bot instance config for multi-bot mode."""
name: str = Field(min_length=1)
class WecomMultiConfig(Base):
"""WeCom channel configuration supporting multiple bot instances."""
enabled: bool = False
instances: list[WecomInstanceConfig] = Field(default_factory=list)
def _coerce_multi_channel_config(
value: Any,
single_cls: type[BaseModel],
multi_cls: type[BaseModel],
) -> BaseModel:
"""Parse a channel config into single- or multi-instance form."""
if isinstance(value, (single_cls, multi_cls)):
return value
if value is None:
return single_cls()
if isinstance(value, dict) and "instances" in value:
return multi_cls.model_validate(value)
return single_cls.model_validate(value)
class ChannelsConfig(Base):
"""Configuration for chat channels."""
send_progress: bool = True # stream agent's text progress to the channel
send_tool_hints: bool = False # stream tool-call hints (e.g. read_file("…"))
whatsapp: WhatsAppConfig = Field(default_factory=WhatsAppConfig)
telegram: TelegramConfig = Field(default_factory=TelegramConfig)
discord: DiscordConfig = Field(default_factory=DiscordConfig)
feishu: FeishuConfig = Field(default_factory=FeishuConfig)
mochat: MochatConfig = Field(default_factory=MochatConfig)
dingtalk: DingTalkConfig = Field(default_factory=DingTalkConfig)
email: EmailConfig = Field(default_factory=EmailConfig)
slack: SlackConfig = Field(default_factory=SlackConfig)
qq: QQConfig = Field(default_factory=QQConfig)
matrix: MatrixConfig = Field(default_factory=MatrixConfig)
wecom: WecomConfig = Field(default_factory=WecomConfig)
whatsapp: WhatsAppConfig | WhatsAppMultiConfig = Field(default_factory=WhatsAppConfig)
telegram: TelegramConfig | TelegramMultiConfig = Field(default_factory=TelegramConfig)
discord: DiscordConfig | DiscordMultiConfig = Field(default_factory=DiscordConfig)
feishu: FeishuConfig | FeishuMultiConfig = Field(default_factory=FeishuConfig)
mochat: MochatConfig | MochatMultiConfig = Field(default_factory=MochatConfig)
dingtalk: DingTalkConfig | DingTalkMultiConfig = Field(default_factory=DingTalkConfig)
email: EmailConfig | EmailMultiConfig = Field(default_factory=EmailConfig)
slack: SlackConfig | SlackMultiConfig = Field(default_factory=SlackConfig)
qq: QQConfig | QQMultiConfig = Field(default_factory=QQConfig)
matrix: MatrixConfig | MatrixMultiConfig = Field(default_factory=MatrixConfig)
wecom: WecomConfig | WecomMultiConfig = Field(default_factory=WecomConfig)
@field_validator(
"whatsapp",
"telegram",
"discord",
"feishu",
"mochat",
"dingtalk",
"email",
"slack",
"qq",
"matrix",
"wecom",
mode="before",
)
@classmethod
def _parse_multi_instance_channels(cls, value: Any, info: ValidationInfo) -> BaseModel:
mapping: dict[str, tuple[type[BaseModel], type[BaseModel]]] = {
"whatsapp": (WhatsAppConfig, WhatsAppMultiConfig),
"telegram": (TelegramConfig, TelegramMultiConfig),
"discord": (DiscordConfig, DiscordMultiConfig),
"feishu": (FeishuConfig, FeishuMultiConfig),
"mochat": (MochatConfig, MochatMultiConfig),
"dingtalk": (DingTalkConfig, DingTalkMultiConfig),
"email": (EmailConfig, EmailMultiConfig),
"slack": (SlackConfig, SlackMultiConfig),
"qq": (QQConfig, QQMultiConfig),
"matrix": (MatrixConfig, MatrixMultiConfig),
"wecom": (WecomConfig, WecomMultiConfig),
}
single_cls, multi_cls = mapping[info.field_name]
return _coerce_multi_channel_config(value, single_cls, multi_cls)
class AgentDefaults(Base):