Security audit: Fix critical dependency vulnerabilities and add security controls
Co-authored-by: kingassune <6126851+kingassune@users.noreply.github.com>
This commit is contained in:
@@ -6,6 +6,34 @@ from typing import Any
|
||||
from nanobot.agent.tools.base import Tool
|
||||
|
||||
|
||||
def _validate_path(path: str, base_dir: Path | None = None) -> tuple[bool, Path | str]:
|
||||
"""
|
||||
Validate path to prevent directory traversal attacks.
|
||||
|
||||
Args:
|
||||
path: The path to validate
|
||||
base_dir: Optional base directory to restrict operations to
|
||||
|
||||
Returns:
|
||||
Tuple of (is_valid, resolved_path_or_error_message)
|
||||
"""
|
||||
try:
|
||||
file_path = Path(path).expanduser().resolve()
|
||||
|
||||
# If base_dir is specified, ensure the path is within it
|
||||
if base_dir is not None:
|
||||
base_resolved = base_dir.resolve()
|
||||
try:
|
||||
# Check if file_path is relative to base_dir
|
||||
file_path.relative_to(base_resolved)
|
||||
except ValueError:
|
||||
return False, f"Error: Path {path} is outside allowed directory"
|
||||
|
||||
return True, file_path
|
||||
except Exception as e:
|
||||
return False, f"Error: Invalid path: {str(e)}"
|
||||
|
||||
|
||||
class ReadFileTool(Tool):
|
||||
"""Tool to read file contents."""
|
||||
|
||||
@@ -32,7 +60,11 @@ class ReadFileTool(Tool):
|
||||
|
||||
async def execute(self, path: str, **kwargs: Any) -> str:
|
||||
try:
|
||||
file_path = Path(path).expanduser()
|
||||
is_valid, result = _validate_path(path)
|
||||
if not is_valid:
|
||||
return str(result)
|
||||
|
||||
file_path = result
|
||||
if not file_path.exists():
|
||||
return f"Error: File not found: {path}"
|
||||
if not file_path.is_file():
|
||||
@@ -76,7 +108,11 @@ class WriteFileTool(Tool):
|
||||
|
||||
async def execute(self, path: str, content: str, **kwargs: Any) -> str:
|
||||
try:
|
||||
file_path = Path(path).expanduser()
|
||||
is_valid, result = _validate_path(path)
|
||||
if not is_valid:
|
||||
return str(result)
|
||||
|
||||
file_path = result
|
||||
file_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
file_path.write_text(content, encoding="utf-8")
|
||||
return f"Successfully wrote {len(content)} bytes to {path}"
|
||||
@@ -120,7 +156,11 @@ class EditFileTool(Tool):
|
||||
|
||||
async def execute(self, path: str, old_text: str, new_text: str, **kwargs: Any) -> str:
|
||||
try:
|
||||
file_path = Path(path).expanduser()
|
||||
is_valid, result = _validate_path(path)
|
||||
if not is_valid:
|
||||
return str(result)
|
||||
|
||||
file_path = result
|
||||
if not file_path.exists():
|
||||
return f"Error: File not found: {path}"
|
||||
|
||||
@@ -170,7 +210,11 @@ class ListDirTool(Tool):
|
||||
|
||||
async def execute(self, path: str, **kwargs: Any) -> str:
|
||||
try:
|
||||
dir_path = Path(path).expanduser()
|
||||
is_valid, result = _validate_path(path)
|
||||
if not is_valid:
|
||||
return str(result)
|
||||
|
||||
dir_path = result
|
||||
if not dir_path.exists():
|
||||
return f"Error: Directory not found: {path}"
|
||||
if not dir_path.is_dir():
|
||||
|
||||
@@ -2,11 +2,35 @@
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import re
|
||||
from typing import Any
|
||||
|
||||
from nanobot.agent.tools.base import Tool
|
||||
|
||||
|
||||
# List of potentially dangerous command patterns
|
||||
DANGEROUS_PATTERNS = [
|
||||
r'rm\s+-rf\s+/', # rm -rf /
|
||||
r':\(\)\{\s*:\|:&\s*\};:', # fork bomb
|
||||
r'mkfs\.', # format filesystem
|
||||
r'dd\s+if=.*\s+of=/dev/(sd|hd)', # overwrite disk
|
||||
r'>\s*/dev/(sd|hd)', # write to raw disk device
|
||||
]
|
||||
|
||||
|
||||
def _is_dangerous_command(command: str) -> tuple[bool, str | None]:
|
||||
"""
|
||||
Check if a command contains dangerous patterns.
|
||||
|
||||
Returns:
|
||||
Tuple of (is_dangerous, warning_message)
|
||||
"""
|
||||
for pattern in DANGEROUS_PATTERNS:
|
||||
if re.search(pattern, command, re.IGNORECASE):
|
||||
return True, f"Warning: Command contains potentially dangerous pattern: {pattern}"
|
||||
return False, None
|
||||
|
||||
|
||||
class ExecTool(Tool):
|
||||
"""Tool to execute shell commands."""
|
||||
|
||||
@@ -40,6 +64,11 @@ class ExecTool(Tool):
|
||||
}
|
||||
|
||||
async def execute(self, command: str, working_dir: str | None = None, **kwargs: Any) -> str:
|
||||
# Check for dangerous command patterns
|
||||
is_dangerous, warning = _is_dangerous_command(command)
|
||||
if is_dangerous:
|
||||
return f"Error: Refusing to execute dangerous command. {warning}"
|
||||
|
||||
cwd = working_dir or self.working_dir or os.getcwd()
|
||||
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user