fix: complete ensure_ascii=False and UTF-8 encoding migration

This commit is contained in:
Re-bin
2026-02-20 07:59:32 +00:00
25 changed files with 356 additions and 235 deletions

View File

@@ -219,7 +219,7 @@ class AgentLoop:
for tool_call in response.tool_calls: for tool_call in response.tool_calls:
tools_used.append(tool_call.name) tools_used.append(tool_call.name)
args_str = json.dumps(tool_call.arguments, ensure_ascii=False) args_str = json.dumps(tool_call.arguments, ensure_ascii=False)
logger.info(f"Tool call: {tool_call.name}({args_str[:200]})") logger.info("Tool call: {}({})", tool_call.name, args_str[:200])
result = await self.tools.execute(tool_call.name, tool_call.arguments) result = await self.tools.execute(tool_call.name, tool_call.arguments)
messages = self.context.add_tool_result( messages = self.context.add_tool_result(
messages, tool_call.id, tool_call.name, result messages, tool_call.id, tool_call.name, result
@@ -247,7 +247,7 @@ class AgentLoop:
if response: if response:
await self.bus.publish_outbound(response) await self.bus.publish_outbound(response)
except Exception as e: except Exception as e:
logger.error(f"Error processing message: {e}") logger.error("Error processing message: {}", e)
await self.bus.publish_outbound(OutboundMessage( await self.bus.publish_outbound(OutboundMessage(
channel=msg.channel, channel=msg.channel,
chat_id=msg.chat_id, chat_id=msg.chat_id,
@@ -292,7 +292,7 @@ class AgentLoop:
return await self._process_system_message(msg) return await self._process_system_message(msg)
preview = msg.content[:80] + "..." if len(msg.content) > 80 else msg.content preview = msg.content[:80] + "..." if len(msg.content) > 80 else msg.content
logger.info(f"Processing message from {msg.channel}:{msg.sender_id}: {preview}") logger.info("Processing message from {}:{}: {}", msg.channel, msg.sender_id, preview)
key = session_key or msg.session_key key = session_key or msg.session_key
session = self.sessions.get_or_create(key) session = self.sessions.get_or_create(key)
@@ -344,7 +344,7 @@ class AgentLoop:
final_content = "I've completed processing but have no response to give." final_content = "I've completed processing but have no response to give."
preview = final_content[:120] + "..." if len(final_content) > 120 else final_content preview = final_content[:120] + "..." if len(final_content) > 120 else final_content
logger.info(f"Response to {msg.channel}:{msg.sender_id}: {preview}") logger.info("Response to {}:{}: {}", msg.channel, msg.sender_id, preview)
session.add_message("user", msg.content) session.add_message("user", msg.content)
session.add_message("assistant", final_content, session.add_message("assistant", final_content,
@@ -365,7 +365,7 @@ class AgentLoop:
The chat_id field contains "original_channel:original_chat_id" to route The chat_id field contains "original_channel:original_chat_id" to route
the response back to the correct destination. the response back to the correct destination.
""" """
logger.info(f"Processing system message from {msg.sender_id}") logger.info("Processing system message from {}", msg.sender_id)
# Parse origin from chat_id (format: "channel:chat_id") # Parse origin from chat_id (format: "channel:chat_id")
if ":" in msg.chat_id: if ":" in msg.chat_id:
@@ -388,7 +388,6 @@ class AgentLoop:
) )
final_content, _ = await self._run_agent_loop(initial_messages) final_content, _ = await self._run_agent_loop(initial_messages)
if final_content is None: if final_content is None:
final_content = "Background task completed." final_content = "Background task completed."
@@ -414,22 +413,22 @@ class AgentLoop:
if archive_all: if archive_all:
old_messages = session.messages old_messages = session.messages
keep_count = 0 keep_count = 0
logger.info(f"Memory consolidation (archive_all): {len(session.messages)} total messages archived") logger.info("Memory consolidation (archive_all): {} total messages archived", len(session.messages))
else: else:
keep_count = self.memory_window // 2 keep_count = self.memory_window // 2
if len(session.messages) <= keep_count: if len(session.messages) <= keep_count:
logger.debug(f"Session {session.key}: No consolidation needed (messages={len(session.messages)}, keep={keep_count})") logger.debug("Session {}: No consolidation needed (messages={}, keep={})", session.key, len(session.messages), keep_count)
return return
messages_to_process = len(session.messages) - session.last_consolidated messages_to_process = len(session.messages) - session.last_consolidated
if messages_to_process <= 0: if messages_to_process <= 0:
logger.debug(f"Session {session.key}: No new messages to consolidate (last_consolidated={session.last_consolidated}, total={len(session.messages)})") logger.debug("Session {}: No new messages to consolidate (last_consolidated={}, total={})", session.key, session.last_consolidated, len(session.messages))
return return
old_messages = session.messages[session.last_consolidated:-keep_count] old_messages = session.messages[session.last_consolidated:-keep_count]
if not old_messages: if not old_messages:
return return
logger.info(f"Memory consolidation started: {len(session.messages)} total, {len(old_messages)} new to consolidate, {keep_count} keep") logger.info("Memory consolidation started: {} total, {} new to consolidate, {} keep", len(session.messages), len(old_messages), keep_count)
lines = [] lines = []
for m in old_messages: for m in old_messages:
@@ -470,7 +469,7 @@ Respond with ONLY valid JSON, no markdown fences."""
text = text.split("\n", 1)[-1].rsplit("```", 1)[0].strip() text = text.split("\n", 1)[-1].rsplit("```", 1)[0].strip()
result = json_repair.loads(text) result = json_repair.loads(text)
if not isinstance(result, dict): if not isinstance(result, dict):
logger.warning(f"Memory consolidation: unexpected response type, skipping. Response: {text[:200]}") logger.warning("Memory consolidation: unexpected response type, skipping. Response: {}", text[:200])
return return
if entry := result.get("history_entry"): if entry := result.get("history_entry"):
@@ -483,9 +482,9 @@ Respond with ONLY valid JSON, no markdown fences."""
session.last_consolidated = 0 session.last_consolidated = 0
else: else:
session.last_consolidated = len(session.messages) - keep_count session.last_consolidated = len(session.messages) - keep_count
logger.info(f"Memory consolidation done: {len(session.messages)} messages, last_consolidated={session.last_consolidated}") logger.info("Memory consolidation done: {} messages, last_consolidated={}", len(session.messages), session.last_consolidated)
except Exception as e: except Exception as e:
logger.error(f"Memory consolidation failed: {e}") logger.error("Memory consolidation failed: {}", e)
async def process_direct( async def process_direct(
self, self,

View File

@@ -86,7 +86,7 @@ class SubagentManager:
# Cleanup when done # Cleanup when done
bg_task.add_done_callback(lambda _: self._running_tasks.pop(task_id, None)) bg_task.add_done_callback(lambda _: self._running_tasks.pop(task_id, None))
logger.info(f"Spawned subagent [{task_id}]: {display_label}") logger.info("Spawned subagent [{}]: {}", task_id, display_label)
return f"Subagent [{display_label}] started (id: {task_id}). I'll notify you when it completes." return f"Subagent [{display_label}] started (id: {task_id}). I'll notify you when it completes."
async def _run_subagent( async def _run_subagent(
@@ -97,7 +97,7 @@ class SubagentManager:
origin: dict[str, str], origin: dict[str, str],
) -> None: ) -> None:
"""Execute the subagent task and announce the result.""" """Execute the subagent task and announce the result."""
logger.info(f"Subagent [{task_id}] starting task: {label}") logger.info("Subagent [{}] starting task: {}", task_id, label)
try: try:
# Build subagent tools (no message tool, no spawn tool) # Build subagent tools (no message tool, no spawn tool)
@@ -160,7 +160,7 @@ class SubagentManager:
# Execute tools # Execute tools
for tool_call in response.tool_calls: for tool_call in response.tool_calls:
args_str = json.dumps(tool_call.arguments, ensure_ascii=False) args_str = json.dumps(tool_call.arguments, ensure_ascii=False)
logger.debug(f"Subagent [{task_id}] executing: {tool_call.name} with arguments: {args_str}") logger.debug("Subagent [{}] executing: {} with arguments: {}", task_id, tool_call.name, args_str)
result = await tools.execute(tool_call.name, tool_call.arguments) result = await tools.execute(tool_call.name, tool_call.arguments)
messages.append({ messages.append({
"role": "tool", "role": "tool",
@@ -175,12 +175,12 @@ class SubagentManager:
if final_result is None: if final_result is None:
final_result = "Task completed but no final response was generated." final_result = "Task completed but no final response was generated."
logger.info(f"Subagent [{task_id}] completed successfully") logger.info("Subagent [{}] completed successfully", task_id)
await self._announce_result(task_id, label, task, final_result, origin, "ok") await self._announce_result(task_id, label, task, final_result, origin, "ok")
except Exception as e: except Exception as e:
error_msg = f"Error: {str(e)}" error_msg = f"Error: {str(e)}"
logger.error(f"Subagent [{task_id}] failed: {e}") logger.error("Subagent [{}] failed: {}", task_id, e)
await self._announce_result(task_id, label, task, error_msg, origin, "error") await self._announce_result(task_id, label, task, error_msg, origin, "error")
async def _announce_result( async def _announce_result(
@@ -213,7 +213,7 @@ Summarize this naturally for the user. Keep it brief (1-2 sentences). Do not men
) )
await self.bus.publish_inbound(msg) await self.bus.publish_inbound(msg)
logger.debug(f"Subagent [{task_id}] announced result to {origin['channel']}:{origin['chat_id']}") logger.debug("Subagent [{}] announced result to {}:{}", task_id, origin['channel'], origin['chat_id'])
def _build_subagent_prompt(self, task: str) -> str: def _build_subagent_prompt(self, task: str) -> str:
"""Build a focused system prompt for the subagent.""" """Build a focused system prompt for the subagent."""

View File

@@ -63,7 +63,7 @@ async def connect_mcp_servers(
streamable_http_client(cfg.url) streamable_http_client(cfg.url)
) )
else: else:
logger.warning(f"MCP server '{name}': no command or url configured, skipping") logger.warning("MCP server '{}': no command or url configured, skipping", name)
continue continue
session = await stack.enter_async_context(ClientSession(read, write)) session = await stack.enter_async_context(ClientSession(read, write))
@@ -73,8 +73,8 @@ async def connect_mcp_servers(
for tool_def in tools.tools: for tool_def in tools.tools:
wrapper = MCPToolWrapper(session, name, tool_def) wrapper = MCPToolWrapper(session, name, tool_def)
registry.register(wrapper) registry.register(wrapper)
logger.debug(f"MCP: registered tool '{wrapper.name}' from server '{name}'") logger.debug("MCP: registered tool '{}' from server '{}'", wrapper.name, name)
logger.info(f"MCP server '{name}': connected, {len(tools.tools)} tools registered") logger.info("MCP server '{}': connected, {} tools registered", name, len(tools.tools))
except Exception as e: except Exception as e:
logger.error(f"MCP server '{name}': failed to connect: {e}") logger.error("MCP server '{}': failed to connect: {}", name, e)

View File

@@ -26,7 +26,8 @@ class ExecTool(Tool):
r"\brm\s+-[rf]{1,2}\b", # rm -r, rm -rf, rm -fr r"\brm\s+-[rf]{1,2}\b", # rm -r, rm -rf, rm -fr
r"\bdel\s+/[fq]\b", # del /f, del /q r"\bdel\s+/[fq]\b", # del /f, del /q
r"\brmdir\s+/s\b", # rmdir /s r"\brmdir\s+/s\b", # rmdir /s
r"\b(format|mkfs|diskpart)\b", # disk operations r"(?:^|[;&|]\s*)format\b", # format (as standalone command only)
r"\b(mkfs|diskpart)\b", # disk operations
r"\bdd\s+if=", # dd r"\bdd\s+if=", # dd
r">\s*/dev/sd", # write to disk r">\s*/dev/sd", # write to disk
r"\b(shutdown|reboot|poweroff)\b", # system power r"\b(shutdown|reboot|poweroff)\b", # system power
@@ -81,6 +82,12 @@ class ExecTool(Tool):
) )
except asyncio.TimeoutError: except asyncio.TimeoutError:
process.kill() process.kill()
# Wait for the process to fully terminate so pipes are
# drained and file descriptors are released.
try:
await asyncio.wait_for(process.wait(), timeout=5.0)
except asyncio.TimeoutError:
pass
return f"Error: Command timed out after {self.timeout} seconds" return f"Error: Command timed out after {self.timeout} seconds"
output_parts = [] output_parts = []

View File

@@ -1,9 +1,6 @@
"""Async message queue for decoupled channel-agent communication.""" """Async message queue for decoupled channel-agent communication."""
import asyncio import asyncio
from typing import Callable, Awaitable
from loguru import logger
from nanobot.bus.events import InboundMessage, OutboundMessage from nanobot.bus.events import InboundMessage, OutboundMessage
@@ -11,70 +8,36 @@ from nanobot.bus.events import InboundMessage, OutboundMessage
class MessageBus: class MessageBus:
""" """
Async message bus that decouples chat channels from the agent core. Async message bus that decouples chat channels from the agent core.
Channels push messages to the inbound queue, and the agent processes Channels push messages to the inbound queue, and the agent processes
them and pushes responses to the outbound queue. them and pushes responses to the outbound queue.
""" """
def __init__(self): def __init__(self):
self.inbound: asyncio.Queue[InboundMessage] = asyncio.Queue() self.inbound: asyncio.Queue[InboundMessage] = asyncio.Queue()
self.outbound: asyncio.Queue[OutboundMessage] = asyncio.Queue() self.outbound: asyncio.Queue[OutboundMessage] = asyncio.Queue()
self._outbound_subscribers: dict[str, list[Callable[[OutboundMessage], Awaitable[None]]]] = {}
self._running = False
async def publish_inbound(self, msg: InboundMessage) -> None: async def publish_inbound(self, msg: InboundMessage) -> None:
"""Publish a message from a channel to the agent.""" """Publish a message from a channel to the agent."""
await self.inbound.put(msg) await self.inbound.put(msg)
async def consume_inbound(self) -> InboundMessage: async def consume_inbound(self) -> InboundMessage:
"""Consume the next inbound message (blocks until available).""" """Consume the next inbound message (blocks until available)."""
return await self.inbound.get() return await self.inbound.get()
async def publish_outbound(self, msg: OutboundMessage) -> None: async def publish_outbound(self, msg: OutboundMessage) -> None:
"""Publish a response from the agent to channels.""" """Publish a response from the agent to channels."""
await self.outbound.put(msg) await self.outbound.put(msg)
async def consume_outbound(self) -> OutboundMessage: async def consume_outbound(self) -> OutboundMessage:
"""Consume the next outbound message (blocks until available).""" """Consume the next outbound message (blocks until available)."""
return await self.outbound.get() return await self.outbound.get()
def subscribe_outbound(
self,
channel: str,
callback: Callable[[OutboundMessage], Awaitable[None]]
) -> None:
"""Subscribe to outbound messages for a specific channel."""
if channel not in self._outbound_subscribers:
self._outbound_subscribers[channel] = []
self._outbound_subscribers[channel].append(callback)
async def dispatch_outbound(self) -> None:
"""
Dispatch outbound messages to subscribed channels.
Run this as a background task.
"""
self._running = True
while self._running:
try:
msg = await asyncio.wait_for(self.outbound.get(), timeout=1.0)
subscribers = self._outbound_subscribers.get(msg.channel, [])
for callback in subscribers:
try:
await callback(msg)
except Exception as e:
logger.error(f"Error dispatching to {msg.channel}: {e}")
except asyncio.TimeoutError:
continue
def stop(self) -> None:
"""Stop the dispatcher loop."""
self._running = False
@property @property
def inbound_size(self) -> int: def inbound_size(self) -> int:
"""Number of pending inbound messages.""" """Number of pending inbound messages."""
return self.inbound.qsize() return self.inbound.qsize()
@property @property
def outbound_size(self) -> int: def outbound_size(self) -> int:
"""Number of pending outbound messages.""" """Number of pending outbound messages."""

View File

@@ -65,7 +65,7 @@ class NanobotDingTalkHandler(CallbackHandler):
sender_id = chatbot_msg.sender_staff_id or chatbot_msg.sender_id sender_id = chatbot_msg.sender_staff_id or chatbot_msg.sender_id
sender_name = chatbot_msg.sender_nick or "Unknown" sender_name = chatbot_msg.sender_nick or "Unknown"
logger.info(f"Received DingTalk message from {sender_name} ({sender_id}): {content}") logger.info("Received DingTalk message from {} ({}): {}", sender_name, sender_id, content)
# Forward to Nanobot via _on_message (non-blocking). # Forward to Nanobot via _on_message (non-blocking).
# Store reference to prevent GC before task completes. # Store reference to prevent GC before task completes.
@@ -78,7 +78,7 @@ class NanobotDingTalkHandler(CallbackHandler):
return AckMessage.STATUS_OK, "OK" return AckMessage.STATUS_OK, "OK"
except Exception as e: except Exception as e:
logger.error(f"Error processing DingTalk message: {e}") logger.error("Error processing DingTalk message: {}", e)
# Return OK to avoid retry loop from DingTalk server # Return OK to avoid retry loop from DingTalk server
return AckMessage.STATUS_OK, "Error" return AckMessage.STATUS_OK, "Error"
@@ -142,13 +142,13 @@ class DingTalkChannel(BaseChannel):
try: try:
await self._client.start() await self._client.start()
except Exception as e: except Exception as e:
logger.warning(f"DingTalk stream error: {e}") logger.warning("DingTalk stream error: {}", e)
if self._running: if self._running:
logger.info("Reconnecting DingTalk stream in 5 seconds...") logger.info("Reconnecting DingTalk stream in 5 seconds...")
await asyncio.sleep(5) await asyncio.sleep(5)
except Exception as e: except Exception as e:
logger.exception(f"Failed to start DingTalk channel: {e}") logger.exception("Failed to start DingTalk channel: {}", e)
async def stop(self) -> None: async def stop(self) -> None:
"""Stop the DingTalk bot.""" """Stop the DingTalk bot."""
@@ -186,7 +186,7 @@ class DingTalkChannel(BaseChannel):
self._token_expiry = time.time() + int(res_data.get("expireIn", 7200)) - 60 self._token_expiry = time.time() + int(res_data.get("expireIn", 7200)) - 60
return self._access_token return self._access_token
except Exception as e: except Exception as e:
logger.error(f"Failed to get DingTalk access token: {e}") logger.error("Failed to get DingTalk access token: {}", e)
return None return None
async def send(self, msg: OutboundMessage) -> None: async def send(self, msg: OutboundMessage) -> None:
@@ -218,11 +218,11 @@ class DingTalkChannel(BaseChannel):
try: try:
resp = await self._http.post(url, json=data, headers=headers) resp = await self._http.post(url, json=data, headers=headers)
if resp.status_code != 200: if resp.status_code != 200:
logger.error(f"DingTalk send failed: {resp.text}") logger.error("DingTalk send failed: {}", resp.text)
else: else:
logger.debug(f"DingTalk message sent to {msg.chat_id}") logger.debug("DingTalk message sent to {}", msg.chat_id)
except Exception as e: except Exception as e:
logger.error(f"Error sending DingTalk message: {e}") logger.error("Error sending DingTalk message: {}", e)
async def _on_message(self, content: str, sender_id: str, sender_name: str) -> None: async def _on_message(self, content: str, sender_id: str, sender_name: str) -> None:
"""Handle incoming message (called by NanobotDingTalkHandler). """Handle incoming message (called by NanobotDingTalkHandler).
@@ -231,7 +231,7 @@ class DingTalkChannel(BaseChannel):
permission checks before publishing to the bus. permission checks before publishing to the bus.
""" """
try: try:
logger.info(f"DingTalk inbound: {content} from {sender_name}") logger.info("DingTalk inbound: {} from {}", content, sender_name)
await self._handle_message( await self._handle_message(
sender_id=sender_id, sender_id=sender_id,
chat_id=sender_id, # For private chat, chat_id == sender_id chat_id=sender_id, # For private chat, chat_id == sender_id
@@ -242,4 +242,4 @@ class DingTalkChannel(BaseChannel):
}, },
) )
except Exception as e: except Exception as e:
logger.error(f"Error publishing DingTalk message: {e}") logger.error("Error publishing DingTalk message: {}", e)

View File

@@ -51,7 +51,7 @@ class DiscordChannel(BaseChannel):
except asyncio.CancelledError: except asyncio.CancelledError:
break break
except Exception as e: except Exception as e:
logger.warning(f"Discord gateway error: {e}") logger.warning("Discord gateway error: {}", e)
if self._running: if self._running:
logger.info("Reconnecting to Discord gateway in 5 seconds...") logger.info("Reconnecting to Discord gateway in 5 seconds...")
await asyncio.sleep(5) await asyncio.sleep(5)
@@ -94,14 +94,14 @@ class DiscordChannel(BaseChannel):
if response.status_code == 429: if response.status_code == 429:
data = response.json() data = response.json()
retry_after = float(data.get("retry_after", 1.0)) retry_after = float(data.get("retry_after", 1.0))
logger.warning(f"Discord rate limited, retrying in {retry_after}s") logger.warning("Discord rate limited, retrying in {}s", retry_after)
await asyncio.sleep(retry_after) await asyncio.sleep(retry_after)
continue continue
response.raise_for_status() response.raise_for_status()
return return
except Exception as e: except Exception as e:
if attempt == 2: if attempt == 2:
logger.error(f"Error sending Discord message: {e}") logger.error("Error sending Discord message: {}", e)
else: else:
await asyncio.sleep(1) await asyncio.sleep(1)
finally: finally:
@@ -116,7 +116,7 @@ class DiscordChannel(BaseChannel):
try: try:
data = json.loads(raw) data = json.loads(raw)
except json.JSONDecodeError: except json.JSONDecodeError:
logger.warning(f"Invalid JSON from Discord gateway: {raw[:100]}") logger.warning("Invalid JSON from Discord gateway: {}", raw[:100])
continue continue
op = data.get("op") op = data.get("op")
@@ -175,7 +175,7 @@ class DiscordChannel(BaseChannel):
try: try:
await self._ws.send(json.dumps(payload)) await self._ws.send(json.dumps(payload))
except Exception as e: except Exception as e:
logger.warning(f"Discord heartbeat failed: {e}") logger.warning("Discord heartbeat failed: {}", e)
break break
await asyncio.sleep(interval_s) await asyncio.sleep(interval_s)
@@ -219,7 +219,7 @@ class DiscordChannel(BaseChannel):
media_paths.append(str(file_path)) media_paths.append(str(file_path))
content_parts.append(f"[attachment: {file_path}]") content_parts.append(f"[attachment: {file_path}]")
except Exception as e: except Exception as e:
logger.warning(f"Failed to download Discord attachment: {e}") logger.warning("Failed to download Discord attachment: {}", e)
content_parts.append(f"[attachment: {filename} - download failed]") content_parts.append(f"[attachment: {filename} - download failed]")
reply_to = (payload.get("referenced_message") or {}).get("id") reply_to = (payload.get("referenced_message") or {}).get("id")

View File

@@ -94,7 +94,7 @@ class EmailChannel(BaseChannel):
metadata=item.get("metadata", {}), metadata=item.get("metadata", {}),
) )
except Exception as e: except Exception as e:
logger.error(f"Email polling error: {e}") logger.error("Email polling error: {}", e)
await asyncio.sleep(poll_seconds) await asyncio.sleep(poll_seconds)
@@ -143,7 +143,7 @@ class EmailChannel(BaseChannel):
try: try:
await asyncio.to_thread(self._smtp_send, email_msg) await asyncio.to_thread(self._smtp_send, email_msg)
except Exception as e: except Exception as e:
logger.error(f"Error sending email to {to_addr}: {e}") logger.error("Error sending email to {}: {}", to_addr, e)
raise raise
def _validate_config(self) -> bool: def _validate_config(self) -> bool:
@@ -162,7 +162,7 @@ class EmailChannel(BaseChannel):
missing.append("smtp_password") missing.append("smtp_password")
if missing: if missing:
logger.error(f"Email channel not configured, missing: {', '.join(missing)}") logger.error("Email channel not configured, missing: {}", ', '.join(missing))
return False return False
return True return True

View File

@@ -2,6 +2,7 @@
import asyncio import asyncio
import json import json
import os
import re import re
import threading import threading
from collections import OrderedDict from collections import OrderedDict
@@ -17,6 +18,10 @@ from nanobot.config.schema import FeishuConfig
try: try:
import lark_oapi as lark import lark_oapi as lark
from lark_oapi.api.im.v1 import ( from lark_oapi.api.im.v1 import (
CreateFileRequest,
CreateFileRequestBody,
CreateImageRequest,
CreateImageRequestBody,
CreateMessageRequest, CreateMessageRequest,
CreateMessageRequestBody, CreateMessageRequestBody,
CreateMessageReactionRequest, CreateMessageReactionRequest,
@@ -151,7 +156,7 @@ class FeishuChannel(BaseChannel):
try: try:
self._ws_client.start() self._ws_client.start()
except Exception as e: except Exception as e:
logger.warning(f"Feishu WebSocket error: {e}") logger.warning("Feishu WebSocket error: {}", e)
if self._running: if self._running:
import time; time.sleep(5) import time; time.sleep(5)
@@ -172,7 +177,7 @@ class FeishuChannel(BaseChannel):
try: try:
self._ws_client.stop() self._ws_client.stop()
except Exception as e: except Exception as e:
logger.warning(f"Error stopping WebSocket client: {e}") logger.warning("Error stopping WebSocket client: {}", e)
logger.info("Feishu bot stopped") logger.info("Feishu bot stopped")
def _add_reaction_sync(self, message_id: str, emoji_type: str) -> None: def _add_reaction_sync(self, message_id: str, emoji_type: str) -> None:
@@ -189,11 +194,11 @@ class FeishuChannel(BaseChannel):
response = self._client.im.v1.message_reaction.create(request) response = self._client.im.v1.message_reaction.create(request)
if not response.success(): if not response.success():
logger.warning(f"Failed to add reaction: code={response.code}, msg={response.msg}") logger.warning("Failed to add reaction: code={}, msg={}", response.code, response.msg)
else: else:
logger.debug(f"Added {emoji_type} reaction to message {message_id}") logger.debug("Added {} reaction to message {}", emoji_type, message_id)
except Exception as e: except Exception as e:
logger.warning(f"Error adding reaction: {e}") logger.warning("Error adding reaction: {}", e)
async def _add_reaction(self, message_id: str, emoji_type: str = "THUMBSUP") -> None: async def _add_reaction(self, message_id: str, emoji_type: str = "THUMBSUP") -> None:
""" """
@@ -263,7 +268,6 @@ class FeishuChannel(BaseChannel):
before = protected[last_end:m.start()].strip() before = protected[last_end:m.start()].strip()
if before: if before:
elements.append({"tag": "markdown", "content": before}) elements.append({"tag": "markdown", "content": before})
level = len(m.group(1))
text = m.group(2).strip() text = m.group(2).strip()
elements.append({ elements.append({
"tag": "div", "tag": "div",
@@ -284,50 +288,128 @@ class FeishuChannel(BaseChannel):
return elements or [{"tag": "markdown", "content": content}] return elements or [{"tag": "markdown", "content": content}]
async def send(self, msg: OutboundMessage) -> None: _IMAGE_EXTS = {".png", ".jpg", ".jpeg", ".gif", ".bmp", ".webp", ".ico", ".tiff", ".tif"}
"""Send a message through Feishu.""" _AUDIO_EXTS = {".opus"}
if not self._client: _FILE_TYPE_MAP = {
logger.warning("Feishu client not initialized") ".opus": "opus", ".mp4": "mp4", ".pdf": "pdf", ".doc": "doc", ".docx": "doc",
return ".xls": "xls", ".xlsx": "xls", ".ppt": "ppt", ".pptx": "ppt",
}
def _upload_image_sync(self, file_path: str) -> str | None:
"""Upload an image to Feishu and return the image_key."""
try:
with open(file_path, "rb") as f:
request = CreateImageRequest.builder() \
.request_body(
CreateImageRequestBody.builder()
.image_type("message")
.image(f)
.build()
).build()
response = self._client.im.v1.image.create(request)
if response.success():
image_key = response.data.image_key
logger.debug("Uploaded image {}: {}", os.path.basename(file_path), image_key)
return image_key
else:
logger.error("Failed to upload image: code={}, msg={}", response.code, response.msg)
return None
except Exception as e:
logger.error("Error uploading image {}: {}", file_path, e)
return None
def _upload_file_sync(self, file_path: str) -> str | None:
"""Upload a file to Feishu and return the file_key."""
ext = os.path.splitext(file_path)[1].lower()
file_type = self._FILE_TYPE_MAP.get(ext, "stream")
file_name = os.path.basename(file_path)
try:
with open(file_path, "rb") as f:
request = CreateFileRequest.builder() \
.request_body(
CreateFileRequestBody.builder()
.file_type(file_type)
.file_name(file_name)
.file(f)
.build()
).build()
response = self._client.im.v1.file.create(request)
if response.success():
file_key = response.data.file_key
logger.debug("Uploaded file {}: {}", file_name, file_key)
return file_key
else:
logger.error("Failed to upload file: code={}, msg={}", response.code, response.msg)
return None
except Exception as e:
logger.error("Error uploading file {}: {}", file_path, e)
return None
def _send_message_sync(self, receive_id_type: str, receive_id: str, msg_type: str, content: str) -> bool:
"""Send a single message (text/image/file/interactive) synchronously."""
try: try:
# Determine receive_id_type based on chat_id format
# open_id starts with "ou_", chat_id starts with "oc_"
if msg.chat_id.startswith("oc_"):
receive_id_type = "chat_id"
else:
receive_id_type = "open_id"
# Build card with markdown + table support
elements = self._build_card_elements(msg.content)
card = {
"config": {"wide_screen_mode": True},
"elements": elements,
}
content = json.dumps(card, ensure_ascii=False)
request = CreateMessageRequest.builder() \ request = CreateMessageRequest.builder() \
.receive_id_type(receive_id_type) \ .receive_id_type(receive_id_type) \
.request_body( .request_body(
CreateMessageRequestBody.builder() CreateMessageRequestBody.builder()
.receive_id(msg.chat_id) .receive_id(receive_id)
.msg_type("interactive") .msg_type(msg_type)
.content(content) .content(content)
.build() .build()
).build() ).build()
response = self._client.im.v1.message.create(request) response = self._client.im.v1.message.create(request)
if not response.success(): if not response.success():
logger.error( logger.error(
f"Failed to send Feishu message: code={response.code}, " "Failed to send Feishu {} message: code={}, msg={}, log_id={}",
f"msg={response.msg}, log_id={response.get_log_id()}" msg_type, response.code, response.msg, response.get_log_id()
) )
else: return False
logger.debug(f"Feishu message sent to {msg.chat_id}") logger.debug("Feishu {} message sent to {}", msg_type, receive_id)
return True
except Exception as e: except Exception as e:
logger.error(f"Error sending Feishu message: {e}") logger.error("Error sending Feishu {} message: {}", msg_type, e)
return False
async def send(self, msg: OutboundMessage) -> None:
"""Send a message through Feishu, including media (images/files) if present."""
if not self._client:
logger.warning("Feishu client not initialized")
return
try:
receive_id_type = "chat_id" if msg.chat_id.startswith("oc_") else "open_id"
loop = asyncio.get_running_loop()
for file_path in msg.media:
if not os.path.isfile(file_path):
logger.warning("Media file not found: {}", file_path)
continue
ext = os.path.splitext(file_path)[1].lower()
if ext in self._IMAGE_EXTS:
key = await loop.run_in_executor(None, self._upload_image_sync, file_path)
if key:
await loop.run_in_executor(
None, self._send_message_sync,
receive_id_type, msg.chat_id, "image", json.dumps({"image_key": key}, ensure_ascii=False),
)
else:
key = await loop.run_in_executor(None, self._upload_file_sync, file_path)
if key:
media_type = "audio" if ext in self._AUDIO_EXTS else "file"
await loop.run_in_executor(
None, self._send_message_sync,
receive_id_type, msg.chat_id, media_type, json.dumps({"file_key": key}, ensure_ascii=False),
)
if msg.content and msg.content.strip():
card = {"config": {"wide_screen_mode": True}, "elements": self._build_card_elements(msg.content)}
await loop.run_in_executor(
None, self._send_message_sync,
receive_id_type, msg.chat_id, "interactive", json.dumps(card, ensure_ascii=False),
)
except Exception as e:
logger.error("Error sending Feishu message: {}", e)
def _on_message_sync(self, data: "P2ImMessageReceiveV1") -> None: def _on_message_sync(self, data: "P2ImMessageReceiveV1") -> None:
""" """
@@ -399,4 +481,4 @@ class FeishuChannel(BaseChannel):
) )
except Exception as e: except Exception as e:
logger.error(f"Error processing Feishu message: {e}") logger.error("Error processing Feishu message: {}", e)

View File

@@ -45,7 +45,7 @@ class ChannelManager:
) )
logger.info("Telegram channel enabled") logger.info("Telegram channel enabled")
except ImportError as e: except ImportError as e:
logger.warning(f"Telegram channel not available: {e}") logger.warning("Telegram channel not available: {}", e)
# WhatsApp channel # WhatsApp channel
if self.config.channels.whatsapp.enabled: if self.config.channels.whatsapp.enabled:
@@ -56,7 +56,7 @@ class ChannelManager:
) )
logger.info("WhatsApp channel enabled") logger.info("WhatsApp channel enabled")
except ImportError as e: except ImportError as e:
logger.warning(f"WhatsApp channel not available: {e}") logger.warning("WhatsApp channel not available: {}", e)
# Discord channel # Discord channel
if self.config.channels.discord.enabled: if self.config.channels.discord.enabled:
@@ -67,7 +67,7 @@ class ChannelManager:
) )
logger.info("Discord channel enabled") logger.info("Discord channel enabled")
except ImportError as e: except ImportError as e:
logger.warning(f"Discord channel not available: {e}") logger.warning("Discord channel not available: {}", e)
# Feishu channel # Feishu channel
if self.config.channels.feishu.enabled: if self.config.channels.feishu.enabled:
@@ -78,7 +78,7 @@ class ChannelManager:
) )
logger.info("Feishu channel enabled") logger.info("Feishu channel enabled")
except ImportError as e: except ImportError as e:
logger.warning(f"Feishu channel not available: {e}") logger.warning("Feishu channel not available: {}", e)
# Mochat channel # Mochat channel
if self.config.channels.mochat.enabled: if self.config.channels.mochat.enabled:
@@ -90,7 +90,7 @@ class ChannelManager:
) )
logger.info("Mochat channel enabled") logger.info("Mochat channel enabled")
except ImportError as e: except ImportError as e:
logger.warning(f"Mochat channel not available: {e}") logger.warning("Mochat channel not available: {}", e)
# DingTalk channel # DingTalk channel
if self.config.channels.dingtalk.enabled: if self.config.channels.dingtalk.enabled:
@@ -101,7 +101,7 @@ class ChannelManager:
) )
logger.info("DingTalk channel enabled") logger.info("DingTalk channel enabled")
except ImportError as e: except ImportError as e:
logger.warning(f"DingTalk channel not available: {e}") logger.warning("DingTalk channel not available: {}", e)
# Email channel # Email channel
if self.config.channels.email.enabled: if self.config.channels.email.enabled:
@@ -112,7 +112,7 @@ class ChannelManager:
) )
logger.info("Email channel enabled") logger.info("Email channel enabled")
except ImportError as e: except ImportError as e:
logger.warning(f"Email channel not available: {e}") logger.warning("Email channel not available: {}", e)
# Slack channel # Slack channel
if self.config.channels.slack.enabled: if self.config.channels.slack.enabled:
@@ -123,7 +123,7 @@ class ChannelManager:
) )
logger.info("Slack channel enabled") logger.info("Slack channel enabled")
except ImportError as e: except ImportError as e:
logger.warning(f"Slack channel not available: {e}") logger.warning("Slack channel not available: {}", e)
# QQ channel # QQ channel
if self.config.channels.qq.enabled: if self.config.channels.qq.enabled:
@@ -135,14 +135,14 @@ class ChannelManager:
) )
logger.info("QQ channel enabled") logger.info("QQ channel enabled")
except ImportError as e: except ImportError as e:
logger.warning(f"QQ channel not available: {e}") logger.warning("QQ channel not available: {}", e)
async def _start_channel(self, name: str, channel: BaseChannel) -> None: async def _start_channel(self, name: str, channel: BaseChannel) -> None:
"""Start a channel and log any exceptions.""" """Start a channel and log any exceptions."""
try: try:
await channel.start() await channel.start()
except Exception as e: except Exception as e:
logger.error(f"Failed to start channel {name}: {e}") logger.error("Failed to start channel {}: {}", name, e)
async def start_all(self) -> None: async def start_all(self) -> None:
"""Start all channels and the outbound dispatcher.""" """Start all channels and the outbound dispatcher."""
@@ -156,7 +156,7 @@ class ChannelManager:
# Start channels # Start channels
tasks = [] tasks = []
for name, channel in self.channels.items(): for name, channel in self.channels.items():
logger.info(f"Starting {name} channel...") logger.info("Starting {} channel...", name)
tasks.append(asyncio.create_task(self._start_channel(name, channel))) tasks.append(asyncio.create_task(self._start_channel(name, channel)))
# Wait for all to complete (they should run forever) # Wait for all to complete (they should run forever)
@@ -178,9 +178,9 @@ class ChannelManager:
for name, channel in self.channels.items(): for name, channel in self.channels.items():
try: try:
await channel.stop() await channel.stop()
logger.info(f"Stopped {name} channel") logger.info("Stopped {} channel", name)
except Exception as e: except Exception as e:
logger.error(f"Error stopping {name}: {e}") logger.error("Error stopping {}: {}", name, e)
async def _dispatch_outbound(self) -> None: async def _dispatch_outbound(self) -> None:
"""Dispatch outbound messages to the appropriate channel.""" """Dispatch outbound messages to the appropriate channel."""
@@ -198,9 +198,9 @@ class ChannelManager:
try: try:
await channel.send(msg) await channel.send(msg)
except Exception as e: except Exception as e:
logger.error(f"Error sending to {msg.channel}: {e}") logger.error("Error sending to {}: {}", msg.channel, e)
else: else:
logger.warning(f"Unknown channel: {msg.channel}") logger.warning("Unknown channel: {}", msg.channel)
except asyncio.TimeoutError: except asyncio.TimeoutError:
continue continue

