341 lines
11 KiB
Python
341 lines
11 KiB
Python
"""Tests for /mcp slash command integration."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from pathlib import Path
|
|
from types import SimpleNamespace
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from nanobot.bus.events import InboundMessage
|
|
|
|
|
|
class _FakeTool:
|
|
def __init__(self, name: str) -> None:
|
|
self._name = name
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return self._name
|
|
|
|
@property
|
|
def description(self) -> str:
|
|
return self._name
|
|
|
|
@property
|
|
def parameters(self) -> dict:
|
|
return {"type": "object", "properties": {}}
|
|
|
|
async def execute(self, **kwargs) -> str:
|
|
return ""
|
|
|
|
|
|
def _make_loop(workspace: Path, *, mcp_servers: dict | None = None, config_path: Path | None = None):
|
|
"""Create an AgentLoop with a real workspace and lightweight mocks."""
|
|
from nanobot.agent.loop import AgentLoop
|
|
from nanobot.bus.queue import MessageBus
|
|
|
|
bus = MessageBus()
|
|
provider = MagicMock()
|
|
provider.get_default_model.return_value = "test-model"
|
|
|
|
with patch("nanobot.agent.loop.SubagentManager"):
|
|
loop = AgentLoop(
|
|
bus=bus,
|
|
provider=provider,
|
|
workspace=workspace,
|
|
config_path=config_path,
|
|
mcp_servers=mcp_servers,
|
|
)
|
|
return loop
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_mcp_lists_configured_servers_and_tools(tmp_path: Path) -> None:
|
|
loop = _make_loop(tmp_path, mcp_servers={"docs": object(), "search": object()})
|
|
loop.tools.register(_FakeTool("mcp_docs_lookup"))
|
|
loop.tools.register(_FakeTool("mcp_search_web"))
|
|
loop.tools.register(_FakeTool("read_file"))
|
|
|
|
with patch.object(loop, "_connect_mcp", AsyncMock()) as connect_mcp:
|
|
response = await loop._process_message(
|
|
InboundMessage(channel="cli", sender_id="user", chat_id="direct", content="/mcp")
|
|
)
|
|
|
|
assert response is not None
|
|
assert "Configured MCP servers:" in response.content
|
|
assert "- docs" in response.content
|
|
assert "- search" in response.content
|
|
assert "docs: lookup" in response.content
|
|
assert "search: web" in response.content
|
|
connect_mcp.assert_awaited_once()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_mcp_without_servers_returns_guidance(tmp_path: Path) -> None:
|
|
loop = _make_loop(tmp_path)
|
|
|
|
response = await loop._process_message(
|
|
InboundMessage(channel="cli", sender_id="user", chat_id="direct", content="/mcp list")
|
|
)
|
|
|
|
assert response is not None
|
|
assert response.content == "No MCP servers are configured for this agent."
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_help_includes_mcp_command(tmp_path: Path) -> None:
|
|
loop = _make_loop(tmp_path)
|
|
|
|
response = await loop._process_message(
|
|
InboundMessage(channel="cli", sender_id="user", chat_id="direct", content="/help")
|
|
)
|
|
|
|
assert response is not None
|
|
assert "/mcp [list]" in response.content
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_mcp_command_hot_reloads_servers_from_config(tmp_path: Path) -> None:
|
|
config_path = tmp_path / "config.json"
|
|
config_path.write_text(json.dumps({"tools": {}}), encoding="utf-8")
|
|
loop = _make_loop(tmp_path, mcp_servers={}, config_path=config_path)
|
|
|
|
config_path.write_text(
|
|
json.dumps(
|
|
{
|
|
"tools": {
|
|
"mcpServers": {
|
|
"docs": {
|
|
"command": "npx",
|
|
"args": ["-y", "@demo/docs"],
|
|
}
|
|
}
|
|
}
|
|
}
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
with patch.object(loop, "_connect_mcp", AsyncMock()) as connect_mcp:
|
|
response = await loop._process_message(
|
|
InboundMessage(channel="cli", sender_id="user", chat_id="direct", content="/mcp")
|
|
)
|
|
|
|
assert response is not None
|
|
assert "Configured MCP servers:" in response.content
|
|
assert "- docs" in response.content
|
|
connect_mcp.assert_awaited_once()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_mcp_config_reload_resets_connections_and_tools(tmp_path: Path) -> None:
|
|
config_path = tmp_path / "config.json"
|
|
config_path.write_text(
|
|
json.dumps(
|
|
{
|
|
"tools": {
|
|
"mcpServers": {
|
|
"old": {
|
|
"command": "npx",
|
|
"args": ["-y", "@demo/old"],
|
|
}
|
|
}
|
|
}
|
|
}
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
loop = _make_loop(
|
|
tmp_path,
|
|
mcp_servers={"old": SimpleNamespace(model_dump=lambda: {"command": "npx", "args": ["-y", "@demo/old"]})},
|
|
config_path=config_path,
|
|
)
|
|
stack = SimpleNamespace(aclose=AsyncMock())
|
|
loop._mcp_stack = stack
|
|
loop._mcp_connected = True
|
|
loop.tools.register(_FakeTool("mcp_old_lookup"))
|
|
|
|
config_path.write_text(
|
|
json.dumps(
|
|
{
|
|
"tools": {
|
|
"mcpServers": {
|
|
"new": {
|
|
"command": "npx",
|
|
"args": ["-y", "@demo/new"],
|
|
}
|
|
}
|
|
}
|
|
}
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
await loop._reload_mcp_servers_if_needed(force=True)
|
|
|
|
assert list(loop._mcp_servers) == ["new"]
|
|
assert loop._mcp_connected is False
|
|
assert loop.tools.get("mcp_old_lookup") is None
|
|
stack.aclose.assert_awaited_once()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_regular_messages_pick_up_reloaded_mcp_config(tmp_path: Path, monkeypatch) -> None:
|
|
config_path = tmp_path / "config.json"
|
|
config_path.write_text(json.dumps({"tools": {}}), encoding="utf-8")
|
|
loop = _make_loop(tmp_path, mcp_servers={}, config_path=config_path)
|
|
|
|
loop.provider.chat_with_retry = AsyncMock(
|
|
return_value=SimpleNamespace(
|
|
has_tool_calls=False,
|
|
content="ok",
|
|
finish_reason="stop",
|
|
reasoning_content=None,
|
|
thinking_blocks=None,
|
|
)
|
|
)
|
|
|
|
config_path.write_text(
|
|
json.dumps(
|
|
{
|
|
"tools": {
|
|
"mcpServers": {
|
|
"docs": {
|
|
"command": "npx",
|
|
"args": ["-y", "@demo/docs"],
|
|
}
|
|
}
|
|
}
|
|
}
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
connect_mcp_servers = AsyncMock()
|
|
monkeypatch.setattr("nanobot.agent.tools.mcp.connect_mcp_servers", connect_mcp_servers)
|
|
|
|
response = await loop._process_message(
|
|
InboundMessage(channel="cli", sender_id="user", chat_id="direct", content="hello")
|
|
)
|
|
|
|
assert response is not None
|
|
assert response.content == "ok"
|
|
assert list(loop._mcp_servers) == ["docs"]
|
|
connect_mcp_servers.assert_awaited_once()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_runtime_config_reload_updates_agent_and_tool_settings(tmp_path: Path) -> None:
|
|
config_path = tmp_path / "config.json"
|
|
config_path.write_text(
|
|
json.dumps(
|
|
{
|
|
"agents": {
|
|
"defaults": {
|
|
"model": "initial-model",
|
|
"maxToolIterations": 4,
|
|
"contextWindowTokens": 4096,
|
|
"maxTokens": 1000,
|
|
"temperature": 0.2,
|
|
"reasoningEffort": "low",
|
|
}
|
|
},
|
|
"tools": {
|
|
"restrictToWorkspace": False,
|
|
"exec": {"timeout": 20, "pathAppend": ""},
|
|
"web": {
|
|
"proxy": "",
|
|
"search": {
|
|
"provider": "brave",
|
|
"apiKey": "",
|
|
"baseUrl": "",
|
|
"maxResults": 3,
|
|
}
|
|
},
|
|
},
|
|
"channels": {
|
|
"sendProgress": True,
|
|
"sendToolHints": False,
|
|
},
|
|
}
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
loop = _make_loop(tmp_path, mcp_servers={}, config_path=config_path)
|
|
|
|
config_path.write_text(
|
|
json.dumps(
|
|
{
|
|
"agents": {
|
|
"defaults": {
|
|
"model": "reloaded-model",
|
|
"maxToolIterations": 9,
|
|
"contextWindowTokens": 8192,
|
|
"maxTokens": 2222,
|
|
"temperature": 0.7,
|
|
"reasoningEffort": "high",
|
|
}
|
|
},
|
|
"tools": {
|
|
"restrictToWorkspace": True,
|
|
"exec": {"timeout": 45, "pathAppend": "/usr/local/bin"},
|
|
"web": {
|
|
"proxy": "http://127.0.0.1:7890",
|
|
"search": {
|
|
"provider": "searxng",
|
|
"apiKey": "demo-key",
|
|
"baseUrl": "https://search.example.com",
|
|
"maxResults": 7,
|
|
}
|
|
},
|
|
},
|
|
"channels": {
|
|
"sendProgress": False,
|
|
"sendToolHints": True,
|
|
},
|
|
}
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
await loop._reload_runtime_config_if_needed(force=True)
|
|
|
|
exec_tool = loop.tools.get("exec")
|
|
web_search_tool = loop.tools.get("web_search")
|
|
web_fetch_tool = loop.tools.get("web_fetch")
|
|
read_tool = loop.tools.get("read_file")
|
|
|
|
assert loop.model == "reloaded-model"
|
|
assert loop.max_iterations == 9
|
|
assert loop.context_window_tokens == 8192
|
|
assert loop.provider.generation.max_tokens == 2222
|
|
assert loop.provider.generation.temperature == 0.7
|
|
assert loop.provider.generation.reasoning_effort == "high"
|
|
assert loop.memory_consolidator.model == "reloaded-model"
|
|
assert loop.memory_consolidator.context_window_tokens == 8192
|
|
assert loop.channels_config.send_progress is False
|
|
assert loop.channels_config.send_tool_hints is True
|
|
loop.subagents.apply_runtime_config.assert_called_once_with(
|
|
model="reloaded-model",
|
|
brave_api_key="demo-key",
|
|
web_proxy="http://127.0.0.1:7890",
|
|
web_search_provider="searxng",
|
|
web_search_base_url="https://search.example.com",
|
|
web_search_max_results=7,
|
|
exec_config=loop.exec_config,
|
|
restrict_to_workspace=True,
|
|
)
|
|
assert exec_tool.timeout == 45
|
|
assert exec_tool.path_append == "/usr/local/bin"
|
|
assert exec_tool.restrict_to_workspace is True
|
|
assert web_search_tool._init_provider == "searxng"
|
|
assert web_search_tool._init_api_key == "demo-key"
|
|
assert web_search_tool._init_base_url == "https://search.example.com"
|
|
assert web_search_tool.max_results == 7
|
|
assert web_search_tool.proxy == "http://127.0.0.1:7890"
|
|
assert web_fetch_tool.proxy == "http://127.0.0.1:7890"
|
|
assert read_tool._allowed_dir == tmp_path
|