"""Shell execution tool.""" import asyncio import os import re from pathlib import Path from typing import Any from nanobot.agent.tools.base import Tool class ExecTool(Tool): """Tool to execute shell commands.""" def __init__( self, timeout: int = 60, working_dir: str | None = None, deny_patterns: list[str] | None = None, allow_patterns: list[str] | None = None, restrict_to_workspace: bool = False, ): self.timeout = timeout self.working_dir = working_dir self.deny_patterns = deny_patterns or [ r"\brm\s+-[rf]{1,2}\b", # rm -r, rm -rf, rm -fr r"\bdel\s+/[fq]\b", # del /f, del /q r"\brmdir\s+/s\b", # rmdir /s r"\b(format|mkfs|diskpart)\b", # disk operations r"\bdd\s+if=", # dd r">\s*/dev/sd", # write to disk r"\b(shutdown|reboot|poweroff)\b", # system power r":\(\)\s*\{.*\};\s*:", # fork bomb ] self.allow_patterns = allow_patterns or [] self.restrict_to_workspace = restrict_to_workspace @property def name(self) -> str: return "exec" @property def description(self) -> str: return "Execute a shell command and return its output. Use with caution." @property def parameters(self) -> dict[str, Any]: return { "type": "object", "properties": { "command": { "type": "string", "description": "The shell command to execute" }, "working_dir": { "type": "string", "description": "Optional working directory for the command" } }, "required": ["command"] } async def execute(self, command: str, working_dir: str | None = None, **kwargs: Any) -> str: cwd = working_dir or self.working_dir or os.getcwd() guard_error = self._guard_command(command, cwd) if guard_error: return guard_error try: process = await asyncio.create_subprocess_shell( command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=cwd, ) try: stdout, stderr = await asyncio.wait_for( process.communicate(), timeout=self.timeout ) except asyncio.TimeoutError: process.kill() return f"Error: Command timed out after {self.timeout} seconds" output_parts = [] if stdout: output_parts.append(stdout.decode("utf-8", errors="replace")) if stderr: stderr_text = stderr.decode("utf-8", errors="replace") if stderr_text.strip(): output_parts.append(f"STDERR:\n{stderr_text}") if process.returncode != 0: output_parts.append(f"\nExit code: {process.returncode}") result = "\n".join(output_parts) if output_parts else "(no output)" # Truncate very long output max_len = 10000 if len(result) > max_len: result = result[:max_len] + f"\n... (truncated, {len(result) - max_len} more chars)" return result except Exception as e: return f"Error executing command: {str(e)}" def _guard_command(self, command: str, cwd: str) -> str | None: """Best-effort safety guard for potentially destructive commands.""" cmd = command.strip() lower = cmd.lower() for pattern in self.deny_patterns: if re.search(pattern, lower): return "Error: Command blocked by safety guard (dangerous pattern detected)" if self.allow_patterns: if not any(re.search(p, lower) for p in self.allow_patterns): return "Error: Command blocked by safety guard (not in allowlist)" if self.restrict_to_workspace: if "..\\" in cmd or "../" in cmd: return "Error: Command blocked by safety guard (path traversal detected)" cwd_path = Path(cwd).resolve() win_paths = re.findall(r"[A-Za-z]:\\[^\\\"']+", cmd) posix_paths = re.findall(r"/[^\s\"']+", cmd) for raw in win_paths + posix_paths: try: p = Path(raw).resolve() except Exception: continue if cwd_path not in p.parents and p != cwd_path: return "Error: Command blocked by safety guard (path outside working dir)" return None