refactor: unify workspace restriction for file tools, remove redundant checks, fix SECURITY.md
This commit is contained in:
@@ -6,37 +6,20 @@ from typing import Any
|
||||
from nanobot.agent.tools.base import Tool
|
||||
|
||||
|
||||
def _validate_path(path: str, base_dir: Path | None = None) -> tuple[bool, Path | str]:
|
||||
"""
|
||||
Validate path to prevent directory traversal attacks.
|
||||
|
||||
Args:
|
||||
path: The path to validate
|
||||
base_dir: Optional base directory to restrict operations to
|
||||
|
||||
Returns:
|
||||
Tuple of (is_valid, resolved_path_or_error_message)
|
||||
"""
|
||||
try:
|
||||
file_path = Path(path).expanduser().resolve()
|
||||
|
||||
# If base_dir is specified, ensure the path is within it
|
||||
if base_dir is not None:
|
||||
base_resolved = base_dir.resolve()
|
||||
try:
|
||||
# Check if file_path is relative to base_dir
|
||||
file_path.relative_to(base_resolved)
|
||||
except ValueError:
|
||||
return False, f"Error: Path {path} is outside allowed directory"
|
||||
|
||||
return True, file_path
|
||||
except Exception as e:
|
||||
return False, f"Error: Invalid path: {str(e)}"
|
||||
def _resolve_path(path: str, allowed_dir: Path | None = None) -> Path:
|
||||
"""Resolve path and optionally enforce directory restriction."""
|
||||
resolved = Path(path).expanduser().resolve()
|
||||
if allowed_dir and not str(resolved).startswith(str(allowed_dir.resolve())):
|
||||
raise PermissionError(f"Path {path} is outside allowed directory {allowed_dir}")
|
||||
return resolved
|
||||
|
||||
|
||||
class ReadFileTool(Tool):
|
||||
"""Tool to read file contents."""
|
||||
|
||||
def __init__(self, allowed_dir: Path | None = None):
|
||||
self._allowed_dir = allowed_dir
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return "read_file"
|
||||
@@ -60,11 +43,7 @@ class ReadFileTool(Tool):
|
||||
|
||||
async def execute(self, path: str, **kwargs: Any) -> str:
|
||||
try:
|
||||
is_valid, result = _validate_path(path)
|
||||
if not is_valid:
|
||||
return str(result)
|
||||
|
||||
file_path = result
|
||||
file_path = _resolve_path(path, self._allowed_dir)
|
||||
if not file_path.exists():
|
||||
return f"Error: File not found: {path}"
|
||||
if not file_path.is_file():
|
||||
@@ -72,8 +51,8 @@ class ReadFileTool(Tool):
|
||||
|
||||
content = file_path.read_text(encoding="utf-8")
|
||||
return content
|
||||
except PermissionError:
|
||||
return f"Error: Permission denied: {path}"
|
||||
except PermissionError as e:
|
||||
return f"Error: {e}"
|
||||
except Exception as e:
|
||||
return f"Error reading file: {str(e)}"
|
||||
|
||||
@@ -81,6 +60,9 @@ class ReadFileTool(Tool):
|
||||
class WriteFileTool(Tool):
|
||||
"""Tool to write content to a file."""
|
||||
|
||||
def __init__(self, allowed_dir: Path | None = None):
|
||||
self._allowed_dir = allowed_dir
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return "write_file"
|
||||
@@ -108,16 +90,12 @@ class WriteFileTool(Tool):
|
||||
|
||||
async def execute(self, path: str, content: str, **kwargs: Any) -> str:
|
||||
try:
|
||||
is_valid, result = _validate_path(path)
|
||||
if not is_valid:
|
||||
return str(result)
|
||||
|
||||
file_path = result
|
||||
file_path = _resolve_path(path, self._allowed_dir)
|
||||
file_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
file_path.write_text(content, encoding="utf-8")
|
||||
return f"Successfully wrote {len(content)} bytes to {path}"
|
||||
except PermissionError:
|
||||
return f"Error: Permission denied: {path}"
|
||||
except PermissionError as e:
|
||||
return f"Error: {e}"
|
||||
except Exception as e:
|
||||
return f"Error writing file: {str(e)}"
|
||||
|
||||
@@ -125,6 +103,9 @@ class WriteFileTool(Tool):
|
||||
class EditFileTool(Tool):
|
||||
"""Tool to edit a file by replacing text."""
|
||||
|
||||
def __init__(self, allowed_dir: Path | None = None):
|
||||
self._allowed_dir = allowed_dir
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return "edit_file"
|
||||
@@ -156,11 +137,7 @@ class EditFileTool(Tool):
|
||||
|
||||
async def execute(self, path: str, old_text: str, new_text: str, **kwargs: Any) -> str:
|
||||
try:
|
||||
is_valid, result = _validate_path(path)
|
||||
if not is_valid:
|
||||
return str(result)
|
||||
|
||||
file_path = result
|
||||
file_path = _resolve_path(path, self._allowed_dir)
|
||||
if not file_path.exists():
|
||||
return f"Error: File not found: {path}"
|
||||
|
||||
@@ -178,8 +155,8 @@ class EditFileTool(Tool):
|
||||
file_path.write_text(new_content, encoding="utf-8")
|
||||
|
||||
return f"Successfully edited {path}"
|
||||
except PermissionError:
|
||||
return f"Error: Permission denied: {path}"
|
||||
except PermissionError as e:
|
||||
return f"Error: {e}"
|
||||
except Exception as e:
|
||||
return f"Error editing file: {str(e)}"
|
||||
|
||||
@@ -187,6 +164,9 @@ class EditFileTool(Tool):
|
||||
class ListDirTool(Tool):
|
||||
"""Tool to list directory contents."""
|
||||
|
||||
def __init__(self, allowed_dir: Path | None = None):
|
||||
self._allowed_dir = allowed_dir
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return "list_dir"
|
||||
@@ -210,11 +190,7 @@ class ListDirTool(Tool):
|
||||
|
||||
async def execute(self, path: str, **kwargs: Any) -> str:
|
||||
try:
|
||||
is_valid, result = _validate_path(path)
|
||||
if not is_valid:
|
||||
return str(result)
|
||||
|
||||
dir_path = result
|
||||
dir_path = _resolve_path(path, self._allowed_dir)
|
||||
if not dir_path.exists():
|
||||
return f"Error: Directory not found: {path}"
|
||||
if not dir_path.is_dir():
|
||||
@@ -229,7 +205,7 @@ class ListDirTool(Tool):
|
||||
return f"Directory {path} is empty"
|
||||
|
||||
return "\n".join(items)
|
||||
except PermissionError:
|
||||
return f"Error: Permission denied: {path}"
|
||||
except PermissionError as e:
|
||||
return f"Error: {e}"
|
||||
except Exception as e:
|
||||
return f"Error listing directory: {str(e)}"
|
||||
|
||||
Reference in New Issue
Block a user