refactor(agent): split slash commands and harden skill sync
Some checks failed
Test Suite / test (3.11) (push) Failing after 1m11s
Test Suite / test (3.12) (push) Failing after 1m16s
Test Suite / test (3.13) (push) Failing after 1m14s

This commit is contained in:
Hua
2026-03-24 13:56:02 +08:00
parent 270dff3d7f
commit 15f7d15108
16 changed files with 1140 additions and 422 deletions

View File

@@ -40,8 +40,10 @@ Do not commit real API keys, tokens, chat logs, or workspace data. Keep local se
- `channels.voiceReply` currently adds TTS attachments on supported outbound channels such as Telegram, and QQ when the configured TTS endpoint returns `silk`. Preserve plain-text fallback when QQ voice requirements are not met. - `channels.voiceReply` currently adds TTS attachments on supported outbound channels such as Telegram, and QQ when the configured TTS endpoint returns `silk`. Preserve plain-text fallback when QQ voice requirements are not met.
- Voice replies should follow the active session persona. Build TTS style instructions from the resolved persona's prompt files, and allow optional persona-local overrides from `VOICE.json` under the persona workspace (`<workspace>/VOICE.json` for default, `<workspace>/personas/<name>/VOICE.json` for custom personas). - Voice replies should follow the active session persona. Build TTS style instructions from the resolved persona's prompt files, and allow optional persona-local overrides from `VOICE.json` under the persona workspace (`<workspace>/VOICE.json` for default, `<workspace>/personas/<name>/VOICE.json` for custom personas).
- `channels.voiceReply.url` may override the TTS endpoint independently of the chat model provider. When omitted, fall back to the active conversation provider URL. Keep `apiBase` accepted as a compatibility alias. - `channels.voiceReply.url` may override the TTS endpoint independently of the chat model provider. When omitted, fall back to the active conversation provider URL. Keep `apiBase` accepted as a compatibility alias.
- `/skill` shells out to `npx clawhub@latest`; it requires Node.js/`npx` at runtime. - `/skill search` queries `https://lightmake.site/api/skills` directly with SkillHub-compatible query params (`page`, `pageSize`, `sortBy`, `order`, `keyword`) and does not require Node.js.
- `/skill uninstall` runs in a non-interactive context, so keep passing `--yes` when shelling out to ClawHub. - `/skill` shells out to `npx clawhub@latest` for `install`, `list`, and `update`; those subcommands still require Node.js/`npx` at runtime.
- Keep ClawHub global options first when shelling out: `--workdir <workspace> --no-input ...`.
- `/skill uninstall` is local workspace cleanup, not a ClawHub subprocess call. Remove `<workspace>/skills/<slug>` and best-effort prune `<workspace>/.clawhub/lock.json`.
- Treat empty `/skill search` output as a user-visible "no results" case rather than a silent success. Surface npm/registry failures directly to the user. - Treat empty `/skill search` output as a user-visible "no results" case rather than a silent success. Surface npm/registry failures directly to the user.
- Never hardcode `~/.nanobot/workspace` for skill installation or lookup. Use the active runtime workspace from config or `--workspace`. - Never hardcode `~/.nanobot/workspace` for skill installation or lookup. Use the active runtime workspace from config or `--workspace`.
- Workspace skills in `<workspace>/skills/` take precedence over built-in skills with the same directory name. - Workspace skills in `<workspace>/skills/` take precedence over built-in skills with the same directory name.

View File

@@ -1603,7 +1603,7 @@ These commands are available inside chats handled by `nanobot agent` or `nanobot
| `/persona set <name>` | Switch persona and start a new session | | `/persona set <name>` | Switch persona and start a new session |
| `/skill search <query>` | Search public skills on ClawHub | | `/skill search <query>` | Search public skills on ClawHub |
| `/skill install <slug>` | Install a ClawHub skill into the active workspace | | `/skill install <slug>` | Install a ClawHub skill into the active workspace |
| `/skill uninstall <slug>` | Remove a ClawHub-managed skill from the active workspace | | `/skill uninstall <slug>` | Remove a locally installed workspace skill from the active workspace |
| `/skill list` | List ClawHub-managed skills in the active workspace | | `/skill list` | List ClawHub-managed skills in the active workspace |
| `/skill update` | Update all ClawHub-managed skills in the active workspace | | `/skill update` | Update all ClawHub-managed skills in the active workspace |
| `/mcp [list]` | List configured MCP servers and registered MCP tools | | `/mcp [list]` | List configured MCP servers and registered MCP tools |
@@ -1616,10 +1616,20 @@ These commands are available inside chats handled by `nanobot agent` or `nanobot
`~/.nanobot/workspace` path. If you start nanobot with `--workspace`, skill install/uninstall/list/update `~/.nanobot/workspace` path. If you start nanobot with `--workspace`, skill install/uninstall/list/update
operate on that workspace's `skills/` directory. operate on that workspace's `skills/` directory.
`/skill search` queries the live ClawHub registry API directly at
`https://lightmake.site/api/skills` using the same sort order as the SkillHub web UI, so search
does not depend on `npm` or `npx`.
For `install`, `list`, and `update`, nanobot still shells out to `npx clawhub@latest`
using ClawHub global options first: `--workdir <workspace> --no-input ...`. `/skill uninstall`
removes the local `<workspace>/skills/<slug>` directory directly and best-effort prunes
`<workspace>/.clawhub/lock.json`, because current ClawHub docs do not document an uninstall
subcommand.
`/skill search` can legitimately return no matches. In that case nanobot now replies with a `/skill search` can legitimately return no matches. In that case nanobot now replies with a
clear "no skills found" message instead of leaving the channel on a transient searching state. clear "no skills found" message instead of leaving the channel on a transient searching state.
If `npx clawhub@latest` cannot reach the npm registry, nanobot also surfaces the registry/network If the ClawHub registry API or `npx clawhub@latest` cannot be reached, nanobot also surfaces the
error directly so the failure is visible to the user. underlying network or HTTP error directly so the failure is visible to the user.
<details> <details>
<summary><b>Heartbeat (Periodic Tasks)</b></summary> <summary><b>Heartbeat (Periodic Tasks)</b></summary>

View File

@@ -0,0 +1,17 @@
"""Command handlers for AgentLoop slash commands."""
from nanobot.agent.commands.language import LanguageCommandHandler
from nanobot.agent.commands.mcp import MCPCommandHandler
from nanobot.agent.commands.persona import PersonaCommandHandler
from nanobot.agent.commands.router import build_agent_command_router
from nanobot.agent.commands.skill import SkillCommandHandler
from nanobot.agent.commands.system import SystemCommandHandler
__all__ = [
"LanguageCommandHandler",
"MCPCommandHandler",
"PersonaCommandHandler",
"SkillCommandHandler",
"SystemCommandHandler",
"build_agent_command_router",
]

View File

@@ -0,0 +1,62 @@
"""Language command helpers for AgentLoop."""
from __future__ import annotations
from typing import TYPE_CHECKING
from nanobot.agent.i18n import language_label, list_languages, normalize_language_code, text
from nanobot.bus.events import InboundMessage, OutboundMessage
if TYPE_CHECKING:
from nanobot.agent.loop import AgentLoop
from nanobot.session.manager import Session
class LanguageCommandHandler:
"""Encapsulates `/lang` subcommand behavior for AgentLoop."""
def __init__(self, loop: AgentLoop) -> None:
self.loop = loop
@staticmethod
def _response(msg: InboundMessage, content: str) -> OutboundMessage:
return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, content=content)
def current(self, msg: InboundMessage, session: Session) -> OutboundMessage:
current = self.loop._get_session_language(session)
return self._response(
msg,
text(current, "current_language", language_name=language_label(current, current)),
)
def list(self, msg: InboundMessage, session: Session) -> OutboundMessage:
current = self.loop._get_session_language(session)
items = "\n".join(
f"- {language_label(code, current)}"
+ (f" ({text(current, 'current_marker')})" if code == current else "")
for code in list_languages()
)
return self._response(msg, text(current, "available_languages", items=items))
def set(self, msg: InboundMessage, session: Session, target_raw: str) -> OutboundMessage:
current = self.loop._get_session_language(session)
target = normalize_language_code(target_raw)
if target is None:
languages = ", ".join(language_label(code, current) for code in list_languages())
return self._response(
msg,
text(current, "unknown_language", name=target_raw, languages=languages),
)
if target == current:
return self._response(
msg,
text(current, "language_already_active", language_name=language_label(target, current)),
)
self.loop._set_session_language(session, target)
self.loop.sessions.save(session)
return self._response(
msg,
text(target, "switched_language", language_name=language_label(target, target)),
)

View File

