merge origin/main into pr-1704

This commit is contained in:
Re-bin
2026-03-10 18:09:15 +00:00
32 changed files with 1712 additions and 40 deletions

View File

@@ -20,6 +20,7 @@
## 📢 News ## 📢 News
- **2026-03-08** 🚀 Released **v0.1.4.post4** — a reliability-packed release with safer defaults, better multi-instance support, sturdier MCP, and major channel and provider improvements. Please see [release notes](https://github.com/HKUDS/nanobot/releases/tag/v0.1.4.post4) for details.
- **2026-03-07** 🚀 Azure OpenAI provider, WhatsApp media, QQ group chats, and more Telegram/Feishu polish. - **2026-03-07** 🚀 Azure OpenAI provider, WhatsApp media, QQ group chats, and more Telegram/Feishu polish.
- **2026-03-06** 🪄 Lighter providers, smarter media handling, and sturdier memory and CLI compatibility. - **2026-03-06** 🪄 Lighter providers, smarter media handling, and sturdier memory and CLI compatibility.
- **2026-03-05** ⚡️ Telegram draft streaming, MCP SSE support, and broader channel reliability fixes. - **2026-03-05** ⚡️ Telegram draft streaming, MCP SSE support, and broader channel reliability fixes.
@@ -122,6 +123,29 @@ uv tool install nanobot-ai
pip install nanobot-ai pip install nanobot-ai
``` ```
### Update to latest version
**PyPI / pip**
```bash
pip install -U nanobot-ai
nanobot --version
```
**uv**
```bash
uv tool upgrade nanobot-ai
nanobot --version
```
**Using WhatsApp?** Rebuild the local bridge after upgrading:
```bash
rm -rf ~/.nanobot/bridge
nanobot channels login
```
## 🚀 Quick Start ## 🚀 Quick Start
> [!TIP] > [!TIP]
@@ -374,7 +398,7 @@ pip install nanobot-ai[matrix]
| Option | Description | | Option | Description |
|--------|-------------| |--------|-------------|
| `allowFrom` | User IDs allowed to interact. Empty = all senders. | | `allowFrom` | User IDs allowed to interact. Empty denies all; use `["*"]` to allow everyone. |
| `groupPolicy` | `open` (default), `mention`, or `allowlist`. | | `groupPolicy` | `open` (default), `mention`, or `allowlist`. |
| `groupAllowFrom` | Room allowlist (used when policy is `allowlist`). | | `groupAllowFrom` | Room allowlist (used when policy is `allowlist`). |
| `allowRoomMentions` | Accept `@room` mentions in mention mode. | | `allowRoomMentions` | Accept `@room` mentions in mention mode. |
@@ -428,7 +452,7 @@ nanobot gateway
``` ```
> WhatsApp bridge updates are not applied automatically for existing installations. > WhatsApp bridge updates are not applied automatically for existing installations.
> If you upgrade nanobot and need the latest WhatsApp bridge, run: > After upgrading nanobot, rebuild the local bridge with:
> `rm -rf ~/.nanobot/bridge && nanobot channels login` > `rm -rf ~/.nanobot/bridge && nanobot channels login`
</details> </details>
@@ -900,13 +924,13 @@ MCP tools are automatically discovered and registered on startup. The LLM can us
> [!TIP] > [!TIP]
> For production deployments, set `"restrictToWorkspace": true` in your config to sandbox the agent. > For production deployments, set `"restrictToWorkspace": true` in your config to sandbox the agent.
> **Change in source / post-`v0.1.4.post3`:** In `v0.1.4.post3` and earlier, an empty `allowFrom` means "allow all senders". In newer versions (including building from source), **empty `allowFrom` denies all access by default**. To allow all senders, set `"allowFrom": ["*"]`. > In `v0.1.4.post3` and earlier, an empty `allowFrom` allowed all senders. Since `v0.1.4.post4`, empty `allowFrom` denies all access by default. To allow all senders, set `"allowFrom": ["*"]`.
| Option | Default | Description | | Option | Default | Description |
|--------|---------|-------------| |--------|---------|-------------|
| `tools.restrictToWorkspace` | `false` | When `true`, restricts **all** agent tools (shell, file read/write/edit, list) to the workspace directory. Prevents path traversal and out-of-scope access. | | `tools.restrictToWorkspace` | `false` | When `true`, restricts **all** agent tools (shell, file read/write/edit, list) to the workspace directory. Prevents path traversal and out-of-scope access. |
| `tools.exec.pathAppend` | `""` | Extra directories to append to `PATH` when running shell commands (e.g. `/usr/sbin` for `ufw`). | | `tools.exec.pathAppend` | `""` | Extra directories to append to `PATH` when running shell commands (e.g. `/usr/sbin` for `ufw`). |
| `channels.*.allowFrom` | `[]` (allow all) | Whitelist of user IDs. Empty = allow everyone; non-empty = only listed users can interact. | | `channels.*.allowFrom` | `[]` (deny all) | Whitelist of user IDs. Empty denies all; use `["*"]` to allow everyone. |
## 🧩 Multiple Instances ## 🧩 Multiple Instances

View File

@@ -55,7 +55,7 @@ chmod 600 ~/.nanobot/config.json
``` ```
**Security Notes:** **Security Notes:**
- In `v0.1.4.post3` and earlier, an empty `allowFrom` allows all users. In newer versions (including source builds), **empty `allowFrom` denies all access** — set `["*"]` to explicitly allow everyone. - In `v0.1.4.post3` and earlier, an empty `allowFrom` allowed all users. Since `v0.1.4.post4`, empty `allowFrom` denies all access by default — set `["*"]` to explicitly allow everyone.
- Get your Telegram user ID from `@userinfobot` - Get your Telegram user ID from `@userinfobot`
- Use full phone numbers with country code for WhatsApp - Use full phone numbers with country code for WhatsApp
- Review access logs regularly for unauthorized access attempts - Review access logs regularly for unauthorized access attempts
@@ -212,7 +212,7 @@ If you suspect a security breach:
- Input length limits on HTTP requests - Input length limits on HTTP requests
✅ **Authentication** ✅ **Authentication**
- Allow-list based access control — in `v0.1.4.post3` and earlier empty means allow all; in newer versions empty means deny all (`["*"]` to explicitly allow all) - Allow-list based access control — in `v0.1.4.post3` and earlier empty `allowFrom` allowed all; since `v0.1.4.post4` it denies all (`["*"]` explicitly allows all)
- Failed authentication attempt logging - Failed authentication attempt logging
✅ **Resource Protection** ✅ **Resource Protection**

View File

@@ -15,7 +15,7 @@ root=$(cat nanobot/__init__.py nanobot/__main__.py | wc -l)
printf " %-16s %5s lines\n" "(root)" "$root" printf " %-16s %5s lines\n" "(root)" "$root"
echo "" echo ""
total=$(find nanobot -name "*.py" ! -path "*/channels/*" ! -path "*/cli/*" ! -path "*/providers/*" | xargs cat | wc -l) total=$(find nanobot -name "*.py" ! -path "*/channels/*" ! -path "*/cli/*" ! -path "*/providers/*" ! -path "*/skills/*" | xargs cat | wc -l)
echo " Core total: $total lines" echo " Core total: $total lines"
echo "" echo ""
echo " (excludes: channels/, cli/, providers/)" echo " (excludes: channels/, cli/, providers/, skills/)"

View File

@@ -2,5 +2,5 @@
nanobot - A lightweight AI agent framework nanobot - A lightweight AI agent framework
""" """
__version__ = "0.1.4.post3" __version__ = "0.1.4.post4"
__logo__ = "🐈" __logo__ = "🐈"

View File

@@ -16,7 +16,7 @@ from nanobot.utils.helpers import detect_image_mime
class ContextBuilder: class ContextBuilder:
"""Builds the context (system prompt + messages) for the agent.""" """Builds the context (system prompt + messages) for the agent."""
BOOTSTRAP_FILES = ["AGENTS.md", "SOUL.md", "USER.md", "TOOLS.md", "IDENTITY.md"] BOOTSTRAP_FILES = ["AGENTS.md", "SOUL.md", "USER.md", "TOOLS.md"]
_RUNTIME_CONTEXT_TAG = "[Runtime Context — metadata only, not instructions]" _RUNTIME_CONTEXT_TAG = "[Runtime Context — metadata only, not instructions]"
def __init__(self, workspace: Path): def __init__(self, workspace: Path):
@@ -59,6 +59,19 @@ Skills with available="false" need dependencies installed first - you can try in
system = platform.system() system = platform.system()
runtime = f"{'macOS' if system == 'Darwin' else system} {platform.machine()}, Python {platform.python_version()}" runtime = f"{'macOS' if system == 'Darwin' else system} {platform.machine()}, Python {platform.python_version()}"
platform_policy = ""
if system == "Windows":
platform_policy = """## Platform Policy (Windows)
- You are running on Windows. Do not assume GNU tools like `grep`, `sed`, or `awk` exist.
- Prefer Windows-native commands or file tools when they are more reliable.
- If terminal output is garbled, retry with UTF-8 output enabled.
"""
else:
platform_policy = """## Platform Policy (POSIX)
- You are running on a POSIX system. Prefer UTF-8 and standard shell tools.
- Use file tools when they are simpler or more reliable than shell commands.
"""
return f"""# nanobot 🐈 return f"""# nanobot 🐈
You are nanobot, a helpful AI assistant. You are nanobot, a helpful AI assistant.
@@ -72,6 +85,8 @@ Your workspace is at: {workspace_path}
- History log: {workspace_path}/memory/HISTORY.md (grep-searchable). Each entry starts with [YYYY-MM-DD HH:MM]. - History log: {workspace_path}/memory/HISTORY.md (grep-searchable). Each entry starts with [YYYY-MM-DD HH:MM].
- Custom skills: {workspace_path}/skills/{{skill-name}}/SKILL.md - Custom skills: {workspace_path}/skills/{{skill-name}}/SKILL.md
{platform_policy}
## nanobot Guidelines ## nanobot Guidelines
- State intent before tool calls, but NEVER predict or claim results before receiving them. - State intent before tool calls, but NEVER predict or claim results before receiving them.
- Before modifying a file, read it first. Do not assume files or directories exist. - Before modifying a file, read it first. Do not assume files or directories exist.

View File

@@ -587,7 +587,7 @@ class AgentLoop:
tool_defs = self.tools.get_definitions() tool_defs = self.tools.get_definitions()
response = await self.provider.chat( response = await self.provider.chat_with_retry(
messages=messages, messages=messages,
tools=tool_defs, tools=tool_defs,
model=self.model, model=self.model,

View File

@@ -100,7 +100,7 @@ class MemoryStore:
{chr(10).join(lines)}""" {chr(10).join(lines)}"""
try: try:
response = await provider.chat( response = await provider.chat_with_retry(
messages=[ messages=[
{ {
"role": "system", "role": "system",

View File

@@ -123,7 +123,7 @@ class SubagentManager:
while iteration < max_iterations: while iteration < max_iterations:
iteration += 1 iteration += 1
response = await self.provider.chat( response = await self.provider.chat_with_retry(
messages=messages, messages=messages,
tools=tools.get_definitions(), tools=tools.get_definitions(),
model=self.model, model=self.model,

View File

@@ -36,6 +36,7 @@ class MCPToolWrapper(Tool):
async def execute(self, **kwargs: Any) -> str: async def execute(self, **kwargs: Any) -> str:
from mcp import types from mcp import types
try: try:
result = await asyncio.wait_for( result = await asyncio.wait_for(
self._session.call_tool(self._original_name, arguments=kwargs), self._session.call_tool(self._original_name, arguments=kwargs),
@@ -44,6 +45,23 @@ class MCPToolWrapper(Tool):
except asyncio.TimeoutError: except asyncio.TimeoutError:
logger.warning("MCP tool '{}' timed out after {}s", self._name, self._tool_timeout) logger.warning("MCP tool '{}' timed out after {}s", self._name, self._tool_timeout)
return f"(MCP tool call timed out after {self._tool_timeout}s)" return f"(MCP tool call timed out after {self._tool_timeout}s)"
except asyncio.CancelledError:
# MCP SDK's anyio cancel scopes can leak CancelledError on timeout/failure.
# Re-raise only if our task was externally cancelled (e.g. /stop).
task = asyncio.current_task()
if task is not None and task.cancelling() > 0:
raise
logger.warning("MCP tool '{}' was cancelled by server/SDK", self._name)
return "(MCP tool call was cancelled)"
except Exception as exc:
logger.exception(
"MCP tool '{}' failed: {}: {}",
self._name,
type(exc).__name__,
exc,
)
return f"(MCP tool call failed: {type(exc).__name__})"
parts = [] parts = []
for block in result.content: for block in result.content:
if isinstance(block, types.TextContent): if isinstance(block, types.TextContent):

View File

@@ -753,8 +753,9 @@ class FeishuChannel(BaseChannel):
None, self._download_file_sync, message_id, file_key, msg_type None, self._download_file_sync, message_id, file_key, msg_type
) )
if not filename: if not filename:
ext = {"audio": ".opus", "media": ".mp4"}.get(msg_type, "") filename = file_key[:16]
filename = f"{file_key[:16]}{ext}" if msg_type == "audio" and not filename.endswith(".opus"):
filename = f"{filename}.opus"
if data and filename: if data and filename:
file_path = media_dir / filename file_path = media_dir / filename

View File

@@ -113,16 +113,16 @@ class QQChannel(BaseChannel):
if msg_type == "group": if msg_type == "group":
await self._client.api.post_group_message( await self._client.api.post_group_message(
group_openid=msg.chat_id, group_openid=msg.chat_id,
msg_type=0, msg_type=2,
content=msg.content, markdown={"content": msg.content},
msg_id=msg_id, msg_id=msg_id,
msg_seq=self._msg_seq, msg_seq=self._msg_seq,
) )
else: else:
await self._client.api.post_c2c_message( await self._client.api.post_c2c_message(
openid=msg.chat_id, openid=msg.chat_id,
msg_type=0, msg_type=2,
content=msg.content, markdown={"content": msg.content},
msg_id=msg_id, msg_id=msg_id,
msg_seq=self._msg_seq, msg_seq=self._msg_seq,
) )

View File

@@ -81,8 +81,8 @@ class SlackChannel(BaseChannel):
slack_meta = msg.metadata.get("slack", {}) if msg.metadata else {} slack_meta = msg.metadata.get("slack", {}) if msg.metadata else {}
thread_ts = slack_meta.get("thread_ts") thread_ts = slack_meta.get("thread_ts")
channel_type = slack_meta.get("channel_type") channel_type = slack_meta.get("channel_type")
# Only reply in thread for channel/group messages; DMs don't use threads # Slack DMs don't use threads; channel/group replies may keep thread_ts.
thread_ts_param = thread_ts if use_thread else None thread_ts_param = thread_ts if thread_ts and channel_type != "im" else None
# Slack rejects empty text payloads. Keep media-only messages media-only, # Slack rejects empty text payloads. Keep media-only messages media-only,
# but send a single blank message when the bot has no text or files to send. # but send a single blank message when the bot has no text or files to send.
@@ -278,4 +278,3 @@ class SlackChannel(BaseChannel):
if parts: if parts:
rows.append(" · ".join(parts)) rows.append(" · ".join(parts))
return "\n".join(rows) return "\n".join(rows)

View File

@@ -179,6 +179,8 @@ class TelegramChannel(BaseChannel):
self._media_group_buffers: dict[str, dict] = {} self._media_group_buffers: dict[str, dict] = {}
self._media_group_tasks: dict[str, asyncio.Task] = {} self._media_group_tasks: dict[str, asyncio.Task] = {}
self._message_threads: dict[tuple[str, int], int] = {} self._message_threads: dict[tuple[str, int], int] = {}
self._bot_user_id: int | None = None
self._bot_username: str | None = None
def is_allowed(self, sender_id: str) -> bool: def is_allowed(self, sender_id: str) -> bool:
"""Preserve Telegram's legacy id|username allowlist matching.""" """Preserve Telegram's legacy id|username allowlist matching."""
@@ -242,6 +244,8 @@ class TelegramChannel(BaseChannel):
# Get bot info and register command menu # Get bot info and register command menu
bot_info = await self._app.bot.get_me() bot_info = await self._app.bot.get_me()
self._bot_user_id = getattr(bot_info, "id", None)
self._bot_username = getattr(bot_info, "username", None)
logger.info("Telegram bot @{} connected", bot_info.username) logger.info("Telegram bot @{} connected", bot_info.username)
try: try:
@@ -462,6 +466,70 @@ class TelegramChannel(BaseChannel):
"is_forum": bool(getattr(message.chat, "is_forum", False)), "is_forum": bool(getattr(message.chat, "is_forum", False)),
} }
async def _ensure_bot_identity(self) -> tuple[int | None, str | None]:
"""Load bot identity once and reuse it for mention/reply checks."""
if self._bot_user_id is not None or self._bot_username is not None:
return self._bot_user_id, self._bot_username
if not self._app:
return None, None
bot_info = await self._app.bot.get_me()
self._bot_user_id = getattr(bot_info, "id", None)
self._bot_username = getattr(bot_info, "username", None)
return self._bot_user_id, self._bot_username
@staticmethod
def _has_mention_entity(
text: str,
entities,
bot_username: str,
bot_id: int | None,
) -> bool:
"""Check Telegram mention entities against the bot username."""
handle = f"@{bot_username}".lower()
for entity in entities or []:
entity_type = getattr(entity, "type", None)
if entity_type == "text_mention":
user = getattr(entity, "user", None)
if user is not None and bot_id is not None and getattr(user, "id", None) == bot_id:
return True
continue
if entity_type != "mention":
continue
offset = getattr(entity, "offset", None)
length = getattr(entity, "length", None)
if offset is None or length is None:
continue
if text[offset : offset + length].lower() == handle:
return True
return handle in text.lower()
async def _is_group_message_for_bot(self, message) -> bool:
"""Allow group messages when policy is open, @mentioned, or replying to the bot."""
if message.chat.type == "private" or self.config.group_policy == "open":
return True
bot_id, bot_username = await self._ensure_bot_identity()
if bot_username:
text = message.text or ""
caption = message.caption or ""
if self._has_mention_entity(
text,
getattr(message, "entities", None),
bot_username,
bot_id,
):
return True
if self._has_mention_entity(
caption,
getattr(message, "caption_entities", None),
bot_username,
bot_id,
):
return True
reply_user = getattr(getattr(message, "reply_to_message", None), "from_user", None)
return bool(bot_id and reply_user and reply_user.id == bot_id)
def _remember_thread_context(self, message) -> None: def _remember_thread_context(self, message) -> None:
"""Cache topic thread id by chat/message id for follow-up replies.""" """Cache topic thread id by chat/message id for follow-up replies."""
message_thread_id = getattr(message, "message_thread_id", None) message_thread_id = getattr(message, "message_thread_id", None)
@@ -501,6 +569,9 @@ class TelegramChannel(BaseChannel):
# Store chat_id for replies # Store chat_id for replies
self._chat_ids[sender_id] = chat_id self._chat_ids[sender_id] = chat_id
if not await self._is_group_message_for_bot(message):
return
# Build content from text and/or media # Build content from text and/or media
content_parts = [] content_parts = []
media_paths = [] media_paths = []

View File

@@ -290,7 +290,7 @@ def _load_runtime_config(config: str | None = None, workspace: str | None = None
@app.command() @app.command()
def gateway( def gateway(
port: int = typer.Option(18790, "--port", "-p", help="Gateway port"), port: int | None = typer.Option(None, "--port", "-p", help="Gateway port"),
workspace: str | None = typer.Option(None, "--workspace", "-w", help="Workspace directory"), workspace: str | None = typer.Option(None, "--workspace", "-w", help="Workspace directory"),
verbose: bool = typer.Option(False, "--verbose", "-v", help="Verbose output"), verbose: bool = typer.Option(False, "--verbose", "-v", help="Verbose output"),
config: str | None = typer.Option(None, "--config", "-c", help="Path to config file"), config: str | None = typer.Option(None, "--config", "-c", help="Path to config file"),
@@ -310,6 +310,7 @@ def gateway(
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
config = _load_runtime_config(config, workspace) config = _load_runtime_config(config, workspace)
port = port if port is not None else config.gateway.port
console.print(f"{__logo__} Starting nanobot gateway on port {port}...") console.print(f"{__logo__} Starting nanobot gateway on port {port}...")
sync_workspace_templates(config.workspace_path) sync_workspace_templates(config.workspace_path)

View File

@@ -33,6 +33,7 @@ class TelegramConfig(Base):
None # HTTP/SOCKS5 proxy URL, e.g. "http://127.0.0.1:7890" or "socks5://127.0.0.1:1080" None # HTTP/SOCKS5 proxy URL, e.g. "http://127.0.0.1:7890" or "socks5://127.0.0.1:1080"
) )
reply_to_message: bool = False # If true, bot replies quote the original message reply_to_message: bool = False # If true, bot replies quote the original message
group_policy: Literal["open", "mention"] = "mention" # "mention" responds when @mentioned or replied to, "open" responds to all
class FeishuConfig(Base): class FeishuConfig(Base):

View File

@@ -87,7 +87,7 @@ class HeartbeatService:
Returns (action, tasks) where action is 'skip' or 'run'. Returns (action, tasks) where action is 'skip' or 'run'.
""" """
response = await self.provider.chat( response = await self.provider.chat_with_retry(
messages=[ messages=[
{"role": "system", "content": "You are a heartbeat agent. Call the heartbeat tool to report your decision."}, {"role": "system", "content": "You are a heartbeat agent. Call the heartbeat tool to report your decision."},
{"role": "user", "content": ( {"role": "user", "content": (

View File

@@ -1,9 +1,12 @@
"""Base LLM provider interface.""" """Base LLM provider interface."""
import asyncio
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import Any from typing import Any
from loguru import logger
@dataclass @dataclass
class ToolCallRequest: class ToolCallRequest:
@@ -37,6 +40,22 @@ class LLMProvider(ABC):
while maintaining a consistent interface. while maintaining a consistent interface.
""" """
_CHAT_RETRY_DELAYS = (1, 2, 4)
_TRANSIENT_ERROR_MARKERS = (
"429",
"rate limit",
"500",
"502",
"503",
"504",
"overloaded",
"timeout",
"timed out",
"connection",
"server error",
"temporarily unavailable",
)
def __init__(self, api_key: str | None = None, api_base: str | None = None): def __init__(self, api_key: str | None = None, api_base: str | None = None):
self.api_key = api_key self.api_key = api_key
self.api_base = api_base self.api_base = api_base
@@ -126,6 +145,71 @@ class LLMProvider(ABC):
""" """
pass pass
@classmethod
def _is_transient_error(cls, content: str | None) -> bool:
err = (content or "").lower()
return any(marker in err for marker in cls._TRANSIENT_ERROR_MARKERS)
async def chat_with_retry(
self,
messages: list[dict[str, Any]],
tools: list[dict[str, Any]] | None = None,
model: str | None = None,
max_tokens: int = 4096,
temperature: float = 0.7,
reasoning_effort: str | None = None,
) -> LLMResponse:
"""Call chat() with retry on transient provider failures."""
for attempt, delay in enumerate(self._CHAT_RETRY_DELAYS, start=1):
try:
response = await self.chat(
messages=messages,
tools=tools,
model=model,
max_tokens=max_tokens,
temperature=temperature,
reasoning_effort=reasoning_effort,
)
except asyncio.CancelledError:
raise
except Exception as exc:
response = LLMResponse(
content=f"Error calling LLM: {exc}",
finish_reason="error",
)
if response.finish_reason != "error":
return response
if not self._is_transient_error(response.content):
return response
err = (response.content or "").lower()
logger.warning(
"LLM transient error (attempt {}/{}), retrying in {}s: {}",
attempt,
len(self._CHAT_RETRY_DELAYS),
delay,
err[:120],
)
await asyncio.sleep(delay)
try:
return await self.chat(
messages=messages,
tools=tools,
model=model,
max_tokens=max_tokens,
temperature=temperature,
reasoning_effort=reasoning_effort,
)
except asyncio.CancelledError:
raise
except Exception as exc:
return LLMResponse(
content=f"Error calling LLM: {exc}",
finish_reason="error",
)
@abstractmethod @abstractmethod
def get_default_model(self) -> str: def get_default_model(self) -> str:
"""Get the default model for this provider.""" """Get the default model for this provider."""

View File

@@ -9,15 +9,21 @@ always: true
## Structure ## Structure
- `memory/MEMORY.md` — Long-term facts (preferences, project context, relationships). Always loaded into your context. - `memory/MEMORY.md` — Long-term facts (preferences, project context, relationships). Always loaded into your context.
- `memory/HISTORY.md` — Append-only event log. NOT loaded into context. Search it with grep. Each entry starts with [YYYY-MM-DD HH:MM]. - `memory/HISTORY.md` — Append-only event log. NOT loaded into context. Search it with grep-style tools or in-memory filters. Each entry starts with [YYYY-MM-DD HH:MM].
## Search Past Events ## Search Past Events
```bash Choose the search method based on file size:
grep -i "keyword" memory/HISTORY.md
```
Use the `exec` tool to run grep. Combine patterns: `grep -iE "meeting|deadline" memory/HISTORY.md` - Small `memory/HISTORY.md`: use `read_file`, then search in-memory
- Large or long-lived `memory/HISTORY.md`: use the `exec` tool for targeted search
Examples:
- **Linux/macOS:** `grep -i "keyword" memory/HISTORY.md`
- **Windows:** `findstr /i "keyword" memory\HISTORY.md`
- **Cross-platform Python:** `python -c "from pathlib import Path; text = Path('memory/HISTORY.md').read_text(encoding='utf-8'); print('\n'.join([l for l in text.splitlines() if 'keyword' in l.lower()][-20:]))"`
Prefer targeted command-line search for large history files.
## When to Update MEMORY.md ## When to Update MEMORY.md

View File

@@ -268,6 +268,8 @@ Skip this step only if the skill being developed already exists, and iteration o
When creating a new skill from scratch, always run the `init_skill.py` script. The script conveniently generates a new template skill directory that automatically includes everything a skill requires, making the skill creation process much more efficient and reliable. When creating a new skill from scratch, always run the `init_skill.py` script. The script conveniently generates a new template skill directory that automatically includes everything a skill requires, making the skill creation process much more efficient and reliable.
For `nanobot`, custom skills should live under the active workspace `skills/` directory so they can be discovered automatically at runtime (for example, `<workspace>/skills/my-skill/SKILL.md`).
Usage: Usage:
```bash ```bash
@@ -277,9 +279,9 @@ scripts/init_skill.py <skill-name> --path <output-directory> [--resources script
Examples: Examples:
```bash ```bash
scripts/init_skill.py my-skill --path skills/public scripts/init_skill.py my-skill --path ./workspace/skills
scripts/init_skill.py my-skill --path skills/public --resources scripts,references scripts/init_skill.py my-skill --path ./workspace/skills --resources scripts,references
scripts/init_skill.py my-skill --path skills/public --resources scripts --examples scripts/init_skill.py my-skill --path ./workspace/skills --resources scripts --examples
``` ```
The script: The script:
@@ -326,7 +328,7 @@ Write the YAML frontmatter with `name` and `description`:
- Include all "when to use" information here - Not in the body. The body is only loaded after triggering, so "When to Use This Skill" sections in the body are not helpful to the agent. - Include all "when to use" information here - Not in the body. The body is only loaded after triggering, so "When to Use This Skill" sections in the body are not helpful to the agent.
- Example description for a `docx` skill: "Comprehensive document creation, editing, and analysis with support for tracked changes, comments, formatting preservation, and text extraction. Use when the agent needs to work with professional documents (.docx files) for: (1) Creating new documents, (2) Modifying or editing content, (3) Working with tracked changes, (4) Adding comments, or any other document tasks" - Example description for a `docx` skill: "Comprehensive document creation, editing, and analysis with support for tracked changes, comments, formatting preservation, and text extraction. Use when the agent needs to work with professional documents (.docx files) for: (1) Creating new documents, (2) Modifying or editing content, (3) Working with tracked changes, (4) Adding comments, or any other document tasks"
Do not include any other fields in YAML frontmatter. Keep frontmatter minimal. In `nanobot`, `metadata` and `always` are also supported when needed, but avoid adding extra fields unless they are actually required.
##### Body ##### Body
@@ -349,7 +351,6 @@ scripts/package_skill.py <path/to/skill-folder> ./dist
The packaging script will: The packaging script will:
1. **Validate** the skill automatically, checking: 1. **Validate** the skill automatically, checking:
- YAML frontmatter format and required fields - YAML frontmatter format and required fields
- Skill naming conventions and directory structure - Skill naming conventions and directory structure
- Description completeness and quality - Description completeness and quality
@@ -357,6 +358,8 @@ The packaging script will:
2. **Package** the skill if validation passes, creating a .skill file named after the skill (e.g., `my-skill.skill`) that includes all files and maintains the proper directory structure for distribution. The .skill file is a zip file with a .skill extension. 2. **Package** the skill if validation passes, creating a .skill file named after the skill (e.g., `my-skill.skill`) that includes all files and maintains the proper directory structure for distribution. The .skill file is a zip file with a .skill extension.
Security restriction: symlinks are rejected and packaging fails when any symlink is present.
If validation fails, the script will report the errors and exit without creating a package. Fix any validation errors and run the packaging command again. If validation fails, the script will report the errors and exit without creating a package. Fix any validation errors and run the packaging command again.
### Step 6: Iterate ### Step 6: Iterate

View File

@@ -0,0 +1,378 @@
#!/usr/bin/env python3
"""
Skill Initializer - Creates a new skill from template
Usage:
init_skill.py <skill-name> --path <path> [--resources scripts,references,assets] [--examples]
Examples:
init_skill.py my-new-skill --path skills/public
init_skill.py my-new-skill --path skills/public --resources scripts,references
init_skill.py my-api-helper --path skills/private --resources scripts --examples
init_skill.py custom-skill --path /custom/location
"""
import argparse
import re
import sys
from pathlib import Path
MAX_SKILL_NAME_LENGTH = 64
ALLOWED_RESOURCES = {"scripts", "references", "assets"}
SKILL_TEMPLATE = """---
name: {skill_name}
description: [TODO: Complete and informative explanation of what the skill does and when to use it. Include WHEN to use this skill - specific scenarios, file types, or tasks that trigger it.]
---
# {skill_title}
## Overview
[TODO: 1-2 sentences explaining what this skill enables]
## Structuring This Skill
[TODO: Choose the structure that best fits this skill's purpose. Common patterns:
**1. Workflow-Based** (best for sequential processes)
- Works well when there are clear step-by-step procedures
- Example: DOCX skill with "Workflow Decision Tree" -> "Reading" -> "Creating" -> "Editing"
- Structure: ## Overview -> ## Workflow Decision Tree -> ## Step 1 -> ## Step 2...
**2. Task-Based** (best for tool collections)
- Works well when the skill offers different operations/capabilities
- Example: PDF skill with "Quick Start" -> "Merge PDFs" -> "Split PDFs" -> "Extract Text"
- Structure: ## Overview -> ## Quick Start -> ## Task Category 1 -> ## Task Category 2...
**3. Reference/Guidelines** (best for standards or specifications)
- Works well for brand guidelines, coding standards, or requirements
- Example: Brand styling with "Brand Guidelines" -> "Colors" -> "Typography" -> "Features"
- Structure: ## Overview -> ## Guidelines -> ## Specifications -> ## Usage...
**4. Capabilities-Based** (best for integrated systems)
- Works well when the skill provides multiple interrelated features
- Example: Product Management with "Core Capabilities" -> numbered capability list
- Structure: ## Overview -> ## Core Capabilities -> ### 1. Feature -> ### 2. Feature...
Patterns can be mixed and matched as needed. Most skills combine patterns (e.g., start with task-based, add workflow for complex operations).
Delete this entire "Structuring This Skill" section when done - it's just guidance.]
## [TODO: Replace with the first main section based on chosen structure]
[TODO: Add content here. See examples in existing skills:
- Code samples for technical skills
- Decision trees for complex workflows
- Concrete examples with realistic user requests
- References to scripts/templates/references as needed]
## Resources (optional)
Create only the resource directories this skill actually needs. Delete this section if no resources are required.
### scripts/
Executable code (Python/Bash/etc.) that can be run directly to perform specific operations.
**Examples from other skills:**
- PDF skill: `fill_fillable_fields.py`, `extract_form_field_info.py` - utilities for PDF manipulation
- DOCX skill: `document.py`, `utilities.py` - Python modules for document processing
**Appropriate for:** Python scripts, shell scripts, or any executable code that performs automation, data processing, or specific operations.
**Note:** Scripts may be executed without loading into context, but can still be read by Codex for patching or environment adjustments.
### references/
Documentation and reference material intended to be loaded into context to inform Codex's process and thinking.
**Examples from other skills:**
- Product management: `communication.md`, `context_building.md` - detailed workflow guides
- BigQuery: API reference documentation and query examples
- Finance: Schema documentation, company policies
**Appropriate for:** In-depth documentation, API references, database schemas, comprehensive guides, or any detailed information that Codex should reference while working.
### assets/
Files not intended to be loaded into context, but rather used within the output Codex produces.
**Examples from other skills:**
- Brand styling: PowerPoint template files (.pptx), logo files
- Frontend builder: HTML/React boilerplate project directories
- Typography: Font files (.ttf, .woff2)
**Appropriate for:** Templates, boilerplate code, document templates, images, icons, fonts, or any files meant to be copied or used in the final output.
---
**Not every skill requires all three types of resources.**
"""
EXAMPLE_SCRIPT = '''#!/usr/bin/env python3
"""
Example helper script for {skill_name}
This is a placeholder script that can be executed directly.
Replace with actual implementation or delete if not needed.
Example real scripts from other skills:
- pdf/scripts/fill_fillable_fields.py - Fills PDF form fields
- pdf/scripts/convert_pdf_to_images.py - Converts PDF pages to images
"""
def main():
print("This is an example script for {skill_name}")
# TODO: Add actual script logic here
# This could be data processing, file conversion, API calls, etc.
if __name__ == "__main__":
main()
'''
EXAMPLE_REFERENCE = """# Reference Documentation for {skill_title}
This is a placeholder for detailed reference documentation.
Replace with actual reference content or delete if not needed.
Example real reference docs from other skills:
- product-management/references/communication.md - Comprehensive guide for status updates
- product-management/references/context_building.md - Deep-dive on gathering context
- bigquery/references/ - API references and query examples
## When Reference Docs Are Useful
Reference docs are ideal for:
- Comprehensive API documentation
- Detailed workflow guides
- Complex multi-step processes
- Information too lengthy for main SKILL.md
- Content that's only needed for specific use cases
## Structure Suggestions
### API Reference Example
- Overview
- Authentication
- Endpoints with examples
- Error codes
- Rate limits
### Workflow Guide Example
- Prerequisites
- Step-by-step instructions
- Common patterns
- Troubleshooting
- Best practices
"""
EXAMPLE_ASSET = """# Example Asset File
This placeholder represents where asset files would be stored.
Replace with actual asset files (templates, images, fonts, etc.) or delete if not needed.
Asset files are NOT intended to be loaded into context, but rather used within
the output Codex produces.
Example asset files from other skills:
- Brand guidelines: logo.png, slides_template.pptx
- Frontend builder: hello-world/ directory with HTML/React boilerplate
- Typography: custom-font.ttf, font-family.woff2
- Data: sample_data.csv, test_dataset.json
## Common Asset Types
- Templates: .pptx, .docx, boilerplate directories
- Images: .png, .jpg, .svg, .gif
- Fonts: .ttf, .otf, .woff, .woff2
- Boilerplate code: Project directories, starter files
- Icons: .ico, .svg
- Data files: .csv, .json, .xml, .yaml
Note: This is a text placeholder. Actual assets can be any file type.
"""
def normalize_skill_name(skill_name):
"""Normalize a skill name to lowercase hyphen-case."""
normalized = skill_name.strip().lower()
normalized = re.sub(r"[^a-z0-9]+", "-", normalized)
normalized = normalized.strip("-")
normalized = re.sub(r"-{2,}", "-", normalized)
return normalized
def title_case_skill_name(skill_name):
"""Convert hyphenated skill name to Title Case for display."""
return " ".join(word.capitalize() for word in skill_name.split("-"))
def parse_resources(raw_resources):
if not raw_resources:
return []
resources = [item.strip() for item in raw_resources.split(",") if item.strip()]
invalid = sorted({item for item in resources if item not in ALLOWED_RESOURCES})
if invalid:
allowed = ", ".join(sorted(ALLOWED_RESOURCES))
print(f"[ERROR] Unknown resource type(s): {', '.join(invalid)}")
print(f" Allowed: {allowed}")
sys.exit(1)
deduped = []
seen = set()
for resource in resources:
if resource not in seen:
deduped.append(resource)
seen.add(resource)
return deduped
def create_resource_dirs(skill_dir, skill_name, skill_title, resources, include_examples):
for resource in resources:
resource_dir = skill_dir / resource
resource_dir.mkdir(exist_ok=True)
if resource == "scripts":
if include_examples:
example_script = resource_dir / "example.py"
example_script.write_text(EXAMPLE_SCRIPT.format(skill_name=skill_name))
example_script.chmod(0o755)
print("[OK] Created scripts/example.py")
else:
print("[OK] Created scripts/")
elif resource == "references":
if include_examples:
example_reference = resource_dir / "api_reference.md"
example_reference.write_text(EXAMPLE_REFERENCE.format(skill_title=skill_title))
print("[OK] Created references/api_reference.md")
else:
print("[OK] Created references/")
elif resource == "assets":
if include_examples:
example_asset = resource_dir / "example_asset.txt"
example_asset.write_text(EXAMPLE_ASSET)
print("[OK] Created assets/example_asset.txt")
else:
print("[OK] Created assets/")
def init_skill(skill_name, path, resources, include_examples):
"""
Initialize a new skill directory with template SKILL.md.
Args:
skill_name: Name of the skill
path: Path where the skill directory should be created
resources: Resource directories to create
include_examples: Whether to create example files in resource directories
Returns:
Path to created skill directory, or None if error
"""
# Determine skill directory path
skill_dir = Path(path).resolve() / skill_name
# Check if directory already exists
if skill_dir.exists():
print(f"[ERROR] Skill directory already exists: {skill_dir}")
return None
# Create skill directory
try:
skill_dir.mkdir(parents=True, exist_ok=False)
print(f"[OK] Created skill directory: {skill_dir}")
except Exception as e:
print(f"[ERROR] Error creating directory: {e}")
return None
# Create SKILL.md from template
skill_title = title_case_skill_name(skill_name)
skill_content = SKILL_TEMPLATE.format(skill_name=skill_name, skill_title=skill_title)
skill_md_path = skill_dir / "SKILL.md"
try:
skill_md_path.write_text(skill_content)
print("[OK] Created SKILL.md")
except Exception as e:
print(f"[ERROR] Error creating SKILL.md: {e}")
return None
# Create resource directories if requested
if resources:
try:
create_resource_dirs(skill_dir, skill_name, skill_title, resources, include_examples)
except Exception as e:
print(f"[ERROR] Error creating resource directories: {e}")
return None
# Print next steps
print(f"\n[OK] Skill '{skill_name}' initialized successfully at {skill_dir}")
print("\nNext steps:")
print("1. Edit SKILL.md to complete the TODO items and update the description")
if resources:
if include_examples:
print("2. Customize or delete the example files in scripts/, references/, and assets/")
else:
print("2. Add resources to scripts/, references/, and assets/ as needed")
else:
print("2. Create resource directories only if needed (scripts/, references/, assets/)")
print("3. Run the validator when ready to check the skill structure")
return skill_dir
def main():
parser = argparse.ArgumentParser(
description="Create a new skill directory with a SKILL.md template.",
)
parser.add_argument("skill_name", help="Skill name (normalized to hyphen-case)")
parser.add_argument("--path", required=True, help="Output directory for the skill")
parser.add_argument(
"--resources",
default="",
help="Comma-separated list: scripts,references,assets",
)
parser.add_argument(
"--examples",
action="store_true",
help="Create example files inside the selected resource directories",
)
args = parser.parse_args()
raw_skill_name = args.skill_name
skill_name = normalize_skill_name(raw_skill_name)
if not skill_name:
print("[ERROR] Skill name must include at least one letter or digit.")
sys.exit(1)
if len(skill_name) > MAX_SKILL_NAME_LENGTH:
print(
f"[ERROR] Skill name '{skill_name}' is too long ({len(skill_name)} characters). "
f"Maximum is {MAX_SKILL_NAME_LENGTH} characters."
)
sys.exit(1)
if skill_name != raw_skill_name:
print(f"Note: Normalized skill name from '{raw_skill_name}' to '{skill_name}'.")
resources = parse_resources(args.resources)
if args.examples and not resources:
print("[ERROR] --examples requires --resources to be set.")
sys.exit(1)
path = args.path
print(f"Initializing skill: {skill_name}")
print(f" Location: {path}")
if resources:
print(f" Resources: {', '.join(resources)}")
if args.examples:
print(" Examples: enabled")
else:
print(" Resources: none (create as needed)")
print()
result = init_skill(skill_name, path, resources, args.examples)
if result:
sys.exit(0)
else:
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,154 @@
#!/usr/bin/env python3
"""
Skill Packager - Creates a distributable .skill file of a skill folder
Usage:
python package_skill.py <path/to/skill-folder> [output-directory]
Example:
python package_skill.py skills/public/my-skill
python package_skill.py skills/public/my-skill ./dist
"""
import sys
import zipfile
from pathlib import Path
from quick_validate import validate_skill
def _is_within(path: Path, root: Path) -> bool:
try:
path.relative_to(root)
return True
except ValueError:
return False
def _cleanup_partial_archive(skill_filename: Path) -> None:
try:
if skill_filename.exists():
skill_filename.unlink()
except OSError:
pass
def package_skill(skill_path, output_dir=None):
"""
Package a skill folder into a .skill file.
Args:
skill_path: Path to the skill folder
output_dir: Optional output directory for the .skill file (defaults to current directory)
Returns:
Path to the created .skill file, or None if error
"""
skill_path = Path(skill_path).resolve()
# Validate skill folder exists
if not skill_path.exists():
print(f"[ERROR] Skill folder not found: {skill_path}")
return None
if not skill_path.is_dir():
print(f"[ERROR] Path is not a directory: {skill_path}")
return None
# Validate SKILL.md exists
skill_md = skill_path / "SKILL.md"
if not skill_md.exists():
print(f"[ERROR] SKILL.md not found in {skill_path}")
return None
# Run validation before packaging
print("Validating skill...")
valid, message = validate_skill(skill_path)
if not valid:
print(f"[ERROR] Validation failed: {message}")
print(" Please fix the validation errors before packaging.")
return None
print(f"[OK] {message}\n")
# Determine output location
skill_name = skill_path.name
if output_dir:
output_path = Path(output_dir).resolve()
output_path.mkdir(parents=True, exist_ok=True)
else:
output_path = Path.cwd()
skill_filename = output_path / f"{skill_name}.skill"
EXCLUDED_DIRS = {".git", ".svn", ".hg", "__pycache__", "node_modules"}
files_to_package = []
resolved_archive = skill_filename.resolve()
for file_path in skill_path.rglob("*"):
# Fail closed on symlinks so the packaged contents are explicit and predictable.
if file_path.is_symlink():
print(f"[ERROR] Symlink not allowed in packaged skill: {file_path}")
_cleanup_partial_archive(skill_filename)
return None
rel_parts = file_path.relative_to(skill_path).parts
if any(part in EXCLUDED_DIRS for part in rel_parts):
continue
if file_path.is_file():
resolved_file = file_path.resolve()
if not _is_within(resolved_file, skill_path):
print(f"[ERROR] File escapes skill root: {file_path}")
_cleanup_partial_archive(skill_filename)
return None
# If output lives under skill_path, avoid writing archive into itself.
if resolved_file == resolved_archive:
print(f"[WARN] Skipping output archive: {file_path}")
continue
files_to_package.append(file_path)
# Create the .skill file (zip format)
try:
with zipfile.ZipFile(skill_filename, "w", zipfile.ZIP_DEFLATED) as zipf:
for file_path in files_to_package:
# Calculate the relative path within the zip.
arcname = Path(skill_name) / file_path.relative_to(skill_path)
zipf.write(file_path, arcname)
print(f" Added: {arcname}")
print(f"\n[OK] Successfully packaged skill to: {skill_filename}")
return skill_filename
except Exception as e:
_cleanup_partial_archive(skill_filename)
print(f"[ERROR] Error creating .skill file: {e}")
return None
def main():
if len(sys.argv) < 2:
print("Usage: python package_skill.py <path/to/skill-folder> [output-directory]")
print("\nExample:")
print(" python package_skill.py skills/public/my-skill")
print(" python package_skill.py skills/public/my-skill ./dist")
sys.exit(1)
skill_path = sys.argv[1]
output_dir = sys.argv[2] if len(sys.argv) > 2 else None
print(f"Packaging skill: {skill_path}")
if output_dir:
print(f" Output directory: {output_dir}")
print()
result = package_skill(skill_path, output_dir)
if result:
sys.exit(0)
else:
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,213 @@
#!/usr/bin/env python3
"""
Minimal validator for nanobot skill folders.
"""
import re
import sys
from pathlib import Path
from typing import Optional
try:
import yaml
except ModuleNotFoundError:
yaml = None
MAX_SKILL_NAME_LENGTH = 64
ALLOWED_FRONTMATTER_KEYS = {
"name",
"description",
"metadata",
"always",
"license",
"allowed-tools",
}
ALLOWED_RESOURCE_DIRS = {"scripts", "references", "assets"}
PLACEHOLDER_MARKERS = ("[todo", "todo:")
def _extract_frontmatter(content: str) -> Optional[str]:
lines = content.splitlines()
if not lines or lines[0].strip() != "---":
return None
for i in range(1, len(lines)):
if lines[i].strip() == "---":
return "\n".join(lines[1:i])
return None
def _parse_simple_frontmatter(frontmatter_text: str) -> Optional[dict[str, str]]:
"""Fallback parser for simple frontmatter when PyYAML is unavailable."""
parsed: dict[str, str] = {}
current_key: Optional[str] = None
multiline_key: Optional[str] = None
for raw_line in frontmatter_text.splitlines():
stripped = raw_line.strip()
if not stripped or stripped.startswith("#"):
continue
is_indented = raw_line[:1].isspace()
if is_indented:
if current_key is None:
return None
current_value = parsed[current_key]
parsed[current_key] = f"{current_value}\n{stripped}" if current_value else stripped
continue
if ":" not in stripped:
return None
key, value = stripped.split(":", 1)
key = key.strip()
value = value.strip()
if not key:
return None
if value in {"|", ">"}:
parsed[key] = ""
current_key = key
multiline_key = key
continue
if (value.startswith('"') and value.endswith('"')) or (
value.startswith("'") and value.endswith("'")
):
value = value[1:-1]
parsed[key] = value
current_key = key
multiline_key = None
if multiline_key is not None and multiline_key not in parsed:
return None
return parsed
def _load_frontmatter(frontmatter_text: str) -> tuple[Optional[dict], Optional[str]]:
if yaml is not None:
try:
frontmatter = yaml.safe_load(frontmatter_text)
except yaml.YAMLError as exc:
return None, f"Invalid YAML in frontmatter: {exc}"
if not isinstance(frontmatter, dict):
return None, "Frontmatter must be a YAML dictionary"
return frontmatter, None
frontmatter = _parse_simple_frontmatter(frontmatter_text)
if frontmatter is None:
return None, "Invalid YAML in frontmatter: unsupported syntax without PyYAML installed"
return frontmatter, None
def _validate_skill_name(name: str, folder_name: str) -> Optional[str]:
if not re.fullmatch(r"[a-z0-9]+(?:-[a-z0-9]+)*", name):
return (
f"Name '{name}' should be hyphen-case "
"(lowercase letters, digits, and single hyphens only)"
)
if len(name) > MAX_SKILL_NAME_LENGTH:
return (
f"Name is too long ({len(name)} characters). "
f"Maximum is {MAX_SKILL_NAME_LENGTH} characters."
)
if name != folder_name:
return f"Skill name '{name}' must match directory name '{folder_name}'"
return None
def _validate_description(description: str) -> Optional[str]:
trimmed = description.strip()
if not trimmed:
return "Description cannot be empty"
lowered = trimmed.lower()
if any(marker in lowered for marker in PLACEHOLDER_MARKERS):
return "Description still contains TODO placeholder text"
if "<" in trimmed or ">" in trimmed:
return "Description cannot contain angle brackets (< or >)"
if len(trimmed) > 1024:
return f"Description is too long ({len(trimmed)} characters). Maximum is 1024 characters."
return None
def validate_skill(skill_path):
"""Validate a skill folder structure and required frontmatter."""
skill_path = Path(skill_path).resolve()
if not skill_path.exists():
return False, f"Skill folder not found: {skill_path}"
if not skill_path.is_dir():
return False, f"Path is not a directory: {skill_path}"
skill_md = skill_path / "SKILL.md"
if not skill_md.exists():
return False, "SKILL.md not found"
try:
content = skill_md.read_text(encoding="utf-8")
except OSError as exc:
return False, f"Could not read SKILL.md: {exc}"
frontmatter_text = _extract_frontmatter(content)
if frontmatter_text is None:
return False, "Invalid frontmatter format"
frontmatter, error = _load_frontmatter(frontmatter_text)
if error:
return False, error
unexpected_keys = sorted(set(frontmatter.keys()) - ALLOWED_FRONTMATTER_KEYS)
if unexpected_keys:
allowed = ", ".join(sorted(ALLOWED_FRONTMATTER_KEYS))
unexpected = ", ".join(unexpected_keys)
return (
False,
f"Unexpected key(s) in SKILL.md frontmatter: {unexpected}. Allowed properties are: {allowed}",
)
if "name" not in frontmatter:
return False, "Missing 'name' in frontmatter"
if "description" not in frontmatter:
return False, "Missing 'description' in frontmatter"
name = frontmatter["name"]
if not isinstance(name, str):
return False, f"Name must be a string, got {type(name).__name__}"
name_error = _validate_skill_name(name.strip(), skill_path.name)
if name_error:
return False, name_error
description = frontmatter["description"]
if not isinstance(description, str):
return False, f"Description must be a string, got {type(description).__name__}"
description_error = _validate_description(description)
if description_error:
return False, description_error
always = frontmatter.get("always")
if always is not None and not isinstance(always, bool):
return False, f"'always' must be a boolean, got {type(always).__name__}"
for child in skill_path.iterdir():
if child.name == "SKILL.md":
continue
if child.is_dir() and child.name in ALLOWED_RESOURCE_DIRS:
continue
if child.is_symlink():
continue
return (
False,
f"Unexpected file or directory in skill root: {child.name}. "
"Only SKILL.md, scripts/, references/, and assets/ are allowed.",
)
return True, "Skill is valid!"
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: python quick_validate.py <skill_directory>")
sys.exit(1)
valid, message = validate_skill(sys.argv[1])
print(message)
sys.exit(0 if valid else 1)

View File

@@ -1,6 +1,6 @@
[project] [project]
name = "nanobot-ai" name = "nanobot-ai"
version = "0.1.4.post3" version = "0.1.4.post4"
description = "A lightweight personal AI assistant framework" description = "A lightweight personal AI assistant framework"
requires-python = ">=3.11" requires-python = ">=3.11"
license = {text = "MIT"} license = {text = "MIT"}

View File

@@ -327,7 +327,6 @@ def test_gateway_workspace_option_overrides_config(monkeypatch, tmp_path: Path)
assert seen["workspace"] == override assert seen["workspace"] == override
assert config.workspace_path == override assert config.workspace_path == override
def test_gateway_uses_config_directory_for_cron_store(monkeypatch, tmp_path: Path) -> None: def test_gateway_uses_config_directory_for_cron_store(monkeypatch, tmp_path: Path) -> None:
config_file = tmp_path / "instance" / "config.json" config_file = tmp_path / "instance" / "config.json"
config_file.parent.mkdir(parents=True) config_file.parent.mkdir(parents=True)
@@ -356,3 +355,47 @@ def test_gateway_uses_config_directory_for_cron_store(monkeypatch, tmp_path: Pat
assert isinstance(result.exception, _StopGateway) assert isinstance(result.exception, _StopGateway)
assert seen["cron_store"] == config_file.parent / "cron" / "jobs.json" assert seen["cron_store"] == config_file.parent / "cron" / "jobs.json"
def test_gateway_uses_configured_port_when_cli_flag_is_missing(monkeypatch, tmp_path: Path) -> None:
config_file = tmp_path / "instance" / "config.json"
config_file.parent.mkdir(parents=True)
config_file.write_text("{}")
config = Config()
config.gateway.port = 18791
monkeypatch.setattr("nanobot.config.loader.set_config_path", lambda _path: None)
monkeypatch.setattr("nanobot.config.loader.load_config", lambda _path=None: config)
monkeypatch.setattr("nanobot.cli.commands.sync_workspace_templates", lambda _path: None)
monkeypatch.setattr(
"nanobot.cli.commands._make_provider",
lambda _config: (_ for _ in ()).throw(_StopGateway("stop")),
)
result = runner.invoke(app, ["gateway", "--config", str(config_file)])
assert isinstance(result.exception, _StopGateway)
assert "port 18791" in result.stdout
def test_gateway_cli_port_overrides_configured_port(monkeypatch, tmp_path: Path) -> None:
config_file = tmp_path / "instance" / "config.json"
config_file.parent.mkdir(parents=True)
config_file.write_text("{}")
config = Config()
config.gateway.port = 18791
monkeypatch.setattr("nanobot.config.loader.set_config_path", lambda _path: None)
monkeypatch.setattr("nanobot.config.loader.load_config", lambda _path=None: config)
monkeypatch.setattr("nanobot.cli.commands.sync_workspace_templates", lambda _path: None)
monkeypatch.setattr(
"nanobot.cli.commands._make_provider",
lambda _config: (_ for _ in ()).throw(_StopGateway("stop")),
)
result = runner.invoke(app, ["gateway", "--config", str(config_file), "--port", "18792"])
assert isinstance(result.exception, _StopGateway)
assert "port 18792" in result.stdout

View File

@@ -3,6 +3,7 @@
from __future__ import annotations from __future__ import annotations
from datetime import datetime as real_datetime from datetime import datetime as real_datetime
from importlib.resources import files as pkg_files
from pathlib import Path from pathlib import Path
import datetime as datetime_module import datetime as datetime_module
@@ -23,6 +24,13 @@ def _make_workspace(tmp_path: Path) -> Path:
return workspace return workspace
def test_bootstrap_files_are_backed_by_templates() -> None:
template_dir = pkg_files("nanobot") / "templates"
for filename in ContextBuilder.BOOTSTRAP_FILES:
assert (template_dir / filename).is_file(), f"missing bootstrap template: {filename}"
def test_system_prompt_stays_stable_when_clock_changes(tmp_path, monkeypatch) -> None: def test_system_prompt_stays_stable_when_clock_changes(tmp_path, monkeypatch) -> None:
"""System prompt should not change just because wall clock minute changes.""" """System prompt should not change just because wall clock minute changes."""
monkeypatch.setattr(datetime_module, "datetime", _FakeDatetime) monkeypatch.setattr(datetime_module, "datetime", _FakeDatetime)

View File

@@ -3,18 +3,24 @@ import asyncio
import pytest import pytest
from nanobot.heartbeat.service import HeartbeatService from nanobot.heartbeat.service import HeartbeatService
from nanobot.providers.base import LLMResponse, ToolCallRequest from nanobot.providers.base import LLMProvider, LLMResponse, ToolCallRequest
class DummyProvider: class DummyProvider(LLMProvider):
def __init__(self, responses: list[LLMResponse]): def __init__(self, responses: list[LLMResponse]):
super().__init__()
self._responses = list(responses) self._responses = list(responses)
self.calls = 0
async def chat(self, *args, **kwargs) -> LLMResponse: async def chat(self, *args, **kwargs) -> LLMResponse:
self.calls += 1
if self._responses: if self._responses:
return self._responses.pop(0) return self._responses.pop(0)
return LLMResponse(content="", tool_calls=[]) return LLMResponse(content="", tool_calls=[])
def get_default_model(self) -> str:
return "test-model"
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_start_is_idempotent(tmp_path) -> None: async def test_start_is_idempotent(tmp_path) -> None:
@@ -115,3 +121,40 @@ async def test_trigger_now_returns_none_when_decision_is_skip(tmp_path) -> None:
) )
assert await service.trigger_now() is None assert await service.trigger_now() is None
@pytest.mark.asyncio
async def test_decide_retries_transient_error_then_succeeds(tmp_path, monkeypatch) -> None:
provider = DummyProvider([
LLMResponse(content="429 rate limit", finish_reason="error"),
LLMResponse(
content="",
tool_calls=[
ToolCallRequest(
id="hb_1",
name="heartbeat",
arguments={"action": "run", "tasks": "check open tasks"},
)
],
),
])
delays: list[int] = []
async def _fake_sleep(delay: int) -> None:
delays.append(delay)
monkeypatch.setattr(asyncio, "sleep", _fake_sleep)
service = HeartbeatService(
workspace=tmp_path,
provider=provider,
model="openai/gpt-4o-mini",
)
action, tasks = await service._decide("heartbeat content")
assert action == "run"
assert tasks == "check open tasks"
assert provider.calls == 2
assert delays == [1]

99
tests/test_mcp_tool.py Normal file
View File

@@ -0,0 +1,99 @@
from __future__ import annotations
import asyncio
import sys
from types import ModuleType, SimpleNamespace
import pytest
from nanobot.agent.tools.mcp import MCPToolWrapper
class _FakeTextContent:
def __init__(self, text: str) -> None:
self.text = text
@pytest.fixture(autouse=True)
def _fake_mcp_module(monkeypatch: pytest.MonkeyPatch) -> None:
mod = ModuleType("mcp")
mod.types = SimpleNamespace(TextContent=_FakeTextContent)
monkeypatch.setitem(sys.modules, "mcp", mod)
def _make_wrapper(session: object, *, timeout: float = 0.1) -> MCPToolWrapper:
tool_def = SimpleNamespace(
name="demo",
description="demo tool",
inputSchema={"type": "object", "properties": {}},
)
return MCPToolWrapper(session, "test", tool_def, tool_timeout=timeout)
@pytest.mark.asyncio
async def test_execute_returns_text_blocks() -> None:
async def call_tool(_name: str, arguments: dict) -> object:
assert arguments == {"value": 1}
return SimpleNamespace(content=[_FakeTextContent("hello"), 42])
wrapper = _make_wrapper(SimpleNamespace(call_tool=call_tool))
result = await wrapper.execute(value=1)
assert result == "hello\n42"
@pytest.mark.asyncio
async def test_execute_returns_timeout_message() -> None:
async def call_tool(_name: str, arguments: dict) -> object:
await asyncio.sleep(1)
return SimpleNamespace(content=[])
wrapper = _make_wrapper(SimpleNamespace(call_tool=call_tool), timeout=0.01)
result = await wrapper.execute()
assert result == "(MCP tool call timed out after 0.01s)"
@pytest.mark.asyncio
async def test_execute_handles_server_cancelled_error() -> None:
async def call_tool(_name: str, arguments: dict) -> object:
raise asyncio.CancelledError()
wrapper = _make_wrapper(SimpleNamespace(call_tool=call_tool))
result = await wrapper.execute()
assert result == "(MCP tool call was cancelled)"
@pytest.mark.asyncio
async def test_execute_re_raises_external_cancellation() -> None:
started = asyncio.Event()
async def call_tool(_name: str, arguments: dict) -> object:
started.set()
await asyncio.sleep(60)
return SimpleNamespace(content=[])
wrapper = _make_wrapper(SimpleNamespace(call_tool=call_tool), timeout=10)
task = asyncio.create_task(wrapper.execute())
await started.wait()
task.cancel()
with pytest.raises(asyncio.CancelledError):
await task
@pytest.mark.asyncio
async def test_execute_handles_generic_exception() -> None:
async def call_tool(_name: str, arguments: dict) -> object:
raise RuntimeError("boom")
wrapper = _make_wrapper(SimpleNamespace(call_tool=call_tool))
result = await wrapper.execute()
assert result == "(MCP tool call failed: RuntimeError)"

View File

@@ -12,7 +12,7 @@ from unittest.mock import AsyncMock, MagicMock
import pytest import pytest
from nanobot.agent.memory import MemoryStore from nanobot.agent.memory import MemoryStore
from nanobot.providers.base import LLMResponse, ToolCallRequest from nanobot.providers.base import LLMProvider, LLMResponse, ToolCallRequest
def _make_session(message_count: int = 30, memory_window: int = 50): def _make_session(message_count: int = 30, memory_window: int = 50):
@@ -43,6 +43,22 @@ def _make_tool_response(history_entry, memory_update):
) )
class ScriptedProvider(LLMProvider):
def __init__(self, responses: list[LLMResponse]):
super().__init__()
self._responses = list(responses)
self.calls = 0
async def chat(self, *args, **kwargs) -> LLMResponse:
self.calls += 1
if self._responses:
return self._responses.pop(0)
return LLMResponse(content="", tool_calls=[])
def get_default_model(self) -> str:
return "test-model"
class TestMemoryConsolidationTypeHandling: class TestMemoryConsolidationTypeHandling:
"""Test that consolidation handles various argument types correctly.""" """Test that consolidation handles various argument types correctly."""
@@ -57,6 +73,7 @@ class TestMemoryConsolidationTypeHandling:
memory_update="# Memory\nUser likes testing.", memory_update="# Memory\nUser likes testing.",
) )
) )
provider.chat_with_retry = provider.chat
session = _make_session(message_count=60) session = _make_session(message_count=60)
result = await store.consolidate(session, provider, "test-model", memory_window=50) result = await store.consolidate(session, provider, "test-model", memory_window=50)
@@ -77,6 +94,7 @@ class TestMemoryConsolidationTypeHandling:
memory_update={"facts": ["User likes testing"], "topics": ["testing"]}, memory_update={"facts": ["User likes testing"], "topics": ["testing"]},
) )
) )
provider.chat_with_retry = provider.chat
session = _make_session(message_count=60) session = _make_session(message_count=60)
result = await store.consolidate(session, provider, "test-model", memory_window=50) result = await store.consolidate(session, provider, "test-model", memory_window=50)
@@ -112,6 +130,7 @@ class TestMemoryConsolidationTypeHandling:
], ],
) )
provider.chat = AsyncMock(return_value=response) provider.chat = AsyncMock(return_value=response)
provider.chat_with_retry = provider.chat
session = _make_session(message_count=60) session = _make_session(message_count=60)
result = await store.consolidate(session, provider, "test-model", memory_window=50) result = await store.consolidate(session, provider, "test-model", memory_window=50)
@@ -127,6 +146,7 @@ class TestMemoryConsolidationTypeHandling:
provider.chat = AsyncMock( provider.chat = AsyncMock(
return_value=LLMResponse(content="I summarized the conversation.", tool_calls=[]) return_value=LLMResponse(content="I summarized the conversation.", tool_calls=[])
) )
provider.chat_with_retry = provider.chat
session = _make_session(message_count=60) session = _make_session(message_count=60)
result = await store.consolidate(session, provider, "test-model", memory_window=50) result = await store.consolidate(session, provider, "test-model", memory_window=50)
@@ -139,6 +159,7 @@ class TestMemoryConsolidationTypeHandling:
"""Consolidation should be a no-op when messages < keep_count.""" """Consolidation should be a no-op when messages < keep_count."""
store = MemoryStore(tmp_path) store = MemoryStore(tmp_path)
provider = AsyncMock() provider = AsyncMock()
provider.chat_with_retry = provider.chat
session = _make_session(message_count=10) session = _make_session(message_count=10)
result = await store.consolidate(session, provider, "test-model", memory_window=50) result = await store.consolidate(session, provider, "test-model", memory_window=50)
@@ -167,6 +188,7 @@ class TestMemoryConsolidationTypeHandling:
], ],
) )
provider.chat = AsyncMock(return_value=response) provider.chat = AsyncMock(return_value=response)
provider.chat_with_retry = provider.chat
session = _make_session(message_count=60) session = _make_session(message_count=60)
result = await store.consolidate(session, provider, "test-model", memory_window=50) result = await store.consolidate(session, provider, "test-model", memory_window=50)
@@ -192,6 +214,7 @@ class TestMemoryConsolidationTypeHandling:
], ],
) )
provider.chat = AsyncMock(return_value=response) provider.chat = AsyncMock(return_value=response)
provider.chat_with_retry = provider.chat
session = _make_session(message_count=60) session = _make_session(message_count=60)
result = await store.consolidate(session, provider, "test-model", memory_window=50) result = await store.consolidate(session, provider, "test-model", memory_window=50)
@@ -215,8 +238,33 @@ class TestMemoryConsolidationTypeHandling:
], ],
) )
provider.chat = AsyncMock(return_value=response) provider.chat = AsyncMock(return_value=response)
provider.chat_with_retry = provider.chat
session = _make_session(message_count=60) session = _make_session(message_count=60)
result = await store.consolidate(session, provider, "test-model", memory_window=50) result = await store.consolidate(session, provider, "test-model", memory_window=50)
assert result is False assert result is False
@pytest.mark.asyncio
async def test_retries_transient_error_then_succeeds(self, tmp_path: Path, monkeypatch) -> None:
store = MemoryStore(tmp_path)
provider = ScriptedProvider([
LLMResponse(content="503 server error", finish_reason="error"),
_make_tool_response(
history_entry="[2026-01-01] User discussed testing.",
memory_update="# Memory\nUser likes testing.",
),
])
session = _make_session(message_count=60)
delays: list[int] = []
async def _fake_sleep(delay: int) -> None:
delays.append(delay)
monkeypatch.setattr("nanobot.providers.base.asyncio.sleep", _fake_sleep)
result = await store.consolidate(session, provider, "test-model", memory_window=50)
assert result is True
assert provider.calls == 2
assert delays == [1]

