fix(loop): resolve conflicts with main and improve /new handler

This commit is contained in:
Re-bin
2026-02-22 17:11:59 +00:00
17 changed files with 757 additions and 400 deletions

View File

@@ -16,10 +16,14 @@
⚡️ Delivers core agent functionality in just **~4,000** lines of code — **99% smaller** than Clawdbot's 430k+ lines. ⚡️ Delivers core agent functionality in just **~4,000** lines of code — **99% smaller** than Clawdbot's 430k+ lines.
📏 Real-time line count: **3,827 lines** (run `bash core_agent_lines.sh` to verify anytime) 📏 Real-time line count: **3,806 lines** (run `bash core_agent_lines.sh` to verify anytime)
## 📢 News ## 📢 News
- **2026-02-21** 🎉 Released **v0.1.4.post1** — new providers, media support across channels, and major stability improvements. See [release notes](https://github.com/HKUDS/nanobot/releases/tag/v0.1.4.post1) for details.
- **2026-02-20** 🐦 Feishu now receives multimodal files from users. More reliable memory under the hood.
- **2026-02-19** ✨ Slack now sends files, Discord splits long messages, and subagents work in CLI mode.
- **2026-02-18** ⚡️ nanobot now supports VolcEngine, MCP custom auth headers, and Anthropic prompt caching.
- **2026-02-17** 🎉 Released **v0.1.4** — MCP support, progress streaming, new providers, and multiple channel improvements. Please see [release notes](https://github.com/HKUDS/nanobot/releases/tag/v0.1.4) for details. - **2026-02-17** 🎉 Released **v0.1.4** — MCP support, progress streaming, new providers, and multiple channel improvements. Please see [release notes](https://github.com/HKUDS/nanobot/releases/tag/v0.1.4) for details.
- **2026-02-16** 🦞 nanobot now integrates a [ClawHub](https://clawhub.ai) skill — search and install public agent skills. - **2026-02-16** 🦞 nanobot now integrates a [ClawHub](https://clawhub.ai) skill — search and install public agent skills.
- **2026-02-15** 🔑 nanobot now supports OpenAI Codex provider with OAuth login support. - **2026-02-15** 🔑 nanobot now supports OpenAI Codex provider with OAuth login support.
@@ -27,13 +31,13 @@
- **2026-02-13** 🎉 Released **v0.1.3.post7** — includes security hardening and multiple improvements. **Please upgrade to the latest version to address security issues**. See [release notes](https://github.com/HKUDS/nanobot/releases/tag/v0.1.3.post7) for more details. - **2026-02-13** 🎉 Released **v0.1.3.post7** — includes security hardening and multiple improvements. **Please upgrade to the latest version to address security issues**. See [release notes](https://github.com/HKUDS/nanobot/releases/tag/v0.1.3.post7) for more details.
- **2026-02-12** 🧠 Redesigned memory system — Less code, more reliable. Join the [discussion](https://github.com/HKUDS/nanobot/discussions/566) about it! - **2026-02-12** 🧠 Redesigned memory system — Less code, more reliable. Join the [discussion](https://github.com/HKUDS/nanobot/discussions/566) about it!
- **2026-02-11** ✨ Enhanced CLI experience and added MiniMax support! - **2026-02-11** ✨ Enhanced CLI experience and added MiniMax support!
- **2026-02-10** 🎉 Released **v0.1.3.post6** with improvements! Check the updates [notes](https://github.com/HKUDS/nanobot/releases/tag/v0.1.3.post6) and our [roadmap](https://github.com/HKUDS/nanobot/discussions/431).
- **2026-02-09** 💬 Added Slack, Email, and QQ support — nanobot now supports multiple chat platforms!
- **2026-02-08** 🔧 Refactored Providers—adding a new LLM provider now takes just 2 simple steps! Check [here](#providers).
<details> <details>
<summary>Earlier news</summary> <summary>Earlier news</summary>
- **2026-02-10** 🎉 Released **v0.1.3.post6** with improvements! Check the updates [notes](https://github.com/HKUDS/nanobot/releases/tag/v0.1.3.post6) and our [roadmap](https://github.com/HKUDS/nanobot/discussions/431).
- **2026-02-09** 💬 Added Slack, Email, and QQ support — nanobot now supports multiple chat platforms!
- **2026-02-08** 🔧 Refactored Providers—adding a new LLM provider now takes just 2 simple steps! Check [here](#providers).
- **2026-02-07** 🚀 Released **v0.1.3.post5** with Qwen support & several key improvements! Check [here](https://github.com/HKUDS/nanobot/releases/tag/v0.1.3.post5) for details. - **2026-02-07** 🚀 Released **v0.1.3.post5** with Qwen support & several key improvements! Check [here](https://github.com/HKUDS/nanobot/releases/tag/v0.1.3.post5) for details.
- **2026-02-06** ✨ Added Moonshot/Kimi provider, Discord integration, and enhanced security hardening! - **2026-02-06** ✨ Added Moonshot/Kimi provider, Discord integration, and enhanced security hardening!
- **2026-02-05** ✨ Added Feishu channel, DeepSeek provider, and enhanced scheduled tasks support! - **2026-02-05** ✨ Added Feishu channel, DeepSeek provider, and enhanced scheduled tasks support!

View File

@@ -82,12 +82,7 @@ Skills with available="false" need dependencies installed first - you can try in
return f"""# nanobot 🐈 return f"""# nanobot 🐈
You are nanobot, a helpful AI assistant. You have access to tools that allow you to: You are nanobot, a helpful AI assistant.
- Read, write, and edit files
- Execute shell commands
- Search the web and fetch web pages
- Send messages to users on chat channels
- Spawn subagents for complex background tasks
## Current Time ## Current Time
{now} ({tz}) {now} ({tz})
@@ -106,6 +101,7 @@ Only use the 'message' tool when you need to send a message to a specific chat c
For normal conversation, just respond with text - do not call the message tool. For normal conversation, just respond with text - do not call the message tool.
Always be helpful, accurate, and concise. Before calling tools, briefly tell the user what you're about to do (one short sentence in the user's language). Always be helpful, accurate, and concise. Before calling tools, briefly tell the user what you're about to do (one short sentence in the user's language).
If you need to use tools, call them directly — never send a preliminary message like "Let me check" without actually calling a tool.
When remembering something important, write to {workspace_path}/memory/MEMORY.md When remembering something important, write to {workspace_path}/memory/MEMORY.md
To recall past events, grep {workspace_path}/memory/HISTORY.md""" To recall past events, grep {workspace_path}/memory/HISTORY.md"""
@@ -235,7 +231,7 @@ To recall past events, grep {workspace_path}/memory/HISTORY.md"""
msg["tool_calls"] = tool_calls msg["tool_calls"] = tool_calls
# Include reasoning content when provided (required by some thinking models) # Include reasoning content when provided (required by some thinking models)
if reasoning_content: if reasoning_content is not None:
msg["reasoning_content"] = reasoning_content msg["reasoning_content"] = reasoning_content
messages.append(msg) messages.append(msg)

View File

@@ -1,29 +1,35 @@
"""Agent loop: the core processing engine.""" """Agent loop: the core processing engine."""
from __future__ import annotations
import asyncio import asyncio
from contextlib import AsyncExitStack
import json import json
import json_repair
from pathlib import Path
import re import re
from typing import Any, Awaitable, Callable from contextlib import AsyncExitStack
from pathlib import Path
from typing import TYPE_CHECKING, Any, Awaitable, Callable
from loguru import logger from loguru import logger
from nanobot.agent.context import ContextBuilder
from nanobot.agent.memory import MemoryStore
from nanobot.agent.subagent import SubagentManager
from nanobot.agent.tools.cron import CronTool
from nanobot.agent.tools.filesystem import EditFileTool, ListDirTool, ReadFileTool, WriteFileTool
from nanobot.agent.tools.message import MessageTool
from nanobot.agent.tools.registry import ToolRegistry
from nanobot.agent.tools.shell import ExecTool
from nanobot.agent.tools.spawn import SpawnTool
from nanobot.agent.tools.web import WebFetchTool, WebSearchTool
from nanobot.bus.events import InboundMessage, OutboundMessage from nanobot.bus.events import InboundMessage, OutboundMessage
from nanobot.bus.queue import MessageBus from nanobot.bus.queue import MessageBus
from nanobot.providers.base import LLMProvider from nanobot.providers.base import LLMProvider
from nanobot.agent.context import ContextBuilder
from nanobot.agent.tools.registry import ToolRegistry
from nanobot.agent.tools.filesystem import ReadFileTool, WriteFileTool, EditFileTool, ListDirTool
from nanobot.agent.tools.shell import ExecTool
from nanobot.agent.tools.web import WebSearchTool, WebFetchTool
from nanobot.agent.tools.message import MessageTool
from nanobot.agent.tools.spawn import SpawnTool
from nanobot.agent.tools.cron import CronTool
from nanobot.agent.subagent import SubagentManager
from nanobot.session.manager import Session, SessionManager from nanobot.session.manager import Session, SessionManager
if TYPE_CHECKING:
from nanobot.config.schema import ExecToolConfig
from nanobot.cron.service import CronService
class AgentLoop: class AgentLoop:
""" """
@@ -48,14 +54,13 @@ class AgentLoop:
max_tokens: int = 4096, max_tokens: int = 4096,
memory_window: int = 50, memory_window: int = 50,
brave_api_key: str | None = None, brave_api_key: str | None = None,
exec_config: "ExecToolConfig | None" = None, exec_config: ExecToolConfig | None = None,
cron_service: "CronService | None" = None, cron_service: CronService | None = None,
restrict_to_workspace: bool = False, restrict_to_workspace: bool = False,
session_manager: SessionManager | None = None, session_manager: SessionManager | None = None,
mcp_servers: dict | None = None, mcp_servers: dict | None = None,
): ):
from nanobot.config.schema import ExecToolConfig from nanobot.config.schema import ExecToolConfig
from nanobot.cron.service import CronService
self.bus = bus self.bus = bus
self.provider = provider self.provider = provider
self.workspace = workspace self.workspace = workspace
@@ -83,57 +88,55 @@ class AgentLoop:
exec_config=self.exec_config, exec_config=self.exec_config,
restrict_to_workspace=restrict_to_workspace, restrict_to_workspace=restrict_to_workspace,
) )
self._running = False self._running = False
self._mcp_servers = mcp_servers or {} self._mcp_servers = mcp_servers or {}
self._mcp_stack: AsyncExitStack | None = None self._mcp_stack: AsyncExitStack | None = None
self._mcp_connected = False self._mcp_connected = False
self._mcp_connecting = False
self._consolidating: set[str] = set() # Session keys with consolidation in progress self._consolidating: set[str] = set() # Session keys with consolidation in progress
self._consolidation_tasks: set[asyncio.Task] = set() # Strong refs to in-flight tasks self._consolidation_tasks: set[asyncio.Task] = set() # Strong refs to in-flight tasks
self._consolidation_locks: dict[str, asyncio.Lock] = {} self._consolidation_locks: dict[str, asyncio.Lock] = {}
self._register_default_tools() self._register_default_tools()
def _register_default_tools(self) -> None: def _register_default_tools(self) -> None:
"""Register the default set of tools.""" """Register the default set of tools."""
# File tools (workspace for relative paths, restrict if configured)
allowed_dir = self.workspace if self.restrict_to_workspace else None allowed_dir = self.workspace if self.restrict_to_workspace else None
self.tools.register(ReadFileTool(workspace=self.workspace, allowed_dir=allowed_dir)) for cls in (ReadFileTool, WriteFileTool, EditFileTool, ListDirTool):
self.tools.register(WriteFileTool(workspace=self.workspace, allowed_dir=allowed_dir)) self.tools.register(cls(workspace=self.workspace, allowed_dir=allowed_dir))
self.tools.register(EditFileTool(workspace=self.workspace, allowed_dir=allowed_dir))
self.tools.register(ListDirTool(workspace=self.workspace, allowed_dir=allowed_dir))
# Shell tool
self.tools.register(ExecTool( self.tools.register(ExecTool(
working_dir=str(self.workspace), working_dir=str(self.workspace),
timeout=self.exec_config.timeout, timeout=self.exec_config.timeout,
restrict_to_workspace=self.restrict_to_workspace, restrict_to_workspace=self.restrict_to_workspace,
)) ))
# Web tools
self.tools.register(WebSearchTool(api_key=self.brave_api_key)) self.tools.register(WebSearchTool(api_key=self.brave_api_key))
self.tools.register(WebFetchTool()) self.tools.register(WebFetchTool())
self.tools.register(MessageTool(send_callback=self.bus.publish_outbound))
# Message tool self.tools.register(SpawnTool(manager=self.subagents))
message_tool = MessageTool(send_callback=self.bus.publish_outbound)
self.tools.register(message_tool)
# Spawn tool (for subagents)
spawn_tool = SpawnTool(manager=self.subagents)
self.tools.register(spawn_tool)
# Cron tool (for scheduling)
if self.cron_service: if self.cron_service:
self.tools.register(CronTool(self.cron_service)) self.tools.register(CronTool(self.cron_service))
async def _connect_mcp(self) -> None: async def _connect_mcp(self) -> None:
"""Connect to configured MCP servers (one-time, lazy).""" """Connect to configured MCP servers (one-time, lazy)."""
if self._mcp_connected or not self._mcp_servers: if self._mcp_connected or self._mcp_connecting or not self._mcp_servers:
return return
self._mcp_connected = True self._mcp_connecting = True
from nanobot.agent.tools.mcp import connect_mcp_servers from nanobot.agent.tools.mcp import connect_mcp_servers
self._mcp_stack = AsyncExitStack() try:
await self._mcp_stack.__aenter__() self._mcp_stack = AsyncExitStack()
await connect_mcp_servers(self._mcp_servers, self.tools, self._mcp_stack) await self._mcp_stack.__aenter__()
await connect_mcp_servers(self._mcp_servers, self.tools, self._mcp_stack)
self._mcp_connected = True
except Exception as e:
logger.error("Failed to connect MCP servers (will retry next message): {}", e)
if self._mcp_stack:
try:
await self._mcp_stack.aclose()
except Exception:
pass
self._mcp_stack = None
finally:
self._mcp_connecting = False
def _set_tool_context(self, channel: str, chat_id: str, message_id: str | None = None) -> None: def _set_tool_context(self, channel: str, chat_id: str, message_id: str | None = None) -> None:
"""Update context for all tools that need routing info.""" """Update context for all tools that need routing info."""
@@ -171,21 +174,11 @@ class AgentLoop:
initial_messages: list[dict], initial_messages: list[dict],
on_progress: Callable[[str], Awaitable[None]] | None = None, on_progress: Callable[[str], Awaitable[None]] | None = None,
) -> tuple[str | None, list[str]]: ) -> tuple[str | None, list[str]]:
""" """Run the agent iteration loop. Returns (final_content, tools_used)."""
Run the agent iteration loop.
Args:
initial_messages: Starting messages for the LLM conversation.
on_progress: Optional callback to push intermediate content to the user.
Returns:
Tuple of (final_content, list_of_tools_used).
"""
messages = initial_messages messages = initial_messages
iteration = 0 iteration = 0
final_content = None final_content = None
tools_used: list[str] = [] tools_used: list[str] = []
text_only_retried = False
while iteration < self.max_iterations: while iteration < self.max_iterations:
iteration += 1 iteration += 1
@@ -231,17 +224,6 @@ class AgentLoop:
) )
else: else:
final_content = self._strip_think(response.content) final_content = self._strip_think(response.content)
# Some models send an interim text response before tool calls.
# Give them one retry; don't forward the text to avoid duplicates.
if not tools_used and not text_only_retried and final_content:
text_only_retried = True
logger.debug("Interim text response (no tools used yet), retrying: {}", final_content[:80])
messages = self.context.add_assistant_message(
messages, response.content,
reasoning_content=response.reasoning_content,
)
final_content = None
continue
break break
return final_content, tools_used return final_content, tools_used
@@ -260,8 +242,12 @@ class AgentLoop:
) )
try: try:
response = await self._process_message(msg) response = await self._process_message(msg)
if response: if response is not None:
await self.bus.publish_outbound(response) await self.bus.publish_outbound(response)
elif msg.channel == "cli":
await self.bus.publish_outbound(OutboundMessage(
channel=msg.channel, chat_id=msg.chat_id, content="", metadata=msg.metadata or {},
))
except Exception as e: except Exception as e:
logger.error("Error processing message: {}", e) logger.error("Error processing message: {}", e)
await self.bus.publish_outbound(OutboundMessage( await self.bus.publish_outbound(OutboundMessage(
@@ -271,7 +257,7 @@ class AgentLoop:
)) ))
except asyncio.TimeoutError: except asyncio.TimeoutError:
continue continue
async def close_mcp(self) -> None: async def close_mcp(self) -> None:
"""Close MCP connections.""" """Close MCP connections."""
if self._mcp_stack: if self._mcp_stack:
@@ -294,76 +280,77 @@ class AgentLoop:
return lock return lock
def _prune_consolidation_lock(self, session_key: str, lock: asyncio.Lock) -> None: def _prune_consolidation_lock(self, session_key: str, lock: asyncio.Lock) -> None:
"""Drop unused per-session lock entries to avoid unbounded growth.""" """Drop lock entry if no longer in use."""
waiters = getattr(lock, "_waiters", None) if not lock.locked():
has_waiters = bool(waiters) self._consolidation_locks.pop(session_key, None)
if lock.locked() or has_waiters:
return
self._consolidation_locks.pop(session_key, None)
async def _process_message( async def _process_message(
self, self,
msg: InboundMessage, msg: InboundMessage,
session_key: str | None = None, session_key: str | None = None,
on_progress: Callable[[str], Awaitable[None]] | None = None, on_progress: Callable[[str], Awaitable[None]] | None = None,
) -> OutboundMessage | None: ) -> OutboundMessage | None:
""" """Process a single inbound message and return the response."""
Process a single inbound message. # System messages: parse origin from chat_id ("channel:chat_id")
Args:
msg: The inbound message to process.
session_key: Override session key (used by process_direct).
on_progress: Optional callback for intermediate output (defaults to bus publish).
Returns:
The response message, or None if no response needed.
"""
# System messages route back via chat_id ("channel:chat_id")
if msg.channel == "system": if msg.channel == "system":
return await self._process_system_message(msg) channel, chat_id = (msg.chat_id.split(":", 1) if ":" in msg.chat_id
else ("cli", msg.chat_id))
logger.info("Processing system message from {}", msg.sender_id)
key = f"{channel}:{chat_id}"
session = self.sessions.get_or_create(key)
self._set_tool_context(channel, chat_id, msg.metadata.get("message_id"))
messages = self.context.build_messages(
history=session.get_history(max_messages=self.memory_window),
current_message=msg.content, channel=channel, chat_id=chat_id,
)
final_content, _ = await self._run_agent_loop(messages)
session.add_message("user", f"[System: {msg.sender_id}] {msg.content}")
session.add_message("assistant", final_content or "Background task completed.")
self.sessions.save(session)
return OutboundMessage(channel=channel, chat_id=chat_id,
content=final_content or "Background task completed.")
preview = msg.content[:80] + "..." if len(msg.content) > 80 else msg.content preview = msg.content[:80] + "..." if len(msg.content) > 80 else msg.content
logger.info("Processing message from {}:{}: {}", msg.channel, msg.sender_id, preview) logger.info("Processing message from {}:{}: {}", msg.channel, msg.sender_id, preview)
key = session_key or msg.session_key key = session_key or msg.session_key
session = self.sessions.get_or_create(key) session = self.sessions.get_or_create(key)
# Handle slash commands # Slash commands
cmd = msg.content.strip().lower() cmd = msg.content.strip().lower()
if cmd == "/new": if cmd == "/new":
lock = self._get_consolidation_lock(session.key) lock = self._get_consolidation_lock(session.key)
messages_to_archive: list[dict[str, Any]] = [] self._consolidating.add(session.key)
try: try:
async with lock: async with lock:
messages_to_archive = session.messages[session.last_consolidated :].copy() snapshot = session.messages[session.last_consolidated:]
temp_session = Session(key=session.key) if snapshot:
temp_session.messages = messages_to_archive temp = Session(key=session.key)
archived = await self._consolidate_memory(temp_session, archive_all=True) temp.messages = list(snapshot)
except Exception as e: if not await self._consolidate_memory(temp, archive_all=True):
logger.error("/new archival failed for {}: {}", session.key, e) return OutboundMessage(
channel=msg.channel, chat_id=msg.chat_id,
content="Memory archival failed, session not cleared. Please try again.",
)
except Exception:
logger.exception("/new archival failed for {}", session.key)
return OutboundMessage( return OutboundMessage(
channel=msg.channel, channel=msg.channel, chat_id=msg.chat_id,
chat_id=msg.chat_id, content="Memory archival failed, session not cleared. Please try again.",
content="Could not start a new session because memory archival failed. Please try again.",
)
if messages_to_archive and not archived:
return OutboundMessage(
channel=msg.channel,
chat_id=msg.chat_id,
content="Could not start a new session because memory archival failed. Please try again.",
) )
finally:
self._consolidating.discard(session.key)
self._prune_consolidation_lock(session.key, lock)
session.clear() session.clear()
self.sessions.save(session) self.sessions.save(session)
self.sessions.invalidate(session.key) self.sessions.invalidate(session.key)
self._prune_consolidation_lock(session.key, lock)
return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id,
content="New session started. Memory consolidation in progress.") content="New session started.")
if cmd == "/help": if cmd == "/help":
return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id,
content="🐈 nanobot commands:\n/new — Start a new conversation\n/help — Show available commands") content="🐈 nanobot commands:\n/new — Start a new conversation\n/help — Show available commands")
if len(session.messages) > self.memory_window and session.key not in self._consolidating: if len(session.messages) > self.memory_window and session.key not in self._consolidating:
self._consolidating.add(session.key) self._consolidating.add(session.key)
lock = self._get_consolidation_lock(session.key) lock = self._get_consolidation_lock(session.key)
@@ -383,18 +370,22 @@ class AgentLoop:
self._consolidation_tasks.add(_task) self._consolidation_tasks.add(_task)
self._set_tool_context(msg.channel, msg.chat_id, msg.metadata.get("message_id")) self._set_tool_context(msg.channel, msg.chat_id, msg.metadata.get("message_id"))
if message_tool := self.tools.get("message"):
if isinstance(message_tool, MessageTool):
message_tool.start_turn()
initial_messages = self.context.build_messages( initial_messages = self.context.build_messages(
history=session.get_history(max_messages=self.memory_window), history=session.get_history(max_messages=self.memory_window),
current_message=msg.content, current_message=msg.content,
media=msg.media if msg.media else None, media=msg.media if msg.media else None,
channel=msg.channel, channel=msg.channel, chat_id=msg.chat_id,
chat_id=msg.chat_id,
) )
async def _bus_progress(content: str) -> None: async def _bus_progress(content: str) -> None:
meta = dict(msg.metadata or {})
meta["_progress"] = True
await self.bus.publish_outbound(OutboundMessage( await self.bus.publish_outbound(OutboundMessage(
channel=msg.channel, chat_id=msg.chat_id, content=content, channel=msg.channel, chat_id=msg.chat_id, content=content, metadata=meta,
metadata=msg.metadata or {},
)) ))
final_content, tools_used = await self._run_agent_loop( final_content, tools_used = await self._run_agent_loop(
@@ -403,165 +394,30 @@ class AgentLoop:
if final_content is None: if final_content is None:
final_content = "I've completed processing but have no response to give." final_content = "I've completed processing but have no response to give."
preview = final_content[:120] + "..." if len(final_content) > 120 else final_content preview = final_content[:120] + "..." if len(final_content) > 120 else final_content
logger.info("Response to {}:{}: {}", msg.channel, msg.sender_id, preview) logger.info("Response to {}:{}: {}", msg.channel, msg.sender_id, preview)
session.add_message("user", msg.content) session.add_message("user", msg.content)
session.add_message("assistant", final_content, session.add_message("assistant", final_content,
tools_used=tools_used if tools_used else None) tools_used=tools_used if tools_used else None)
self.sessions.save(session) self.sessions.save(session)
return OutboundMessage(
channel=msg.channel,
chat_id=msg.chat_id,
content=final_content,
metadata=msg.metadata or {}, # Pass through for channel-specific needs (e.g. Slack thread_ts)
)
async def _process_system_message(self, msg: InboundMessage) -> OutboundMessage | None:
"""
Process a system message (e.g., subagent announce).
The chat_id field contains "original_channel:original_chat_id" to route
the response back to the correct destination.
"""
logger.info("Processing system message from {}", msg.sender_id)
# Parse origin from chat_id (format: "channel:chat_id")
if ":" in msg.chat_id:
parts = msg.chat_id.split(":", 1)
origin_channel = parts[0]
origin_chat_id = parts[1]
else:
# Fallback
origin_channel = "cli"
origin_chat_id = msg.chat_id
session_key = f"{origin_channel}:{origin_chat_id}"
session = self.sessions.get_or_create(session_key)
self._set_tool_context(origin_channel, origin_chat_id, msg.metadata.get("message_id"))
initial_messages = self.context.build_messages(
history=session.get_history(max_messages=self.memory_window),
current_message=msg.content,
channel=origin_channel,
chat_id=origin_chat_id,
)
final_content, _ = await self._run_agent_loop(initial_messages)
if final_content is None: if message_tool := self.tools.get("message"):
final_content = "Background task completed." if isinstance(message_tool, MessageTool) and message_tool._sent_in_turn:
return None
session.add_message("user", f"[System: {msg.sender_id}] {msg.content}")
session.add_message("assistant", final_content)
self.sessions.save(session)
return OutboundMessage( return OutboundMessage(
channel=origin_channel, channel=msg.channel, chat_id=msg.chat_id, content=final_content,
chat_id=origin_chat_id, metadata=msg.metadata or {},
content=final_content
) )
async def _consolidate_memory(self, session, archive_all: bool = False) -> bool: async def _consolidate_memory(self, session, archive_all: bool = False) -> bool:
"""Consolidate old messages into MEMORY.md + HISTORY.md. """Delegate to MemoryStore.consolidate(). Returns True on success."""
return await MemoryStore(self.workspace).consolidate(
Args: session, self.provider, self.model,
archive_all: If True, clear all messages and reset session (for /new command). archive_all=archive_all, memory_window=self.memory_window,
If False, only write to files without modifying session. )
"""
memory = self.context.memory
if archive_all:
old_messages = session.messages
keep_count = 0
logger.info("Memory consolidation (archive_all): {} total messages archived", len(session.messages))
else:
keep_count = self.memory_window // 2
if len(session.messages) <= keep_count:
logger.debug("Session {}: No consolidation needed (messages={}, keep={})", session.key, len(session.messages), keep_count)
return True
messages_to_process = len(session.messages) - session.last_consolidated
if messages_to_process <= 0:
logger.debug("Session {}: No new messages to consolidate (last_consolidated={}, total={})", session.key, session.last_consolidated, len(session.messages))
return True
old_messages = session.messages[session.last_consolidated:-keep_count]
if not old_messages:
return True
logger.info("Memory consolidation started: {} total, {} new to consolidate, {} keep", len(session.messages), len(old_messages), keep_count)
lines = []
for m in old_messages:
if not m.get("content"):
continue
tools = f" [tools: {', '.join(m['tools_used'])}]" if m.get("tools_used") else ""
lines.append(f"[{m.get('timestamp', '?')[:16]}] {m['role'].upper()}{tools}: {m['content']}")
conversation = "\n".join(lines)
current_memory = memory.read_long_term()
prompt = f"""You are a memory consolidation agent. Process this conversation and return a JSON object with exactly two keys:
1. "history_entry": A paragraph (2-5 sentences) summarizing the key events/decisions/topics. Start with a timestamp like [YYYY-MM-DD HH:MM]. Include enough detail to be useful when found by grep search later.
2. "memory_update": The updated long-term memory content. Add any new facts: user location, preferences, personal info, habits, project context, technical decisions, tools/services used. If nothing new, return the existing content unchanged.
## Current Long-term Memory
{current_memory or "(empty)"}
## Conversation to Process
{conversation}
**IMPORTANT**: Both values MUST be strings, not objects or arrays.
Example:
{{
"history_entry": "[2026-02-14 22:50] User asked about...",
"memory_update": "- Host: HARRYBOOK-T14P\n- Name: Nado"
}}
Respond with ONLY valid JSON, no markdown fences."""
try:
response = await self.provider.chat(
messages=[
{"role": "system", "content": "You are a memory consolidation agent. Respond only with valid JSON."},
{"role": "user", "content": prompt},
],
model=self.model,
)
text = (response.content or "").strip()
if not text:
logger.warning("Memory consolidation: LLM returned empty response, skipping")
return False
if text.startswith("```"):
text = text.split("\n", 1)[-1].rsplit("```", 1)[0].strip()
result = json_repair.loads(text)
if not isinstance(result, dict):
logger.warning("Memory consolidation: unexpected response type, skipping. Response: {}", text[:200])
return False
if entry := result.get("history_entry"):
# Defensive: ensure entry is a string (LLM may return dict)
if not isinstance(entry, str):
entry = json.dumps(entry, ensure_ascii=False)
memory.append_history(entry)
if update := result.get("memory_update"):
# Defensive: ensure update is a string
if not isinstance(update, str):
update = json.dumps(update, ensure_ascii=False)
if update != current_memory:
memory.write_long_term(update)
if archive_all:
session.last_consolidated = 0
else:
session.last_consolidated = len(session.messages) - keep_count
logger.info("Memory consolidation done: {} messages, last_consolidated={}", len(session.messages), session.last_consolidated)
return True
except Exception as e:
logger.error("Memory consolidation failed: {}", e)
return False
async def process_direct( async def process_direct(
self, self,
@@ -571,26 +427,8 @@ Respond with ONLY valid JSON, no markdown fences."""
chat_id: str = "direct", chat_id: str = "direct",
on_progress: Callable[[str], Awaitable[None]] | None = None, on_progress: Callable[[str], Awaitable[None]] | None = None,
) -> str: ) -> str:
""" """Process a message directly (for CLI or cron usage)."""
Process a message directly (for CLI or cron usage).
Args:
content: The message content.
session_key: Session identifier (overrides channel:chat_id for session lookup).
channel: Source channel (for tool context routing).
chat_id: Source chat ID (for tool context routing).
on_progress: Optional callback for intermediate output.
Returns:
The agent's response.
"""
await self._connect_mcp() await self._connect_mcp()
msg = InboundMessage( msg = InboundMessage(channel=channel, sender_id="user", chat_id=chat_id, content=content)
channel=channel,
sender_id="user",
chat_id=chat_id,
content=content
)
response = await self._process_message(msg, session_key=session_key, on_progress=on_progress) response = await self._process_message(msg, session_key=session_key, on_progress=on_progress)
return response.content if response else "" return response.content if response else ""

View File

@@ -1,9 +1,46 @@
"""Memory system for persistent agent memory.""" """Memory system for persistent agent memory."""
from __future__ import annotations
import json
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING
from loguru import logger
from nanobot.utils.helpers import ensure_dir from nanobot.utils.helpers import ensure_dir
if TYPE_CHECKING:
from nanobot.providers.base import LLMProvider
from nanobot.session.manager import Session
_SAVE_MEMORY_TOOL = [
{
"type": "function",
"function": {
"name": "save_memory",
"description": "Save the memory consolidation result to persistent storage.",
"parameters": {
"type": "object",
"properties": {
"history_entry": {
"type": "string",
"description": "A paragraph (2-5 sentences) summarizing key events/decisions/topics. "
"Start with [YYYY-MM-DD HH:MM]. Include detail useful for grep search.",
},
"memory_update": {
"type": "string",
"description": "Full updated long-term memory as markdown. Include all existing "
"facts plus new ones. Return unchanged if nothing new.",
},
},
"required": ["history_entry", "memory_update"],
},
},
}
]
class MemoryStore: class MemoryStore:
"""Two-layer memory: MEMORY.md (long-term facts) + HISTORY.md (grep-searchable log).""" """Two-layer memory: MEMORY.md (long-term facts) + HISTORY.md (grep-searchable log)."""
@@ -28,3 +65,79 @@ class MemoryStore:
def get_memory_context(self) -> str: def get_memory_context(self) -> str:
long_term = self.read_long_term() long_term = self.read_long_term()
return f"## Long-term Memory\n{long_term}" if long_term else "" return f"## Long-term Memory\n{long_term}" if long_term else ""
async def consolidate(
self,
session: Session,
provider: LLMProvider,
model: str,
*,
archive_all: bool = False,
memory_window: int = 50,
) -> bool:
"""Consolidate old messages into MEMORY.md + HISTORY.md via LLM tool call.
Returns True on success (including no-op), False on failure.
"""
if archive_all:
old_messages = session.messages
keep_count = 0
logger.info("Memory consolidation (archive_all): {} messages", len(session.messages))
else:
keep_count = memory_window // 2
if len(session.messages) <= keep_count:
return True
if len(session.messages) - session.last_consolidated <= 0:
return True
old_messages = session.messages[session.last_consolidated:-keep_count]
if not old_messages:
return True
logger.info("Memory consolidation: {} to consolidate, {} keep", len(old_messages), keep_count)
lines = []
for m in old_messages:
if not m.get("content"):
continue
tools = f" [tools: {', '.join(m['tools_used'])}]" if m.get("tools_used") else ""
lines.append(f"[{m.get('timestamp', '?')[:16]}] {m['role'].upper()}{tools}: {m['content']}")
current_memory = self.read_long_term()
prompt = f"""Process this conversation and call the save_memory tool with your consolidation.
## Current Long-term Memory
{current_memory or "(empty)"}
## Conversation to Process
{chr(10).join(lines)}"""
try:
response = await provider.chat(
messages=[
{"role": "system", "content": "You are a memory consolidation agent. Call the save_memory tool with your consolidation of the conversation."},
{"role": "user", "content": prompt},
],
tools=_SAVE_MEMORY_TOOL,
model=model,
)
if not response.has_tool_calls:
logger.warning("Memory consolidation: LLM did not call save_memory, skipping")
return False
args = response.tool_calls[0].arguments
if entry := args.get("history_entry"):
if not isinstance(entry, str):
entry = json.dumps(entry, ensure_ascii=False)
self.append_history(entry)
if update := args.get("memory_update"):
if not isinstance(update, str):
update = json.dumps(update, ensure_ascii=False)
if update != current_memory:
self.write_long_term(update)
session.last_consolidated = 0 if archive_all else len(session.messages) - keep_count
logger.info("Memory consolidation done: {} messages, last_consolidated={}", len(session.messages), session.last_consolidated)
return True
except Exception:
logger.exception("Memory consolidation failed")
return False

View File

@@ -1,5 +1,6 @@
"""File system tools: read, write, edit.""" """File system tools: read, write, edit."""
import difflib
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
@@ -12,8 +13,11 @@ def _resolve_path(path: str, workspace: Path | None = None, allowed_dir: Path |
if not p.is_absolute() and workspace: if not p.is_absolute() and workspace:
p = workspace / p p = workspace / p
resolved = p.resolve() resolved = p.resolve()
if allowed_dir and not str(resolved).startswith(str(allowed_dir.resolve())): if allowed_dir:
raise PermissionError(f"Path {path} is outside allowed directory {allowed_dir}") try:
resolved.relative_to(allowed_dir.resolve())
except ValueError:
raise PermissionError(f"Path {path} is outside allowed directory {allowed_dir}")
return resolved return resolved
@@ -150,7 +154,7 @@ class EditFileTool(Tool):
content = file_path.read_text(encoding="utf-8") content = file_path.read_text(encoding="utf-8")
if old_text not in content: if old_text not in content:
return f"Error: old_text not found in file. Make sure it matches exactly." return self._not_found_message(old_text, content, path)
# Count occurrences # Count occurrences
count = content.count(old_text) count = content.count(old_text)
@@ -166,6 +170,28 @@ class EditFileTool(Tool):
except Exception as e: except Exception as e:
return f"Error editing file: {str(e)}" return f"Error editing file: {str(e)}"
@staticmethod
def _not_found_message(old_text: str, content: str, path: str) -> str:
"""Build a helpful error when old_text is not found."""
lines = content.splitlines(keepends=True)
old_lines = old_text.splitlines(keepends=True)
window = len(old_lines)
best_ratio, best_start = 0.0, 0
for i in range(max(1, len(lines) - window + 1)):
ratio = difflib.SequenceMatcher(None, old_lines, lines[i : i + window]).ratio()
if ratio > best_ratio:
best_ratio, best_start = ratio, i
if best_ratio > 0.5:
diff = "\n".join(difflib.unified_diff(
old_lines, lines[best_start : best_start + window],
fromfile="old_text (provided)", tofile=f"{path} (actual, line {best_start + 1})",
lineterm="",
))
return f"Error: old_text not found in {path}.\nBest match ({best_ratio:.0%} similar) at line {best_start + 1}:\n{diff}"
return f"Error: old_text not found in {path}. No similar text found. Verify the file content."
class ListDirTool(Tool): class ListDirTool(Tool):
"""Tool to list directory contents.""" """Tool to list directory contents."""

View File

@@ -1,6 +1,6 @@
"""Message tool for sending messages to users.""" """Message tool for sending messages to users."""
from typing import Any, Callable, Awaitable from typing import Any, Awaitable, Callable
from nanobot.agent.tools.base import Tool from nanobot.agent.tools.base import Tool
from nanobot.bus.events import OutboundMessage from nanobot.bus.events import OutboundMessage
@@ -8,37 +8,42 @@ from nanobot.bus.events import OutboundMessage
class MessageTool(Tool): class MessageTool(Tool):
"""Tool to send messages to users on chat channels.""" """Tool to send messages to users on chat channels."""
def __init__( def __init__(
self, self,
send_callback: Callable[[OutboundMessage], Awaitable[None]] | None = None, send_callback: Callable[[OutboundMessage], Awaitable[None]] | None = None,
default_channel: str = "", default_channel: str = "",
default_chat_id: str = "", default_chat_id: str = "",
default_message_id: str | None = None default_message_id: str | None = None,
): ):
self._send_callback = send_callback self._send_callback = send_callback
self._default_channel = default_channel self._default_channel = default_channel
self._default_chat_id = default_chat_id self._default_chat_id = default_chat_id
self._default_message_id = default_message_id self._default_message_id = default_message_id
self._sent_in_turn: bool = False
def set_context(self, channel: str, chat_id: str, message_id: str | None = None) -> None: def set_context(self, channel: str, chat_id: str, message_id: str | None = None) -> None:
"""Set the current message context.""" """Set the current message context."""
self._default_channel = channel self._default_channel = channel
self._default_chat_id = chat_id self._default_chat_id = chat_id
self._default_message_id = message_id self._default_message_id = message_id
def set_send_callback(self, callback: Callable[[OutboundMessage], Awaitable[None]]) -> None: def set_send_callback(self, callback: Callable[[OutboundMessage], Awaitable[None]]) -> None:
"""Set the callback for sending messages.""" """Set the callback for sending messages."""
self._send_callback = callback self._send_callback = callback
def start_turn(self) -> None:
"""Reset per-turn send tracking."""
self._sent_in_turn = False
@property @property
def name(self) -> str: def name(self) -> str:
return "message" return "message"
@property @property
def description(self) -> str: def description(self) -> str:
return "Send a message to the user. Use this when you want to communicate something." return "Send a message to the user. Use this when you want to communicate something."
@property @property
def parameters(self) -> dict[str, Any]: def parameters(self) -> dict[str, Any]:
return { return {
@@ -64,11 +69,11 @@ class MessageTool(Tool):
}, },
"required": ["content"] "required": ["content"]
} }
async def execute( async def execute(
self, self,
content: str, content: str,
channel: str | None = None, channel: str | None = None,
chat_id: str | None = None, chat_id: str | None = None,
message_id: str | None = None, message_id: str | None = None,
media: list[str] | None = None, media: list[str] | None = None,
@@ -77,13 +82,13 @@ class MessageTool(Tool):
channel = channel or self._default_channel channel = channel or self._default_channel
chat_id = chat_id or self._default_chat_id chat_id = chat_id or self._default_chat_id
message_id = message_id or self._default_message_id message_id = message_id or self._default_message_id
if not channel or not chat_id: if not channel or not chat_id:
return "Error: No target channel/chat specified" return "Error: No target channel/chat specified"
if not self._send_callback: if not self._send_callback:
return "Error: Message sending not configured" return "Error: Message sending not configured"
msg = OutboundMessage( msg = OutboundMessage(
channel=channel, channel=channel,
chat_id=chat_id, chat_id=chat_id,
@@ -93,9 +98,10 @@ class MessageTool(Tool):
"message_id": message_id, "message_id": message_id,
} }
) )
try: try:
await self._send_callback(msg) await self._send_callback(msg)
self._sent_in_turn = True
media_info = f" with {len(media)} attachments" if media else "" media_info = f" with {len(media)} attachments" if media else ""
return f"Message sent to {channel}:{chat_id}{media_info}" return f"Message sent to {channel}:{chat_id}{media_info}"
except Exception as e: except Exception as e:

View File

@@ -105,8 +105,9 @@ class BaseChannel(ABC):
""" """
if not self.is_allowed(sender_id): if not self.is_allowed(sender_id):
logger.warning( logger.warning(
f"Access denied for sender {sender_id} on channel {self.name}. " "Access denied for sender {} on channel {}. "
f"Add them to allowFrom list in config to grant access." "Add them to allowFrom list in config to grant access.",
sender_id, self.name,
) )
return return

View File

@@ -58,7 +58,8 @@ class NanobotDingTalkHandler(CallbackHandler):
if not content: if not content:
logger.warning( logger.warning(
f"Received empty or unsupported message type: {chatbot_msg.message_type}" "Received empty or unsupported message type: {}",
chatbot_msg.message_type,
) )
return AckMessage.STATUS_OK, "OK" return AckMessage.STATUS_OK, "OK"
@@ -126,7 +127,8 @@ class DingTalkChannel(BaseChannel):
self._http = httpx.AsyncClient() self._http = httpx.AsyncClient()
logger.info( logger.info(
f"Initializing DingTalk Stream Client with Client ID: {self.config.client_id}..." "Initializing DingTalk Stream Client with Client ID: {}...",
self.config.client_id,
) )
credential = Credential(self.config.client_id, self.config.client_secret) credential = Credential(self.config.client_id, self.config.client_secret)
self._client = DingTalkStreamClient(credential) self._client = DingTalkStreamClient(credential)

View File

@@ -17,6 +17,29 @@ from nanobot.config.schema import DiscordConfig
DISCORD_API_BASE = "https://discord.com/api/v10" DISCORD_API_BASE = "https://discord.com/api/v10"
MAX_ATTACHMENT_BYTES = 20 * 1024 * 1024 # 20MB MAX_ATTACHMENT_BYTES = 20 * 1024 * 1024 # 20MB
MAX_MESSAGE_LEN = 2000 # Discord message character limit
def _split_message(content: str, max_len: int = MAX_MESSAGE_LEN) -> list[str]:
"""Split content into chunks within max_len, preferring line breaks."""
if not content:
return []
if len(content) <= max_len:
return [content]
chunks: list[str] = []
while content:
if len(content) <= max_len:
chunks.append(content)
break
cut = content[:max_len]
pos = cut.rfind('\n')
if pos <= 0:
pos = cut.rfind(' ')
if pos <= 0:
pos = max_len
chunks.append(content[:pos])
content = content[pos:].lstrip()
return chunks
class DiscordChannel(BaseChannel): class DiscordChannel(BaseChannel):
@@ -79,34 +102,48 @@ class DiscordChannel(BaseChannel):
return return
url = f"{DISCORD_API_BASE}/channels/{msg.chat_id}/messages" url = f"{DISCORD_API_BASE}/channels/{msg.chat_id}/messages"
payload: dict[str, Any] = {"content": msg.content}
if msg.reply_to:
payload["message_reference"] = {"message_id": msg.reply_to}
payload["allowed_mentions"] = {"replied_user": False}
headers = {"Authorization": f"Bot {self.config.token}"} headers = {"Authorization": f"Bot {self.config.token}"}
try: try:
for attempt in range(3): chunks = _split_message(msg.content or "")
try: if not chunks:
response = await self._http.post(url, headers=headers, json=payload) return
if response.status_code == 429:
data = response.json() for i, chunk in enumerate(chunks):
retry_after = float(data.get("retry_after", 1.0)) payload: dict[str, Any] = {"content": chunk}
logger.warning("Discord rate limited, retrying in {}s", retry_after)
await asyncio.sleep(retry_after) # Only set reply reference on the first chunk
continue if i == 0 and msg.reply_to:
response.raise_for_status() payload["message_reference"] = {"message_id": msg.reply_to}
return payload["allowed_mentions"] = {"replied_user": False}
except Exception as e:
if attempt == 2: if not await self._send_payload(url, headers, payload):
logger.error("Error sending Discord message: {}", e) break # Abort remaining chunks on failure
else:
await asyncio.sleep(1)
finally: finally:
await self._stop_typing(msg.chat_id) await self._stop_typing(msg.chat_id)
async def _send_payload(
self, url: str, headers: dict[str, str], payload: dict[str, Any]
) -> bool:
"""Send a single Discord API payload with retry on rate-limit. Returns True on success."""
for attempt in range(3):
try:
response = await self._http.post(url, headers=headers, json=payload)
if response.status_code == 429:
data = response.json()
retry_after = float(data.get("retry_after", 1.0))
logger.warning("Discord rate limited, retrying in {}s", retry_after)
await asyncio.sleep(retry_after)
continue
response.raise_for_status()
return True
except Exception as e:
if attempt == 2:
logger.error("Error sending Discord message: {}", e)
else:
await asyncio.sleep(1)
return False
async def _gateway_loop(self) -> None: async def _gateway_loop(self) -> None:
"""Main gateway loop: identify, heartbeat, dispatch events.""" """Main gateway loop: identify, heartbeat, dispatch events."""
if not self._ws: if not self._ws:

View File

@@ -304,7 +304,8 @@ class EmailChannel(BaseChannel):
self._processed_uids.add(uid) self._processed_uids.add(uid)
# mark_seen is the primary dedup; this set is a safety net # mark_seen is the primary dedup; this set is a safety net
if len(self._processed_uids) > self._MAX_PROCESSED_UIDS: if len(self._processed_uids) > self._MAX_PROCESSED_UIDS:
self._processed_uids.clear() # Evict a random half to cap memory; mark_seen is the primary dedup
self._processed_uids = set(list(self._processed_uids)[len(self._processed_uids) // 2:])
if mark_seen: if mark_seen:
client.store(imap_id, "+FLAGS", "\\Seen") client.store(imap_id, "+FLAGS", "\\Seen")

View File

@@ -6,6 +6,7 @@ import os
import re import re
import threading import threading
from collections import OrderedDict from collections import OrderedDict
from pathlib import Path
from typing import Any from typing import Any
from loguru import logger from loguru import logger
@@ -27,6 +28,8 @@ try:
CreateMessageReactionRequest, CreateMessageReactionRequest,
CreateMessageReactionRequestBody, CreateMessageReactionRequestBody,
Emoji, Emoji,
GetFileRequest,
GetMessageResourceRequest,
P2ImMessageReceiveV1, P2ImMessageReceiveV1,
) )
FEISHU_AVAILABLE = True FEISHU_AVAILABLE = True
@@ -44,6 +47,139 @@ MSG_TYPE_MAP = {
} }
def _extract_share_card_content(content_json: dict, msg_type: str) -> str:
"""Extract text representation from share cards and interactive messages."""
parts = []
if msg_type == "share_chat":
parts.append(f"[shared chat: {content_json.get('chat_id', '')}]")
elif msg_type == "share_user":
parts.append(f"[shared user: {content_json.get('user_id', '')}]")
elif msg_type == "interactive":
parts.extend(_extract_interactive_content(content_json))
elif msg_type == "share_calendar_event":
parts.append(f"[shared calendar event: {content_json.get('event_key', '')}]")
elif msg_type == "system":
parts.append("[system message]")
elif msg_type == "merge_forward":
parts.append("[merged forward messages]")
return "\n".join(parts) if parts else f"[{msg_type}]"
def _extract_interactive_content(content: dict) -> list[str]:
"""Recursively extract text and links from interactive card content."""
parts = []
if isinstance(content, str):
try:
content = json.loads(content)
except (json.JSONDecodeError, TypeError):
return [content] if content.strip() else []
if not isinstance(content, dict):
return parts
if "title" in content:
title = content["title"]
if isinstance(title, dict):
title_content = title.get("content", "") or title.get("text", "")
if title_content:
parts.append(f"title: {title_content}")
elif isinstance(title, str):
parts.append(f"title: {title}")
for element in content.get("elements", []) if isinstance(content.get("elements"), list) else []:
parts.extend(_extract_element_content(element))
card = content.get("card", {})
if card:
parts.extend(_extract_interactive_content(card))
header = content.get("header", {})
if header:
header_title = header.get("title", {})
if isinstance(header_title, dict):
header_text = header_title.get("content", "") or header_title.get("text", "")
if header_text:
parts.append(f"title: {header_text}")
return parts
def _extract_element_content(element: dict) -> list[str]:
"""Extract content from a single card element."""
parts = []
if not isinstance(element, dict):
return parts
tag = element.get("tag", "")
if tag in ("markdown", "lark_md"):
content = element.get("content", "")
if content:
parts.append(content)
elif tag == "div":
text = element.get("text", {})
if isinstance(text, dict):
text_content = text.get("content", "") or text.get("text", "")
if text_content:
parts.append(text_content)
elif isinstance(text, str):
parts.append(text)
for field in element.get("fields", []):
if isinstance(field, dict):
field_text = field.get("text", {})
if isinstance(field_text, dict):
c = field_text.get("content", "")
if c:
parts.append(c)
elif tag == "a":
href = element.get("href", "")
text = element.get("text", "")
if href:
parts.append(f"link: {href}")
if text:
parts.append(text)
elif tag == "button":
text = element.get("text", {})
if isinstance(text, dict):
c = text.get("content", "")
if c:
parts.append(c)
url = element.get("url", "") or element.get("multi_url", {}).get("url", "")
if url:
parts.append(f"link: {url}")
elif tag == "img":
alt = element.get("alt", {})
parts.append(alt.get("content", "[image]") if isinstance(alt, dict) else "[image]")
elif tag == "note":
for ne in element.get("elements", []):
parts.extend(_extract_element_content(ne))
elif tag == "column_set":
for col in element.get("columns", []):
for ce in col.get("elements", []):
parts.extend(_extract_element_content(ce))
elif tag == "plain_text":
content = element.get("content", "")
if content:
parts.append(content)
else:
for ne in element.get("elements", []):
parts.extend(_extract_element_content(ne))
return parts
def _extract_post_text(content_json: dict) -> str: def _extract_post_text(content_json: dict) -> str:
"""Extract plain text from Feishu post (rich text) message content. """Extract plain text from Feishu post (rich text) message content.
@@ -345,6 +481,87 @@ class FeishuChannel(BaseChannel):
logger.error("Error uploading file {}: {}", file_path, e) logger.error("Error uploading file {}: {}", file_path, e)
return None return None
def _download_image_sync(self, message_id: str, image_key: str) -> tuple[bytes | None, str | None]:
"""Download an image from Feishu message by message_id and image_key."""
try:
request = GetMessageResourceRequest.builder() \
.message_id(message_id) \
.file_key(image_key) \
.type("image") \
.build()
response = self._client.im.v1.message_resource.get(request)
if response.success():
file_data = response.file
# GetMessageResourceRequest returns BytesIO, need to read bytes
if hasattr(file_data, 'read'):
file_data = file_data.read()
return file_data, response.file_name
else:
logger.error("Failed to download image: code={}, msg={}", response.code, response.msg)
return None, None
except Exception as e:
logger.error("Error downloading image {}: {}", image_key, e)
return None, None
def _download_file_sync(self, file_key: str) -> tuple[bytes | None, str | None]:
"""Download a file from Feishu by file_key."""
try:
request = GetFileRequest.builder().file_key(file_key).build()
response = self._client.im.v1.file.get(request)
if response.success():
return response.file, response.file_name
else:
logger.error("Failed to download file: code={}, msg={}", response.code, response.msg)
return None, None
except Exception as e:
logger.error("Error downloading file {}: {}", file_key, e)
return None, None
async def _download_and_save_media(
self,
msg_type: str,
content_json: dict,
message_id: str | None = None
) -> tuple[str | None, str]:
"""
Download media from Feishu and save to local disk.
Returns:
(file_path, content_text) - file_path is None if download failed
"""
loop = asyncio.get_running_loop()
media_dir = Path.home() / ".nanobot" / "media"
media_dir.mkdir(parents=True, exist_ok=True)
data, filename = None, None
if msg_type == "image":
image_key = content_json.get("image_key")
if image_key and message_id:
data, filename = await loop.run_in_executor(
None, self._download_image_sync, message_id, image_key
)
if not filename:
filename = f"{image_key[:16]}.jpg"
elif msg_type in ("audio", "file"):
file_key = content_json.get("file_key")
if file_key:
data, filename = await loop.run_in_executor(
None, self._download_file_sync, file_key
)
if not filename:
ext = ".opus" if msg_type == "audio" else ""
filename = f"{file_key[:16]}{ext}"
if data and filename:
file_path = media_dir / filename
file_path.write_bytes(data)
logger.debug("Downloaded {} to {}", msg_type, file_path)
return str(file_path), f"[{msg_type}: {filename}]"
return None, f"[{msg_type}: download failed]"
def _send_message_sync(self, receive_id_type: str, receive_id: str, msg_type: str, content: str) -> bool: def _send_message_sync(self, receive_id_type: str, receive_id: str, msg_type: str, content: str) -> bool:
"""Send a single message (text/image/file/interactive) synchronously.""" """Send a single message (text/image/file/interactive) synchronously."""
try: try:
@@ -425,60 +642,81 @@ class FeishuChannel(BaseChannel):
event = data.event event = data.event
message = event.message message = event.message
sender = event.sender sender = event.sender
# Deduplication check # Deduplication check
message_id = message.message_id message_id = message.message_id
if message_id in self._processed_message_ids: if message_id in self._processed_message_ids:
return return
self._processed_message_ids[message_id] = None self._processed_message_ids[message_id] = None
# Trim cache: keep most recent 500 when exceeds 1000 # Trim cache
while len(self._processed_message_ids) > 1000: while len(self._processed_message_ids) > 1000:
self._processed_message_ids.popitem(last=False) self._processed_message_ids.popitem(last=False)
# Skip bot messages # Skip bot messages
sender_type = sender.sender_type if sender.sender_type == "bot":
if sender_type == "bot":
return return
sender_id = sender.sender_id.open_id if sender.sender_id else "unknown" sender_id = sender.sender_id.open_id if sender.sender_id else "unknown"
chat_id = message.chat_id chat_id = message.chat_id
chat_type = message.chat_type # "p2p" or "group" chat_type = message.chat_type
msg_type = message.message_type msg_type = message.message_type
# Add reaction to indicate "seen" # Add reaction
await self._add_reaction(message_id, "THUMBSUP") await self._add_reaction(message_id, "THUMBSUP")
# Parse message content # Parse content
content_parts = []
media_paths = []
try:
content_json = json.loads(message.content) if message.content else {}
except json.JSONDecodeError:
content_json = {}
if msg_type == "text": if msg_type == "text":
try: text = content_json.get("text", "")
content = json.loads(message.content).get("text", "") if text:
except json.JSONDecodeError: content_parts.append(text)
content = message.content or ""
elif msg_type == "post": elif msg_type == "post":
try: text = _extract_post_text(content_json)
content_json = json.loads(message.content) if text:
content = _extract_post_text(content_json) content_parts.append(text)
except (json.JSONDecodeError, TypeError):
content = message.content or "" elif msg_type in ("image", "audio", "file"):
file_path, content_text = await self._download_and_save_media(msg_type, content_json, message_id)
if file_path:
media_paths.append(file_path)
content_parts.append(content_text)
elif msg_type in ("share_chat", "share_user", "interactive", "share_calendar_event", "system", "merge_forward"):
# Handle share cards and interactive messages
text = _extract_share_card_content(content_json, msg_type)
if text:
content_parts.append(text)
else: else:
content = MSG_TYPE_MAP.get(msg_type, f"[{msg_type}]") content_parts.append(MSG_TYPE_MAP.get(msg_type, f"[{msg_type}]"))
if not content: content = "\n".join(content_parts) if content_parts else ""
if not content and not media_paths:
return return
# Forward to message bus # Forward to message bus
reply_to = chat_id if chat_type == "group" else sender_id reply_to = chat_id if chat_type == "group" else sender_id
await self._handle_message( await self._handle_message(
sender_id=sender_id, sender_id=sender_id,
chat_id=reply_to, chat_id=reply_to,
content=content, content=content,
media=media_paths,
metadata={ metadata={
"message_id": message_id, "message_id": message_id,
"chat_type": chat_type, "chat_type": chat_type,
"msg_type": msg_type, "msg_type": msg_type,
} }
) )
except Exception as e: except Exception as e:
logger.error("Error processing Feishu message: {}", e) logger.error("Error processing Feishu message: {}", e)

View File

@@ -84,11 +84,24 @@ class SlackChannel(BaseChannel):
channel_type = slack_meta.get("channel_type") channel_type = slack_meta.get("channel_type")
# Only reply in thread for channel/group messages; DMs don't use threads # Only reply in thread for channel/group messages; DMs don't use threads
use_thread = thread_ts and channel_type != "im" use_thread = thread_ts and channel_type != "im"
await self._web_client.chat_postMessage( thread_ts_param = thread_ts if use_thread else None
channel=msg.chat_id,
text=self._to_mrkdwn(msg.content), if msg.content:
thread_ts=thread_ts if use_thread else None, await self._web_client.chat_postMessage(
) channel=msg.chat_id,
text=self._to_mrkdwn(msg.content),
thread_ts=thread_ts_param,
)
for media_path in msg.media or []:
try:
await self._web_client.files_upload_v2(
channel=msg.chat_id,
file=media_path,
thread_ts=thread_ts_param,
)
except Exception as e:
logger.error("Failed to upload file {}: {}", media_path, e)
except Exception as e: except Exception as e:
logger.error("Error sending Slack message: {}", e) logger.error("Error sending Slack message: {}", e)
@@ -166,18 +179,21 @@ class SlackChannel(BaseChannel):
except Exception as e: except Exception as e:
logger.debug("Slack reactions_add failed: {}", e) logger.debug("Slack reactions_add failed: {}", e)
await self._handle_message( try:
sender_id=sender_id, await self._handle_message(
chat_id=chat_id, sender_id=sender_id,
content=text, chat_id=chat_id,
metadata={ content=text,
"slack": { metadata={
"event": event, "slack": {
"thread_ts": thread_ts, "event": event,
"channel_type": channel_type, "thread_ts": thread_ts,
} "channel_type": channel_type,
}, }
) },
)
except Exception:
logger.exception("Error handling Slack message from {}", sender_id)
def _is_allowed(self, sender_id: str, chat_id: str, channel_type: str) -> bool: def _is_allowed(self, sender_id: str, chat_id: str, channel_type: str) -> bool:
if channel_type == "im": if channel_type == "im":

View File

@@ -498,27 +498,58 @@ def agent(
console.print(f" [dim]↳ {content}[/dim]") console.print(f" [dim]↳ {content}[/dim]")
if message: if message:
# Single message mode # Single message mode — direct call, no bus needed
async def run_once(): async def run_once():
with _thinking_ctx(): with _thinking_ctx():
response = await agent_loop.process_direct(message, session_id, on_progress=_cli_progress) response = await agent_loop.process_direct(message, session_id, on_progress=_cli_progress)
_print_agent_response(response, render_markdown=markdown) _print_agent_response(response, render_markdown=markdown)
await agent_loop.close_mcp() await agent_loop.close_mcp()
asyncio.run(run_once()) asyncio.run(run_once())
else: else:
# Interactive mode # Interactive mode — route through bus like other channels
from nanobot.bus.events import InboundMessage
_init_prompt_session() _init_prompt_session()
console.print(f"{__logo__} Interactive mode (type [bold]exit[/bold] or [bold]Ctrl+C[/bold] to quit)\n") console.print(f"{__logo__} Interactive mode (type [bold]exit[/bold] or [bold]Ctrl+C[/bold] to quit)\n")
if ":" in session_id:
cli_channel, cli_chat_id = session_id.split(":", 1)
else:
cli_channel, cli_chat_id = "cli", session_id
def _exit_on_sigint(signum, frame): def _exit_on_sigint(signum, frame):
_restore_terminal() _restore_terminal()
console.print("\nGoodbye!") console.print("\nGoodbye!")
os._exit(0) os._exit(0)
signal.signal(signal.SIGINT, _exit_on_sigint) signal.signal(signal.SIGINT, _exit_on_sigint)
async def run_interactive(): async def run_interactive():
bus_task = asyncio.create_task(agent_loop.run())
turn_done = asyncio.Event()
turn_done.set()
turn_response: list[str] = []
async def _consume_outbound():
while True:
try:
msg = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0)
if msg.metadata.get("_progress"):
console.print(f" [dim]↳ {msg.content}[/dim]")
elif not turn_done.is_set():
if msg.content:
turn_response.append(msg.content)
turn_done.set()
elif msg.content:
console.print()
_print_agent_response(msg.content, render_markdown=markdown)
except asyncio.TimeoutError:
continue
except asyncio.CancelledError:
break
outbound_task = asyncio.create_task(_consume_outbound())
try: try:
while True: while True:
try: try:
@@ -532,10 +563,22 @@ def agent(
_restore_terminal() _restore_terminal()
console.print("\nGoodbye!") console.print("\nGoodbye!")
break break
turn_done.clear()
turn_response.clear()
await bus.publish_inbound(InboundMessage(
channel=cli_channel,
sender_id="user",
chat_id=cli_chat_id,
content=user_input,
))
with _thinking_ctx(): with _thinking_ctx():
response = await agent_loop.process_direct(user_input, session_id, on_progress=_cli_progress) await turn_done.wait()
_print_agent_response(response, render_markdown=markdown)
if turn_response:
_print_agent_response(turn_response[0], render_markdown=markdown)
except KeyboardInterrupt: except KeyboardInterrupt:
_restore_terminal() _restore_terminal()
console.print("\nGoodbye!") console.print("\nGoodbye!")
@@ -545,8 +588,11 @@ def agent(
console.print("\nGoodbye!") console.print("\nGoodbye!")
break break
finally: finally:
agent_loop.stop()
outbound_task.cancel()
await asyncio.gather(bus_task, outbound_task, return_exceptions=True)
await agent_loop.close_mcp() await agent_loop.close_mcp()
asyncio.run(run_interactive()) asyncio.run(run_interactive())
@@ -622,6 +668,33 @@ def channels_status():
slack_config slack_config
) )
# DingTalk
dt = config.channels.dingtalk
dt_config = f"client_id: {dt.client_id[:10]}..." if dt.client_id else "[dim]not configured[/dim]"
table.add_row(
"DingTalk",
"" if dt.enabled else "",
dt_config
)
# QQ
qq = config.channels.qq
qq_config = f"app_id: {qq.app_id[:10]}..." if qq.app_id else "[dim]not configured[/dim]"
table.add_row(
"QQ",
"" if qq.enabled else "",
qq_config
)
# Email
em = config.channels.email
em_config = em.imap_host if em.imap_host else "[dim]not configured[/dim]"
table.add_row(
"Email",
"" if em.enabled else "",
em_config
)
console.print(table) console.print(table)

View File

@@ -111,7 +111,7 @@ class LiteLLMProvider(LLMProvider):
def _supports_cache_control(self, model: str) -> bool: def _supports_cache_control(self, model: str) -> bool:
"""Return True when the provider supports cache_control on content blocks.""" """Return True when the provider supports cache_control on content blocks."""
if self._gateway is not None: if self._gateway is not None:
return False return self._gateway.supports_prompt_caching
spec = find_by_model(model) spec = find_by_model(model)
return spec is not None and spec.supports_prompt_caching return spec is not None and spec.supports_prompt_caching

View File

@@ -100,6 +100,7 @@ PROVIDERS: tuple[ProviderSpec, ...] = (
default_api_base="https://openrouter.ai/api/v1", default_api_base="https://openrouter.ai/api/v1",
strip_model_prefix=False, strip_model_prefix=False,
model_overrides=(), model_overrides=(),
supports_prompt_caching=True,
), ),
# AiHubMix: global gateway, OpenAI-compatible interface. # AiHubMix: global gateway, OpenAI-compatible interface.
@@ -146,7 +147,7 @@ PROVIDERS: tuple[ProviderSpec, ...] = (
keywords=("volcengine", "volces", "ark"), keywords=("volcengine", "volces", "ark"),
env_key="OPENAI_API_KEY", env_key="OPENAI_API_KEY",
display_name="VolcEngine", display_name="VolcEngine",
litellm_prefix="openai", litellm_prefix="volcengine",
skip_prefixes=(), skip_prefixes=(),
env_extras=(), env_extras=(),
is_gateway=True, is_gateway=True,

View File

@@ -1,6 +1,7 @@
"""Session management for conversation history.""" """Session management for conversation history."""
import json import json
import shutil
from pathlib import Path from pathlib import Path
from dataclasses import dataclass, field from dataclasses import dataclass, field
from datetime import datetime from datetime import datetime
@@ -108,9 +109,11 @@ class SessionManager:
if not path.exists(): if not path.exists():
legacy_path = self._get_legacy_session_path(key) legacy_path = self._get_legacy_session_path(key)
if legacy_path.exists(): if legacy_path.exists():
import shutil try:
shutil.move(str(legacy_path), str(path)) shutil.move(str(legacy_path), str(path))
logger.info("Migrated session {} from legacy path", key) logger.info("Migrated session {} from legacy path", key)
except Exception:
logger.exception("Failed to migrate session {}", key)
if not path.exists(): if not path.exists():
return None return None
@@ -154,6 +157,7 @@ class SessionManager:
with open(path, "w", encoding="utf-8") as f: with open(path, "w", encoding="utf-8") as f:
metadata_line = { metadata_line = {
"_type": "metadata", "_type": "metadata",
"key": session.key,
"created_at": session.created_at.isoformat(), "created_at": session.created_at.isoformat(),
"updated_at": session.updated_at.isoformat(), "updated_at": session.updated_at.isoformat(),
"metadata": session.metadata, "metadata": session.metadata,
@@ -186,8 +190,9 @@ class SessionManager:
if first_line: if first_line:
data = json.loads(first_line) data = json.loads(first_line)
if data.get("_type") == "metadata": if data.get("_type") == "metadata":
key = data.get("key") or path.stem.replace("_", ":", 1)
sessions.append({ sessions.append({
"key": path.stem.replace("_", ":"), "key": key,
"created_at": data.get("created_at"), "created_at": data.get("created_at"),
"updated_at": data.get("updated_at"), "updated_at": data.get("updated_at"),
"path": str(path) "path": str(path)

View File

@@ -1,6 +1,6 @@
[project] [project]
name = "nanobot-ai" name = "nanobot-ai"
version = "0.1.4" version = "0.1.4.post1"
description = "A lightweight personal AI assistant framework" description = "A lightweight personal AI assistant framework"
requires-python = ">=3.11" requires-python = ">=3.11"
license = {text = "MIT"} license = {text = "MIT"}