@@ -0,0 +1,64 @@
"""MCP command helpers for AgentLoop."""
from __future__ import annotations
from typing import TYPE_CHECKING
from nanobot.agent.i18n import text
from nanobot.bus.events import InboundMessage, OutboundMessage
if TYPE_CHECKING:
from nanobot.agent.loop import AgentLoop
class MCPCommandHandler:
"""Encapsulates `/mcp` subcommand behavior for AgentLoop."""
def __init__(self, loop: AgentLoop) -> None:
self.loop = loop
@staticmethod
def _response(msg: InboundMessage, content: str) -> OutboundMessage:
return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, content=content)
def _group_mcp_tool_names(self) -> dict[str, list[str]]:
"""Group registered MCP tool names by configured server name."""
grouped = {name: [] for name in self.loop._mcp_servers}
server_names = sorted(self.loop._mcp_servers, key=len, reverse=True)
for tool_name in self.loop.tools.tool_names:
if not tool_name.startswith("mcp_"):
continue
for server_name in server_names:
prefix = f"mcp_{server_name}_"
if tool_name.startswith(prefix):
grouped[server_name].append(tool_name.removeprefix(prefix))
break
return {name: sorted(tools) for name, tools in grouped.items()}
async def list(self, msg: InboundMessage, language: str) -> OutboundMessage:
await self.loop._reload_mcp_servers_if_needed()
if not self.loop._mcp_servers:
return self._response(msg, text(language, "mcp_no_servers"))
await self.loop._connect_mcp()
server_lines = "\n".join(f"- {name}" for name in self.loop._mcp_servers)
sections = [text(language, "mcp_servers_list", items=server_lines)]
grouped_tools = self._group_mcp_tool_names()
tool_lines = "\n".join(
f"- {server}: {', '.join(tools)}"
for server, tools in grouped_tools.items()
if tools
)
sections.append(
text(language, "mcp_tools_list", items=tool_lines)
if tool_lines
else text(language, "mcp_no_tools")
)
return self._response(msg, "\n\n".join(sections))

View File

@@ -0,0 +1,76 @@
"""Persona command helpers for AgentLoop."""
from __future__ import annotations
from typing import TYPE_CHECKING
from loguru import logger
from nanobot.agent.i18n import text
from nanobot.bus.events import InboundMessage, OutboundMessage
if TYPE_CHECKING:
from nanobot.agent.loop import AgentLoop
from nanobot.session.manager import Session
class PersonaCommandHandler:
"""Encapsulates `/persona` subcommand behavior for AgentLoop."""
def __init__(self, loop: AgentLoop) -> None:
self.loop = loop
@staticmethod
def _response(msg: InboundMessage, content: str) -> OutboundMessage:
return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, content=content)
def current(self, msg: InboundMessage, session: Session) -> OutboundMessage:
language = self.loop._get_session_language(session)
current = self.loop._get_session_persona(session)
return self._response(msg, text(language, "current_persona", persona=current))
def list(self, msg: InboundMessage, session: Session) -> OutboundMessage:
language = self.loop._get_session_language(session)
current = self.loop._get_session_persona(session)
marker = text(language, "current_marker")
personas = [
f"{name} ({marker})" if name == current else name
for name in self.loop.context.list_personas()
]
return self._response(
msg,
text(language, "available_personas", items="\n".join(f"- {name}" for name in personas)),
)
async def set(self, msg: InboundMessage, session: Session, target_raw: str) -> OutboundMessage:
language = self.loop._get_session_language(session)
target = self.loop.context.find_persona(target_raw)
if target is None:
personas = ", ".join(self.loop.context.list_personas())
return self._response(
msg,
text(
language,
"unknown_persona",
name=target_raw,
personas=personas,
path=self.loop.workspace / "personas" / target_raw,
),
)
current = self.loop._get_session_persona(session)
if target == current:
return self._response(msg, text(language, "persona_already_active", persona=target))
try:
if not await self.loop.memory_consolidator.archive_unconsolidated(session):
return self._response(msg, text(language, "memory_archival_failed_persona"))
except Exception:
logger.exception("/persona archival failed for {}", session.key)
return self._response(msg, text(language, "memory_archival_failed_persona"))
session.clear()
self.loop._set_session_persona(session, target)
self.loop.sessions.save(session)
self.loop.sessions.invalidate(session.key)
return self._response(msg, text(language, "switched_persona", persona=target))

View File

@@ -0,0 +1,86 @@
"""AgentLoop slash-command router registration."""
from __future__ import annotations
from typing import TYPE_CHECKING
from nanobot.command.router import CommandContext, CommandRouter
if TYPE_CHECKING:
from nanobot.bus.events import OutboundMessage
def _session(ctx: CommandContext):
return ctx.session or ctx.loop.sessions.get_or_create(ctx.key)
async def _cmd_status(ctx: CommandContext) -> OutboundMessage:
session = _session(ctx)
return ctx.loop._system_commands.status(ctx.msg, session)
async def _cmd_new(ctx: CommandContext) -> OutboundMessage:
session = _session(ctx)
language = ctx.loop._get_session_language(session)
return ctx.loop._system_commands.new_session(ctx.msg, session, language)
async def _cmd_help(ctx: CommandContext) -> OutboundMessage:
session = _session(ctx)
language = ctx.loop._get_session_language(session)
return ctx.loop._system_commands.help(ctx.msg, language)
async def _cmd_lang(ctx: CommandContext):
return await ctx.loop._handle_language_command(ctx.msg, _session(ctx))
async def _cmd_persona(ctx: CommandContext):
return await ctx.loop._handle_persona_command(ctx.msg, _session(ctx))
async def _cmd_skill(ctx: CommandContext):
return await ctx.loop._handle_skill_command(ctx.msg, _session(ctx))
async def _cmd_mcp(ctx: CommandContext):
return await ctx.loop._handle_mcp_command(ctx.msg, _session(ctx))
async def _cmd_stop_priority(ctx: CommandContext):
await ctx.loop._handle_stop(ctx.msg)
return None
async def _cmd_restart_priority(ctx: CommandContext):
await ctx.loop._handle_restart(ctx.msg)
return None
def build_agent_command_router() -> CommandRouter:
"""Create the slash-command router used by AgentLoop."""
router = CommandRouter()
router.priority("/stop", _cmd_stop_priority)
router.priority("/restart", _cmd_restart_priority)
router.priority("/status", _cmd_status)
router.exact("/new", _cmd_new)
router.exact("/status", _cmd_status)
router.exact("/help", _cmd_help)
router.exact("/lang", _cmd_lang)
router.exact("/language", _cmd_lang)
router.prefix("/lang ", _cmd_lang)
router.prefix("/language ", _cmd_lang)
router.exact("/persona", _cmd_persona)
router.prefix("/persona ", _cmd_persona)
router.exact("/skill", _cmd_skill)
router.prefix("/skill ", _cmd_skill)
router.exact("/mcp", _cmd_mcp)
router.prefix("/mcp ", _cmd_mcp)
return router

View File