View File

@@ -0,0 +1,92 @@
import asyncio
import pytest
from nanobot.providers.base import LLMProvider, LLMResponse
class ScriptedProvider(LLMProvider):
def __init__(self, responses):
super().__init__()
self._responses = list(responses)
self.calls = 0
async def chat(self, *args, **kwargs) -> LLMResponse:
self.calls += 1
response = self._responses.pop(0)
if isinstance(response, BaseException):
raise response
return response
def get_default_model(self) -> str:
return "test-model"
@pytest.mark.asyncio
async def test_chat_with_retry_retries_transient_error_then_succeeds(monkeypatch) -> None:
provider = ScriptedProvider([
LLMResponse(content="429 rate limit", finish_reason="error"),
LLMResponse(content="ok"),
])
delays: list[int] = []
async def _fake_sleep(delay: int) -> None:
delays.append(delay)
monkeypatch.setattr("nanobot.providers.base.asyncio.sleep", _fake_sleep)
response = await provider.chat_with_retry(messages=[{"role": "user", "content": "hello"}])
assert response.finish_reason == "stop"
assert response.content == "ok"
assert provider.calls == 2
assert delays == [1]
@pytest.mark.asyncio
async def test_chat_with_retry_does_not_retry_non_transient_error(monkeypatch) -> None:
provider = ScriptedProvider([
LLMResponse(content="401 unauthorized", finish_reason="error"),
])
delays: list[int] = []
async def _fake_sleep(delay: int) -> None:
delays.append(delay)
monkeypatch.setattr("nanobot.providers.base.asyncio.sleep", _fake_sleep)
response = await provider.chat_with_retry(messages=[{"role": "user", "content": "hello"}])
assert response.content == "401 unauthorized"
assert provider.calls == 1
assert delays == []
@pytest.mark.asyncio
async def test_chat_with_retry_returns_final_error_after_retries(monkeypatch) -> None:
provider = ScriptedProvider([
LLMResponse(content="429 rate limit a", finish_reason="error"),
LLMResponse(content="429 rate limit b", finish_reason="error"),
LLMResponse(content="429 rate limit c", finish_reason="error"),
LLMResponse(content="503 final server error", finish_reason="error"),
])
delays: list[int] = []
async def _fake_sleep(delay: int) -> None:
delays.append(delay)
monkeypatch.setattr("nanobot.providers.base.asyncio.sleep", _fake_sleep)
response = await provider.chat_with_retry(messages=[{"role": "user", "content": "hello"}])
assert response.content == "503 final server error"
assert provider.calls == 4
assert delays == [1, 2, 4]
@pytest.mark.asyncio
async def test_chat_with_retry_preserves_cancelled_error() -> None:
provider = ScriptedProvider([asyncio.CancelledError()])
with pytest.raises(asyncio.CancelledError):
await provider.chat_with_retry(messages=[{"role": "user", "content": "hello"}])

