Merge branch 'main' into pr-892

This commit is contained in:
Re-bin
2026-02-20 16:06:17 +00:00
9 changed files with 174 additions and 99 deletions

View File

@@ -1,30 +1,36 @@
"""Agent loop: the core processing engine."""
import asyncio
from contextlib import AsyncExitStack
import json
import json_repair
from pathlib import Path
import re
from typing import Any, Awaitable, Callable
from __future__ import annotations
import asyncio
import json
import re
from contextlib import AsyncExitStack
from pathlib import Path
from typing import TYPE_CHECKING, Awaitable, Callable
import json_repair
from loguru import logger
from nanobot.agent.context import ContextBuilder
from nanobot.agent.memory import MemoryStore
from nanobot.agent.subagent import SubagentManager
from nanobot.agent.tools.cron import CronTool
from nanobot.agent.tools.filesystem import EditFileTool, ListDirTool, ReadFileTool, WriteFileTool
from nanobot.agent.tools.message import MessageTool
from nanobot.agent.tools.registry import ToolRegistry
from nanobot.agent.tools.shell import ExecTool
from nanobot.agent.tools.spawn import SpawnTool
from nanobot.agent.tools.web import WebFetchTool, WebSearchTool
from nanobot.bus.events import InboundMessage, OutboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.providers.base import LLMProvider
from nanobot.agent.context import ContextBuilder
from nanobot.agent.tools.registry import ToolRegistry
from nanobot.agent.tools.filesystem import ReadFileTool, WriteFileTool, EditFileTool, ListDirTool
from nanobot.agent.tools.shell import ExecTool
from nanobot.agent.tools.web import WebSearchTool, WebFetchTool
from nanobot.agent.tools.message import MessageTool
from nanobot.agent.tools.spawn import SpawnTool
from nanobot.agent.tools.cron import CronTool
from nanobot.agent.memory import MemoryStore
from nanobot.agent.subagent import SubagentManager
from nanobot.session.manager import Session, SessionManager
if TYPE_CHECKING:
from nanobot.config.schema import ExecToolConfig
from nanobot.cron.service import CronService
class AgentLoop:
"""
@@ -49,14 +55,13 @@ class AgentLoop:
max_tokens: int = 4096,
memory_window: int = 50,
brave_api_key: str | None = None,
exec_config: "ExecToolConfig | None" = None,
cron_service: "CronService | None" = None,
exec_config: ExecToolConfig | None = None,
cron_service: CronService | None = None,
restrict_to_workspace: bool = False,
session_manager: SessionManager | None = None,
mcp_servers: dict | None = None,
):
from nanobot.config.schema import ExecToolConfig
from nanobot.cron.service import CronService
self.bus = bus
self.provider = provider
self.workspace = workspace
@@ -358,6 +363,10 @@ class AgentLoop:
asyncio.create_task(_consolidate_and_unlock())
self._set_tool_context(msg.channel, msg.chat_id, msg.metadata.get("message_id"))
if message_tool := self.tools.get("message"):
if isinstance(message_tool, MessageTool):
message_tool.start_turn()
initial_messages = self.context.build_messages(
history=session.get_history(max_messages=self.memory_window),
current_message=msg.content,
@@ -387,6 +396,10 @@ class AgentLoop:
tools_used=tools_used if tools_used else None)
self.sessions.save(session)
if message_tool := self.tools.get("message"):
if isinstance(message_tool, MessageTool) and message_tool._sent_in_turn:
return None
return OutboundMessage(
channel=msg.channel,
chat_id=msg.chat_id,

View File

@@ -1,6 +1,6 @@
"""Message tool for sending messages to users."""
from typing import Any, Callable, Awaitable
from typing import Any, Awaitable, Callable
from nanobot.agent.tools.base import Tool
from nanobot.bus.events import OutboundMessage
@@ -14,12 +14,13 @@ class MessageTool(Tool):
send_callback: Callable[[OutboundMessage], Awaitable[None]] | None = None,
default_channel: str = "",
default_chat_id: str = "",
default_message_id: str | None = None
default_message_id: str | None = None,
):
self._send_callback = send_callback
self._default_channel = default_channel
self._default_chat_id = default_chat_id
self._default_message_id = default_message_id
self._sent_in_turn: bool = False
def set_context(self, channel: str, chat_id: str, message_id: str | None = None) -> None:
"""Set the current message context."""
@@ -31,6 +32,10 @@ class MessageTool(Tool):
"""Set the callback for sending messages."""
self._send_callback = callback
def start_turn(self) -> None:
"""Reset per-turn send tracking."""
self._sent_in_turn = False
@property
def name(self) -> str:
return "message"
@@ -96,6 +101,7 @@ class MessageTool(Tool):
try:
await self._send_callback(msg)
self._sent_in_turn = True
media_info = f" with {len(media)} attachments" if media else ""
return f"Message sent to {channel}:{chat_id}{media_info}"
except Exception as e:

View File

@@ -105,8 +105,9 @@ class BaseChannel(ABC):
"""
if not self.is_allowed(sender_id):
logger.warning(
f"Access denied for sender {sender_id} on channel {self.name}. "
f"Add them to allowFrom list in config to grant access."
"Access denied for sender {} on channel {}. "
"Add them to allowFrom list in config to grant access.",
sender_id, self.name,
)
return