View File

@@ -322,7 +322,7 @@ class MochatChannel(BaseChannel):
await self._api_send("/api/claw/sessions/send", "sessionId", target.id, await self._api_send("/api/claw/sessions/send", "sessionId", target.id,
content, msg.reply_to) content, msg.reply_to)
except Exception as e: except Exception as e:
logger.error(f"Failed to send Mochat message: {e}") logger.error("Failed to send Mochat message: {}", e)
# ---- config / init helpers --------------------------------------------- # ---- config / init helpers ---------------------------------------------
@@ -380,7 +380,7 @@ class MochatChannel(BaseChannel):
@client.event @client.event
async def connect_error(data: Any) -> None: async def connect_error(data: Any) -> None:
logger.error(f"Mochat websocket connect error: {data}") logger.error("Mochat websocket connect error: {}", data)
@client.on("claw.session.events") @client.on("claw.session.events")
async def on_session_events(payload: dict[str, Any]) -> None: async def on_session_events(payload: dict[str, Any]) -> None:
@@ -407,7 +407,7 @@ class MochatChannel(BaseChannel):
) )
return True return True
except Exception as e: except Exception as e:
logger.error(f"Failed to connect Mochat websocket: {e}") logger.error("Failed to connect Mochat websocket: {}", e)
try: try:
await client.disconnect() await client.disconnect()
except Exception: except Exception:
@@ -444,7 +444,7 @@ class MochatChannel(BaseChannel):
"limit": self.config.watch_limit, "limit": self.config.watch_limit,
}) })
if not ack.get("result"): if not ack.get("result"):
logger.error(f"Mochat subscribeSessions failed: {ack.get('message', 'unknown error')}") logger.error("Mochat subscribeSessions failed: {}", ack.get('message', 'unknown error'))
return False return False
data = ack.get("data") data = ack.get("data")
@@ -466,7 +466,7 @@ class MochatChannel(BaseChannel):
return True return True
ack = await self._socket_call("com.claw.im.subscribePanels", {"panelIds": panel_ids}) ack = await self._socket_call("com.claw.im.subscribePanels", {"panelIds": panel_ids})
if not ack.get("result"): if not ack.get("result"):
logger.error(f"Mochat subscribePanels failed: {ack.get('message', 'unknown error')}") logger.error("Mochat subscribePanels failed: {}", ack.get('message', 'unknown error'))
return False return False
return True return True
@@ -488,7 +488,7 @@ class MochatChannel(BaseChannel):
try: try:
await self._refresh_targets(subscribe_new=self._ws_ready) await self._refresh_targets(subscribe_new=self._ws_ready)
except Exception as e: except Exception as e:
logger.warning(f"Mochat refresh failed: {e}") logger.warning("Mochat refresh failed: {}", e)
if self._fallback_mode: if self._fallback_mode:
await self._ensure_fallback_workers() await self._ensure_fallback_workers()
@@ -502,7 +502,7 @@ class MochatChannel(BaseChannel):
try: try:
response = await self._post_json("/api/claw/sessions/list", {}) response = await self._post_json("/api/claw/sessions/list", {})
except Exception as e: except Exception as e:
logger.warning(f"Mochat listSessions failed: {e}") logger.warning("Mochat listSessions failed: {}", e)
return return
sessions = response.get("sessions") sessions = response.get("sessions")
@@ -536,7 +536,7 @@ class MochatChannel(BaseChannel):
try: try:
response = await self._post_json("/api/claw/groups/get", {}) response = await self._post_json("/api/claw/groups/get", {})
except Exception as e: except Exception as e:
logger.warning(f"Mochat getWorkspaceGroup failed: {e}") logger.warning("Mochat getWorkspaceGroup failed: {}", e)
return return
raw_panels = response.get("panels") raw_panels = response.get("panels")
@@ -598,7 +598,7 @@ class MochatChannel(BaseChannel):
except asyncio.CancelledError: except asyncio.CancelledError:
break break
except Exception as e: except Exception as e:
logger.warning(f"Mochat watch fallback error ({session_id}): {e}") logger.warning("Mochat watch fallback error ({}): {}", session_id, e)
await asyncio.sleep(max(0.1, self.config.retry_delay_ms / 1000.0)) await asyncio.sleep(max(0.1, self.config.retry_delay_ms / 1000.0))
async def _panel_poll_worker(self, panel_id: str) -> None: async def _panel_poll_worker(self, panel_id: str) -> None:
@@ -625,7 +625,7 @@ class MochatChannel(BaseChannel):
except asyncio.CancelledError: except asyncio.CancelledError:
break break
except Exception as e: except Exception as e:
logger.warning(f"Mochat panel polling error ({panel_id}): {e}") logger.warning("Mochat panel polling error ({}): {}", panel_id, e)
await asyncio.sleep(sleep_s) await asyncio.sleep(sleep_s)
# ---- inbound event processing ------------------------------------------ # ---- inbound event processing ------------------------------------------
@@ -836,7 +836,7 @@ class MochatChannel(BaseChannel):
try: try:
data = json.loads(self._cursor_path.read_text("utf-8")) data = json.loads(self._cursor_path.read_text("utf-8"))
except Exception as e: except Exception as e:
logger.warning(f"Failed to read Mochat cursor file: {e}") logger.warning("Failed to read Mochat cursor file: {}", e)
return return
cursors = data.get("cursors") if isinstance(data, dict) else None cursors = data.get("cursors") if isinstance(data, dict) else None
if isinstance(cursors, dict): if isinstance(cursors, dict):
@@ -852,7 +852,7 @@ class MochatChannel(BaseChannel):
"cursors": self._session_cursor, "cursors": self._session_cursor,
}, ensure_ascii=False, indent=2) + "\n", "utf-8") }, ensure_ascii=False, indent=2) + "\n", "utf-8")
except Exception as e: except Exception as e:
logger.warning(f"Failed to save Mochat cursor file: {e}") logger.warning("Failed to save Mochat cursor file: {}", e)
# ---- HTTP helpers ------------------------------------------------------ # ---- HTTP helpers ------------------------------------------------------