@@ -0,0 +1,434 @@
"""Skill command helpers for AgentLoop."""
from __future__ import annotations
import asyncio
import json
import os
import shutil
from pathlib import Path
from typing import TYPE_CHECKING, Any
import httpx
from loguru import logger
from nanobot.agent.i18n import text
from nanobot.bus.events import InboundMessage, OutboundMessage
from nanobot.utils.helpers import ensure_dir
if TYPE_CHECKING:
from nanobot.agent.loop import AgentLoop
class SkillCommandHandler:
"""Encapsulates `/skill` subcommand behavior for AgentLoop."""
def __init__(self, loop: AgentLoop) -> None:
self.loop = loop
@staticmethod
def _response(msg: InboundMessage, content: str) -> OutboundMessage:
return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, content=content)
@staticmethod
def _decode_subprocess_output(data: bytes) -> str:
"""Decode subprocess output conservatively for CLI surfacing."""
return data.decode("utf-8", errors="replace").strip()
def _is_clawhub_network_error(self, output: str) -> bool:
lowered = output.lower()
return any(marker in lowered for marker in self.loop._CLAWHUB_NETWORK_ERROR_MARKERS)
def _format_clawhub_error(self, language: str, code: int, output: str) -> str:
if output and self._is_clawhub_network_error(output):
return "\n\n".join([text(language, "skill_command_network_failed"), output])
return output or text(language, "skill_command_failed", code=code)
@staticmethod
def _clawhub_search_headers(language: str) -> dict[str, str]:
accept_language = "zh-CN,zh;q=0.9,en;q=0.8" if language.startswith("zh") else "en-US,en;q=0.9"
return {
"accept": "*/*",
"accept-language": accept_language,
"origin": "https://skillhub.tencent.com",
"referer": "https://skillhub.tencent.com/",
}
def _format_clawhub_search_results(
self,
language: str,
query: str,
skills: list[dict[str, Any]],
total: int,
) -> str:
blocks = [
text(
language,
"skill_search_results_header",
query=query,
total=total,
count=len(skills),
)
]
description_key = "description_zh" if language.startswith("zh") else "description"
for index, skill in enumerate(skills, start=1):
name = str(skill.get("name") or skill.get("slug") or f"skill-{index}")
slug = str(skill.get("slug") or "-")
owner = str(skill.get("ownerName") or "-")
installs = str(skill.get("installs") or 0)
stars = str(skill.get("stars") or 0)
version = str(skill.get("version") or "-")
description = str(
skill.get(description_key) or skill.get("description") or skill.get("description_zh") or ""
).strip()
homepage = str(skill.get("homepage") or "").strip()
lines = [
f"{index}. {name}",
text(
language,
"skill_search_result_meta",
slug=slug,
owner=owner,
installs=installs,
stars=stars,
version=version,
),
]
if description:
lines.append(description)
if homepage:
lines.append(homepage)
blocks.append("\n".join(lines))
return "\n\n".join(blocks)
async def _search_clawhub(
self,
language: str,
query: str,
) -> tuple[int, str]:
params = {
"page": "1",
"pageSize": str(self.loop._CLAWHUB_SEARCH_LIMIT),
"sortBy": "score",
"order": "desc",
"keyword": query,
}
try:
async with httpx.AsyncClient(
proxy=self.loop.web_proxy,
follow_redirects=True,
timeout=self.loop._CLAWHUB_SEARCH_TIMEOUT_SECONDS,
) as client:
response = await client.get(
self.loop._CLAWHUB_SEARCH_API_URL,
params=params,
headers=self._clawhub_search_headers(language),
)
response.raise_for_status()
except httpx.TimeoutException:
return 124, text(language, "skill_search_timeout")
except httpx.HTTPStatusError as exc:
details = exc.response.text.strip()
message = text(language, "skill_search_failed_status", status=exc.response.status_code)
return exc.response.status_code, "\n\n".join(part for part in [message, details] if part)
except httpx.RequestError as exc:
return 1, "\n\n".join([text(language, "skill_search_request_failed"), str(exc)])
try:
payload = response.json()
except ValueError:
return 1, text(language, "skill_search_invalid_response")
if not isinstance(payload, dict):
return 1, text(language, "skill_search_invalid_response")
if payload.get("code") != 0:
details = str(payload.get("message") or "").strip()
return 1, "\n\n".join(
part for part in [text(language, "skill_search_failed"), details] if part
)
data = payload.get("data")
if not isinstance(data, dict):
return 1, text(language, "skill_search_invalid_response")
skills = data.get("skills")
if not isinstance(skills, list):
return 1, text(language, "skill_search_invalid_response")
total = data.get("total")
if not isinstance(total, int):
total = len(skills)
if not skills:
return 0, ""
return 0, self._format_clawhub_search_results(language, query, skills, total)
def _clawhub_env(self) -> dict[str, str]:
"""Configure npm so ClawHub fails fast and uses a writable cache directory."""
env = os.environ.copy()
env.setdefault("NO_COLOR", "1")
env.setdefault("FORCE_COLOR", "0")
env.setdefault("npm_config_cache", str(ensure_dir(self.loop._clawhub_npm_cache_dir)))
env.setdefault("npm_config_update_notifier", "false")
env.setdefault("npm_config_audit", "false")
env.setdefault("npm_config_fund", "false")
env.setdefault("npm_config_fetch_retries", "0")
env.setdefault("npm_config_fetch_timeout", "5000")
env.setdefault("npm_config_fetch_retry_mintimeout", "1000")
env.setdefault("npm_config_fetch_retry_maxtimeout", "5000")
return env
def _is_clawhub_cache_error(self, output: str) -> bool:
lowered = output.lower()
return any(marker in lowered for marker in self.loop._CLAWHUB_CACHE_ERROR_MARKERS) and (
"_npx/" in lowered or "_npx\\" in lowered
)
@staticmethod
def _clear_clawhub_exec_cache(env: dict[str, str]) -> None:
"""Clear npm's temporary exec installs without wiping the shared tarball cache."""
cache_root = env.get("npm_config_cache")
if not cache_root:
return
shutil.rmtree(Path(cache_root) / "_npx", ignore_errors=True)
async def _run_clawhub_once(
self,
npx: str,
env: dict[str, str],
*args: str,
timeout_seconds: int | None = None,
) -> tuple[int, str]:
"""Run one ClawHub subprocess attempt and return (exit_code, combined_output)."""
proc = None
try:
proc = await asyncio.create_subprocess_exec(
npx,
"--yes",
"clawhub@latest",
*args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env,
)
stdout, stderr = await asyncio.wait_for(
proc.communicate(),
timeout=timeout_seconds or self.loop._CLAWHUB_TIMEOUT_SECONDS,
)
except FileNotFoundError:
raise
except asyncio.TimeoutError:
if proc is not None and proc.returncode is None:
proc.kill()
await proc.communicate()
raise
except asyncio.CancelledError:
if proc is not None and proc.returncode is None:
proc.kill()
await proc.communicate()
raise
output_parts = [
self._decode_subprocess_output(stdout),
self._decode_subprocess_output(stderr),
]
output = "\n".join(part for part in output_parts if part).strip()
return proc.returncode or 0, output
@staticmethod
def _clawhub_args(workspace: str, *args: str) -> tuple[str, ...]:
"""Build ClawHub CLI args with global options first for consistent parsing."""
return ("--workdir", workspace, "--no-input", *args)
@staticmethod
def _is_valid_skill_slug(slug: str) -> bool:
"""Validate a workspace skill slug for local install/remove operations."""
return bool(slug) and slug not in {".", ".."} and "/" not in slug and "\\" not in slug
def _prune_clawhub_lockfile(self, slug: str) -> bool:
"""Best-effort removal of a skill entry from the local ClawHub lockfile."""
lock_path = self.loop.workspace / ".clawhub" / "lock.json"
if not lock_path.exists():
return False
data = json.loads(lock_path.read_text(encoding="utf-8"))
if not isinstance(data, dict):
return False
changed = False
skills = data.get("skills")
if isinstance(skills, dict) and slug in skills:
del skills[slug]
changed = True
elif isinstance(skills, list):
filtered = [
item
for item in skills
if not (
item == slug
or (isinstance(item, dict) and (item.get("slug") == slug or item.get("name") == slug))
)
]
if len(filtered) != len(skills):
data["skills"] = filtered
changed = True
if changed:
lock_path.write_text(json.dumps(data, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
return changed
async def _run_clawhub(
self, language: str, *args: str, timeout_seconds: int | None = None,
) -> tuple[int, str]:
"""Run the ClawHub CLI and return (exit_code, combined_output)."""
npx = shutil.which("npx")
if not npx:
return 127, text(language, "skill_npx_missing")
env = self._clawhub_env()
try:
async with self.loop._clawhub_lock:
code, output = await self._run_clawhub_once(
npx,
env,
*args,
timeout_seconds=timeout_seconds,
)
if code != 0 and self._is_clawhub_cache_error(output):
logger.warning(
"Retrying ClawHub command after clearing npm exec cache at {}",
env["npm_config_cache"],
)
self._clear_clawhub_exec_cache(env)
code, output = await self._run_clawhub_once(
npx,
env,
*args,
timeout_seconds=timeout_seconds,
)
except FileNotFoundError:
return 127, text(language, "skill_npx_missing")
except asyncio.TimeoutError:
return 124, text(language, "skill_command_timeout")
except asyncio.CancelledError:
raise
return code, output
def _format_skill_command_success(
self,
language: str,
subcommand: str,
output: str,
*,
include_workspace_note: bool = False,
) -> str:
notes: list[str] = []
if output:
notes.append(output)
if include_workspace_note:
notes.append(text(language, "skill_applied_to_workspace", workspace=self.loop.workspace))
return "\n\n".join(notes) if notes else text(language, "skill_command_completed", command=subcommand)
async def _run_skill_clawhub_command(
self,
msg: InboundMessage,
language: str,
subcommand: str,
*args: str,
timeout_seconds: int | None = None,
include_workspace_note: bool = False,
) -> OutboundMessage:
code, output = await self._run_clawhub(
language,
*self._clawhub_args(str(self.loop.workspace), *args),
timeout_seconds=timeout_seconds,
)
if code != 0:
return self._response(msg, self._format_clawhub_error(language, code, output))
return self._response(
msg,
self._format_skill_command_success(
language,
subcommand,
output,
include_workspace_note=include_workspace_note,
),
)
async def search(self, msg: InboundMessage, language: str, query: str) -> OutboundMessage:
code, output = await self._search_clawhub(language, query)
if code != 0:
return self._response(msg, output or text(language, "skill_search_failed"))
if not output:
return self._response(msg, text(language, "skill_search_no_results", query=query))
return self._response(msg, output)
async def install(self, msg: InboundMessage, language: str, slug: str) -> OutboundMessage:
return await self._run_skill_clawhub_command(
msg,
language,
"install",
"install",
slug,
timeout_seconds=self.loop._CLAWHUB_INSTALL_TIMEOUT_SECONDS,
include_workspace_note=True,
)
async def uninstall(self, msg: InboundMessage, language: str, slug: str) -> OutboundMessage:
if not self._is_valid_skill_slug(slug):
return self._response(msg, text(language, "skill_invalid_slug", slug=slug))
skill_dir = self.loop.workspace / "skills" / slug
if not skill_dir.is_dir():
return self._response(
msg,
text(language, "skill_uninstall_not_found", slug=slug, path=skill_dir),
)
try:
shutil.rmtree(skill_dir)
except OSError:
logger.exception("Failed to remove workspace skill {}", skill_dir)
return self._response(
msg,
text(language, "skill_uninstall_failed", slug=slug, path=skill_dir),
)
notes = [text(language, "skill_uninstalled_local", slug=slug, path=skill_dir)]
try:
if self._prune_clawhub_lockfile(slug):
notes.append(
text(
language,
"skill_lockfile_pruned",
path=self.loop.workspace / ".clawhub" / "lock.json",
)
)
except (OSError, ValueError, TypeError):
logger.exception("Failed to prune ClawHub lockfile for {}", slug)
notes.append(
text(
language,
"skill_lockfile_cleanup_failed",
path=self.loop.workspace / ".clawhub" / "lock.json",
)
)
return self._response(msg, "\n\n".join(notes))
async def list(self, msg: InboundMessage, language: str) -> OutboundMessage:
return await self._run_skill_clawhub_command(msg, language, "list", "list")
async def update(self, msg: InboundMessage, language: str) -> OutboundMessage:
return await self._run_skill_clawhub_command(
msg,
language,
"update",
"update",
"--all",
timeout_seconds=self.loop._CLAWHUB_INSTALL_TIMEOUT_SECONDS,
include_workspace_note=True,
)

View File

@@ -0,0 +1,75 @@
"""Lightweight system command helpers for AgentLoop."""
from __future__ import annotations
from typing import TYPE_CHECKING
from nanobot import __version__
from nanobot.agent.i18n import help_lines, text
from nanobot.bus.events import InboundMessage, OutboundMessage
from nanobot.utils.helpers import build_status_content
if TYPE_CHECKING:
from nanobot.agent.loop import AgentLoop
from nanobot.session.manager import Session
class SystemCommandHandler:
"""Encapsulates lightweight `/new`, `/help`, and `/status` behavior for AgentLoop."""
def __init__(self, loop: AgentLoop) -> None:
self.loop = loop
@staticmethod
def _response(
msg: InboundMessage,
content: str,
*,
metadata: dict[str, str] | None = None,
) -> OutboundMessage:
return OutboundMessage(
channel=msg.channel,
chat_id=msg.chat_id,
content=content,
metadata=metadata,
)
def help(self, msg: InboundMessage, language: str) -> OutboundMessage:
return self._response(
msg,
"\n".join(help_lines(language)),
metadata={"render_as": "text"},
)
def new_session(self, msg: InboundMessage, session: Session, language: str) -> OutboundMessage:
snapshot = session.messages[session.last_consolidated:]
session.clear()
self.loop.sessions.save(session)
self.loop.sessions.invalidate(session.key)
if snapshot:
self.loop._schedule_background(self.loop.memory_consolidator.archive_messages(session, snapshot))
return self._response(msg, text(language, "new_session_started"))
def status(self, msg: InboundMessage, session: Session) -> OutboundMessage:
ctx_est = 0
try:
ctx_est, _ = self.loop.memory_consolidator.estimate_session_prompt_tokens(session)
except Exception:
pass
if ctx_est <= 0:
ctx_est = self.loop._last_usage.get("prompt_tokens", 0)
return self._response(
msg,
build_status_content(
version=__version__,
model=self.loop.model,
start_time=self.loop._start_time,
last_usage=self.loop._last_usage,
context_window_tokens=self.loop.context_window_tokens,
session_msg_count=len(session.get_history(max_messages=0)),
context_tokens_estimate=ctx_est,
),
metadata={"render_as": "text"},
)

View File

@@ -5,7 +5,6 @@ from __future__ import annotations
import asyncio import asyncio
import json import json
import os import os
import shutil
import sys import sys
import tempfile import tempfile
import time import time
@@ -15,14 +14,17 @@ from typing import TYPE_CHECKING, Any, Awaitable, Callable
from loguru import logger from loguru import logger
from nanobot import __version__ from nanobot.agent.commands import (
LanguageCommandHandler,
MCPCommandHandler,
PersonaCommandHandler,
SkillCommandHandler,
SystemCommandHandler,
build_agent_command_router,
)
from nanobot.agent.context import ContextBuilder from nanobot.agent.context import ContextBuilder
from nanobot.agent.i18n import ( from nanobot.agent.i18n import (
DEFAULT_LANGUAGE, DEFAULT_LANGUAGE,
help_lines,
language_label,
list_languages,
normalize_language_code,
resolve_language, resolve_language,
text, text,
) )
@@ -39,10 +41,11 @@ from nanobot.agent.tools.spawn import SpawnTool
from nanobot.agent.tools.web import WebFetchTool, WebSearchTool from nanobot.agent.tools.web import WebFetchTool, WebSearchTool
from nanobot.bus.events import InboundMessage, OutboundMessage from nanobot.bus.events import InboundMessage, OutboundMessage
from nanobot.bus.queue import MessageBus from nanobot.bus.queue import MessageBus
from nanobot.command.router import CommandContext
from nanobot.providers.base import LLMProvider from nanobot.providers.base import LLMProvider
from nanobot.providers.speech import OpenAISpeechProvider from nanobot.providers.speech import OpenAISpeechProvider
from nanobot.session.manager import Session, SessionManager from nanobot.session.manager import Session, SessionManager
from nanobot.utils.helpers import build_status_content, ensure_dir, safe_filename from nanobot.utils.helpers import ensure_dir, safe_filename
if TYPE_CHECKING: if TYPE_CHECKING:
from nanobot.config.schema import ChannelsConfig, ExecToolConfig from nanobot.config.schema import ChannelsConfig, ExecToolConfig
@@ -74,6 +77,14 @@ class AgentLoop:
"network request failed", "network request failed",
"registry.npmjs.org", "registry.npmjs.org",
) )
_CLAWHUB_CACHE_ERROR_MARKERS = (
"err_module_not_found",
"cannot find module",
"cannot find package",
)
_CLAWHUB_SEARCH_API_URL = "https://lightmake.site/api/skills"
_CLAWHUB_SEARCH_TIMEOUT_SECONDS = 15.0
_CLAWHUB_SEARCH_LIMIT = 5
_CLAWHUB_NPM_CACHE_DIR = Path(tempfile.gettempdir()) / "nanobot-npm-cache" _CLAWHUB_NPM_CACHE_DIR = Path(tempfile.gettempdir()) / "nanobot-npm-cache"
_PREFLIGHT_CONSOLIDATION_BUDGET_SECONDS = 1.5 _PREFLIGHT_CONSOLIDATION_BUDGET_SECONDS = 1.5
@@ -117,6 +128,14 @@ class AgentLoop:
self.restrict_to_workspace = restrict_to_workspace self.restrict_to_workspace = restrict_to_workspace
self._start_time = time.time() self._start_time = time.time()
self._last_usage: dict[str, int] = {} self._last_usage: dict[str, int] = {}
self._clawhub_lock = asyncio.Lock()
self._clawhub_npm_cache_dir = self._CLAWHUB_NPM_CACHE_DIR / str(os.getpid())
self._language_commands = LanguageCommandHandler(self)
self._mcp_commands = MCPCommandHandler(self)
self._persona_commands = PersonaCommandHandler(self)
self._skill_commands = SkillCommandHandler(self)
self._system_commands = SystemCommandHandler(self)
self._command_router = build_agent_command_router()
self.context = ContextBuilder(workspace) self.context = ContextBuilder(workspace)
self.sessions = session_manager or SessionManager(workspace) self.sessions = session_manager or SessionManager(workspace)
@@ -164,12 +183,6 @@ class AgentLoop:
) )
self._register_default_tools() self._register_default_tools()
@staticmethod
def _command_name(content: str) -> str:
"""Return the normalized slash command name."""
parts = content.strip().split(None, 1)
return parts[0].lower() if parts else ""
def _get_session_persona(self, session: Session) -> str: def _get_session_persona(self, session: Session) -> str:
"""Return the active persona name for a session.""" """Return the active persona name for a session."""
return self.context.resolve_persona(session.metadata.get("persona")) return self.context.resolve_persona(session.metadata.get("persona"))
@@ -214,23 +227,6 @@ class AgentLoop:
"""Return MCP command help text.""" """Return MCP command help text."""
return text(language, "mcp_usage") return text(language, "mcp_usage")
def _group_mcp_tool_names(self) -> dict[str, list[str]]:
"""Group registered MCP tool names by configured server name."""
grouped = {name: [] for name in self._mcp_servers}
server_names = sorted(self._mcp_servers, key=len, reverse=True)
for tool_name in self.tools.tool_names:
if not tool_name.startswith("mcp_"):
continue
for server_name in server_names:
prefix = f"mcp_{server_name}_"
if tool_name.startswith(prefix):
grouped[server_name].append(tool_name.removeprefix(prefix))
break
return {name: sorted(tools) for name, tools in grouped.items()}
def _remove_registered_mcp_tools(self) -> None: def _remove_registered_mcp_tools(self) -> None:
"""Remove all dynamically registered MCP tools from the registry.""" """Remove all dynamically registered MCP tools from the registry."""
for tool_name in list(self.tools.tool_names): for tool_name in list(self.tools.tool_names):
@@ -368,176 +364,86 @@ class AgentLoop:
await self._reload_runtime_config_if_needed(force=force) await self._reload_runtime_config_if_needed(force=force)
@staticmethod @staticmethod
def _decode_subprocess_output(data: bytes) -> str: def _skill_subcommand(parts: list[str]) -> str | None:
"""Decode subprocess output conservatively for CLI surfacing.""" if len(parts) < 2:
return data.decode("utf-8", errors="replace").strip() return None
return parts[1].lower()
@classmethod @staticmethod
def _is_clawhub_network_error(cls, output: str) -> bool: def _skill_search_query(content: str) -> str | None:
lowered = output.lower() query_parts = content.strip().split(None, 2)
return any(marker in lowered for marker in cls._CLAWHUB_NETWORK_ERROR_MARKERS) if len(query_parts) < 3:
return None
query = query_parts[2].strip()
return query or None
def _format_clawhub_error(self, language: str, code: int, output: str) -> str: @staticmethod
if output and self._is_clawhub_network_error(output): def _skill_argument(parts: list[str]) -> str | None:
return "\n\n".join([text(language, "skill_command_network_failed"), output]) if len(parts) < 3:
return output or text(language, "skill_command_failed", code=code) return None
value = parts[2].strip()
return value or None
def _clawhub_env(self) -> dict[str, str]: def _command_context(
"""Configure npm so ClawHub fails fast and uses a writable cache directory.""" self,
env = os.environ.copy() msg: InboundMessage,
env.setdefault("NO_COLOR", "1") *,
env.setdefault("FORCE_COLOR", "0") session: Session | None = None,
env.setdefault("npm_config_cache", str(self._CLAWHUB_NPM_CACHE_DIR)) key: str | None = None,
env.setdefault("npm_config_update_notifier", "false") ) -> CommandContext:
env.setdefault("npm_config_audit", "false") return CommandContext(
env.setdefault("npm_config_fund", "false") msg=msg,
env.setdefault("npm_config_fetch_retries", "0") session=session,
env.setdefault("npm_config_fetch_timeout", "5000") key=key or msg.session_key,
env.setdefault("npm_config_fetch_retry_mintimeout", "1000") raw=msg.content.strip(),
env.setdefault("npm_config_fetch_retry_maxtimeout", "5000") loop=self,
return env )
async def _run_clawhub(
self, language: str, *args: str, timeout_seconds: int | None = None,
) -> tuple[int, str]:
"""Run the ClawHub CLI and return (exit_code, combined_output)."""
npx = shutil.which("npx")
if not npx:
return 127, text(language, "skill_npx_missing")
env = self._clawhub_env()
proc = None
try:
proc = await asyncio.create_subprocess_exec(
npx,
"--yes",
"clawhub@latest",
*args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env,
)
stdout, stderr = await asyncio.wait_for(
proc.communicate(), timeout=timeout_seconds or self._CLAWHUB_TIMEOUT_SECONDS,
)
except FileNotFoundError:
return 127, text(language, "skill_npx_missing")
except asyncio.TimeoutError:
if proc is not None and proc.returncode is None:
proc.kill()
await proc.communicate()
return 124, text(language, "skill_command_timeout")
except asyncio.CancelledError:
if proc is not None and proc.returncode is None:
proc.kill()
await proc.communicate()
raise
output_parts = [
self._decode_subprocess_output(stdout),
self._decode_subprocess_output(stderr),
]
output = "\n".join(part for part in output_parts if part).strip()
return proc.returncode or 0, output
async def _handle_skill_command(self, msg: InboundMessage, session: Session) -> OutboundMessage: async def _handle_skill_command(self, msg: InboundMessage, session: Session) -> OutboundMessage:
"""Handle ClawHub skill management commands for the active workspace.""" """Handle ClawHub skill management commands for the active workspace."""
language = self._get_session_language(session) language = self._get_session_language(session)
parts = msg.content.strip().split() parts = msg.content.strip().split()
search_query: str | None = None subcommand = self._skill_subcommand(parts)
if len(parts) == 1: if not subcommand:
return OutboundMessage( return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, content=text(language, "skill_usage"))
channel=msg.channel,
chat_id=msg.chat_id,
content=text(language, "skill_usage"),
)
subcommand = parts[1].lower()
workspace = str(self.workspace)
if subcommand == "search": if subcommand == "search":
query_parts = msg.content.strip().split(None, 2) query = self._skill_search_query(msg.content)
if len(query_parts) < 3 or not query_parts[2].strip(): if not query:
return OutboundMessage( return OutboundMessage(
channel=msg.channel, channel=msg.channel,
chat_id=msg.chat_id, chat_id=msg.chat_id,
content=text(language, "skill_search_missing_query"), content=text(language, "skill_search_missing_query"),
) )
search_query = query_parts[2].strip() return await self._skill_commands.search(msg, language, query)
code, output = await self._run_clawhub(
language, if subcommand == "install":
"search", slug = self._skill_argument(parts)
search_query, if not slug:
"--limit",
"5",
)
elif subcommand == "install":
if len(parts) < 3:
return OutboundMessage( return OutboundMessage(
channel=msg.channel, channel=msg.channel,
chat_id=msg.chat_id, chat_id=msg.chat_id,
content=text(language, "skill_install_missing_slug"), content=text(language, "skill_install_missing_slug"),
) )
code, output = await self._run_clawhub( return await self._skill_commands.install(msg, language, slug)
language,
"install", if subcommand == "uninstall":
parts[2], slug = self._skill_argument(parts)
"--workdir", if not slug:
workspace,
timeout_seconds=self._CLAWHUB_INSTALL_TIMEOUT_SECONDS,
)
elif subcommand == "uninstall":
if len(parts) < 3:
return OutboundMessage( return OutboundMessage(
channel=msg.channel, channel=msg.channel,
chat_id=msg.chat_id, chat_id=msg.chat_id,
content=text(language, "skill_uninstall_missing_slug"), content=text(language, "skill_uninstall_missing_slug"),
) )
code, output = await self._run_clawhub( return await self._skill_commands.uninstall(msg, language, slug)
language,
"uninstall",
parts[2],
"--yes",
"--workdir",
workspace,
)
elif subcommand == "list":
code, output = await self._run_clawhub(language, "list", "--workdir", workspace)
elif subcommand == "update":
code, output = await self._run_clawhub(
language,
"update",
"--all",
"--workdir",
workspace,
timeout_seconds=self._CLAWHUB_INSTALL_TIMEOUT_SECONDS,
)
else:
return OutboundMessage(
channel=msg.channel,
chat_id=msg.chat_id,
content=text(language, "skill_usage"),
)
if code != 0: if subcommand == "list":
content = self._format_clawhub_error(language, code, output) return await self._skill_commands.list(msg, language)
return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, content=content)
if subcommand == "search" and not output: if subcommand == "update":
return OutboundMessage( return await self._skill_commands.update(msg, language)
channel=msg.channel,
chat_id=msg.chat_id,
content=text(language, "skill_search_no_results", query=search_query or ""),
)
notes: list[str] = [] return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, content=text(language, "skill_usage"))
if output:
notes.append(output)
if subcommand in {"install", "uninstall", "update"}:
notes.append(text(language, "skill_applied_to_workspace", workspace=workspace))
content = "\n\n".join(notes) if notes else text(language, "skill_command_completed", command=subcommand)
return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, content=content)
async def _handle_mcp_command(self, msg: InboundMessage, session: Session) -> OutboundMessage: async def _handle_mcp_command(self, msg: InboundMessage, session: Session) -> OutboundMessage:
"""Handle MCP inspection commands.""" """Handle MCP inspection commands."""
@@ -551,37 +457,7 @@ class AgentLoop:
content=self._mcp_usage(language), content=self._mcp_usage(language),
) )
await self._reload_mcp_servers_if_needed() return await self._mcp_commands.list(msg, language)
if not self._mcp_servers:
return OutboundMessage(
channel=msg.channel,
chat_id=msg.chat_id,
content=text(language, "mcp_no_servers"),
)
await self._connect_mcp()
server_lines = "\n".join(f"- {name}" for name in self._mcp_servers)
sections = [text(language, "mcp_servers_list", items=server_lines)]
grouped_tools = self._group_mcp_tool_names()
tool_lines = "\n".join(
f"- {server}: {', '.join(tools)}"
for server, tools in grouped_tools.items()
if tools
)
sections.append(
text(language, "mcp_tools_list", items=tool_lines)
if tool_lines
else text(language, "mcp_no_tools")
)
return OutboundMessage(
channel=msg.channel,
chat_id=msg.chat_id,
content="\n\n".join(sections),
)
def _register_default_tools(self) -> None: def _register_default_tools(self) -> None:
"""Register the default set of tools.""" """Register the default set of tools."""
@@ -661,28 +537,6 @@ class AgentLoop:
return f'{tc.name}("{val[:40]}")' if len(val) > 40 else f'{tc.name}("{val}")' return f'{tc.name}("{val[:40]}")' if len(val) > 40 else f'{tc.name}("{val}")'
return ", ".join(_fmt(tc) for tc in tool_calls) return ", ".join(_fmt(tc) for tc in tool_calls)
def _status_response(self, msg: InboundMessage, session: Session) -> OutboundMessage:
"""Build an outbound status message for a session."""
ctx_est = 0
try:
ctx_est, _ = self.memory_consolidator.estimate_session_prompt_tokens(session)
except Exception:
pass
if ctx_est <= 0:
ctx_est = self._last_usage.get("prompt_tokens", 0)
return OutboundMessage(
channel=msg.channel,
chat_id=msg.chat_id,
content=build_status_content(
version=__version__, model=self.model,
start_time=self._start_time, last_usage=self._last_usage,
context_window_tokens=self.context_window_tokens,
session_msg_count=len(session.get_history(max_messages=0)),
context_tokens_estimate=ctx_est,
),
metadata={"render_as": "text"},
)
@staticmethod @staticmethod
def _voice_reply_extension(response_format: str) -> str: def _voice_reply_extension(response_format: str) -> str:
"""Map TTS response formats to delivery file extensions.""" """Map TTS response formats to delivery file extensions."""
@@ -972,14 +826,14 @@ class AgentLoop:
logger.warning("Error consuming inbound message: {}, continuing...", e) logger.warning("Error consuming inbound message: {}, continuing...", e)
continue continue
cmd = self._command_name(msg.content) ctx = self._command_context(
if cmd == "/stop": msg,
await self._handle_stop(msg) session=self.sessions.get_or_create(msg.session_key),
elif cmd == "/restart": )
await self._handle_restart(msg) if self._command_router.is_priority(ctx.raw):
elif cmd == "/status": result = await self._command_router.dispatch_priority(ctx)
session = self.sessions.get_or_create(msg.session_key) if result is not None:
await self.bus.publish_outbound(self._status_response(msg, session)) await self.bus.publish_outbound(result)
else: else:
task = asyncio.create_task(self._dispatch(msg)) task = asyncio.create_task(self._dispatch(msg))
self._active_tasks.setdefault(msg.session_key, []).append(task) self._active_tasks.setdefault(msg.session_key, []).append(task)
@@ -1069,27 +923,14 @@ class AgentLoop:
async def _handle_language_command(self, msg: InboundMessage, session: Session) -> OutboundMessage: async def _handle_language_command(self, msg: InboundMessage, session: Session) -> OutboundMessage:
"""Handle session-scoped language switching commands.""" """Handle session-scoped language switching commands."""
current = self._get_session_language(session)
parts = msg.content.strip().split() parts = msg.content.strip().split()
current = self._get_session_language(session)
if len(parts) == 1 or parts[1].lower() == "current": if len(parts) == 1 or parts[1].lower() == "current":
return OutboundMessage( return self._language_commands.current(msg, session)
channel=msg.channel,
chat_id=msg.chat_id,
content=text(current, "current_language", language_name=language_label(current, current)),
)
subcommand = parts[1].lower() subcommand = parts[1].lower()
if subcommand == "list": if subcommand == "list":
items = "\n".join( return self._language_commands.list(msg, session)
f"- {language_label(code, current)}"
+ (f" ({text(current, 'current_marker')})" if code == current else "")
for code in list_languages()
)
return OutboundMessage(
channel=msg.channel,
chat_id=msg.chat_id,
content=text(current, "available_languages", items=items),
)
if subcommand != "set" or len(parts) < 3: if subcommand != "set" or len(parts) < 3:
return OutboundMessage( return OutboundMessage(
@@ -1098,55 +939,18 @@ class AgentLoop:
content=self._language_usage(current), content=self._language_usage(current),
) )
target = normalize_language_code(parts[2]) return self._language_commands.set(msg, session, parts[2])
if target is None:
languages = ", ".join(language_label(code, current) for code in list_languages())
return OutboundMessage(
channel=msg.channel,
chat_id=msg.chat_id,
content=text(current, "unknown_language", name=parts[2], languages=languages),
)
if target == current:
return OutboundMessage(
channel=msg.channel,
chat_id=msg.chat_id,
content=text(current, "language_already_active", language_name=language_label(target, current)),
)
self._set_session_language(session, target)
self.sessions.save(session)
return OutboundMessage(
channel=msg.channel,
chat_id=msg.chat_id,
content=text(target, "switched_language", language_name=language_label(target, target)),
)
async def _handle_persona_command(self, msg: InboundMessage, session: Session) -> OutboundMessage: async def _handle_persona_command(self, msg: InboundMessage, session: Session) -> OutboundMessage:
"""Handle session-scoped persona management commands.""" """Handle session-scoped persona management commands."""
language = self._get_session_language(session) language = self._get_session_language(session)
parts = msg.content.strip().split() parts = msg.content.strip().split()
if len(parts) == 1 or parts[1].lower() == "current": if len(parts) == 1 or parts[1].lower() == "current":
current = self._get_session_persona(session) return self._persona_commands.current(msg, session)
return OutboundMessage(
channel=msg.channel,
chat_id=msg.chat_id,
content=text(language, "current_persona", persona=current),
)
subcommand = parts[1].lower() subcommand = parts[1].lower()
if subcommand == "list": if subcommand == "list":
current = self._get_session_persona(session) return self._persona_commands.list(msg, session)
marker = text(language, "current_marker")
personas = [
f"{name} ({marker})" if name == current else name
for name in self.context.list_personas()
]
return OutboundMessage(
channel=msg.channel,
chat_id=msg.chat_id,
content=text(language, "available_personas", items="\n".join(f"- {name}" for name in personas)),
)
if subcommand != "set" or len(parts) < 3: if subcommand != "set" or len(parts) < 3:
return OutboundMessage( return OutboundMessage(
@@ -1155,53 +959,7 @@ class AgentLoop:
content=self._persona_usage(language), content=self._persona_usage(language),
) )
target = self.context.find_persona(parts[2]) return await self._persona_commands.set(msg, session, parts[2])
if target is None:
personas = ", ".join(self.context.list_personas())
return OutboundMessage(
channel=msg.channel,
chat_id=msg.chat_id,
content=text(
language,
"unknown_persona",
name=parts[2],
personas=personas,
path=self.workspace / "personas" / parts[2],
),
)
current = self._get_session_persona(session)
if target == current:
return OutboundMessage(
channel=msg.channel,
chat_id=msg.chat_id,
content=text(language, "persona_already_active", persona=target),
)
try:
if not await self.memory_consolidator.archive_unconsolidated(session):
return OutboundMessage(
channel=msg.channel,
chat_id=msg.chat_id,
content=text(language, "memory_archival_failed_persona"),
)
except Exception:
logger.exception("/persona archival failed for {}", session.key)
return OutboundMessage(
channel=msg.channel,
chat_id=msg.chat_id,
content=text(language, "memory_archival_failed_persona"),
)
session.clear()
self._set_session_persona(session, target)
self.sessions.save(session)
self.sessions.invalidate(session.key)
return OutboundMessage(
channel=msg.channel,
chat_id=msg.chat_id,
content=text(language, "switched_persona", persona=target),
)
async def close_mcp(self) -> None: async def close_mcp(self) -> None:
"""Drain pending background archives, then close MCP connections.""" """Drain pending background archives, then close MCP connections."""
@@ -1320,35 +1078,11 @@ class AgentLoop:
language = self._get_session_language(session) language = self._get_session_language(session)
# Slash commands # Slash commands
cmd = self._command_name(msg.content) slash_response = await self._command_router.dispatch(
if cmd == "/new": self._command_context(msg, session=session, key=key)
snapshot = session.messages[session.last_consolidated:] )
session.clear() if slash_response is not None:
self.sessions.save(session) return slash_response
self.sessions.invalidate(session.key)
if snapshot:
self._schedule_background(self.memory_consolidator.archive_messages(session, snapshot))
return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id,
content=text(language, "new_session_started"))
if cmd == "/status":
return self._status_response(msg, session)
if cmd in {"/lang", "/language"}:
return await self._handle_language_command(msg, session)
if cmd == "/persona":
return await self._handle_persona_command(msg, session)
if cmd == "/skill":
return await self._handle_skill_command(msg, session)
if cmd == "/mcp":
return await self._handle_mcp_command(msg, session)
if cmd == "/help":
return OutboundMessage(
channel=msg.channel,
chat_id=msg.chat_id,
content="\n".join(help_lines(language)),
metadata={"render_as": "text"},
)
await self._connect_mcp() await self._connect_mcp()
await self._run_preflight_token_consolidation(session) await self._run_preflight_token_consolidation(session)