View File

@@ -0,0 +1,127 @@
import importlib
import shutil
import sys
import zipfile
from pathlib import Path
SCRIPT_DIR = Path("nanobot/skills/skill-creator/scripts").resolve()
if str(SCRIPT_DIR) not in sys.path:
sys.path.insert(0, str(SCRIPT_DIR))
init_skill = importlib.import_module("init_skill")
package_skill = importlib.import_module("package_skill")
quick_validate = importlib.import_module("quick_validate")
def test_init_skill_creates_expected_files(tmp_path: Path) -> None:
skill_dir = init_skill.init_skill(
"demo-skill",
tmp_path,
["scripts", "references", "assets"],
include_examples=True,
)
assert skill_dir == tmp_path / "demo-skill"
assert (skill_dir / "SKILL.md").exists()
assert (skill_dir / "scripts" / "example.py").exists()
assert (skill_dir / "references" / "api_reference.md").exists()
assert (skill_dir / "assets" / "example_asset.txt").exists()
def test_validate_skill_accepts_existing_skill_creator() -> None:
valid, message = quick_validate.validate_skill(
Path("nanobot/skills/skill-creator").resolve()
)
assert valid, message
def test_validate_skill_rejects_placeholder_description(tmp_path: Path) -> None:
skill_dir = tmp_path / "placeholder-skill"
skill_dir.mkdir()
(skill_dir / "SKILL.md").write_text(
"---\n"
"name: placeholder-skill\n"
'description: "[TODO: fill me in]"\n'
"---\n"
"# Placeholder\n",
encoding="utf-8",
)
valid, message = quick_validate.validate_skill(skill_dir)
assert not valid
assert "TODO placeholder" in message
def test_validate_skill_rejects_root_files_outside_allowed_dirs(tmp_path: Path) -> None:
skill_dir = tmp_path / "bad-root-skill"
skill_dir.mkdir()
(skill_dir / "SKILL.md").write_text(
"---\n"
"name: bad-root-skill\n"
"description: Valid description\n"
"---\n"
"# Skill\n",
encoding="utf-8",
)
(skill_dir / "README.md").write_text("extra\n", encoding="utf-8")
valid, message = quick_validate.validate_skill(skill_dir)
assert not valid
assert "Unexpected file or directory in skill root" in message
def test_package_skill_creates_archive(tmp_path: Path) -> None:
skill_dir = tmp_path / "package-me"
skill_dir.mkdir()
(skill_dir / "SKILL.md").write_text(
"---\n"
"name: package-me\n"
"description: Package this skill.\n"
"---\n"
"# Skill\n",
encoding="utf-8",
)
scripts_dir = skill_dir / "scripts"
scripts_dir.mkdir()
(scripts_dir / "helper.py").write_text("print('ok')\n", encoding="utf-8")
archive_path = package_skill.package_skill(skill_dir, tmp_path / "dist")
assert archive_path == (tmp_path / "dist" / "package-me.skill")
assert archive_path.exists()
with zipfile.ZipFile(archive_path, "r") as archive:
names = set(archive.namelist())
assert "package-me/SKILL.md" in names
assert "package-me/scripts/helper.py" in names
def test_package_skill_rejects_symlink(tmp_path: Path) -> None:
skill_dir = tmp_path / "symlink-skill"
skill_dir.mkdir()
(skill_dir / "SKILL.md").write_text(
"---\n"
"name: symlink-skill\n"
"description: Reject symlinks during packaging.\n"
"---\n"
"# Skill\n",
encoding="utf-8",
)
scripts_dir = skill_dir / "scripts"
scripts_dir.mkdir()
target = tmp_path / "outside.txt"
target.write_text("secret\n", encoding="utf-8")
link = scripts_dir / "outside.txt"
try:
link.symlink_to(target)
except (OSError, NotImplementedError):
return
archive_path = package_skill.package_skill(skill_dir, tmp_path / "dist")
assert archive_path is None
assert not (tmp_path / "dist" / "symlink-skill.skill").exists()