View File

@@ -34,7 +34,7 @@ def _make_bot_class(channel: "QQChannel") -> "type[botpy.Client]":
super().__init__(intents=intents) super().__init__(intents=intents)
async def on_ready(self): async def on_ready(self):
logger.info(f"QQ bot ready: {self.robot.name}") logger.info("QQ bot ready: {}", self.robot.name)
async def on_c2c_message_create(self, message: "C2CMessage"): async def on_c2c_message_create(self, message: "C2CMessage"):
await channel._on_message(message) await channel._on_message(message)
@@ -80,7 +80,7 @@ class QQChannel(BaseChannel):
try: try:
await self._client.start(appid=self.config.app_id, secret=self.config.secret) await self._client.start(appid=self.config.app_id, secret=self.config.secret)
except Exception as e: except Exception as e:
logger.warning(f"QQ bot error: {e}") logger.warning("QQ bot error: {}", e)
if self._running: if self._running:
logger.info("Reconnecting QQ bot in 5 seconds...") logger.info("Reconnecting QQ bot in 5 seconds...")
await asyncio.sleep(5) await asyncio.sleep(5)
@@ -108,7 +108,7 @@ class QQChannel(BaseChannel):
content=msg.content, content=msg.content,
) )
except Exception as e: except Exception as e:
logger.error(f"Error sending QQ message: {e}") logger.error("Error sending QQ message: {}", e)
async def _on_message(self, data: "C2CMessage") -> None: async def _on_message(self, data: "C2CMessage") -> None:
"""Handle incoming message from QQ.""" """Handle incoming message from QQ."""
@@ -131,4 +131,4 @@ class QQChannel(BaseChannel):
metadata={"message_id": data.id}, metadata={"message_id": data.id},
) )
except Exception as e: except Exception as e:
logger.error(f"Error handling QQ message: {e}") logger.error("Error handling QQ message: {}", e)