View File

@@ -21,8 +21,21 @@
"skill_usage": "Usage:\n/skill search <query>\n/skill install <slug>\n/skill uninstall <slug>\n/skill list\n/skill update", "skill_usage": "Usage:\n/skill search <query>\n/skill install <slug>\n/skill uninstall <slug>\n/skill list\n/skill update",
"skill_search_missing_query": "Missing query.\n\nUsage:\n/skill search <query>", "skill_search_missing_query": "Missing query.\n\nUsage:\n/skill search <query>",
"skill_search_no_results": "No skills found for \"{query}\". Try broader keywords, or use /skill install <slug> if you know the exact slug.", "skill_search_no_results": "No skills found for \"{query}\". Try broader keywords, or use /skill install <slug> if you know the exact slug.",
"skill_search_results_header": "Found {total} skills for \"{query}\". Showing top {count}:",
"skill_search_result_meta": "slug: {slug} | owner: {owner} | installs: {installs} | stars: {stars} | version: {version}",
"skill_search_timeout": "ClawHub search timed out. Check network, proxy, or registry connectivity and retry.",
"skill_search_failed": "ClawHub search failed.",
"skill_search_failed_status": "ClawHub search failed with HTTP {status}.",
"skill_search_request_failed": "ClawHub search request failed. Check network, proxy, or registry connectivity and retry.",
"skill_search_invalid_response": "ClawHub search returned an unexpected response.",
"skill_install_missing_slug": "Missing skill slug.\n\nUsage:\n/skill install <slug>", "skill_install_missing_slug": "Missing skill slug.\n\nUsage:\n/skill install <slug>",
"skill_uninstall_missing_slug": "Missing skill slug.\n\nUsage:\n/skill uninstall <slug>", "skill_uninstall_missing_slug": "Missing skill slug.\n\nUsage:\n/skill uninstall <slug>",
"skill_invalid_slug": "Invalid skill slug: {slug}",
"skill_uninstall_not_found": "Skill {slug} is not installed at {path}.",
"skill_uninstall_failed": "Failed to remove local skill {slug} at {path}.",
"skill_uninstalled_local": "Removed local skill {slug} from {path}.",
"skill_lockfile_pruned": "Updated ClawHub lockfile: {path}",
"skill_lockfile_cleanup_failed": "Removed the local skill, but could not update the ClawHub lockfile at {path}.",
"skill_npx_missing": "npx is not installed. Install Node.js first, then retry /skill.", "skill_npx_missing": "npx is not installed. Install Node.js first, then retry /skill.",
"skill_command_timeout": "The ClawHub command timed out. Check npm connectivity or proxy settings and try again.", "skill_command_timeout": "The ClawHub command timed out. Check npm connectivity or proxy settings and try again.",
"skill_command_failed": "ClawHub command failed with exit code {code}.", "skill_command_failed": "ClawHub command failed with exit code {code}.",