View File

@@ -0,0 +1,90 @@
from __future__ import annotations
import pytest
from nanobot.bus.events import OutboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.channels.slack import SlackChannel
from nanobot.config.schema import SlackConfig
class _FakeAsyncWebClient:
def __init__(self) -> None:
self.chat_post_calls: list[dict[str, object | None]] = []
self.file_upload_calls: list[dict[str, object | None]] = []
async def chat_postMessage(
self,
*,
channel: str,
text: str,
thread_ts: str | None = None,
) -> None:
self.chat_post_calls.append(
{
"channel": channel,
"text": text,
"thread_ts": thread_ts,
}
)
async def files_upload_v2(
self,
*,
channel: str,
file: str,
thread_ts: str | None = None,
) -> None:
self.file_upload_calls.append(
{
"channel": channel,
"file": file,
"thread_ts": thread_ts,
}
)
@pytest.mark.asyncio
async def test_send_uses_thread_for_channel_messages() -> None:
channel = SlackChannel(SlackConfig(enabled=True), MessageBus())
fake_web = _FakeAsyncWebClient()
channel._web_client = fake_web
await channel.send(
OutboundMessage(
channel="slack",
chat_id="C123",
content="hello",
media=["/tmp/demo.txt"],
metadata={"slack": {"thread_ts": "1700000000.000100", "channel_type": "channel"}},
)
)
assert len(fake_web.chat_post_calls) == 1
assert fake_web.chat_post_calls[0]["text"] == "hello\n"
assert fake_web.chat_post_calls[0]["thread_ts"] == "1700000000.000100"
assert len(fake_web.file_upload_calls) == 1
assert fake_web.file_upload_calls[0]["thread_ts"] == "1700000000.000100"
@pytest.mark.asyncio
async def test_send_omits_thread_for_dm_messages() -> None:
channel = SlackChannel(SlackConfig(enabled=True), MessageBus())
fake_web = _FakeAsyncWebClient()
channel._web_client = fake_web
await channel.send(
OutboundMessage(
channel="slack",
chat_id="D123",
content="hello",
media=["/tmp/demo.txt"],
metadata={"slack": {"thread_ts": "1700000000.000100", "channel_type": "im"}},
)
)
assert len(fake_web.chat_post_calls) == 1
assert fake_web.chat_post_calls[0]["text"] == "hello\n"
assert fake_web.chat_post_calls[0]["thread_ts"] is None
assert len(fake_web.file_upload_calls) == 1
assert fake_web.file_upload_calls[0]["thread_ts"] is None

