merge origin/main into pr-1327

Made-with: Cursor
This commit is contained in:
Re-bin
2026-03-11 07:30:38 +00:00
33 changed files with 2335 additions and 445 deletions

View File

@@ -10,7 +10,7 @@ from typing import Any
from nanobot.agent.memory import MemoryStore
from nanobot.agent.skills import SkillsLoader
from nanobot.utils.helpers import detect_image_mime
from nanobot.utils.helpers import build_assistant_message, detect_image_mime
class ContextBuilder:
@@ -182,12 +182,10 @@ Reply directly with text for conversations. Only use the 'message' tool to send
thinking_blocks: list[dict] | None = None,
) -> list[dict[str, Any]]:
"""Add an assistant message to the message list."""
msg: dict[str, Any] = {"role": "assistant", "content": content}
if tool_calls:
msg["tool_calls"] = tool_calls
if reasoning_content is not None:
msg["reasoning_content"] = reasoning_content
if thinking_blocks:
msg["thinking_blocks"] = thinking_blocks
messages.append(msg)
messages.append(build_assistant_message(
content,
tool_calls=tool_calls,
reasoning_content=reasoning_content,
thinking_blocks=thinking_blocks,
))
return messages

View File

@@ -5,7 +5,6 @@ from __future__ import annotations
import asyncio
import json
import re
import weakref
from contextlib import AsyncExitStack
from pathlib import Path
from typing import TYPE_CHECKING, Any, Awaitable, Callable
@@ -13,7 +12,7 @@ from typing import TYPE_CHECKING, Any, Awaitable, Callable
from loguru import logger
from nanobot.agent.context import ContextBuilder
from nanobot.agent.memory import MemoryStore
from nanobot.agent.memory import MemoryConsolidator
from nanobot.agent.subagent import SubagentManager
from nanobot.agent.tools.cron import CronTool
from nanobot.agent.tools.filesystem import EditFileTool, ListDirTool, ReadFileTool, WriteFileTool
@@ -55,8 +54,8 @@ class AgentLoop:
max_iterations: int = 40,
temperature: float = 0.1,
max_tokens: int = 4096,
memory_window: int = 100,
reasoning_effort: str | None = None,
context_window_tokens: int = 65_536,
brave_api_key: str | None = None,
web_proxy: str | None = None,
exec_config: ExecToolConfig | None = None,
@@ -75,8 +74,8 @@ class AgentLoop:
self.max_iterations = max_iterations
self.temperature = temperature
self.max_tokens = max_tokens
self.memory_window = memory_window
self.reasoning_effort = reasoning_effort
self.context_window_tokens = context_window_tokens
self.brave_api_key = brave_api_key
self.web_proxy = web_proxy
self.exec_config = exec_config or ExecToolConfig()
@@ -105,11 +104,17 @@ class AgentLoop:
self._mcp_stack: AsyncExitStack | None = None
self._mcp_connected = False
self._mcp_connecting = False
self._consolidating: set[str] = set() # Session keys with consolidation in progress
self._consolidation_tasks: set[asyncio.Task] = set() # Strong refs to in-flight tasks
self._consolidation_locks: weakref.WeakValueDictionary[str, asyncio.Lock] = weakref.WeakValueDictionary()
self._active_tasks: dict[str, list[asyncio.Task]] = {} # session_key -> tasks
self._processing_lock = asyncio.Lock()
self.memory_consolidator = MemoryConsolidator(
workspace=workspace,
provider=provider,
model=self.model,
sessions=self.sessions,
context_window_tokens=context_window_tokens,
build_messages=self.context.build_messages,
get_tool_definitions=self.tools.get_definitions,
)
self._register_default_tools()
def _register_default_tools(self) -> None:
@@ -182,7 +187,7 @@ class AgentLoop:
initial_messages: list[dict],
on_progress: Callable[..., Awaitable[None]] | None = None,
) -> tuple[str | None, list[str], list[dict]]:
"""Run the agent iteration loop. Returns (final_content, tools_used, messages)."""
"""Run the agent iteration loop."""
messages = initial_messages
iteration = 0
final_content = None
@@ -191,9 +196,11 @@ class AgentLoop:
while iteration < self.max_iterations:
iteration += 1
response = await self.provider.chat(
tool_defs = self.tools.get_definitions()
response = await self.provider.chat_with_retry(
messages=messages,
tools=self.tools.get_definitions(),
tools=tool_defs,
model=self.model,
temperature=self.temperature,
max_tokens=self.max_tokens,
@@ -341,8 +348,9 @@ class AgentLoop:
logger.info("Processing system message from {}", msg.sender_id)
key = f"{channel}:{chat_id}"
session = self.sessions.get_or_create(key)
await self.memory_consolidator.maybe_consolidate_by_tokens(session)
self._set_tool_context(channel, chat_id, msg.metadata.get("message_id"))
history = session.get_history(max_messages=self.memory_window)
history = session.get_history(max_messages=0)
messages = self.context.build_messages(
history=history,
current_message=msg.content, channel=channel, chat_id=chat_id,
@@ -350,6 +358,7 @@ class AgentLoop:
final_content, _, all_msgs = await self._run_agent_loop(messages)
self._save_turn(session, all_msgs, 1 + len(history))
self.sessions.save(session)
await self.memory_consolidator.maybe_consolidate_by_tokens(session)
return OutboundMessage(channel=channel, chat_id=chat_id,
content=final_content or "Background task completed.")
@@ -362,27 +371,20 @@ class AgentLoop:
# Slash commands
cmd = msg.content.strip().lower()
if cmd == "/new":
lock = self._consolidation_locks.setdefault(session.key, asyncio.Lock())
self._consolidating.add(session.key)
try:
async with lock:
snapshot = session.messages[session.last_consolidated:]
if snapshot:
temp = Session(key=session.key)
temp.messages = list(snapshot)
if not await self._consolidate_memory(temp, archive_all=True):
return OutboundMessage(
channel=msg.channel, chat_id=msg.chat_id,
content="Memory archival failed, session not cleared. Please try again.",
)
if not await self.memory_consolidator.archive_unconsolidated(session):
return OutboundMessage(
channel=msg.channel,
chat_id=msg.chat_id,
content="Memory archival failed, session not cleared. Please try again.",
)
except Exception:
logger.exception("/new archival failed for {}", session.key)
return OutboundMessage(
channel=msg.channel, chat_id=msg.chat_id,
channel=msg.channel,
chat_id=msg.chat_id,
content="Memory archival failed, session not cleared. Please try again.",
)
finally:
self._consolidating.discard(session.key)
session.clear()
self.sessions.save(session)
@@ -393,30 +395,14 @@ class AgentLoop:
return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id,
content="🐈 nanobot commands:\n/new — Start a new conversation\n/stop — Stop the current task\n/help — Show available commands")
unconsolidated = len(session.messages) - session.last_consolidated
if (unconsolidated >= self.memory_window and session.key not in self._consolidating):
self._consolidating.add(session.key)
lock = self._consolidation_locks.setdefault(session.key, asyncio.Lock())
async def _consolidate_and_unlock():
try:
async with lock:
await self._consolidate_memory(session)
finally:
self._consolidating.discard(session.key)
_task = asyncio.current_task()
if _task is not None:
self._consolidation_tasks.discard(_task)
_task = asyncio.create_task(_consolidate_and_unlock())
self._consolidation_tasks.add(_task)
await self.memory_consolidator.maybe_consolidate_by_tokens(session)
self._set_tool_context(msg.channel, msg.chat_id, msg.metadata.get("message_id"))
if message_tool := self.tools.get("message"):
if isinstance(message_tool, MessageTool):
message_tool.start_turn()
history = session.get_history(max_messages=self.memory_window)
history = session.get_history(max_messages=0)
initial_messages = self.context.build_messages(
history=history,
current_message=msg.content,
@@ -441,6 +427,7 @@ class AgentLoop:
self._save_turn(session, all_msgs, 1 + len(history))
self.sessions.save(session)
await self.memory_consolidator.maybe_consolidate_by_tokens(session)
if (mt := self.tools.get("message")) and isinstance(mt, MessageTool) and mt._sent_in_turn:
return None
@@ -487,13 +474,6 @@ class AgentLoop:
session.messages.append(entry)
session.updated_at = datetime.now()
async def _consolidate_memory(self, session, archive_all: bool = False) -> bool:
"""Delegate to MemoryStore.consolidate(). Returns True on success."""
return await MemoryStore(self.workspace).consolidate(
session, self.provider, self.model,
archive_all=archive_all, memory_window=self.memory_window,
)
async def process_direct(
self,
content: str,

View File

@@ -2,17 +2,19 @@
from __future__ import annotations
import asyncio
import json
import weakref
from pathlib import Path
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any, Callable
from loguru import logger
from nanobot.utils.helpers import ensure_dir
from nanobot.utils.helpers import ensure_dir, estimate_message_tokens, estimate_prompt_tokens_chain
if TYPE_CHECKING:
from nanobot.providers.base import LLMProvider
from nanobot.session.manager import Session
from nanobot.session.manager import Session, SessionManager
_SAVE_MEMORY_TOOL = [
@@ -26,7 +28,7 @@ _SAVE_MEMORY_TOOL = [
"properties": {
"history_entry": {
"type": "string",
"description": "A paragraph (2-5 sentences) summarizing key events/decisions/topics. "
"description": "A paragraph summarizing key events/decisions/topics. "
"Start with [YYYY-MM-DD HH:MM]. Include detail useful for grep search.",
},
"memory_update": {
@@ -42,6 +44,20 @@ _SAVE_MEMORY_TOOL = [
]
def _ensure_text(value: Any) -> str:
"""Normalize tool-call payload values to text for file storage."""
return value if isinstance(value, str) else json.dumps(value, ensure_ascii=False)
def _normalize_save_memory_args(args: Any) -> dict[str, Any] | None:
"""Normalize provider tool-call arguments to the expected dict shape."""
if isinstance(args, str):
args = json.loads(args)
if isinstance(args, list):
return args[0] if args and isinstance(args[0], dict) else None
return args if isinstance(args, dict) else None
class MemoryStore:
"""Two-layer memory: MEMORY.md (long-term facts) + HISTORY.md (grep-searchable log)."""
@@ -66,40 +82,27 @@ class MemoryStore:
long_term = self.read_long_term()
return f"## Long-term Memory\n{long_term}" if long_term else ""
@staticmethod
def _format_messages(messages: list[dict]) -> str:
lines = []
for message in messages:
if not message.get("content"):
continue
tools = f" [tools: {', '.join(message['tools_used'])}]" if message.get("tools_used") else ""
lines.append(
f"[{message.get('timestamp', '?')[:16]}] {message['role'].upper()}{tools}: {message['content']}"
)
return "\n".join(lines)
async def consolidate(
self,
session: Session,
messages: list[dict],
provider: LLMProvider,
model: str,
*,
archive_all: bool = False,
memory_window: int = 50,
) -> bool:
"""Consolidate old messages into MEMORY.md + HISTORY.md via LLM tool call.
Returns True on success (including no-op), False on failure.
"""
if archive_all:
old_messages = session.messages
keep_count = 0
logger.info("Memory consolidation (archive_all): {} messages", len(session.messages))
else:
keep_count = memory_window // 2
if len(session.messages) <= keep_count:
return True
if len(session.messages) - session.last_consolidated <= 0:
return True
old_messages = session.messages[session.last_consolidated:-keep_count]
if not old_messages:
return True
logger.info("Memory consolidation: {} to consolidate, {} keep", len(old_messages), keep_count)
lines = []
for m in old_messages:
if not m.get("content"):
continue
tools = f" [tools: {', '.join(m['tools_used'])}]" if m.get("tools_used") else ""
lines.append(f"[{m.get('timestamp', '?')[:16]}] {m['role'].upper()}{tools}: {m['content']}")
"""Consolidate the provided message chunk into MEMORY.md + HISTORY.md."""
if not messages:
return True
current_memory = self.read_long_term()
prompt = f"""Process this conversation and call the save_memory tool with your consolidation.
@@ -108,10 +111,10 @@ class MemoryStore:
{current_memory or "(empty)"}
## Conversation to Process
{chr(10).join(lines)}"""
{self._format_messages(messages)}"""
try:
response = await provider.chat(
response = await provider.chat_with_retry(
messages=[
{"role": "system", "content": "You are a memory consolidation agent. Call the save_memory tool with your consolidation of the conversation."},
{"role": "user", "content": prompt},
@@ -124,34 +127,158 @@ class MemoryStore:
logger.warning("Memory consolidation: LLM did not call save_memory, skipping")
return False
args = response.tool_calls[0].arguments
# Some providers return arguments as a JSON string instead of dict
if isinstance(args, str):
args = json.loads(args)
# Some providers return arguments as a list (handle edge case)
if isinstance(args, list):
if args and isinstance(args[0], dict):
args = args[0]
else:
logger.warning("Memory consolidation: unexpected arguments as empty or non-dict list")
return False
if not isinstance(args, dict):
logger.warning("Memory consolidation: unexpected arguments type {}", type(args).__name__)
args = _normalize_save_memory_args(response.tool_calls[0].arguments)
if args is None:
logger.warning("Memory consolidation: unexpected save_memory arguments")
return False
if entry := args.get("history_entry"):
if not isinstance(entry, str):
entry = json.dumps(entry, ensure_ascii=False)
self.append_history(entry)
self.append_history(_ensure_text(entry))
if update := args.get("memory_update"):
if not isinstance(update, str):
update = json.dumps(update, ensure_ascii=False)
update = _ensure_text(update)
if update != current_memory:
self.write_long_term(update)
session.last_consolidated = 0 if archive_all else len(session.messages) - keep_count
logger.info("Memory consolidation done: {} messages, last_consolidated={}", len(session.messages), session.last_consolidated)
logger.info("Memory consolidation done for {} messages", len(messages))
return True
except Exception:
logger.exception("Memory consolidation failed")
return False
class MemoryConsolidator:
"""Owns consolidation policy, locking, and session offset updates."""
_MAX_CONSOLIDATION_ROUNDS = 5
def __init__(
self,
workspace: Path,
provider: LLMProvider,
model: str,
sessions: SessionManager,
context_window_tokens: int,
build_messages: Callable[..., list[dict[str, Any]]],
get_tool_definitions: Callable[[], list[dict[str, Any]]],
):
self.store = MemoryStore(workspace)
self.provider = provider
self.model = model
self.sessions = sessions
self.context_window_tokens = context_window_tokens
self._build_messages = build_messages
self._get_tool_definitions = get_tool_definitions
self._locks: weakref.WeakValueDictionary[str, asyncio.Lock] = weakref.WeakValueDictionary()
def get_lock(self, session_key: str) -> asyncio.Lock:
"""Return the shared consolidation lock for one session."""
return self._locks.setdefault(session_key, asyncio.Lock())
async def consolidate_messages(self, messages: list[dict[str, object]]) -> bool:
"""Archive a selected message chunk into persistent memory."""
return await self.store.consolidate(messages, self.provider, self.model)
def pick_consolidation_boundary(
self,
session: Session,
tokens_to_remove: int,
) -> tuple[int, int] | None:
"""Pick a user-turn boundary that removes enough old prompt tokens."""
start = session.last_consolidated
if start >= len(session.messages) or tokens_to_remove <= 0:
return None
removed_tokens = 0
last_boundary: tuple[int, int] | None = None
for idx in range(start, len(session.messages)):
message = session.messages[idx]
if idx > start and message.get("role") == "user":
last_boundary = (idx, removed_tokens)
if removed_tokens >= tokens_to_remove:
return last_boundary
removed_tokens += estimate_message_tokens(message)
return last_boundary
def estimate_session_prompt_tokens(self, session: Session) -> tuple[int, str]:
"""Estimate current prompt size for the normal session history view."""
history = session.get_history(max_messages=0)
channel, chat_id = (session.key.split(":", 1) if ":" in session.key else (None, None))
probe_messages = self._build_messages(
history=history,
current_message="[token-probe]",
channel=channel,
chat_id=chat_id,
)
return estimate_prompt_tokens_chain(
self.provider,
self.model,
probe_messages,
self._get_tool_definitions(),
)
async def archive_unconsolidated(self, session: Session) -> bool:
"""Archive the full unconsolidated tail for /new-style session rollover."""
lock = self.get_lock(session.key)
async with lock:
snapshot = session.messages[session.last_consolidated:]
if not snapshot:
return True
return await self.consolidate_messages(snapshot)
async def maybe_consolidate_by_tokens(self, session: Session) -> None:
"""Loop: archive old messages until prompt fits within half the context window."""
if not session.messages or self.context_window_tokens <= 0:
return
lock = self.get_lock(session.key)
async with lock:
target = self.context_window_tokens // 2
estimated, source = self.estimate_session_prompt_tokens(session)
if estimated <= 0:
return
if estimated < self.context_window_tokens:
logger.debug(
"Token consolidation idle {}: {}/{} via {}",
session.key,
estimated,
self.context_window_tokens,
source,
)
return
for round_num in range(self._MAX_CONSOLIDATION_ROUNDS):
if estimated <= target:
return
boundary = self.pick_consolidation_boundary(session, max(1, estimated - target))
if boundary is None:
logger.debug(
"Token consolidation: no safe boundary for {} (round {})",
session.key,
round_num,
)
return
end_idx = boundary[0]
chunk = session.messages[session.last_consolidated:end_idx]
if not chunk:
return
logger.info(
"Token consolidation round {} for {}: {}/{} via {}, chunk={} msgs",
round_num,
session.key,
estimated,
self.context_window_tokens,
source,
len(chunk),
)
if not await self.consolidate_messages(chunk):
return
session.last_consolidated = end_idx
self.sessions.save(session)
estimated, source = self.estimate_session_prompt_tokens(session)
if estimated <= 0:
return

View File

@@ -16,6 +16,7 @@ from nanobot.bus.events import InboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.config.schema import ExecToolConfig
from nanobot.providers.base import LLMProvider
from nanobot.utils.helpers import build_assistant_message
class SubagentManager:
@@ -123,7 +124,7 @@ class SubagentManager:
while iteration < max_iterations:
iteration += 1
response = await self.provider.chat(
response = await self.provider.chat_with_retry(
messages=messages,
tools=tools.get_definitions(),
model=self.model,
@@ -133,7 +134,6 @@ class SubagentManager:
)
if response.has_tool_calls:
# Add assistant message with tool calls
tool_call_dicts = [
{
"id": tc.id,
@@ -145,11 +145,12 @@ class SubagentManager:
}
for tc in response.tool_calls
]
messages.append({
"role": "assistant",
"content": response.content or "",
"tool_calls": tool_call_dicts,
})
messages.append(build_assistant_message(
response.content or "",
tool_calls=tool_call_dicts,
reasoning_content=response.reasoning_content,
thinking_blocks=response.thinking_blocks,
))
# Execute tools
for tool_call in response.tool_calls:

View File

@@ -57,6 +57,8 @@ class NanobotDingTalkHandler(CallbackHandler):
content = ""
if chatbot_msg.text:
content = chatbot_msg.text.content.strip()
elif chatbot_msg.extensions.get("content", {}).get("recognition"):
content = chatbot_msg.extensions["content"]["recognition"].strip()
if not content:
content = message.data.get("text", {}).get("content", "").strip()

View File

@@ -753,8 +753,9 @@ class FeishuChannel(BaseChannel):
None, self._download_file_sync, message_id, file_key, msg_type
)
if not filename:
ext = {"audio": ".opus", "media": ".mp4"}.get(msg_type, "")
filename = f"{file_key[:16]}{ext}"
filename = file_key[:16]
if msg_type == "audio" and not filename.endswith(".opus"):
filename = f"{filename}.opus"
if data and filename:
file_path = media_dir / filename

View File

@@ -81,8 +81,8 @@ class SlackChannel(BaseChannel):
slack_meta = msg.metadata.get("slack", {}) if msg.metadata else {}
thread_ts = slack_meta.get("thread_ts")
channel_type = slack_meta.get("channel_type")
# Only reply in thread for channel/group messages; DMs don't use threads
thread_ts_param = thread_ts if use_thread else None
# Slack DMs don't use threads; channel/group replies may keep thread_ts.
thread_ts_param = thread_ts if thread_ts and channel_type != "im" else None
# Slack rejects empty text payloads. Keep media-only messages media-only,
# but send a single blank message when the bot has no text or files to send.
@@ -278,4 +278,3 @@ class SlackChannel(BaseChannel):
if parts:
rows.append(" · ".join(parts))
return "\n".join(rows)

View File

@@ -179,6 +179,8 @@ class TelegramChannel(BaseChannel):
self._media_group_buffers: dict[str, dict] = {}
self._media_group_tasks: dict[str, asyncio.Task] = {}
self._message_threads: dict[tuple[str, int], int] = {}
self._bot_user_id: int | None = None
self._bot_username: str | None = None
def is_allowed(self, sender_id: str) -> bool:
"""Preserve Telegram's legacy id|username allowlist matching."""
@@ -242,6 +244,8 @@ class TelegramChannel(BaseChannel):
# Get bot info and register command menu
bot_info = await self._app.bot.get_me()
self._bot_user_id = getattr(bot_info, "id", None)
self._bot_username = getattr(bot_info, "username", None)
logger.info("Telegram bot @{} connected", bot_info.username)
try:
@@ -462,6 +466,70 @@ class TelegramChannel(BaseChannel):
"is_forum": bool(getattr(message.chat, "is_forum", False)),
}
async def _ensure_bot_identity(self) -> tuple[int | None, str | None]:
"""Load bot identity once and reuse it for mention/reply checks."""
if self._bot_user_id is not None or self._bot_username is not None:
return self._bot_user_id, self._bot_username
if not self._app:
return None, None
bot_info = await self._app.bot.get_me()
self._bot_user_id = getattr(bot_info, "id", None)
self._bot_username = getattr(bot_info, "username", None)
return self._bot_user_id, self._bot_username
@staticmethod
def _has_mention_entity(
text: str,
entities,
bot_username: str,
bot_id: int | None,
) -> bool:
"""Check Telegram mention entities against the bot username."""
handle = f"@{bot_username}".lower()
for entity in entities or []:
entity_type = getattr(entity, "type", None)
if entity_type == "text_mention":
user = getattr(entity, "user", None)
if user is not None and bot_id is not None and getattr(user, "id", None) == bot_id:
return True
continue
if entity_type != "mention":
continue
offset = getattr(entity, "offset", None)
length = getattr(entity, "length", None)
if offset is None or length is None:
continue
if text[offset : offset + length].lower() == handle:
return True
return handle in text.lower()
async def _is_group_message_for_bot(self, message) -> bool:
"""Allow group messages when policy is open, @mentioned, or replying to the bot."""
if message.chat.type == "private" or self.config.group_policy == "open":
return True
bot_id, bot_username = await self._ensure_bot_identity()
if bot_username:
text = message.text or ""
caption = message.caption or ""
if self._has_mention_entity(
text,
getattr(message, "entities", None),
bot_username,
bot_id,
):
return True
if self._has_mention_entity(
caption,
getattr(message, "caption_entities", None),
bot_username,
bot_id,
):
return True
reply_user = getattr(getattr(message, "reply_to_message", None), "from_user", None)
return bool(bot_id and reply_user and reply_user.id == bot_id)
def _remember_thread_context(self, message) -> None:
"""Cache topic thread id by chat/message id for follow-up replies."""
message_thread_id = getattr(message, "message_thread_id", None)
@@ -501,6 +569,9 @@ class TelegramChannel(BaseChannel):
# Store chat_id for replies
self._chat_ids[sender_id] = chat_id
if not await self._is_group_message_for_bot(message):
return
# Build content from text and/or media
content_parts = []
media_paths = []

View File

@@ -191,6 +191,8 @@ def onboard():
save_config(Config())
console.print(f"[green]✓[/green] Created config at {config_path}")
console.print("[dim]Config template now uses `maxTokens` + `contextWindowTokens`; `memoryWindow` is no longer a runtime setting.[/dim]")
# Create workspace
workspace = get_workspace_path()
@@ -283,6 +285,16 @@ def _load_runtime_config(config: str | None = None, workspace: str | None = None
return loaded
def _print_deprecated_memory_window_notice(config: Config) -> None:
"""Warn when running with old memoryWindow-only config."""
if config.agents.defaults.should_warn_deprecated_memory_window:
console.print(
"[yellow]Hint:[/yellow] Detected deprecated `memoryWindow` without "
"`contextWindowTokens`. `memoryWindow` is ignored; run "
"[cyan]nanobot onboard[/cyan] to refresh your config template."
)
# ============================================================================
# Gateway / Server
# ============================================================================
@@ -290,7 +302,7 @@ def _load_runtime_config(config: str | None = None, workspace: str | None = None
@app.command()
def gateway(
port: int = typer.Option(18790, "--port", "-p", help="Gateway port"),
port: int | None = typer.Option(None, "--port", "-p", help="Gateway port"),
workspace: str | None = typer.Option(None, "--workspace", "-w", help="Workspace directory"),
verbose: bool = typer.Option(False, "--verbose", "-v", help="Verbose output"),
config: str | None = typer.Option(None, "--config", "-c", help="Path to config file"),
@@ -310,6 +322,8 @@ def gateway(
logging.basicConfig(level=logging.DEBUG)
config = _load_runtime_config(config, workspace)
_print_deprecated_memory_window_notice(config)
port = port if port is not None else config.gateway.port
console.print(f"{__logo__} Starting nanobot gateway on port {port}...")
sync_workspace_templates(config.workspace_path)
@@ -330,8 +344,8 @@ def gateway(
temperature=config.agents.defaults.temperature,
max_tokens=config.agents.defaults.max_tokens,
max_iterations=config.agents.defaults.max_tool_iterations,
memory_window=config.agents.defaults.memory_window,
reasoning_effort=config.agents.defaults.reasoning_effort,
context_window_tokens=config.agents.defaults.context_window_tokens,
brave_api_key=config.tools.web.search.api_key or None,
web_proxy=config.tools.web.proxy or None,
exec_config=config.tools.exec,
@@ -493,6 +507,7 @@ def agent(
from nanobot.cron.service import CronService
config = _load_runtime_config(config, workspace)
_print_deprecated_memory_window_notice(config)
sync_workspace_templates(config.workspace_path)
bus = MessageBus()
@@ -515,8 +530,8 @@ def agent(
temperature=config.agents.defaults.temperature,
max_tokens=config.agents.defaults.max_tokens,
max_iterations=config.agents.defaults.max_tool_iterations,
memory_window=config.agents.defaults.memory_window,
reasoning_effort=config.agents.defaults.reasoning_effort,
context_window_tokens=config.agents.defaults.context_window_tokens,
brave_api_key=config.tools.web.search.api_key or None,
web_proxy=config.tools.web.proxy or None,
exec_config=config.tools.exec,

View File

@@ -33,6 +33,7 @@ class TelegramConfig(Base):
None # HTTP/SOCKS5 proxy URL, e.g. "http://127.0.0.1:7890" or "socks5://127.0.0.1:1080"
)
reply_to_message: bool = False # If true, bot replies quote the original message
group_policy: Literal["open", "mention"] = "mention" # "mention" responds when @mentioned or replied to, "open" responds to all
class FeishuConfig(Base):
@@ -236,11 +237,18 @@ class AgentDefaults(Base):
"auto" # Provider name (e.g. "anthropic", "openrouter") or "auto" for auto-detection
)
max_tokens: int = 8192
context_window_tokens: int = 65_536
temperature: float = 0.1
max_tool_iterations: int = 40
memory_window: int = 100
# Deprecated compatibility field: accepted from old configs but ignored at runtime.
memory_window: int | None = Field(default=None, exclude=True)
reasoning_effort: str | None = None # low / medium / high — enables LLM thinking mode
@property
def should_warn_deprecated_memory_window(self) -> bool:
"""Return True when old memoryWindow is present without contextWindowTokens."""
return self.memory_window is not None and "context_window_tokens" not in self.model_fields_set
class AgentsConfig(Base):
"""Agent configuration."""

View File

@@ -87,7 +87,7 @@ class HeartbeatService:
Returns (action, tasks) where action is 'skip' or 'run'.
"""
response = await self.provider.chat(
response = await self.provider.chat_with_retry(
messages=[
{"role": "system", "content": "You are a heartbeat agent. Call the heartbeat tool to report your decision."},
{"role": "user", "content": (

View File

@@ -1,9 +1,12 @@
"""Base LLM provider interface."""
import asyncio
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any
from loguru import logger
@dataclass
class ToolCallRequest:
@@ -37,6 +40,22 @@ class LLMProvider(ABC):
while maintaining a consistent interface.
"""
_CHAT_RETRY_DELAYS = (1, 2, 4)
_TRANSIENT_ERROR_MARKERS = (
"429",
"rate limit",
"500",
"502",
"503",
"504",
"overloaded",
"timeout",
"timed out",
"connection",
"server error",
"temporarily unavailable",
)
def __init__(self, api_key: str | None = None, api_base: str | None = None):
self.api_key = api_key
self.api_base = api_base
@@ -126,6 +145,71 @@ class LLMProvider(ABC):
"""
pass
@classmethod
def _is_transient_error(cls, content: str | None) -> bool:
err = (content or "").lower()
return any(marker in err for marker in cls._TRANSIENT_ERROR_MARKERS)
async def chat_with_retry(
self,
messages: list[dict[str, Any]],
tools: list[dict[str, Any]] | None = None,
model: str | None = None,
max_tokens: int = 4096,
temperature: float = 0.7,
reasoning_effort: str | None = None,
) -> LLMResponse:
"""Call chat() with retry on transient provider failures."""
for attempt, delay in enumerate(self._CHAT_RETRY_DELAYS, start=1):
try:
response = await self.chat(
messages=messages,
tools=tools,
model=model,
max_tokens=max_tokens,
temperature=temperature,
reasoning_effort=reasoning_effort,
)
except asyncio.CancelledError:
raise
except Exception as exc:
response = LLMResponse(
content=f"Error calling LLM: {exc}",
finish_reason="error",
)
if response.finish_reason != "error":
return response
if not self._is_transient_error(response.content):
return response
err = (response.content or "").lower()
logger.warning(
"LLM transient error (attempt {}/{}), retrying in {}s: {}",
attempt,
len(self._CHAT_RETRY_DELAYS),
delay,
err[:120],
)
await asyncio.sleep(delay)
try:
return await self.chat(
messages=messages,
tools=tools,
model=model,
max_tokens=max_tokens,
temperature=temperature,
reasoning_effort=reasoning_effort,
)
except asyncio.CancelledError:
raise
except Exception as exc:
return LLMResponse(
content=f"Error calling LLM: {exc}",
finish_reason="error",
)
@abstractmethod
def get_default_model(self) -> str:
"""Get the default model for this provider."""

View File

@@ -268,6 +268,8 @@ Skip this step only if the skill being developed already exists, and iteration o
When creating a new skill from scratch, always run the `init_skill.py` script. The script conveniently generates a new template skill directory that automatically includes everything a skill requires, making the skill creation process much more efficient and reliable.
For `nanobot`, custom skills should live under the active workspace `skills/` directory so they can be discovered automatically at runtime (for example, `<workspace>/skills/my-skill/SKILL.md`).
Usage:
```bash
@@ -277,9 +279,9 @@ scripts/init_skill.py <skill-name> --path <output-directory> [--resources script
Examples:
```bash
scripts/init_skill.py my-skill --path skills/public
scripts/init_skill.py my-skill --path skills/public --resources scripts,references
scripts/init_skill.py my-skill --path skills/public --resources scripts --examples
scripts/init_skill.py my-skill --path ./workspace/skills
scripts/init_skill.py my-skill --path ./workspace/skills --resources scripts,references
scripts/init_skill.py my-skill --path ./workspace/skills --resources scripts --examples
```
The script:
@@ -326,7 +328,7 @@ Write the YAML frontmatter with `name` and `description`:
- Include all "when to use" information here - Not in the body. The body is only loaded after triggering, so "When to Use This Skill" sections in the body are not helpful to the agent.
- Example description for a `docx` skill: "Comprehensive document creation, editing, and analysis with support for tracked changes, comments, formatting preservation, and text extraction. Use when the agent needs to work with professional documents (.docx files) for: (1) Creating new documents, (2) Modifying or editing content, (3) Working with tracked changes, (4) Adding comments, or any other document tasks"
Do not include any other fields in YAML frontmatter.
Keep frontmatter minimal. In `nanobot`, `metadata` and `always` are also supported when needed, but avoid adding extra fields unless they are actually required.
##### Body
@@ -349,7 +351,6 @@ scripts/package_skill.py <path/to/skill-folder> ./dist
The packaging script will:
1. **Validate** the skill automatically, checking:
- YAML frontmatter format and required fields
- Skill naming conventions and directory structure
- Description completeness and quality
@@ -357,6 +358,8 @@ The packaging script will:
2. **Package** the skill if validation passes, creating a .skill file named after the skill (e.g., `my-skill.skill`) that includes all files and maintains the proper directory structure for distribution. The .skill file is a zip file with a .skill extension.
Security restriction: symlinks are rejected and packaging fails when any symlink is present.
If validation fails, the script will report the errors and exit without creating a package. Fix any validation errors and run the packaging command again.
### Step 6: Iterate

View File

@@ -0,0 +1,378 @@
#!/usr/bin/env python3
"""
Skill Initializer - Creates a new skill from template
Usage:
init_skill.py <skill-name> --path <path> [--resources scripts,references,assets] [--examples]
Examples:
init_skill.py my-new-skill --path skills/public
init_skill.py my-new-skill --path skills/public --resources scripts,references
init_skill.py my-api-helper --path skills/private --resources scripts --examples
init_skill.py custom-skill --path /custom/location
"""
import argparse
import re
import sys
from pathlib import Path
MAX_SKILL_NAME_LENGTH = 64
ALLOWED_RESOURCES = {"scripts", "references", "assets"}
SKILL_TEMPLATE = """---
name: {skill_name}
description: [TODO: Complete and informative explanation of what the skill does and when to use it. Include WHEN to use this skill - specific scenarios, file types, or tasks that trigger it.]
---
# {skill_title}
## Overview
[TODO: 1-2 sentences explaining what this skill enables]
## Structuring This Skill
[TODO: Choose the structure that best fits this skill's purpose. Common patterns:
**1. Workflow-Based** (best for sequential processes)
- Works well when there are clear step-by-step procedures
- Example: DOCX skill with "Workflow Decision Tree" -> "Reading" -> "Creating" -> "Editing"
- Structure: ## Overview -> ## Workflow Decision Tree -> ## Step 1 -> ## Step 2...
**2. Task-Based** (best for tool collections)
- Works well when the skill offers different operations/capabilities
- Example: PDF skill with "Quick Start" -> "Merge PDFs" -> "Split PDFs" -> "Extract Text"
- Structure: ## Overview -> ## Quick Start -> ## Task Category 1 -> ## Task Category 2...
**3. Reference/Guidelines** (best for standards or specifications)
- Works well for brand guidelines, coding standards, or requirements
- Example: Brand styling with "Brand Guidelines" -> "Colors" -> "Typography" -> "Features"
- Structure: ## Overview -> ## Guidelines -> ## Specifications -> ## Usage...
**4. Capabilities-Based** (best for integrated systems)
- Works well when the skill provides multiple interrelated features
- Example: Product Management with "Core Capabilities" -> numbered capability list
- Structure: ## Overview -> ## Core Capabilities -> ### 1. Feature -> ### 2. Feature...
Patterns can be mixed and matched as needed. Most skills combine patterns (e.g., start with task-based, add workflow for complex operations).
Delete this entire "Structuring This Skill" section when done - it's just guidance.]
## [TODO: Replace with the first main section based on chosen structure]
[TODO: Add content here. See examples in existing skills:
- Code samples for technical skills
- Decision trees for complex workflows
- Concrete examples with realistic user requests
- References to scripts/templates/references as needed]
## Resources (optional)
Create only the resource directories this skill actually needs. Delete this section if no resources are required.
### scripts/
Executable code (Python/Bash/etc.) that can be run directly to perform specific operations.
**Examples from other skills:**
- PDF skill: `fill_fillable_fields.py`, `extract_form_field_info.py` - utilities for PDF manipulation
- DOCX skill: `document.py`, `utilities.py` - Python modules for document processing
**Appropriate for:** Python scripts, shell scripts, or any executable code that performs automation, data processing, or specific operations.
**Note:** Scripts may be executed without loading into context, but can still be read by Codex for patching or environment adjustments.
### references/
Documentation and reference material intended to be loaded into context to inform Codex's process and thinking.
**Examples from other skills:**
- Product management: `communication.md`, `context_building.md` - detailed workflow guides
- BigQuery: API reference documentation and query examples
- Finance: Schema documentation, company policies
**Appropriate for:** In-depth documentation, API references, database schemas, comprehensive guides, or any detailed information that Codex should reference while working.
### assets/
Files not intended to be loaded into context, but rather used within the output Codex produces.
**Examples from other skills:**
- Brand styling: PowerPoint template files (.pptx), logo files
- Frontend builder: HTML/React boilerplate project directories
- Typography: Font files (.ttf, .woff2)
**Appropriate for:** Templates, boilerplate code, document templates, images, icons, fonts, or any files meant to be copied or used in the final output.
---
**Not every skill requires all three types of resources.**
"""
EXAMPLE_SCRIPT = '''#!/usr/bin/env python3
"""
Example helper script for {skill_name}
This is a placeholder script that can be executed directly.
Replace with actual implementation or delete if not needed.
Example real scripts from other skills:
- pdf/scripts/fill_fillable_fields.py - Fills PDF form fields
- pdf/scripts/convert_pdf_to_images.py - Converts PDF pages to images
"""
def main():
print("This is an example script for {skill_name}")
# TODO: Add actual script logic here
# This could be data processing, file conversion, API calls, etc.
if __name__ == "__main__":
main()
'''
EXAMPLE_REFERENCE = """# Reference Documentation for {skill_title}
This is a placeholder for detailed reference documentation.
Replace with actual reference content or delete if not needed.
Example real reference docs from other skills:
- product-management/references/communication.md - Comprehensive guide for status updates
- product-management/references/context_building.md - Deep-dive on gathering context
- bigquery/references/ - API references and query examples
## When Reference Docs Are Useful
Reference docs are ideal for:
- Comprehensive API documentation
- Detailed workflow guides
- Complex multi-step processes
- Information too lengthy for main SKILL.md
- Content that's only needed for specific use cases
## Structure Suggestions
### API Reference Example
- Overview
- Authentication
- Endpoints with examples
- Error codes
- Rate limits
### Workflow Guide Example
- Prerequisites
- Step-by-step instructions
- Common patterns
- Troubleshooting
- Best practices
"""
EXAMPLE_ASSET = """# Example Asset File
This placeholder represents where asset files would be stored.
Replace with actual asset files (templates, images, fonts, etc.) or delete if not needed.
Asset files are NOT intended to be loaded into context, but rather used within
the output Codex produces.
Example asset files from other skills:
- Brand guidelines: logo.png, slides_template.pptx
- Frontend builder: hello-world/ directory with HTML/React boilerplate
- Typography: custom-font.ttf, font-family.woff2
- Data: sample_data.csv, test_dataset.json
## Common Asset Types
- Templates: .pptx, .docx, boilerplate directories
- Images: .png, .jpg, .svg, .gif
- Fonts: .ttf, .otf, .woff, .woff2
- Boilerplate code: Project directories, starter files
- Icons: .ico, .svg
- Data files: .csv, .json, .xml, .yaml
Note: This is a text placeholder. Actual assets can be any file type.
"""
def normalize_skill_name(skill_name):
"""Normalize a skill name to lowercase hyphen-case."""
normalized = skill_name.strip().lower()
normalized = re.sub(r"[^a-z0-9]+", "-", normalized)
normalized = normalized.strip("-")
normalized = re.sub(r"-{2,}", "-", normalized)
return normalized
def title_case_skill_name(skill_name):
"""Convert hyphenated skill name to Title Case for display."""
return " ".join(word.capitalize() for word in skill_name.split("-"))
def parse_resources(raw_resources):
if not raw_resources:
return []
resources = [item.strip() for item in raw_resources.split(",") if item.strip()]
invalid = sorted({item for item in resources if item not in ALLOWED_RESOURCES})
if invalid:
allowed = ", ".join(sorted(ALLOWED_RESOURCES))
print(f"[ERROR] Unknown resource type(s): {', '.join(invalid)}")
print(f" Allowed: {allowed}")
sys.exit(1)
deduped = []
seen = set()
for resource in resources:
if resource not in seen:
deduped.append(resource)
seen.add(resource)
return deduped
def create_resource_dirs(skill_dir, skill_name, skill_title, resources, include_examples):
for resource in resources:
resource_dir = skill_dir / resource
resource_dir.mkdir(exist_ok=True)
if resource == "scripts":
if include_examples:
example_script = resource_dir / "example.py"
example_script.write_text(EXAMPLE_SCRIPT.format(skill_name=skill_name))
example_script.chmod(0o755)
print("[OK] Created scripts/example.py")
else:
print("[OK] Created scripts/")
elif resource == "references":
if include_examples:
example_reference = resource_dir / "api_reference.md"
example_reference.write_text(EXAMPLE_REFERENCE.format(skill_title=skill_title))
print("[OK] Created references/api_reference.md")
else:
print("[OK] Created references/")
elif resource == "assets":
if include_examples:
example_asset = resource_dir / "example_asset.txt"
example_asset.write_text(EXAMPLE_ASSET)
print("[OK] Created assets/example_asset.txt")
else:
print("[OK] Created assets/")
def init_skill(skill_name, path, resources, include_examples):
"""
Initialize a new skill directory with template SKILL.md.
Args:
skill_name: Name of the skill
path: Path where the skill directory should be created
resources: Resource directories to create
include_examples: Whether to create example files in resource directories
Returns:
Path to created skill directory, or None if error
"""
# Determine skill directory path
skill_dir = Path(path).resolve() / skill_name
# Check if directory already exists
if skill_dir.exists():
print(f"[ERROR] Skill directory already exists: {skill_dir}")
return None
# Create skill directory
try:
skill_dir.mkdir(parents=True, exist_ok=False)
print(f"[OK] Created skill directory: {skill_dir}")
except Exception as e:
print(f"[ERROR] Error creating directory: {e}")
return None
# Create SKILL.md from template
skill_title = title_case_skill_name(skill_name)
skill_content = SKILL_TEMPLATE.format(skill_name=skill_name, skill_title=skill_title)
skill_md_path = skill_dir / "SKILL.md"
try:
skill_md_path.write_text(skill_content)
print("[OK] Created SKILL.md")
except Exception as e:
print(f"[ERROR] Error creating SKILL.md: {e}")
return None
# Create resource directories if requested
if resources:
try:
create_resource_dirs(skill_dir, skill_name, skill_title, resources, include_examples)
except Exception as e:
print(f"[ERROR] Error creating resource directories: {e}")
return None
# Print next steps
print(f"\n[OK] Skill '{skill_name}' initialized successfully at {skill_dir}")
print("\nNext steps:")
print("1. Edit SKILL.md to complete the TODO items and update the description")
if resources:
if include_examples:
print("2. Customize or delete the example files in scripts/, references/, and assets/")
else:
print("2. Add resources to scripts/, references/, and assets/ as needed")
else:
print("2. Create resource directories only if needed (scripts/, references/, assets/)")
print("3. Run the validator when ready to check the skill structure")
return skill_dir
def main():
parser = argparse.ArgumentParser(
description="Create a new skill directory with a SKILL.md template.",
)
parser.add_argument("skill_name", help="Skill name (normalized to hyphen-case)")
parser.add_argument("--path", required=True, help="Output directory for the skill")
parser.add_argument(
"--resources",
default="",
help="Comma-separated list: scripts,references,assets",
)
parser.add_argument(
"--examples",
action="store_true",
help="Create example files inside the selected resource directories",
)
args = parser.parse_args()
raw_skill_name = args.skill_name
skill_name = normalize_skill_name(raw_skill_name)
if not skill_name:
print("[ERROR] Skill name must include at least one letter or digit.")
sys.exit(1)
if len(skill_name) > MAX_SKILL_NAME_LENGTH:
print(
f"[ERROR] Skill name '{skill_name}' is too long ({len(skill_name)} characters). "
f"Maximum is {MAX_SKILL_NAME_LENGTH} characters."
)
sys.exit(1)
if skill_name != raw_skill_name:
print(f"Note: Normalized skill name from '{raw_skill_name}' to '{skill_name}'.")
resources = parse_resources(args.resources)
if args.examples and not resources:
print("[ERROR] --examples requires --resources to be set.")
sys.exit(1)
path = args.path
print(f"Initializing skill: {skill_name}")
print(f" Location: {path}")
if resources:
print(f" Resources: {', '.join(resources)}")
if args.examples:
print(" Examples: enabled")
else:
print(" Resources: none (create as needed)")
print()
result = init_skill(skill_name, path, resources, args.examples)
if result:
sys.exit(0)
else:
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,154 @@
#!/usr/bin/env python3
"""
Skill Packager - Creates a distributable .skill file of a skill folder
Usage:
python package_skill.py <path/to/skill-folder> [output-directory]
Example:
python package_skill.py skills/public/my-skill
python package_skill.py skills/public/my-skill ./dist
"""
import sys
import zipfile
from pathlib import Path
from quick_validate import validate_skill
def _is_within(path: Path, root: Path) -> bool:
try:
path.relative_to(root)
return True
except ValueError:
return False
def _cleanup_partial_archive(skill_filename: Path) -> None:
try:
if skill_filename.exists():
skill_filename.unlink()
except OSError:
pass
def package_skill(skill_path, output_dir=None):
"""
Package a skill folder into a .skill file.
Args:
skill_path: Path to the skill folder
output_dir: Optional output directory for the .skill file (defaults to current directory)
Returns:
Path to the created .skill file, or None if error
"""
skill_path = Path(skill_path).resolve()
# Validate skill folder exists
if not skill_path.exists():
print(f"[ERROR] Skill folder not found: {skill_path}")
return None
if not skill_path.is_dir():
print(f"[ERROR] Path is not a directory: {skill_path}")
return None
# Validate SKILL.md exists
skill_md = skill_path / "SKILL.md"
if not skill_md.exists():
print(f"[ERROR] SKILL.md not found in {skill_path}")
return None
# Run validation before packaging
print("Validating skill...")
valid, message = validate_skill(skill_path)
if not valid:
print(f"[ERROR] Validation failed: {message}")
print(" Please fix the validation errors before packaging.")
return None
print(f"[OK] {message}\n")
# Determine output location
skill_name = skill_path.name
if output_dir:
output_path = Path(output_dir).resolve()
output_path.mkdir(parents=True, exist_ok=True)
else:
output_path = Path.cwd()
skill_filename = output_path / f"{skill_name}.skill"
EXCLUDED_DIRS = {".git", ".svn", ".hg", "__pycache__", "node_modules"}
files_to_package = []
resolved_archive = skill_filename.resolve()
for file_path in skill_path.rglob("*"):
# Fail closed on symlinks so the packaged contents are explicit and predictable.
if file_path.is_symlink():
print(f"[ERROR] Symlink not allowed in packaged skill: {file_path}")
_cleanup_partial_archive(skill_filename)
return None
rel_parts = file_path.relative_to(skill_path).parts
if any(part in EXCLUDED_DIRS for part in rel_parts):
continue
if file_path.is_file():
resolved_file = file_path.resolve()
if not _is_within(resolved_file, skill_path):
print(f"[ERROR] File escapes skill root: {file_path}")
_cleanup_partial_archive(skill_filename)
return None
# If output lives under skill_path, avoid writing archive into itself.
if resolved_file == resolved_archive:
print(f"[WARN] Skipping output archive: {file_path}")
continue
files_to_package.append(file_path)
# Create the .skill file (zip format)
try:
with zipfile.ZipFile(skill_filename, "w", zipfile.ZIP_DEFLATED) as zipf:
for file_path in files_to_package:
# Calculate the relative path within the zip.
arcname = Path(skill_name) / file_path.relative_to(skill_path)
zipf.write(file_path, arcname)
print(f" Added: {arcname}")
print(f"\n[OK] Successfully packaged skill to: {skill_filename}")
return skill_filename
except Exception as e:
_cleanup_partial_archive(skill_filename)
print(f"[ERROR] Error creating .skill file: {e}")
return None
def main():
if len(sys.argv) < 2:
print("Usage: python package_skill.py <path/to/skill-folder> [output-directory]")
print("\nExample:")
print(" python package_skill.py skills/public/my-skill")
print(" python package_skill.py skills/public/my-skill ./dist")
sys.exit(1)
skill_path = sys.argv[1]
output_dir = sys.argv[2] if len(sys.argv) > 2 else None
print(f"Packaging skill: {skill_path}")
if output_dir:
print(f" Output directory: {output_dir}")
print()
result = package_skill(skill_path, output_dir)
if result:
sys.exit(0)
else:
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,213 @@
#!/usr/bin/env python3
"""
Minimal validator for nanobot skill folders.
"""
import re
import sys
from pathlib import Path
from typing import Optional
try:
import yaml
except ModuleNotFoundError:
yaml = None
MAX_SKILL_NAME_LENGTH = 64
ALLOWED_FRONTMATTER_KEYS = {
"name",
"description",
"metadata",
"always",
"license",
"allowed-tools",
}
ALLOWED_RESOURCE_DIRS = {"scripts", "references", "assets"}
PLACEHOLDER_MARKERS = ("[todo", "todo:")
def _extract_frontmatter(content: str) -> Optional[str]:
lines = content.splitlines()
if not lines or lines[0].strip() != "---":
return None
for i in range(1, len(lines)):
if lines[i].strip() == "---":
return "\n".join(lines[1:i])
return None
def _parse_simple_frontmatter(frontmatter_text: str) -> Optional[dict[str, str]]:
"""Fallback parser for simple frontmatter when PyYAML is unavailable."""
parsed: dict[str, str] = {}
current_key: Optional[str] = None
multiline_key: Optional[str] = None
for raw_line in frontmatter_text.splitlines():
stripped = raw_line.strip()
if not stripped or stripped.startswith("#"):
continue
is_indented = raw_line[:1].isspace()
if is_indented:
if current_key is None:
return None
current_value = parsed[current_key]
parsed[current_key] = f"{current_value}\n{stripped}" if current_value else stripped
continue
if ":" not in stripped:
return None
key, value = stripped.split(":", 1)
key = key.strip()
value = value.strip()
if not key:
return None
if value in {"|", ">"}:
parsed[key] = ""
current_key = key
multiline_key = key
continue
if (value.startswith('"') and value.endswith('"')) or (
value.startswith("'") and value.endswith("'")
):
value = value[1:-1]
parsed[key] = value
current_key = key
multiline_key = None
if multiline_key is not None and multiline_key not in parsed:
return None
return parsed
def _load_frontmatter(frontmatter_text: str) -> tuple[Optional[dict], Optional[str]]:
if yaml is not None:
try:
frontmatter = yaml.safe_load(frontmatter_text)
except yaml.YAMLError as exc:
return None, f"Invalid YAML in frontmatter: {exc}"
if not isinstance(frontmatter, dict):
return None, "Frontmatter must be a YAML dictionary"
return frontmatter, None
frontmatter = _parse_simple_frontmatter(frontmatter_text)
if frontmatter is None:
return None, "Invalid YAML in frontmatter: unsupported syntax without PyYAML installed"
return frontmatter, None
def _validate_skill_name(name: str, folder_name: str) -> Optional[str]:
if not re.fullmatch(r"[a-z0-9]+(?:-[a-z0-9]+)*", name):
return (
f"Name '{name}' should be hyphen-case "
"(lowercase letters, digits, and single hyphens only)"
)
if len(name) > MAX_SKILL_NAME_LENGTH:
return (
f"Name is too long ({len(name)} characters). "
f"Maximum is {MAX_SKILL_NAME_LENGTH} characters."
)
if name != folder_name:
return f"Skill name '{name}' must match directory name '{folder_name}'"
return None
def _validate_description(description: str) -> Optional[str]:
trimmed = description.strip()
if not trimmed:
return "Description cannot be empty"
lowered = trimmed.lower()
if any(marker in lowered for marker in PLACEHOLDER_MARKERS):
return "Description still contains TODO placeholder text"
if "<" in trimmed or ">" in trimmed:
return "Description cannot contain angle brackets (< or >)"
if len(trimmed) > 1024:
return f"Description is too long ({len(trimmed)} characters). Maximum is 1024 characters."
return None
def validate_skill(skill_path):
"""Validate a skill folder structure and required frontmatter."""
skill_path = Path(skill_path).resolve()
if not skill_path.exists():
return False, f"Skill folder not found: {skill_path}"
if not skill_path.is_dir():
return False, f"Path is not a directory: {skill_path}"
skill_md = skill_path / "SKILL.md"
if not skill_md.exists():
return False, "SKILL.md not found"
try:
content = skill_md.read_text(encoding="utf-8")
except OSError as exc:
return False, f"Could not read SKILL.md: {exc}"
frontmatter_text = _extract_frontmatter(content)
if frontmatter_text is None:
return False, "Invalid frontmatter format"
frontmatter, error = _load_frontmatter(frontmatter_text)
if error:
return False, error
unexpected_keys = sorted(set(frontmatter.keys()) - ALLOWED_FRONTMATTER_KEYS)
if unexpected_keys:
allowed = ", ".join(sorted(ALLOWED_FRONTMATTER_KEYS))
unexpected = ", ".join(unexpected_keys)
return (
False,
f"Unexpected key(s) in SKILL.md frontmatter: {unexpected}. Allowed properties are: {allowed}",
)
if "name" not in frontmatter:
return False, "Missing 'name' in frontmatter"
if "description" not in frontmatter:
return False, "Missing 'description' in frontmatter"
name = frontmatter["name"]
if not isinstance(name, str):
return False, f"Name must be a string, got {type(name).__name__}"
name_error = _validate_skill_name(name.strip(), skill_path.name)
if name_error:
return False, name_error
description = frontmatter["description"]
if not isinstance(description, str):
return False, f"Description must be a string, got {type(description).__name__}"
description_error = _validate_description(description)
if description_error:
return False, description_error
always = frontmatter.get("always")
if always is not None and not isinstance(always, bool):
return False, f"'always' must be a boolean, got {type(always).__name__}"
for child in skill_path.iterdir():
if child.name == "SKILL.md":
continue
if child.is_dir() and child.name in ALLOWED_RESOURCE_DIRS:
continue
if child.is_symlink():
continue
return (
False,
f"Unexpected file or directory in skill root: {child.name}. "
"Only SKILL.md, scripts/, references/, and assets/ are allowed.",
)
return True, "Skill is valid!"
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: python quick_validate.py <skill_directory>")
sys.exit(1)
valid, message = validate_skill(sys.argv[1])
print(message)
sys.exit(0 if valid else 1)

View File

@@ -1,8 +1,12 @@
"""Utility functions for nanobot."""
import json
import re
from datetime import datetime
from pathlib import Path
from typing import Any
import tiktoken
def detect_image_mime(data: bytes) -> str | None:
@@ -68,6 +72,104 @@ def split_message(content: str, max_len: int = 2000) -> list[str]:
return chunks
def build_assistant_message(
content: str | None,
tool_calls: list[dict[str, Any]] | None = None,
reasoning_content: str | None = None,
thinking_blocks: list[dict] | None = None,
) -> dict[str, Any]:
"""Build a provider-safe assistant message with optional reasoning fields."""
msg: dict[str, Any] = {"role": "assistant", "content": content}
if tool_calls:
msg["tool_calls"] = tool_calls
if reasoning_content is not None:
msg["reasoning_content"] = reasoning_content
if thinking_blocks:
msg["thinking_blocks"] = thinking_blocks
return msg
def estimate_prompt_tokens(
messages: list[dict[str, Any]],
tools: list[dict[str, Any]] | None = None,
) -> int:
"""Estimate prompt tokens with tiktoken."""
try:
enc = tiktoken.get_encoding("cl100k_base")
parts: list[str] = []
for msg in messages:
content = msg.get("content")
if isinstance(content, str):
parts.append(content)
elif isinstance(content, list):
for part in content:
if isinstance(part, dict) and part.get("type") == "text":
txt = part.get("text", "")
if txt:
parts.append(txt)
if tools:
parts.append(json.dumps(tools, ensure_ascii=False))
return len(enc.encode("\n".join(parts)))
except Exception:
return 0
def estimate_message_tokens(message: dict[str, Any]) -> int:
"""Estimate prompt tokens contributed by one persisted message."""
content = message.get("content")
parts: list[str] = []
if isinstance(content, str):
parts.append(content)
elif isinstance(content, list):
for part in content:
if isinstance(part, dict) and part.get("type") == "text":
text = part.get("text", "")
if text:
parts.append(text)
else:
parts.append(json.dumps(part, ensure_ascii=False))
elif content is not None:
parts.append(json.dumps(content, ensure_ascii=False))
for key in ("name", "tool_call_id"):
value = message.get(key)
if isinstance(value, str) and value:
parts.append(value)
if message.get("tool_calls"):
parts.append(json.dumps(message["tool_calls"], ensure_ascii=False))
payload = "\n".join(parts)
if not payload:
return 1
try:
enc = tiktoken.get_encoding("cl100k_base")
return max(1, len(enc.encode(payload)))
except Exception:
return max(1, len(payload) // 4)
def estimate_prompt_tokens_chain(
provider: Any,
model: str | None,
messages: list[dict[str, Any]],
tools: list[dict[str, Any]] | None = None,
) -> tuple[int, str]:
"""Estimate prompt tokens via provider counter first, then tiktoken fallback."""
provider_counter = getattr(provider, "estimate_prompt_tokens", None)
if callable(provider_counter):
try:
tokens, source = provider_counter(messages, tools, model)
if isinstance(tokens, (int, float)) and tokens > 0:
return int(tokens), str(source or "provider_counter")
except Exception:
pass
estimated = estimate_prompt_tokens(messages, tools)
if estimated > 0:
return int(estimated), "tiktoken"
return 0, "none"
def sync_workspace_templates(workspace: Path, silent: bool = False) -> list[str]:
"""Sync bundled templates to workspace. Only creates missing files."""
from importlib.resources import files as pkg_files
@@ -88,7 +190,7 @@ def sync_workspace_templates(workspace: Path, silent: bool = False) -> list[str]
added.append(str(dest.relative_to(workspace)))
for item in tpl.iterdir():
if item.name.endswith(".md"):
if item.name.endswith(".md") and not item.name.startswith("."):
_write(item, workspace / item.name)
_write(tpl / "memory" / "MEMORY.md", workspace / "memory" / "MEMORY.md")
_write(None, workspace / "memory" / "HISTORY.md")