View File

@@ -21,8 +21,21 @@
"skill_usage": "用法:\n/skill search <query>\n/skill install <slug>\n/skill uninstall <slug>\n/skill list\n/skill update", "skill_usage": "用法:\n/skill search <query>\n/skill install <slug>\n/skill uninstall <slug>\n/skill list\n/skill update",
"skill_search_missing_query": "缺少搜索关键词。\n\n用法\n/skill search <query>", "skill_search_missing_query": "缺少搜索关键词。\n\n用法\n/skill search <query>",
"skill_search_no_results": "没有找到与“{query}”相关的 skill。请尝试更宽泛的关键词如果你知道精确 slug也可以直接用 /skill install <slug>。", "skill_search_no_results": "没有找到与“{query}”相关的 skill。请尝试更宽泛的关键词如果你知道精确 slug也可以直接用 /skill install <slug>。",
"skill_search_results_header": "找到 {total} 个与“{query}”相关的 skill显示前 {count} 个:",
"skill_search_result_meta": "slug{slug} | 作者:{owner} | 安装:{installs} | 星标:{stars} | 版本:{version}",
"skill_search_timeout": "ClawHub 搜索超时。请检查网络、代理或 registry 连通性后重试。",
"skill_search_failed": "ClawHub 搜索失败。",
"skill_search_failed_status": "ClawHub 搜索失败HTTP 状态码 {status}。",
"skill_search_request_failed": "ClawHub 搜索请求失败。请检查网络、代理或 registry 连通性后重试。",
"skill_search_invalid_response": "ClawHub 搜索返回了无法解析的响应。",
"skill_install_missing_slug": "缺少 skill slug。\n\n用法\n/skill install <slug>", "skill_install_missing_slug": "缺少 skill slug。\n\n用法\n/skill install <slug>",
"skill_uninstall_missing_slug": "缺少 skill slug。\n\n用法\n/skill uninstall <slug>", "skill_uninstall_missing_slug": "缺少 skill slug。\n\n用法\n/skill uninstall <slug>",
"skill_invalid_slug": "无效的 skill slug{slug}",
"skill_uninstall_not_found": "在 {path} 没有找到已安装的 skill{slug}。",
"skill_uninstall_failed": "删除本地 skill 失败:{slug}{path})。",
"skill_uninstalled_local": "已删除本地 skill{slug}{path})。",
"skill_lockfile_pruned": "已更新 ClawHub lockfile{path}",
"skill_lockfile_cleanup_failed": "本地 skill 已删除,但无法更新 ClawHub lockfile{path}。",
"skill_npx_missing": "未安装 npx。请先安装 Node.js然后再重试 /skill。", "skill_npx_missing": "未安装 npx。请先安装 Node.js然后再重试 /skill。",
"skill_command_timeout": "ClawHub 命令执行超时。请检查 npm 网络、代理或 registry 配置后重试。", "skill_command_timeout": "ClawHub 命令执行超时。请检查 npm 网络、代理或 registry 配置后重试。",
"skill_command_failed": "ClawHub 命令执行失败,退出码 {code}。", "skill_command_failed": "ClawHub 命令执行失败,退出码 {code}。",