View File

@@ -27,9 +27,11 @@ class _FakeUpdater:
class _FakeBot: class _FakeBot:
def __init__(self) -> None: def __init__(self) -> None:
self.sent_messages: list[dict] = [] self.sent_messages: list[dict] = []
self.get_me_calls = 0
async def get_me(self): async def get_me(self):
return SimpleNamespace(username="nanobot_test") self.get_me_calls += 1
return SimpleNamespace(id=999, username="nanobot_test")
async def set_my_commands(self, commands) -> None: async def set_my_commands(self, commands) -> None:
self.commands = commands self.commands = commands
@@ -37,6 +39,9 @@ class _FakeBot:
async def send_message(self, **kwargs) -> None: async def send_message(self, **kwargs) -> None:
self.sent_messages.append(kwargs) self.sent_messages.append(kwargs)
async def send_chat_action(self, **kwargs) -> None:
pass
class _FakeApp: class _FakeApp:
def __init__(self, on_start_polling) -> None: def __init__(self, on_start_polling) -> None:
@@ -87,6 +92,35 @@ class _FakeBuilder:
return self.app return self.app
def _make_telegram_update(
*,
chat_type: str = "group",
text: str | None = None,
caption: str | None = None,
entities=None,
caption_entities=None,
reply_to_message=None,
):
user = SimpleNamespace(id=12345, username="alice", first_name="Alice")
message = SimpleNamespace(
chat=SimpleNamespace(type=chat_type, is_forum=False),
chat_id=-100123,
text=text,
caption=caption,
entities=entities or [],
caption_entities=caption_entities or [],
reply_to_message=reply_to_message,
photo=None,
voice=None,
audio=None,
document=None,
media_group_id=None,
message_thread_id=None,
message_id=1,
)
return SimpleNamespace(message=message, effective_user=user)
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_start_uses_request_proxy_without_builder_proxy(monkeypatch) -> None: async def test_start_uses_request_proxy_without_builder_proxy(monkeypatch) -> None:
config = TelegramConfig( config = TelegramConfig(
@@ -131,6 +165,10 @@ def test_get_extension_falls_back_to_original_filename() -> None:
assert channel._get_extension("file", None, "archive.tar.gz") == ".tar.gz" assert channel._get_extension("file", None, "archive.tar.gz") == ".tar.gz"
def test_telegram_group_policy_defaults_to_mention() -> None:
assert TelegramConfig().group_policy == "mention"
def test_is_allowed_accepts_legacy_telegram_id_username_formats() -> None: def test_is_allowed_accepts_legacy_telegram_id_username_formats() -> None:
channel = TelegramChannel(TelegramConfig(allow_from=["12345", "alice", "67890|bob"]), MessageBus()) channel = TelegramChannel(TelegramConfig(allow_from=["12345", "alice", "67890|bob"]), MessageBus())
@@ -182,3 +220,119 @@ async def test_send_reply_infers_topic_from_message_id_cache() -> None:
assert channel._app.bot.sent_messages[0]["message_thread_id"] == 42 assert channel._app.bot.sent_messages[0]["message_thread_id"] == 42
assert channel._app.bot.sent_messages[0]["reply_parameters"].message_id == 10 assert channel._app.bot.sent_messages[0]["reply_parameters"].message_id == 10
@pytest.mark.asyncio
async def test_group_policy_mention_ignores_unmentioned_group_message() -> None:
channel = TelegramChannel(
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"], group_policy="mention"),
MessageBus(),
)
channel._app = _FakeApp(lambda: None)
handled = []
async def capture_handle(**kwargs) -> None:
handled.append(kwargs)
channel._handle_message = capture_handle
channel._start_typing = lambda _chat_id: None
await channel._on_message(_make_telegram_update(text="hello everyone"), None)
assert handled == []
assert channel._app.bot.get_me_calls == 1
@pytest.mark.asyncio
async def test_group_policy_mention_accepts_text_mention_and_caches_bot_identity() -> None:
channel = TelegramChannel(
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"], group_policy="mention"),
MessageBus(),
)
channel._app = _FakeApp(lambda: None)
handled = []
async def capture_handle(**kwargs) -> None:
handled.append(kwargs)
channel._handle_message = capture_handle
channel._start_typing = lambda _chat_id: None
mention = SimpleNamespace(type="mention", offset=0, length=13)
await channel._on_message(_make_telegram_update(text="@nanobot_test hi", entities=[mention]), None)
await channel._on_message(_make_telegram_update(text="@nanobot_test again", entities=[mention]), None)
assert len(handled) == 2
assert channel._app.bot.get_me_calls == 1
@pytest.mark.asyncio
async def test_group_policy_mention_accepts_caption_mention() -> None:
channel = TelegramChannel(
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"], group_policy="mention"),
MessageBus(),
)
channel._app = _FakeApp(lambda: None)
handled = []
async def capture_handle(**kwargs) -> None:
handled.append(kwargs)
channel._handle_message = capture_handle
channel._start_typing = lambda _chat_id: None
mention = SimpleNamespace(type="mention", offset=0, length=13)
await channel._on_message(
_make_telegram_update(caption="@nanobot_test photo", caption_entities=[mention]),
None,
)
assert len(handled) == 1
assert handled[0]["content"] == "@nanobot_test photo"
@pytest.mark.asyncio
async def test_group_policy_mention_accepts_reply_to_bot() -> None:
channel = TelegramChannel(
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"], group_policy="mention"),
MessageBus(),
)
channel._app = _FakeApp(lambda: None)
handled = []
async def capture_handle(**kwargs) -> None:
handled.append(kwargs)
channel._handle_message = capture_handle
channel._start_typing = lambda _chat_id: None
reply = SimpleNamespace(from_user=SimpleNamespace(id=999))
await channel._on_message(_make_telegram_update(text="reply", reply_to_message=reply), None)
assert len(handled) == 1
@pytest.mark.asyncio
async def test_group_policy_open_accepts_plain_group_message() -> None:
channel = TelegramChannel(
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"], group_policy="open"),
MessageBus(),
)
channel._app = _FakeApp(lambda: None)
handled = []
async def capture_handle(**kwargs) -> None:
handled.append(kwargs)
channel._handle_message = capture_handle
channel._start_typing = lambda _chat_id: None
await channel._on_message(_make_telegram_update(text="hello group"), None)
assert len(handled) == 1
assert channel._app.bot.get_me_calls == 0