View File

@@ -36,7 +36,7 @@ class SlackChannel(BaseChannel):
logger.error("Slack bot/app token not configured") logger.error("Slack bot/app token not configured")
return return
if self.config.mode != "socket": if self.config.mode != "socket":
logger.error(f"Unsupported Slack mode: {self.config.mode}") logger.error("Unsupported Slack mode: {}", self.config.mode)
return return
self._running = True self._running = True
@@ -53,9 +53,9 @@ class SlackChannel(BaseChannel):
try: try:
auth = await self._web_client.auth_test() auth = await self._web_client.auth_test()
self._bot_user_id = auth.get("user_id") self._bot_user_id = auth.get("user_id")
logger.info(f"Slack bot connected as {self._bot_user_id}") logger.info("Slack bot connected as {}", self._bot_user_id)
except Exception as e: except Exception as e:
logger.warning(f"Slack auth_test failed: {e}") logger.warning("Slack auth_test failed: {}", e)
logger.info("Starting Slack Socket Mode client...") logger.info("Starting Slack Socket Mode client...")
await self._socket_client.connect() await self._socket_client.connect()
@@ -70,7 +70,7 @@ class SlackChannel(BaseChannel):
try: try:
await self._socket_client.close() await self._socket_client.close()
except Exception as e: except Exception as e:
logger.warning(f"Slack socket close failed: {e}") logger.warning("Slack socket close failed: {}", e)
self._socket_client = None self._socket_client = None
async def send(self, msg: OutboundMessage) -> None: async def send(self, msg: OutboundMessage) -> None:
@@ -90,7 +90,7 @@ class SlackChannel(BaseChannel):
thread_ts=thread_ts if use_thread else None, thread_ts=thread_ts if use_thread else None,
) )
except Exception as e: except Exception as e:
logger.error(f"Error sending Slack message: {e}") logger.error("Error sending Slack message: {}", e)
async def _on_socket_request( async def _on_socket_request(
self, self,
@@ -164,7 +164,7 @@ class SlackChannel(BaseChannel):
timestamp=event.get("ts"), timestamp=event.get("ts"),
) )
except Exception as e: except Exception as e:
logger.debug(f"Slack reactions_add failed: {e}") logger.debug("Slack reactions_add failed: {}", e)
await self._handle_message( await self._handle_message(
sender_id=sender_id, sender_id=sender_id,

View File

@@ -165,13 +165,13 @@ class TelegramChannel(BaseChannel):
# Get bot info and register command menu # Get bot info and register command menu
bot_info = await self._app.bot.get_me() bot_info = await self._app.bot.get_me()
logger.info(f"Telegram bot @{bot_info.username} connected") logger.info("Telegram bot @{} connected", bot_info.username)
try: try:
await self._app.bot.set_my_commands(self.BOT_COMMANDS) await self._app.bot.set_my_commands(self.BOT_COMMANDS)
logger.debug("Telegram bot commands registered") logger.debug("Telegram bot commands registered")
except Exception as e: except Exception as e:
logger.warning(f"Failed to register bot commands: {e}") logger.warning("Failed to register bot commands: {}", e)
# Start polling (this runs until stopped) # Start polling (this runs until stopped)
await self._app.updater.start_polling( await self._app.updater.start_polling(
@@ -221,7 +221,7 @@ class TelegramChannel(BaseChannel):
try: try:
chat_id = int(msg.chat_id) chat_id = int(msg.chat_id)
except ValueError: except ValueError:
logger.error(f"Invalid chat_id: {msg.chat_id}") logger.error("Invalid chat_id: {}", msg.chat_id)
return return
# Send media files # Send media files
@@ -238,7 +238,7 @@ class TelegramChannel(BaseChannel):
await sender(chat_id=chat_id, **{param: f}) await sender(chat_id=chat_id, **{param: f})
except Exception as e: except Exception as e:
filename = media_path.rsplit("/", 1)[-1] filename = media_path.rsplit("/", 1)[-1]
logger.error(f"Failed to send media {media_path}: {e}") logger.error("Failed to send media {}: {}", media_path, e)
await self._app.bot.send_message(chat_id=chat_id, text=f"[Failed to send: {filename}]") await self._app.bot.send_message(chat_id=chat_id, text=f"[Failed to send: {filename}]")
# Send text content # Send text content
@@ -248,11 +248,11 @@ class TelegramChannel(BaseChannel):
html = _markdown_to_telegram_html(chunk) html = _markdown_to_telegram_html(chunk)
await self._app.bot.send_message(chat_id=chat_id, text=html, parse_mode="HTML") await self._app.bot.send_message(chat_id=chat_id, text=html, parse_mode="HTML")
except Exception as e: except Exception as e:
logger.warning(f"HTML parse failed, falling back to plain text: {e}") logger.warning("HTML parse failed, falling back to plain text: {}", e)
try: try:
await self._app.bot.send_message(chat_id=chat_id, text=chunk) await self._app.bot.send_message(chat_id=chat_id, text=chunk)
except Exception as e2: except Exception as e2:
logger.error(f"Error sending Telegram message: {e2}") logger.error("Error sending Telegram message: {}", e2)
async def _on_start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: async def _on_start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle /start command.""" """Handle /start command."""
@@ -344,21 +344,21 @@ class TelegramChannel(BaseChannel):
transcriber = GroqTranscriptionProvider(api_key=self.groq_api_key) transcriber = GroqTranscriptionProvider(api_key=self.groq_api_key)
transcription = await transcriber.transcribe(file_path) transcription = await transcriber.transcribe(file_path)
if transcription: if transcription:
logger.info(f"Transcribed {media_type}: {transcription[:50]}...") logger.info("Transcribed {}: {}...", media_type, transcription[:50])
content_parts.append(f"[transcription: {transcription}]") content_parts.append(f"[transcription: {transcription}]")
else: else:
content_parts.append(f"[{media_type}: {file_path}]") content_parts.append(f"[{media_type}: {file_path}]")
else: else:
content_parts.append(f"[{media_type}: {file_path}]") content_parts.append(f"[{media_type}: {file_path}]")
logger.debug(f"Downloaded {media_type} to {file_path}") logger.debug("Downloaded {} to {}", media_type, file_path)
except Exception as e: except Exception as e:
logger.error(f"Failed to download media: {e}") logger.error("Failed to download media: {}", e)
content_parts.append(f"[{media_type}: download failed]") content_parts.append(f"[{media_type}: download failed]")
content = "\n".join(content_parts) if content_parts else "[empty message]" content = "\n".join(content_parts) if content_parts else "[empty message]"
logger.debug(f"Telegram message from {sender_id}: {content[:50]}...") logger.debug("Telegram message from {}: {}...", sender_id, content[:50])
str_chat_id = str(chat_id) str_chat_id = str(chat_id)
@@ -401,11 +401,11 @@ class TelegramChannel(BaseChannel):
except asyncio.CancelledError: except asyncio.CancelledError:
pass pass
except Exception as e: except Exception as e:
logger.debug(f"Typing indicator stopped for {chat_id}: {e}") logger.debug("Typing indicator stopped for {}: {}", chat_id, e)
async def _on_error(self, update: object, context: ContextTypes.DEFAULT_TYPE) -> None: async def _on_error(self, update: object, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Log polling / handler errors instead of silently swallowing them.""" """Log polling / handler errors instead of silently swallowing them."""
logger.error(f"Telegram error: {context.error}") logger.error("Telegram error: {}", context.error)
def _get_extension(self, media_type: str, mime_type: str | None) -> str: def _get_extension(self, media_type: str, mime_type: str | None) -> str:
"""Get file extension based on media type.""" """Get file extension based on media type."""

View File

@@ -34,7 +34,7 @@ class WhatsAppChannel(BaseChannel):
bridge_url = self.config.bridge_url bridge_url = self.config.bridge_url
logger.info(f"Connecting to WhatsApp bridge at {bridge_url}...") logger.info("Connecting to WhatsApp bridge at {}...", bridge_url)
self._running = True self._running = True
@@ -53,14 +53,14 @@ class WhatsAppChannel(BaseChannel):
try: try:
await self._handle_bridge_message(message) await self._handle_bridge_message(message)
except Exception as e: except Exception as e:
logger.error(f"Error handling bridge message: {e}") logger.error("Error handling bridge message: {}", e)
except asyncio.CancelledError: except asyncio.CancelledError:
break break
except Exception as e: except Exception as e:
self._connected = False self._connected = False
self._ws = None self._ws = None
logger.warning(f"WhatsApp bridge connection error: {e}") logger.warning("WhatsApp bridge connection error: {}", e)
if self._running: if self._running:
logger.info("Reconnecting in 5 seconds...") logger.info("Reconnecting in 5 seconds...")
@@ -89,14 +89,14 @@ class WhatsAppChannel(BaseChannel):
} }
await self._ws.send(json.dumps(payload, ensure_ascii=False)) await self._ws.send(json.dumps(payload, ensure_ascii=False))
except Exception as e: except Exception as e:
logger.error(f"Error sending WhatsApp message: {e}") logger.error("Error sending WhatsApp message: {}", e)
async def _handle_bridge_message(self, raw: str) -> None: async def _handle_bridge_message(self, raw: str) -> None:
"""Handle a message from the bridge.""" """Handle a message from the bridge."""
try: try:
data = json.loads(raw) data = json.loads(raw)
except json.JSONDecodeError: except json.JSONDecodeError:
logger.warning(f"Invalid JSON from bridge: {raw[:100]}") logger.warning("Invalid JSON from bridge: {}", raw[:100])
return return
msg_type = data.get("type") msg_type = data.get("type")
@@ -112,11 +112,11 @@ class WhatsAppChannel(BaseChannel):
# Extract just the phone number or lid as chat_id # Extract just the phone number or lid as chat_id
user_id = pn if pn else sender user_id = pn if pn else sender
sender_id = user_id.split("@")[0] if "@" in user_id else user_id sender_id = user_id.split("@")[0] if "@" in user_id else user_id
logger.info(f"Sender {sender}") logger.info("Sender {}", sender)
# Handle voice transcription if it's a voice message # Handle voice transcription if it's a voice message
if content == "[Voice Message]": if content == "[Voice Message]":
logger.info(f"Voice message received from {sender_id}, but direct download from bridge is not yet supported.") logger.info("Voice message received from {}, but direct download from bridge is not yet supported.", sender_id)
content = "[Voice Message: Transcription not available for WhatsApp yet]" content = "[Voice Message: Transcription not available for WhatsApp yet]"
await self._handle_message( await self._handle_message(
@@ -133,7 +133,7 @@ class WhatsAppChannel(BaseChannel):
elif msg_type == "status": elif msg_type == "status":
# Connection status update # Connection status update
status = data.get("status") status = data.get("status")
logger.info(f"WhatsApp status: {status}") logger.info("WhatsApp status: {}", status)
if status == "connected": if status == "connected":
self._connected = True self._connected = True
@@ -145,4 +145,4 @@ class WhatsAppChannel(BaseChannel):
logger.info("Scan QR code in the bridge terminal to connect WhatsApp") logger.info("Scan QR code in the bridge terminal to connect WhatsApp")
elif msg_type == "error": elif msg_type == "error":
logger.error(f"WhatsApp bridge error: {data.get('error')}") logger.error("WhatsApp bridge error: {}", data.get('error'))

View File

@@ -287,11 +287,25 @@ class Config(BaseSettings):
from nanobot.providers.registry import PROVIDERS from nanobot.providers.registry import PROVIDERS
model_lower = (model or self.agents.defaults.model).lower() model_lower = (model or self.agents.defaults.model).lower()
model_normalized = model_lower.replace("-", "_")
model_prefix = model_lower.split("/", 1)[0] if "/" in model_lower else ""
normalized_prefix = model_prefix.replace("-", "_")
def _kw_matches(kw: str) -> bool:
kw = kw.lower()
return kw in model_lower or kw.replace("-", "_") in model_normalized
# Explicit provider prefix wins — prevents `github-copilot/...codex` matching openai_codex.
for spec in PROVIDERS:
p = getattr(self.providers, spec.name, None)
if p and model_prefix and normalized_prefix == spec.name:
if spec.is_oauth or p.api_key:
return p, spec.name
# Match by keyword (order follows PROVIDERS registry) # Match by keyword (order follows PROVIDERS registry)
for spec in PROVIDERS: for spec in PROVIDERS:
p = getattr(self.providers, spec.name, None) p = getattr(self.providers, spec.name, None)
if p and any(kw in model_lower for kw in spec.keywords): if p and any(_kw_matches(kw) for kw in spec.keywords):
if spec.is_oauth or p.api_key: if spec.is_oauth or p.api_key:
return p, spec.name return p, spec.name

View File

@@ -99,7 +99,7 @@ class CronService:
)) ))
self._store = CronStore(jobs=jobs) self._store = CronStore(jobs=jobs)
except Exception as e: except Exception as e:
logger.warning(f"Failed to load cron store: {e}") logger.warning("Failed to load cron store: {}", e)
self._store = CronStore() self._store = CronStore()
else: else:
self._store = CronStore() self._store = CronStore()
@@ -157,7 +157,7 @@ class CronService:
self._recompute_next_runs() self._recompute_next_runs()
self._save_store() self._save_store()
self._arm_timer() self._arm_timer()
logger.info(f"Cron service started with {len(self._store.jobs if self._store else [])} jobs") logger.info("Cron service started with {} jobs", len(self._store.jobs if self._store else []))
def stop(self) -> None: def stop(self) -> None:
"""Stop the cron service.""" """Stop the cron service."""
@@ -222,7 +222,7 @@ class CronService:
async def _execute_job(self, job: CronJob) -> None: async def _execute_job(self, job: CronJob) -> None:
"""Execute a single job.""" """Execute a single job."""
start_ms = _now_ms() start_ms = _now_ms()
logger.info(f"Cron: executing job '{job.name}' ({job.id})") logger.info("Cron: executing job '{}' ({})", job.name, job.id)
try: try:
response = None response = None
@@ -231,12 +231,12 @@ class CronService:
job.state.last_status = "ok" job.state.last_status = "ok"
job.state.last_error = None job.state.last_error = None
logger.info(f"Cron: job '{job.name}' completed") logger.info("Cron: job '{}' completed", job.name)
except Exception as e: except Exception as e:
job.state.last_status = "error" job.state.last_status = "error"
job.state.last_error = str(e) job.state.last_error = str(e)
logger.error(f"Cron: job '{job.name}' failed: {e}") logger.error("Cron: job '{}' failed: {}", job.name, e)
job.state.last_run_at_ms = start_ms job.state.last_run_at_ms = start_ms
job.updated_at_ms = _now_ms() job.updated_at_ms = _now_ms()
@@ -296,7 +296,7 @@ class CronService:
self._save_store() self._save_store()
self._arm_timer() self._arm_timer()
logger.info(f"Cron: added job '{name}' ({job.id})") logger.info("Cron: added job '{}' ({})", name, job.id)
return job return job
def remove_job(self, job_id: str) -> bool: def remove_job(self, job_id: str) -> bool:
@@ -309,7 +309,7 @@ class CronService:
if removed: if removed:
self._save_store() self._save_store()
self._arm_timer() self._arm_timer()
logger.info(f"Cron: removed job {job_id}") logger.info("Cron: removed job {}", job_id)
return removed return removed

