refactor(channels): abstract login() into BaseChannel, unify CLI commands

Move channel-specific login logic from CLI into each channel class via a
new `login(force=False)` method on BaseChannel. The `channels login <name>`
command now dynamically loads the channel and calls its login() method.

- WeixinChannel.login(): calls existing _qr_login(), with force to clear saved token
- WhatsAppChannel.login(): sets up bridge and spawns npm process for QR login
- CLI no longer contains duplicate login logic per channel
- Update CHANNEL_PLUGIN_GUIDE to document the login() hook

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
chengyongru
2026-03-23 13:50:43 +08:00
committed by Xubin Ren
parent 11e1bbbab7
commit 556b21d011
5 changed files with 184 additions and 156 deletions

View File

@@ -178,6 +178,35 @@ The agent receives the message and processes it. Replies arrive in your `send()`
| `async stop()` | Set `self._running = False` and clean up. Called when gateway shuts down. | | `async stop()` | Set `self._running = False` and clean up. Called when gateway shuts down. |
| `async send(msg: OutboundMessage)` | Deliver an outbound message to the platform. | | `async send(msg: OutboundMessage)` | Deliver an outbound message to the platform. |
### Interactive Login
If your channel requires interactive authentication (e.g. QR code scan), override `login(force=False)`:
```python
async def login(self, force: bool = False) -> bool:
"""
Perform channel-specific interactive login.
Args:
force: If True, ignore existing credentials and re-authenticate.
Returns True if already authenticated or login succeeds.
"""
# For QR-code-based login:
# 1. If force, clear saved credentials
# 2. Check if already authenticated (load from disk/state)
# 3. If not, show QR code and poll for confirmation
# 4. Save token on success
```
Channels that don't need interactive login (e.g. Telegram with bot token, Discord with bot token) inherit the default `login()` which just returns `True`.
Users trigger interactive login via:
```bash
nanobot channels login <channel_name>
nanobot channels login <channel_name> --force # re-authenticate
```
### Provided by Base ### Provided by Base
| Method / Property | Description | | Method / Property | Description |
@@ -188,6 +217,7 @@ The agent receives the message and processes it. Replies arrive in your `send()`
| `transcribe_audio(file_path)` | Transcribes audio via Groq Whisper (if configured). | | `transcribe_audio(file_path)` | Transcribes audio via Groq Whisper (if configured). |
| `supports_streaming` (property) | `True` when config has `"streaming": true` **and** subclass overrides `send_delta()`. | | `supports_streaming` (property) | `True` when config has `"streaming": true` **and** subclass overrides `send_delta()`. |
| `is_running` | Returns `self._running`. | | `is_running` | Returns `self._running`. |
| `login(force=False)` | Perform interactive login (e.g. QR code scan). Returns `True` if already authenticated or login succeeds. Override in subclasses that support interactive login. |
### Optional (streaming) ### Optional (streaming)

View File

@@ -49,6 +49,18 @@ class BaseChannel(ABC):
logger.warning("{}: audio transcription failed: {}", self.name, e) logger.warning("{}: audio transcription failed: {}", self.name, e)
return "" return ""
async def login(self, force: bool = False) -> bool:
"""
Perform channel-specific interactive login (e.g. QR code scan).
Args:
force: If True, ignore existing credentials and force re-authentication.
Returns True if already authenticated or login succeeds.
Override in subclasses that support interactive login.
"""
return True
@abstractmethod @abstractmethod
async def start(self) -> None: async def start(self) -> None:
""" """

View File

@@ -311,6 +311,31 @@ class WeixinChannel(BaseChannel):
# Channel lifecycle # Channel lifecycle
# ------------------------------------------------------------------ # ------------------------------------------------------------------
async def login(self, force: bool = False) -> bool:
"""Perform QR code login and save token. Returns True on success."""
if force:
self._token = ""
self._get_updates_buf = ""
state_file = self._get_state_dir() / "account.json"
if state_file.exists():
state_file.unlink()
if self._token or self._load_state():
return True
# Initialize HTTP client for the login flow
self._client = httpx.AsyncClient(
timeout=httpx.Timeout(60, connect=30),
follow_redirects=True,
)
self._running = True # Enable polling loop in _qr_login()
try:
return await self._qr_login()
finally:
self._running = False
if self._client:
await self._client.aclose()
self._client = None
async def start(self) -> None: async def start(self) -> None:
self._running = True self._running = True
self._next_poll_timeout_s = self.config.poll_timeout self._next_poll_timeout_s = self.config.poll_timeout
@@ -323,7 +348,7 @@ class WeixinChannel(BaseChannel):
self._token = self.config.token self._token = self.config.token
elif not self._load_state(): elif not self._load_state():
if not await self._qr_login(): if not await self._qr_login():
logger.error("WeChat login failed. Run 'nanobot weixin login' to authenticate.") logger.error("WeChat login failed. Run 'nanobot channels login weixin' to authenticate.")
self._running = False self._running = False
return return