View File

@@ -58,7 +58,8 @@ class NanobotDingTalkHandler(CallbackHandler):
if not content:
logger.warning(
f"Received empty or unsupported message type: {chatbot_msg.message_type}"
"Received empty or unsupported message type: {}",
chatbot_msg.message_type,
)
return AckMessage.STATUS_OK, "OK"
@@ -126,7 +127,8 @@ class DingTalkChannel(BaseChannel):
self._http = httpx.AsyncClient()
logger.info(
f"Initializing DingTalk Stream Client with Client ID: {self.config.client_id}..."
"Initializing DingTalk Stream Client with Client ID: {}...",
self.config.client_id,
)
credential = Credential(self.config.client_id, self.config.client_secret)
self._client = DingTalkStreamClient(credential)

View File

@@ -17,6 +17,29 @@ from nanobot.config.schema import DiscordConfig
DISCORD_API_BASE = "https://discord.com/api/v10"
MAX_ATTACHMENT_BYTES = 20 * 1024 * 1024 # 20MB
MAX_MESSAGE_LEN = 2000 # Discord message character limit
def _split_message(content: str, max_len: int = MAX_MESSAGE_LEN) -> list[str]:
"""Split content into chunks within max_len, preferring line breaks."""
if not content:
return []
if len(content) <= max_len:
return [content]
chunks: list[str] = []
while content:
if len(content) <= max_len:
chunks.append(content)
break
cut = content[:max_len]
pos = cut.rfind('\n')
if pos <= 0:
pos = cut.rfind(' ')
if pos <= 0:
pos = max_len
chunks.append(content[:pos])
content = content[pos:].lstrip()
return chunks
class DiscordChannel(BaseChannel):
@@ -79,34 +102,48 @@ class DiscordChannel(BaseChannel):
return
url = f"{DISCORD_API_BASE}/channels/{msg.chat_id}/messages"
payload: dict[str, Any] = {"content": msg.content}
if msg.reply_to:
payload["message_reference"] = {"message_id": msg.reply_to}
payload["allowed_mentions"] = {"replied_user": False}
headers = {"Authorization": f"Bot {self.config.token}"}
try:
for attempt in range(3):
try:
response = await self._http.post(url, headers=headers, json=payload)
if response.status_code == 429:
data = response.json()
retry_after = float(data.get("retry_after", 1.0))
logger.warning("Discord rate limited, retrying in {}s", retry_after)
await asyncio.sleep(retry_after)
continue
response.raise_for_status()
return
except Exception as e:
if attempt == 2:
logger.error("Error sending Discord message: {}", e)
else:
await asyncio.sleep(1)
chunks = _split_message(msg.content or "")
if not chunks:
return
for i, chunk in enumerate(chunks):
payload: dict[str, Any] = {"content": chunk}
# Only set reply reference on the first chunk
if i == 0 and msg.reply_to:
payload["message_reference"] = {"message_id": msg.reply_to}
payload["allowed_mentions"] = {"replied_user": False}
if not await self._send_payload(url, headers, payload):
break # Abort remaining chunks on failure
finally:
await self._stop_typing(msg.chat_id)
async def _send_payload(
self, url: str, headers: dict[str, str], payload: dict[str, Any]
) -> bool:
"""Send a single Discord API payload with retry on rate-limit. Returns True on success."""
for attempt in range(3):
try:
response = await self._http.post(url, headers=headers, json=payload)
if response.status_code == 429:
data = response.json()
retry_after = float(data.get("retry_after", 1.0))
logger.warning("Discord rate limited, retrying in {}s", retry_after)
await asyncio.sleep(retry_after)
continue
response.raise_for_status()
return True
except Exception as e:
if attempt == 2:
logger.error("Error sending Discord message: {}", e)
else:
await asyncio.sleep(1)
return False
async def _gateway_loop(self) -> None:
"""Main gateway loop: identify, heartbeat, dispatch events."""
if not self._ws:

View File

@@ -84,11 +84,24 @@ class SlackChannel(BaseChannel):
channel_type = slack_meta.get("channel_type")
# Only reply in thread for channel/group messages; DMs don't use threads
use_thread = thread_ts and channel_type != "im"
await self._web_client.chat_postMessage(
channel=msg.chat_id,
text=self._to_mrkdwn(msg.content),
thread_ts=thread_ts if use_thread else None,
)
thread_ts_param = thread_ts if use_thread else None
if msg.content:
await self._web_client.chat_postMessage(
channel=msg.chat_id,
text=self._to_mrkdwn(msg.content),
thread_ts=thread_ts_param,
)
for media_path in msg.media or []:
try:
await self._web_client.files_upload_v2(
channel=msg.chat_id,
file=media_path,
thread_ts=thread_ts_param,
)
except Exception as e:
logger.error("Failed to upload file {}: {}", media_path, e)
except Exception as e:
logger.error("Error sending Slack message: {}", e)

View File

@@ -111,7 +111,7 @@ class LiteLLMProvider(LLMProvider):
def _supports_cache_control(self, model: str) -> bool:
"""Return True when the provider supports cache_control on content blocks."""
if self._gateway is not None:
return False
return self._gateway.supports_prompt_caching
spec = find_by_model(model)
return spec is not None and spec.supports_prompt_caching

View File

@@ -100,6 +100,7 @@ PROVIDERS: tuple[ProviderSpec, ...] = (
default_api_base="https://openrouter.ai/api/v1",
strip_model_prefix=False,
model_overrides=(),
supports_prompt_caching=True,
),
# AiHubMix: global gateway, OpenAI-compatible interface.

View File

@@ -154,6 +154,7 @@ class SessionManager:
with open(path, "w", encoding="utf-8") as f:
metadata_line = {
"_type": "metadata",
"key": session.key,
"created_at": session.created_at.isoformat(),
"updated_at": session.updated_at.isoformat(),
"metadata": session.metadata,
@@ -186,8 +187,9 @@ class SessionManager:
if first_line:
data = json.loads(first_line)
if data.get("_type") == "metadata":
key = data.get("key") or path.stem.replace("_", ":", 1)
sessions.append({
"key": path.stem.replace("_", ":"),
"key": key,
"created_at": data.get("created_at"),
"updated_at": data.get("updated_at"),
"path": str(path)