View File

@@ -78,7 +78,7 @@ class HeartbeatService:
self._running = True self._running = True
self._task = asyncio.create_task(self._run_loop()) self._task = asyncio.create_task(self._run_loop())
logger.info(f"Heartbeat started (every {self.interval_s}s)") logger.info("Heartbeat started (every {}s)", self.interval_s)
def stop(self) -> None: def stop(self) -> None:
"""Stop the heartbeat service.""" """Stop the heartbeat service."""
@@ -97,7 +97,7 @@ class HeartbeatService:
except asyncio.CancelledError: except asyncio.CancelledError:
break break
except Exception as e: except Exception as e:
logger.error(f"Heartbeat error: {e}") logger.error("Heartbeat error: {}", e)
async def _tick(self) -> None: async def _tick(self) -> None:
"""Execute a single heartbeat tick.""" """Execute a single heartbeat tick."""
@@ -118,10 +118,10 @@ class HeartbeatService:
if HEARTBEAT_OK_TOKEN.replace("_", "") in response.upper().replace("_", ""): if HEARTBEAT_OK_TOKEN.replace("_", "") in response.upper().replace("_", ""):
logger.info("Heartbeat: OK (no action needed)") logger.info("Heartbeat: OK (no action needed)")
else: else:
logger.info(f"Heartbeat: completed task") logger.info("Heartbeat: completed task")
except Exception as e: except Exception as e:
logger.error(f"Heartbeat execution failed: {e}") logger.error("Heartbeat execution failed: {}", e)
async def trigger_now(self) -> str | None: async def trigger_now(self) -> str | None:
"""Manually trigger a heartbeat.""" """Manually trigger a heartbeat."""