View File

@@ -3,11 +3,14 @@
import asyncio import asyncio
import json import json
import mimetypes import mimetypes
import os
import shutil
import subprocess
from collections import OrderedDict from collections import OrderedDict
from typing import Any from pathlib import Path
from typing import Any, Literal
from loguru import logger from loguru import logger
from pydantic import Field from pydantic import Field
from nanobot.bus.events import OutboundMessage from nanobot.bus.events import OutboundMessage
@@ -48,6 +51,37 @@ class WhatsAppChannel(BaseChannel):
self._connected = False self._connected = False
self._processed_message_ids: OrderedDict[str, None] = OrderedDict() self._processed_message_ids: OrderedDict[str, None] = OrderedDict()
async def login(self, force: bool = False) -> bool:
"""
Set up and run the WhatsApp bridge for QR code login.
This spawns the Node.js bridge process which handles the WhatsApp
authentication flow. The process blocks until the user scans the QR code
or interrupts with Ctrl+C.
"""
from nanobot.config.paths import get_runtime_subdir
try:
bridge_dir = _ensure_bridge_setup()
except RuntimeError as e:
logger.error("{}", e)
return False
env = {**os.environ}
if self.config.bridge_token:
env["BRIDGE_TOKEN"] = self.config.bridge_token
env["AUTH_DIR"] = str(get_runtime_subdir("whatsapp-auth"))
logger.info("Starting WhatsApp bridge for QR login...")
try:
subprocess.run(
[shutil.which("npm"), "start"], cwd=bridge_dir, check=True, env=env
)
except subprocess.CalledProcessError:
return False
return True
async def start(self) -> None: async def start(self) -> None:
"""Start the WhatsApp channel by connecting to the bridge.""" """Start the WhatsApp channel by connecting to the bridge."""
import websockets import websockets
@@ -64,7 +98,9 @@ class WhatsAppChannel(BaseChannel):
self._ws = ws self._ws = ws
# Send auth token if configured # Send auth token if configured
if self.config.bridge_token: if self.config.bridge_token:
await ws.send(json.dumps({"type": "auth", "token": self.config.bridge_token})) await ws.send(
json.dumps({"type": "auth", "token": self.config.bridge_token})
)
self._connected = True self._connected = True
logger.info("Connected to WhatsApp bridge") logger.info("Connected to WhatsApp bridge")
@@ -102,11 +138,7 @@ class WhatsAppChannel(BaseChannel):
return return
try: try:
payload = { payload = {"type": "send", "to": msg.chat_id, "text": msg.content}
"type": "send",
"to": msg.chat_id,
"text": msg.content
}
await self._ws.send(json.dumps(payload, ensure_ascii=False)) await self._ws.send(json.dumps(payload, ensure_ascii=False))
except Exception as e: except Exception as e:
logger.error("Error sending WhatsApp message: {}", e) logger.error("Error sending WhatsApp message: {}", e)
@@ -144,7 +176,10 @@ class WhatsAppChannel(BaseChannel):
# Handle voice transcription if it's a voice message # Handle voice transcription if it's a voice message
if content == "[Voice Message]": if content == "[Voice Message]":
logger.info("Voice message received from {}, but direct download from bridge is not yet supported.", sender_id) logger.info(
"Voice message received from {}, but direct download from bridge is not yet supported.",
sender_id,
)
content = "[Voice Message: Transcription not available for WhatsApp yet]" content = "[Voice Message: Transcription not available for WhatsApp yet]"
# Extract media paths (images/documents/videos downloaded by the bridge) # Extract media paths (images/documents/videos downloaded by the bridge)
@@ -166,8 +201,8 @@ class WhatsAppChannel(BaseChannel):
metadata={ metadata={
"message_id": message_id, "message_id": message_id,
"timestamp": data.get("timestamp"), "timestamp": data.get("timestamp"),
"is_group": data.get("isGroup", False) "is_group": data.get("isGroup", False),
} },
) )
elif msg_type == "status": elif msg_type == "status":
@@ -185,4 +220,55 @@ class WhatsAppChannel(BaseChannel):
logger.info("Scan QR code in the bridge terminal to connect WhatsApp") logger.info("Scan QR code in the bridge terminal to connect WhatsApp")
elif msg_type == "error": elif msg_type == "error":
logger.error("WhatsApp bridge error: {}", data.get('error')) logger.error("WhatsApp bridge error: {}", data.get("error"))
def _ensure_bridge_setup() -> Path:
"""
Ensure the WhatsApp bridge is set up and built.
Returns the bridge directory. Raises RuntimeError if npm is not found
or bridge cannot be built.
"""
from nanobot.config.paths import get_bridge_install_dir
user_bridge = get_bridge_install_dir()
if (user_bridge / "dist" / "index.js").exists():
return user_bridge
npm_path = shutil.which("npm")
if not npm_path:
raise RuntimeError("npm not found. Please install Node.js >= 18.")
# Find source bridge
current_file = Path(__file__)
pkg_bridge = current_file.parent.parent / "bridge"
src_bridge = current_file.parent.parent.parent / "bridge"
source = None
if (pkg_bridge / "package.json").exists():
source = pkg_bridge
elif (src_bridge / "package.json").exists():
source = src_bridge
if not source:
raise RuntimeError(
"WhatsApp bridge source not found. "
"Try reinstalling: pip install --force-reinstall nanobot"
)
logger.info("Setting up WhatsApp bridge...")
user_bridge.parent.mkdir(parents=True, exist_ok=True)
if user_bridge.exists():
shutil.rmtree(user_bridge)
shutil.copytree(source, user_bridge, ignore=shutil.ignore_patterns("node_modules", "dist"))
logger.info(" Installing dependencies...")
subprocess.run([npm_path, "install"], cwd=user_bridge, check=True, capture_output=True)
logger.info(" Building...")
subprocess.run([npm_path, "run", "build"], cwd=user_bridge, check=True, capture_output=True)
logger.info("Bridge ready")
return user_bridge

