Merge branch 'main' into main

This commit is contained in:
Dontrail Cotlage
2026-02-04 09:59:33 -05:00
committed by GitHub
14 changed files with 276 additions and 43 deletions

View File

@@ -40,14 +40,17 @@ class AgentLoop:
workspace: Path,
model: str | None = None,
max_iterations: int = 20,
brave_api_key: str | None = None
brave_api_key: str | None = None,
exec_config: "ExecToolConfig | None" = None,
):
from nanobot.config.schema import ExecToolConfig
self.bus = bus
self.provider = provider
self.workspace = workspace
self.model = model or provider.get_default_model()
self.max_iterations = max_iterations
self.brave_api_key = brave_api_key
self.exec_config = exec_config or ExecToolConfig()
self.context = ContextBuilder(workspace)
self.sessions = SessionManager(workspace)
@@ -58,6 +61,7 @@ class AgentLoop:
bus=bus,
model=self.model,
brave_api_key=brave_api_key,
exec_config=self.exec_config,
)
self._running = False
@@ -72,7 +76,11 @@ class AgentLoop:
self.tools.register(ListDirTool())
# Shell tool
self.tools.register(ExecTool(working_dir=str(self.workspace)))
self.tools.register(ExecTool(
working_dir=str(self.workspace),
timeout=self.exec_config.timeout,
restrict_to_workspace=self.exec_config.restrict_to_workspace,
))
# Web tools
self.tools.register(WebSearchTool(api_key=self.brave_api_key))

View File