View File

@@ -88,10 +88,21 @@ class LiteLLMProvider(LLMProvider):
# Standard mode: auto-prefix for known providers # Standard mode: auto-prefix for known providers
spec = find_by_model(model) spec = find_by_model(model)
if spec and spec.litellm_prefix: if spec and spec.litellm_prefix:
model = self._canonicalize_explicit_prefix(model, spec.name, spec.litellm_prefix)
if not any(model.startswith(s) for s in spec.skip_prefixes): if not any(model.startswith(s) for s in spec.skip_prefixes):
model = f"{spec.litellm_prefix}/{model}" model = f"{spec.litellm_prefix}/{model}"
return model return model
@staticmethod
def _canonicalize_explicit_prefix(model: str, spec_name: str, canonical_prefix: str) -> str:
"""Normalize explicit provider prefixes like `github-copilot/...`."""
if "/" not in model:
return model
prefix, remainder = model.split("/", 1)
if prefix.lower().replace("-", "_") != spec_name:
return model
return f"{canonical_prefix}/{remainder}"
def _apply_model_overrides(self, model: str, kwargs: dict[str, Any]) -> None: def _apply_model_overrides(self, model: str, kwargs: dict[str, Any]) -> None:
"""Apply model-specific parameter overrides from the registry.""" """Apply model-specific parameter overrides from the registry."""