View File

@@ -1004,158 +1004,33 @@ def _get_bridge_dir() -> Path:
@channels_app.command("login") @channels_app.command("login")
def channels_login(): def channels_login(
"""Link device via QR code.""" channel_name: str = typer.Argument(..., help="Channel name (e.g. weixin, whatsapp)"),
import shutil force: bool = typer.Option(False, "--force", "-f", help="Force re-authentication even if already logged in"),
import subprocess ):
"""Authenticate with a channel via QR code or other interactive login."""
from nanobot.channels.registry import discover_all, load_channel_class
from nanobot.config.loader import load_config from nanobot.config.loader import load_config
from nanobot.config.paths import get_runtime_subdir
config = load_config() config = load_config()
bridge_dir = _get_bridge_dir() channel_cfg = getattr(config.channels, channel_name, None) or {}
console.print(f"{__logo__} Starting bridge...") # Validate channel exists
console.print("Scan the QR code to connect.\n") all_channels = discover_all()
if channel_name not in all_channels:
env = {**os.environ} available = ", ".join(all_channels.keys())
wa_cfg = getattr(config.channels, "whatsapp", None) or {} console.print(f"[red]Unknown channel: {channel_name}[/red] Available: {available}")
bridge_token = wa_cfg.get("bridgeToken", "") if isinstance(wa_cfg, dict) else getattr(wa_cfg, "bridge_token", "")
if bridge_token:
env["BRIDGE_TOKEN"] = bridge_token
env["AUTH_DIR"] = str(get_runtime_subdir("whatsapp-auth"))
npm_path = shutil.which("npm")
if not npm_path:
console.print("[red]npm not found. Please install Node.js.[/red]")
raise typer.Exit(1) raise typer.Exit(1)
try: console.print(f"{__logo__} {all_channels[channel_name].display_name} Login\n")
subprocess.run([npm_path, "start"], cwd=bridge_dir, check=True, env=env)
except subprocess.CalledProcessError as e:
console.print(f"[red]Bridge failed: {e}[/red]")
channel_cls = load_channel_class(channel_name)
channel = channel_cls(channel_cfg, bus=None)
# ============================================================================ success = asyncio.run(channel.login(force=force))
# WeChat (WeXin) Commands
# ============================================================================
weixin_app = typer.Typer(help="WeChat (微信) account management") if not success:
app.add_typer(weixin_app, name="weixin") raise typer.Exit(1)
@weixin_app.command("login")
def weixin_login():
"""Authenticate with personal WeChat via QR code scan."""
import json as _json
from nanobot.config.loader import load_config
from nanobot.config.paths import get_runtime_subdir
config = load_config()
weixin_cfg = getattr(config.channels, "weixin", None) or {}
base_url = (
weixin_cfg.get("baseUrl", "https://ilinkai.weixin.qq.com")
if isinstance(weixin_cfg, dict)
else getattr(weixin_cfg, "base_url", "https://ilinkai.weixin.qq.com")
)
state_dir = get_runtime_subdir("weixin")
account_file = state_dir / "account.json"
console.print(f"{__logo__} WeChat QR Code Login\n")
async def _run_login():
import httpx as _httpx
headers = {
"Content-Type": "application/json",
}
async with _httpx.AsyncClient(timeout=60, follow_redirects=True) as client:
# Step 1: Get QR code
console.print("[cyan]Fetching QR code...[/cyan]")
resp = await client.get(
f"{base_url}/ilink/bot/get_bot_qrcode",
params={"bot_type": "3"},
headers=headers,
)
resp.raise_for_status()
data = resp.json()
# qrcode_img_content is the scannable URL; qrcode is the poll ID
qrcode_img_content = data.get("qrcode_img_content", "")
qrcode_id = data.get("qrcode", "")
if not qrcode_id:
console.print(f"[red]Failed to get QR code: {data}[/red]")
return
scan_url = qrcode_img_content or qrcode_id
# Print QR code
try:
import qrcode as qr_lib
qr = qr_lib.QRCode(border=1)
qr.add_data(scan_url)
qr.make(fit=True)
qr.print_ascii(invert=True)
except ImportError:
console.print("\n[yellow]Install 'qrcode' for terminal QR display[/yellow]")
console.print(f"\nLogin URL: {scan_url}\n")
console.print("\n[cyan]Scan the QR code with WeChat...[/cyan]")
# Step 2: Poll for scan (iLink-App-ClientVersion header per login-qr.ts)
poll_headers = {**headers, "iLink-App-ClientVersion": "1"}
for _ in range(120): # ~4 minute timeout
try:
resp = await client.get(
f"{base_url}/ilink/bot/get_qrcode_status",
params={"qrcode": qrcode_id},
headers=poll_headers,
)
resp.raise_for_status()
status_data = resp.json()
except _httpx.TimeoutException:
continue
status = status_data.get("status", "")
if status == "confirmed":
token = status_data.get("bot_token", "")
bot_id = status_data.get("ilink_bot_id", "")
base_url_resp = status_data.get("baseurl", "")
user_id = status_data.get("ilink_user_id", "")
if token:
account = {
"token": token,
"get_updates_buf": "",
}
if base_url_resp:
account["base_url"] = base_url_resp
account_file.write_text(_json.dumps(account, ensure_ascii=False))
console.print("\n[green]✓ WeChat login successful![/green]")
if bot_id:
console.print(f"[dim]Bot ID: {bot_id}[/dim]")
if user_id:
console.print(
f"[dim]User ID: {user_id} (add to allowFrom in config)[/dim]"
)
console.print(f"[dim]Credentials saved to {account_file}[/dim]")
return
else:
console.print("[red]Login confirmed but no token received.[/red]")
return
elif status == "scaned":
console.print("[cyan]Scanned! Confirm on your phone...[/cyan]")
elif status == "expired":
console.print("[red]QR code expired. Please try again.[/red]")
return
await asyncio.sleep(2)
console.print("[red]Login timed out. Please try again.[/red]")
asyncio.run(_run_login())
# ============================================================================ # ============================================================================