View File

@@ -20,14 +20,19 @@ Use this skill when the user asks any of:
## Search ## Search
Query the live registry API directly:
```bash ```bash
npx --yes clawhub@latest search "web scraping" --limit 5 curl 'https://lightmake.site/api/skills?page=1&pageSize=5&sortBy=score&order=desc&keyword=web%20scraping' \
-H 'accept: */*' \
-H 'origin: https://skillhub.tencent.com' \
-H 'referer: https://skillhub.tencent.com/'
``` ```
## Install ## Install
```bash ```bash
npx --yes clawhub@latest install <slug> --workdir <nanobot-workspace> npx --yes clawhub@latest --workdir <nanobot-workspace> --no-input install <slug>
``` ```
Replace `<slug>` with the skill name from search results. Replace `<nanobot-workspace>` with the Replace `<slug>` with the skill name from search results. Replace `<nanobot-workspace>` with the
@@ -38,20 +43,34 @@ active workspace for the current nanobot process. This places the skill into
## Update ## Update
```bash ```bash
npx --yes clawhub@latest update --all --workdir <nanobot-workspace> npx --yes clawhub@latest --workdir <nanobot-workspace> --no-input update --all
``` ```
## List installed ## List installed
```bash ```bash
npx --yes clawhub@latest list --workdir <nanobot-workspace> npx --yes clawhub@latest --workdir <nanobot-workspace> --no-input list
``` ```
## Uninstall from nanobot workspace
Current ClawHub docs do not document a local uninstall subcommand. In nanobot, remove a
workspace-installed skill with:
```text
/skill uninstall <slug>
```
This deletes `<nanobot-workspace>/skills/<slug>` and best-effort prunes
`<nanobot-workspace>/.clawhub/lock.json`.
## Notes ## Notes
- Requires Node.js (`npx` comes with it). - Search uses the public registry API directly and does not require Node.js.
- Install/list/update require Node.js (`npx` comes with it).
- No API key needed for search and install. - No API key needed for search and install.
- Login (`npx --yes clawhub@latest login`) is only required for publishing. - Login (`npx --yes clawhub@latest login`) is only required for publishing.
- `--workdir <nanobot-workspace>` is critical — without it, skills install to the current directory - `--workdir <nanobot-workspace>` is critical — without it, skills install to the current directory
instead of the active nanobot workspace. instead of the active nanobot workspace.
- Keep global options before the subcommand: `--workdir ... --no-input install ...`.
- After install, remind the user to start a new session to load the skill. - After install, remind the user to start a new session to load the skill.

