Merge PR #653: resolve relative file paths against workspace

This commit is contained in:
Re-bin
2026-02-20 08:03:27 +00:00
3 changed files with 42 additions and 35 deletions

View File

@@ -93,12 +93,12 @@ class AgentLoop:
def _register_default_tools(self) -> None:
"""Register the default set of tools."""
# File tools (restrict to workspace if configured)
# File tools (workspace for relative paths, restrict if configured)
allowed_dir = self.workspace if self.restrict_to_workspace else None
self.tools.register(ReadFileTool(allowed_dir=allowed_dir))
self.tools.register(WriteFileTool(allowed_dir=allowed_dir))
self.tools.register(EditFileTool(allowed_dir=allowed_dir))
self.tools.register(ListDirTool(allowed_dir=allowed_dir))
self.tools.register(ReadFileTool(workspace=self.workspace, allowed_dir=allowed_dir))
self.tools.register(WriteFileTool(workspace=self.workspace, allowed_dir=allowed_dir))
self.tools.register(EditFileTool(workspace=self.workspace, allowed_dir=allowed_dir))
self.tools.register(ListDirTool(workspace=self.workspace, allowed_dir=allowed_dir))
# Shell tool
self.tools.register(ExecTool(

View File

@@ -103,10 +103,10 @@ class SubagentManager:
# Build subagent tools (no message tool, no spawn tool)
tools = ToolRegistry()
allowed_dir = self.workspace if self.restrict_to_workspace else None
tools.register(ReadFileTool(allowed_dir=allowed_dir))
tools.register(WriteFileTool(allowed_dir=allowed_dir))
tools.register(EditFileTool(allowed_dir=allowed_dir))
tools.register(ListDirTool(allowed_dir=allowed_dir))
tools.register(ReadFileTool(workspace=self.workspace, allowed_dir=allowed_dir))
tools.register(WriteFileTool(workspace=self.workspace, allowed_dir=allowed_dir))
tools.register(EditFileTool(workspace=self.workspace, allowed_dir=allowed_dir))
tools.register(ListDirTool(workspace=self.workspace, allowed_dir=allowed_dir))
tools.register(ExecTool(
working_dir=str(self.workspace),
timeout=self.exec_config.timeout,

View File

@@ -6,9 +6,12 @@ from typing import Any
from nanobot.agent.tools.base import Tool
def _resolve_path(path: str, allowed_dir: Path | None = None) -> Path:
"""Resolve path and optionally enforce directory restriction."""
resolved = Path(path).expanduser().resolve()
def _resolve_path(path: str, workspace: Path | None = None, allowed_dir: Path | None = None) -> Path:
"""Resolve path against workspace (if relative) and enforce directory restriction."""
p = Path(path).expanduser()
if not p.is_absolute() and workspace:
p = workspace / p
resolved = p.resolve()
if allowed_dir and not str(resolved).startswith(str(allowed_dir.resolve())):
raise PermissionError(f"Path {path} is outside allowed directory {allowed_dir}")
return resolved
@@ -16,8 +19,9 @@ def _resolve_path(path: str, allowed_dir: Path | None = None) -> Path:
class ReadFileTool(Tool):
"""Tool to read file contents."""
def __init__(self, allowed_dir: Path | None = None):
def __init__(self, workspace: Path | None = None, allowed_dir: Path | None = None):
self._workspace = workspace
self._allowed_dir = allowed_dir
@property
@@ -43,12 +47,12 @@ class ReadFileTool(Tool):
async def execute(self, path: str, **kwargs: Any) -> str:
try:
file_path = _resolve_path(path, self._allowed_dir)
file_path = _resolve_path(path, self._workspace, self._allowed_dir)
if not file_path.exists():
return f"Error: File not found: {path}"
if not file_path.is_file():
return f"Error: Not a file: {path}"
content = file_path.read_text(encoding="utf-8")
return content
except PermissionError as e:
@@ -59,8 +63,9 @@ class ReadFileTool(Tool):
class WriteFileTool(Tool):
"""Tool to write content to a file."""
def __init__(self, allowed_dir: Path | None = None):
def __init__(self, workspace: Path | None = None, allowed_dir: Path | None = None):
self._workspace = workspace
self._allowed_dir = allowed_dir
@property
@@ -90,10 +95,10 @@ class WriteFileTool(Tool):
async def execute(self, path: str, content: str, **kwargs: Any) -> str:
try:
file_path = _resolve_path(path, self._allowed_dir)
file_path = _resolve_path(path, self._workspace, self._allowed_dir)
file_path.parent.mkdir(parents=True, exist_ok=True)
file_path.write_text(content, encoding="utf-8")
return f"Successfully wrote {len(content)} bytes to {path}"
return f"Successfully wrote {len(content)} bytes to {file_path}"
except PermissionError as e:
return f"Error: {e}"
except Exception as e:
@@ -102,8 +107,9 @@ class WriteFileTool(Tool):
class EditFileTool(Tool):
"""Tool to edit a file by replacing text."""
def __init__(self, allowed_dir: Path | None = None):
def __init__(self, workspace: Path | None = None, allowed_dir: Path | None = None):
self._workspace = workspace
self._allowed_dir = allowed_dir
@property
@@ -137,24 +143,24 @@ class EditFileTool(Tool):
async def execute(self, path: str, old_text: str, new_text: str, **kwargs: Any) -> str:
try:
file_path = _resolve_path(path, self._allowed_dir)
file_path = _resolve_path(path, self._workspace, self._allowed_dir)
if not file_path.exists():
return f"Error: File not found: {path}"
content = file_path.read_text(encoding="utf-8")
if old_text not in content:
return f"Error: old_text not found in file. Make sure it matches exactly."
# Count occurrences
count = content.count(old_text)
if count > 1:
return f"Warning: old_text appears {count} times. Please provide more context to make it unique."
new_content = content.replace(old_text, new_text, 1)
file_path.write_text(new_content, encoding="utf-8")
return f"Successfully edited {path}"
return f"Successfully edited {file_path}"
except PermissionError as e:
return f"Error: {e}"
except Exception as e:
@@ -163,8 +169,9 @@ class EditFileTool(Tool):
class ListDirTool(Tool):
"""Tool to list directory contents."""
def __init__(self, allowed_dir: Path | None = None):
def __init__(self, workspace: Path | None = None, allowed_dir: Path | None = None):
self._workspace = workspace
self._allowed_dir = allowed_dir
@property
@@ -190,20 +197,20 @@ class ListDirTool(Tool):
async def execute(self, path: str, **kwargs: Any) -> str:
try:
dir_path = _resolve_path(path, self._allowed_dir)
dir_path = _resolve_path(path, self._workspace, self._allowed_dir)
if not dir_path.exists():
return f"Error: Directory not found: {path}"
if not dir_path.is_dir():
return f"Error: Not a directory: {path}"
items = []
for item in sorted(dir_path.iterdir()):
prefix = "📁 " if item.is_dir() else "📄 "
items.append(f"{prefix}{item.name}")
if not items:
return f"Directory {path} is empty"
return "\n".join(items)
except PermissionError as e:
return f"Error: {e}"