View File

@@ -80,7 +80,7 @@ class OpenAICodexProvider(LLMProvider):
def _strip_model_prefix(model: str) -> str: def _strip_model_prefix(model: str) -> str:
if model.startswith("openai-codex/"): if model.startswith("openai-codex/") or model.startswith("openai_codex/"):
return model.split("/", 1)[1] return model.split("/", 1)[1]
return model return model
@@ -176,7 +176,7 @@ def _convert_messages(messages: list[dict[str, Any]]) -> tuple[str, list[dict[st
if role == "tool": if role == "tool":
call_id, _ = _split_tool_call_id(msg.get("tool_call_id")) call_id, _ = _split_tool_call_id(msg.get("tool_call_id"))
output_text = content if isinstance(content, str) else json.dumps(content) output_text = content if isinstance(content, str) else json.dumps(content, ensure_ascii=False)
input_items.append( input_items.append(
{ {
"type": "function_call_output", "type": "function_call_output",

View File

@@ -384,10 +384,18 @@ def find_by_model(model: str) -> ProviderSpec | None:
"""Match a standard provider by model-name keyword (case-insensitive). """Match a standard provider by model-name keyword (case-insensitive).
Skips gateways/local — those are matched by api_key/api_base instead.""" Skips gateways/local — those are matched by api_key/api_base instead."""
model_lower = model.lower() model_lower = model.lower()
for spec in PROVIDERS: model_normalized = model_lower.replace("-", "_")
if spec.is_gateway or spec.is_local: model_prefix = model_lower.split("/", 1)[0] if "/" in model_lower else ""
continue normalized_prefix = model_prefix.replace("-", "_")
if any(kw in model_lower for kw in spec.keywords): std_specs = [s for s in PROVIDERS if not s.is_gateway and not s.is_local]
# Prefer explicit provider prefix — prevents `github-copilot/...codex` matching openai_codex.
for spec in std_specs:
if model_prefix and normalized_prefix == spec.name:
return spec
for spec in std_specs:
if any(kw in model_lower or kw.replace("-", "_") in model_normalized for kw in spec.keywords):
return spec return spec
return None return None

View File

@@ -35,7 +35,7 @@ class GroqTranscriptionProvider:
path = Path(file_path) path = Path(file_path)
if not path.exists(): if not path.exists():
logger.error(f"Audio file not found: {file_path}") logger.error("Audio file not found: {}", file_path)
return "" return ""
try: try:
@@ -61,5 +61,5 @@ class GroqTranscriptionProvider:
return data.get("text", "") return data.get("text", "")
except Exception as e: except Exception as e:
logger.error(f"Groq transcription error: {e}") logger.error("Groq transcription error: {}", e)
return "" return ""

View File

@@ -110,7 +110,7 @@ class SessionManager:
if legacy_path.exists(): if legacy_path.exists():
import shutil import shutil
shutil.move(str(legacy_path), str(path)) shutil.move(str(legacy_path), str(path))
logger.info(f"Migrated session {key} from legacy path") logger.info("Migrated session {} from legacy path", key)
if not path.exists(): if not path.exists():
return None return None
@@ -144,7 +144,7 @@ class SessionManager:
last_consolidated=last_consolidated last_consolidated=last_consolidated
) )
except Exception as e: except Exception as e:
logger.warning(f"Failed to load session {key}: {e}") logger.warning("Failed to load session {}: {}", key, e)
return None return None
def save(self, session: Session) -> None: def save(self, session: Session) -> None:
@@ -163,7 +163,6 @@ class SessionManager:
for msg in session.messages: for msg in session.messages:
f.write(json.dumps(msg, ensure_ascii=False) + "\n") f.write(json.dumps(msg, ensure_ascii=False) + "\n")
self._cache[session.key] = session self._cache[session.key] = session
def invalidate(self, key: str) -> None: def invalidate(self, key: str) -> None:

View File

@@ -17,37 +17,37 @@ classifiers = [
] ]
dependencies = [ dependencies = [
"typer>=0.9.0", "typer>=0.20.0,<1.0.0",
"litellm>=1.0.0", "litellm>=1.81.5,<2.0.0",
"pydantic>=2.0.0", "pydantic>=2.12.0,<3.0.0",
"pydantic-settings>=2.0.0", "pydantic-settings>=2.12.0,<3.0.0",
"websockets>=12.0", "websockets>=16.0,<17.0",
"websocket-client>=1.6.0", "websocket-client>=1.9.0,<2.0.0",
"httpx>=0.25.0", "httpx>=0.28.0,<1.0.0",
"oauth-cli-kit>=0.1.1", "oauth-cli-kit>=0.1.3,<1.0.0",
"loguru>=0.7.0", "loguru>=0.7.3,<1.0.0",
"readability-lxml>=0.8.0", "readability-lxml>=0.8.4,<1.0.0",
"rich>=13.0.0", "rich>=14.0.0,<15.0.0",
"croniter>=2.0.0", "croniter>=6.0.0,<7.0.0",
"dingtalk-stream>=0.4.0", "dingtalk-stream>=0.24.0,<1.0.0",
"python-telegram-bot[socks]>=21.0", "python-telegram-bot[socks]>=22.0,<23.0",
"lark-oapi>=1.0.0", "lark-oapi>=1.5.0,<2.0.0",
"socksio>=1.0.0", "socksio>=1.0.0,<2.0.0",
"python-socketio>=5.11.0", "python-socketio>=5.16.0,<6.0.0",
"msgpack>=1.0.8", "msgpack>=1.1.0,<2.0.0",
"slack-sdk>=3.26.0", "slack-sdk>=3.39.0,<4.0.0",
"slackify-markdown>=0.2.0", "slackify-markdown>=0.2.0,<1.0.0",
"qq-botpy>=1.0.0", "qq-botpy>=1.2.0,<2.0.0",
"python-socks[asyncio]>=2.4.0", "python-socks[asyncio]>=2.8.0,<3.0.0",
"prompt-toolkit>=3.0.0", "prompt-toolkit>=3.0.50,<4.0.0",
"mcp>=1.0.0", "mcp>=1.26.0,<2.0.0",
"json-repair>=0.30.0", "json-repair>=0.57.0,<1.0.0",
] ]
[project.optional-dependencies] [project.optional-dependencies]
dev = [ dev = [
"pytest>=7.0.0", "pytest>=9.0.0,<10.0.0",
"pytest-asyncio>=0.21.0", "pytest-asyncio>=1.3.0,<2.0.0",
"ruff>=0.1.0", "ruff>=0.1.0",
] ]

View File

@@ -6,6 +6,10 @@ import pytest
from typer.testing import CliRunner from typer.testing import CliRunner
from nanobot.cli.commands import app from nanobot.cli.commands import app
from nanobot.config.schema import Config
from nanobot.providers.litellm_provider import LiteLLMProvider
from nanobot.providers.openai_codex_provider import _strip_model_prefix
from nanobot.providers.registry import find_by_model
runner = CliRunner() runner = CliRunner()
@@ -90,3 +94,37 @@ def test_onboard_existing_workspace_safe_create(mock_paths):
assert "Created workspace" not in result.stdout assert "Created workspace" not in result.stdout
assert "Created AGENTS.md" in result.stdout assert "Created AGENTS.md" in result.stdout
assert (workspace_dir / "AGENTS.md").exists() assert (workspace_dir / "AGENTS.md").exists()
def test_config_matches_github_copilot_codex_with_hyphen_prefix():
config = Config()
config.agents.defaults.model = "github-copilot/gpt-5.3-codex"
assert config.get_provider_name() == "github_copilot"
def test_config_matches_openai_codex_with_hyphen_prefix():
config = Config()
config.agents.defaults.model = "openai-codex/gpt-5.1-codex"
assert config.get_provider_name() == "openai_codex"
def test_find_by_model_prefers_explicit_prefix_over_generic_codex_keyword():
spec = find_by_model("github-copilot/gpt-5.3-codex")
assert spec is not None
assert spec.name == "github_copilot"
def test_litellm_provider_canonicalizes_github_copilot_hyphen_prefix():
provider = LiteLLMProvider(default_model="github-copilot/gpt-5.3-codex")
resolved = provider._resolve_model("github-copilot/gpt-5.3-codex")
assert resolved == "github_copilot/gpt-5.3-codex"
def test_openai_codex_strip_prefix_supports_hyphen_and_underscore():
assert _strip_model_prefix("openai-codex/gpt-5.1-codex") == "gpt-5.1-codex"
assert _strip_model_prefix("openai_codex/gpt-5.1-codex") == "gpt-5.1-codex"