View File

@@ -1,10 +1,11 @@
"""Test session management with cache-friendly message handling.""" """Test session management with cache-friendly message handling."""
import asyncio import asyncio
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock from unittest.mock import AsyncMock, MagicMock
import pytest import pytest
from pathlib import Path
from nanobot.session.manager import Session, SessionManager from nanobot.session.manager import Session, SessionManager
# Test constants # Test constants

View File

@@ -8,7 +8,7 @@ from unittest.mock import AsyncMock, MagicMock, patch
import pytest import pytest
from nanobot.bus.events import InboundMessage, OutboundMessage from nanobot.bus.events import InboundMessage
from nanobot.providers.base import LLMResponse from nanobot.providers.base import LLMResponse

View File

@@ -5,6 +5,7 @@ from __future__ import annotations
from pathlib import Path from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch from unittest.mock import AsyncMock, MagicMock, patch
import httpx
import pytest import pytest
from nanobot.bus.events import InboundMessage from nanobot.bus.events import InboundMessage
@@ -38,68 +39,157 @@ class _FakeProcess:
self.killed = True self.killed = True
@pytest.mark.asyncio class _FakeAsyncClient:
async def test_skill_search_runs_clawhub_search(tmp_path: Path) -> None: def __init__(self, *, response: httpx.Response | None = None, error: Exception | None = None) -> None:
loop = _make_loop(tmp_path) self.response = response
proc = _FakeProcess(stdout="skill-a\nskill-b") self.error = error
create_proc = AsyncMock(return_value=proc) self.calls: list[dict[str, object]] = []
with patch("nanobot.agent.loop.shutil.which", return_value="/usr/bin/npx"), \ async def __aenter__(self) -> _FakeAsyncClient:
patch("nanobot.agent.loop.asyncio.create_subprocess_exec", create_proc): return self
async def __aexit__(self, exc_type, exc, tb) -> bool:
return False
async def get(self, url: str, *, params: dict[str, str] | None = None, headers: dict[str, str] | None = None):
self.calls.append({"url": url, "params": params, "headers": headers})
if self.error is not None:
raise self.error
assert self.response is not None
return self.response
@pytest.mark.asyncio
async def test_skill_search_uses_registry_api(tmp_path: Path) -> None:
loop = _make_loop(tmp_path)
request = httpx.Request("GET", "https://lightmake.site/api/skills")
response = httpx.Response(
200,
request=request,
json={
"code": 0,
"data": {
"skills": [
{
"name": "News Aggregator Skill",
"slug": "news-aggregator-skill",
"ownerName": "cclank",
"installs": 667,
"stars": 19,
"version": "0.1.0",
"description": "Fetches and analyzes real-time news.",
"description_zh": "抓取并分析实时新闻。",
"homepage": "https://clawhub.ai/cclank/news-aggregator-skill",
}
],
"total": 42,
},
"message": "success",
},
)
client = _FakeAsyncClient(response=response)
create_proc = AsyncMock()
with patch("nanobot.agent.commands.skill.httpx.AsyncClient", return_value=client), \
patch("nanobot.agent.commands.skill.asyncio.create_subprocess_exec", create_proc):
response = await loop._process_message( response = await loop._process_message(
InboundMessage(channel="cli", sender_id="user", chat_id="direct", content="/skill search web scraping") InboundMessage(channel="cli", sender_id="user", chat_id="direct", content="/skill search web scraping")
) )
assert response is not None assert response is not None
assert response.content == "skill-a\nskill-b" assert 'Found 42 skills for "web scraping"' in response.content
assert create_proc.await_count == 1 assert "slug: news-aggregator-skill | owner: cclank | installs: 667 | stars: 19 | version: 0.1.0" in response.content
args = create_proc.await_args.args assert "https://clawhub.ai/cclank/news-aggregator-skill" in response.content
assert args == ( assert create_proc.await_count == 0
"/usr/bin/npx", assert client.calls == [
"--yes", {
"clawhub@latest", "url": "https://lightmake.site/api/skills",
"search", "params": {
"web scraping", "page": "1",
"--limit", "pageSize": "5",
"5", "sortBy": "score",
) "order": "desc",
env = create_proc.await_args.kwargs["env"] "keyword": "web scraping",
assert env["npm_config_cache"].endswith("nanobot-npm-cache") },
assert env["npm_config_fetch_retries"] == "0" "headers": {
assert env["npm_config_fetch_timeout"] == "5000" "accept": "*/*",
"accept-language": "en-US,en;q=0.9",
"origin": "https://skillhub.tencent.com",
"referer": "https://skillhub.tencent.com/",
},
}
]
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_skill_search_surfaces_npm_network_errors(tmp_path: Path) -> None: async def test_skill_retries_after_clearing_corrupted_npx_cache(tmp_path: Path) -> None:
loop = _make_loop(tmp_path) loop = _make_loop(tmp_path)
proc = _FakeProcess( broken_proc = _FakeProcess(
returncode=1, returncode=1,
stderr=( stderr=(
"npm error code EAI_AGAIN\n" "node:internal/modules/esm/resolve:201\n"
"npm error request to https://registry.npmjs.org/clawhub failed" "Error: Cannot find package "
"'/tmp/nanobot-npm-cache/_npx/a92a6dbcf543fba6/node_modules/log-symbols/index.js' "
"imported from "
"'/tmp/nanobot-npm-cache/_npx/a92a6dbcf543fba6/node_modules/ora/index.js'\n"
"code: 'ERR_MODULE_NOT_FOUND'"
), ),
) )
create_proc = AsyncMock(return_value=proc) recovered_proc = _FakeProcess(stdout="demo-skill")
create_proc = AsyncMock(side_effect=[broken_proc, recovered_proc])
with patch("nanobot.agent.loop.shutil.which", return_value="/usr/bin/npx"), \ with patch("nanobot.agent.commands.skill.shutil.which", return_value="/usr/bin/npx"), \
patch("nanobot.agent.loop.asyncio.create_subprocess_exec", create_proc): patch("nanobot.agent.commands.skill.asyncio.create_subprocess_exec", create_proc), \
patch("nanobot.agent.commands.skill.shutil.rmtree") as remove_tree:
response = await loop._process_message(
InboundMessage(channel="cli", sender_id="user", chat_id="direct", content="/skill list")
)
assert response is not None
assert response.content == "demo-skill"
assert create_proc.await_count == 2
env = create_proc.await_args_list[0].kwargs["env"]
remove_tree.assert_called_once_with(Path(env["npm_config_cache"]) / "_npx", ignore_errors=True)
@pytest.mark.asyncio
async def test_skill_search_surfaces_registry_request_errors(tmp_path: Path) -> None:
loop = _make_loop(tmp_path)
request = httpx.Request("GET", "https://lightmake.site/api/skills")
client = _FakeAsyncClient(
error=httpx.ConnectError(
"temporary failure in name resolution",
request=request,
)
)
create_proc = AsyncMock()
with patch("nanobot.agent.commands.skill.httpx.AsyncClient", return_value=client), \
patch("nanobot.agent.commands.skill.asyncio.create_subprocess_exec", create_proc):
response = await loop._process_message( response = await loop._process_message(
InboundMessage(channel="cli", sender_id="user", chat_id="direct", content="/skill search test") InboundMessage(channel="cli", sender_id="user", chat_id="direct", content="/skill search test")
) )
assert response is not None assert response is not None
assert "could not reach the npm registry" in response.content assert "ClawHub search request failed" in response.content
assert "EAI_AGAIN" in response.content assert "temporary failure in name resolution" in response.content
assert create_proc.await_count == 0
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_skill_search_empty_output_returns_no_results(tmp_path: Path) -> None: async def test_skill_search_empty_output_returns_no_results(tmp_path: Path) -> None:
loop = _make_loop(tmp_path) loop = _make_loop(tmp_path)
proc = _FakeProcess(stdout="") request = httpx.Request("GET", "https://lightmake.site/api/skills")
create_proc = AsyncMock(return_value=proc) response = httpx.Response(
200,
request=request,
json={"code": 0, "data": {"skills": [], "total": 0}, "message": "success"},
)
client = _FakeAsyncClient(response=response)
create_proc = AsyncMock()
with patch("nanobot.agent.loop.shutil.which", return_value="/usr/bin/npx"), \ with patch("nanobot.agent.commands.skill.httpx.AsyncClient", return_value=client), \
patch("nanobot.agent.loop.asyncio.create_subprocess_exec", create_proc): patch("nanobot.agent.commands.skill.asyncio.create_subprocess_exec", create_proc):
response = await loop._process_message( response = await loop._process_message(
InboundMessage( InboundMessage(
channel="cli", channel="cli",
@@ -111,6 +201,7 @@ async def test_skill_search_empty_output_returns_no_results(tmp_path: Path) -> N
assert response is not None assert response is not None
assert 'No skills found for "selfimprovingagent"' in response.content assert 'No skills found for "selfimprovingagent"' in response.content
assert create_proc.await_count == 0
@pytest.mark.asyncio @pytest.mark.asyncio
@@ -122,11 +213,6 @@ async def test_skill_search_empty_output_returns_no_results(tmp_path: Path) -> N
("install", "demo-skill"), ("install", "demo-skill"),
"Installed demo-skill", "Installed demo-skill",
), ),
(
"/skill uninstall demo-skill",
("uninstall", "demo-skill", "--yes"),
"Uninstalled demo-skill",
),
( (
"/skill list", "/skill list",
("list",), ("list",),
@@ -146,8 +232,8 @@ async def test_skill_commands_use_active_workspace(
proc = _FakeProcess(stdout=expected_output) proc = _FakeProcess(stdout=expected_output)
create_proc = AsyncMock(return_value=proc) create_proc = AsyncMock(return_value=proc)
with patch("nanobot.agent.loop.shutil.which", return_value="/usr/bin/npx"), \ with patch("nanobot.agent.commands.skill.shutil.which", return_value="/usr/bin/npx"), \
patch("nanobot.agent.loop.asyncio.create_subprocess_exec", create_proc): patch("nanobot.agent.commands.skill.asyncio.create_subprocess_exec", create_proc):
response = await loop._process_message( response = await loop._process_message(
InboundMessage(channel="cli", sender_id="user", chat_id="direct", content=command) InboundMessage(channel="cli", sender_id="user", chat_id="direct", content=command)
) )
@@ -156,11 +242,37 @@ async def test_skill_commands_use_active_workspace(
assert expected_output in response.content assert expected_output in response.content
args = create_proc.await_args.args args = create_proc.await_args.args
assert args[:3] == ("/usr/bin/npx", "--yes", "clawhub@latest") assert args[:3] == ("/usr/bin/npx", "--yes", "clawhub@latest")
assert args[3:] == (*expected_args, "--workdir", str(tmp_path)) assert args[3:] == ("--workdir", str(tmp_path), "--no-input", *expected_args)
if command != "/skill list": if command != "/skill list":
assert f"Applied to workspace: {tmp_path}" in response.content assert f"Applied to workspace: {tmp_path}" in response.content
@pytest.mark.asyncio
async def test_skill_uninstall_removes_local_workspace_skill_and_prunes_lockfile(tmp_path: Path) -> None:
loop = _make_loop(tmp_path)
skill_dir = tmp_path / "skills" / "demo-skill"
skill_dir.mkdir(parents=True)
(skill_dir / "SKILL.md").write_text("# demo", encoding="utf-8")
lock_dir = tmp_path / ".clawhub"
lock_dir.mkdir()
lock_path = lock_dir / "lock.json"
lock_path.write_text(
'{"skills":{"demo-skill":{"version":"1.0.0"},"other-skill":{"version":"2.0.0"}}}',
encoding="utf-8",
)
response = await loop._process_message(
InboundMessage(channel="cli", sender_id="user", chat_id="direct", content="/skill uninstall demo-skill")
)
assert response is not None
assert "Removed local skill demo-skill" in response.content
assert "Updated ClawHub lockfile" in response.content
assert not skill_dir.exists()
assert '"demo-skill"' not in lock_path.read_text(encoding="utf-8")
assert '"other-skill"' in lock_path.read_text(encoding="utf-8")
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_skill_help_includes_skill_command(tmp_path: Path) -> None: async def test_skill_help_includes_skill_command(tmp_path: Path) -> None:
loop = _make_loop(tmp_path) loop = _make_loop(tmp_path)
@@ -177,7 +289,7 @@ async def test_skill_help_includes_skill_command(tmp_path: Path) -> None:
async def test_skill_missing_npx_returns_guidance(tmp_path: Path) -> None: async def test_skill_missing_npx_returns_guidance(tmp_path: Path) -> None:
loop = _make_loop(tmp_path) loop = _make_loop(tmp_path)
with patch("nanobot.agent.loop.shutil.which", return_value=None): with patch("nanobot.agent.commands.skill.shutil.which", return_value=None):
response = await loop._process_message( response = await loop._process_message(
InboundMessage(channel="cli", sender_id="user", chat_id="direct", content="/skill list") InboundMessage(channel="cli", sender_id="user", chat_id="direct", content="/skill list")
) )