@@ -33,12 +33,15 @@ class SubagentManager:
bus: MessageBus,
model: str | None = None,
brave_api_key: str | None = None,
exec_config: "ExecToolConfig | None" = None,
):
from nanobot.config.schema import ExecToolConfig
self.provider = provider
self.workspace = workspace
self.bus = bus
self.model = model or provider.get_default_model()
self.brave_api_key = brave_api_key
self.exec_config = exec_config or ExecToolConfig()
self._running_tasks: dict[str, asyncio.Task[None]] = {}
async def spawn(
@@ -96,7 +99,11 @@ class SubagentManager:
tools.register(ReadFileTool())
tools.register(WriteFileTool())
tools.register(ListDirTool())
tools.register(ExecTool(working_dir=str(self.workspace)))
tools.register(ExecTool(
working_dir=str(self.workspace),
timeout=self.exec_config.timeout,
restrict_to_workspace=self.exec_config.restrict_to_workspace,
))
tools.register(WebSearchTool(api_key=self.brave_api_key))
tools.register(WebFetchTool())

View File

@@ -12,6 +12,15 @@ class Tool(ABC):
the environment, such as reading files, executing commands, etc.
"""
_TYPE_MAP = {
"string": str,
"integer": int,
"number": (int, float),
"boolean": bool,
"array": list,
"object": dict,
}
@property
@abstractmethod
def name(self) -> str:
@@ -42,6 +51,44 @@ class Tool(ABC):
String result of the tool execution.
"""
pass
def validate_params(self, params: dict[str, Any]) -> list[str]:
"""Validate tool parameters against JSON schema. Returns error list (empty if valid)."""
schema = self.parameters or {}
if schema.get("type", "object") != "object":
raise ValueError(f"Schema must be object type, got {schema.get('type')!r}")
return self._validate(params, {**schema, "type": "object"}, "")
def _validate(self, val: Any, schema: dict[str, Any], path: str) -> list[str]:
t, label = schema.get("type"), path or "parameter"
if t in self._TYPE_MAP and not isinstance(val, self._TYPE_MAP[t]):
return [f"{label} should be {t}"]
errors = []
if "enum" in schema and val not in schema["enum"]:
errors.append(f"{label} must be one of {schema['enum']}")
if t in ("integer", "number"):
if "minimum" in schema and val < schema["minimum"]:
errors.append(f"{label} must be >= {schema['minimum']}")
if "maximum" in schema and val > schema["maximum"]:
errors.append(f"{label} must be <= {schema['maximum']}")
if t == "string":
if "minLength" in schema and len(val) < schema["minLength"]:
errors.append(f"{label} must be at least {schema['minLength']} chars")
if "maxLength" in schema and len(val) > schema["maxLength"]:
errors.append(f"{label} must be at most {schema['maxLength']} chars")
if t == "object":
props = schema.get("properties", {})
for k in schema.get("required", []):
if k not in val:
errors.append(f"missing required {path + '.' + k if path else k}")
for k, v in val.items():
if k in props:
errors.extend(self._validate(v, props[k], path + '.' + k if path else k))
if t == "array" and "items" in schema:
for i, item in enumerate(val):
errors.extend(self._validate(item, schema["items"], f"{path}[{i}]" if path else f"[{i}]"))
return errors
def to_schema(self) -> dict[str, Any]:
"""Convert tool to OpenAI function schema format."""

View File

@@ -52,8 +52,11 @@ class ToolRegistry:
tool = self._tools.get(name)
if not tool:
return f"Error: Tool '{name}' not found"
try:
errors = tool.validate_params(params)
if errors:
return f"Error: Invalid parameters for tool '{name}': " + "; ".join(errors)
return await tool.execute(**params)
except Exception as e:
return f"Error executing {name}: {str(e)}"

View File

@@ -34,9 +34,28 @@ def validate_command_safety(command: str) -> tuple[bool, str | None]:
class ExecTool(Tool):
"""Tool to execute shell commands."""
def __init__(self, timeout: int = 60, working_dir: str | None = None):
def __init__(
self,
timeout: int = 60,
working_dir: str | None = None,
deny_patterns: list[str] | None = None,
allow_patterns: list[str] | None = None,
restrict_to_workspace: bool = False,
):
self.timeout = timeout
self.working_dir = working_dir
self.deny_patterns = deny_patterns or [
r"\brm\s+-[rf]{1,2}\b", # rm -r, rm -rf, rm -fr
r"\bdel\s+/[fq]\b", # del /f, del /q
r"\brmdir\s+/s\b", # rmdir /s
r"\b(format|mkfs|diskpart)\b", # disk operations
r"\bdd\s+if=", # dd
r">\s*/dev/sd", # write to disk
r"\b(shutdown|reboot|poweroff)\b", # system power
r":\(\)\s*\{.*\};\s*:", # fork bomb
]
self.allow_patterns = allow_patterns or []
self.restrict_to_workspace = restrict_to_workspace
@property
def name(self) -> str:
@@ -70,6 +89,9 @@ class ExecTool(Tool):
return f"Error: Refusing to execute dangerous command. {warning}"
cwd = working_dir or self.working_dir or os.getcwd()
guard_error = self._guard_command(command, cwd)
if guard_error:
return guard_error
try:
process = await asyncio.create_subprocess_shell(
@@ -112,3 +134,35 @@ class ExecTool(Tool):
except Exception as e:
return f"Error executing command: {str(e)}"
def _guard_command(self, command: str, cwd: str) -> str | None:
"""Best-effort safety guard for potentially destructive commands."""
cmd = command.strip()
lower = cmd.lower()
for pattern in self.deny_patterns:
if re.search(pattern, lower):
return "Error: Command blocked by safety guard (dangerous pattern detected)"
if self.allow_patterns:
if not any(re.search(p, lower) for p in self.allow_patterns):
return "Error: Command blocked by safety guard (not in allowlist)"
if self.restrict_to_workspace:
if "..\\" in cmd or "../" in cmd:
return "Error: Command blocked by safety guard (path traversal detected)"
cwd_path = Path(cwd).resolve()
win_paths = re.findall(r"[A-Za-z]:\\[^\\\"']+", cmd)
posix_paths = re.findall(r"/[^\s\"']+", cmd)
for raw in win_paths + posix_paths:
try:
p = Path(raw).resolve()
except Exception:
continue
if cwd_path not in p.parents and p != cwd_path:
return "Error: Command blocked by safety guard (path outside working dir)"
return None