mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-05-03 00:02:36 +00:00
fix: prevent path traversal in FileWriterTool without interface changes
Adds containment check inside _run() using os.path.realpath() to ensure the resolved file path stays within the resolved directory. Blocks ../ sequences, absolute filenames, and symlink escapes transparently — no schema or interface changes required. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -2,7 +2,7 @@ import os
|
||||
from typing import Any
|
||||
|
||||
from crewai.tools import BaseTool
|
||||
from pydantic import BaseModel, Field
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
def strtobool(val) -> bool:
|
||||
@@ -23,80 +23,42 @@ class FileWriterToolInput(BaseModel):
|
||||
content: str
|
||||
|
||||
|
||||
class ScopedFileWriterToolInput(BaseModel):
|
||||
"""Input when base_dir is set — the LLM supplies only filename and content."""
|
||||
filename: str
|
||||
overwrite: str | bool = False
|
||||
content: str
|
||||
|
||||
|
||||
class FileWriterTool(BaseTool):
|
||||
name: str = "File Writer Tool"
|
||||
description: str = (
|
||||
"A tool to write content to a specified file. "
|
||||
"Accepts filename, content, and optionally a directory path and overwrite flag as input."
|
||||
)
|
||||
description: str = "A tool to write content to a specified file. Accepts filename, content, and optionally a directory path and overwrite flag as input."
|
||||
args_schema: type[BaseModel] = FileWriterToolInput
|
||||
base_dir: str | None = None
|
||||
|
||||
def __init__(self, base_dir: str | None = None, **kwargs: Any) -> None:
|
||||
"""Initialize the FileWriterTool.
|
||||
|
||||
Args:
|
||||
base_dir (Optional[str]): Restrict all writes to this directory tree.
|
||||
Any filename or directory that resolves outside base_dir is rejected,
|
||||
including ../traversal and symlink escapes. When not set the tool
|
||||
can write anywhere the process has permission to — only use that in
|
||||
fully sandboxed environments.
|
||||
"""
|
||||
super().__init__(**kwargs)
|
||||
self.base_dir = os.path.realpath(base_dir) if base_dir is not None else None
|
||||
if base_dir is not None:
|
||||
self.args_schema = ScopedFileWriterToolInput
|
||||
self.description = (
|
||||
f"A tool to write files into {base_dir}. "
|
||||
"Accepts a filename, content, and an optional overwrite flag."
|
||||
)
|
||||
self._generate_description()
|
||||
|
||||
def _validate_path(self, filepath: str) -> str | None:
|
||||
"""Resolve path and enforce base_dir containment. Returns None if rejected."""
|
||||
if self.base_dir is None:
|
||||
return filepath
|
||||
real_path = os.path.realpath(filepath)
|
||||
if not real_path.startswith(self.base_dir + os.sep) and real_path != self.base_dir:
|
||||
return None
|
||||
return real_path
|
||||
|
||||
def _run(self, **kwargs: Any) -> str:
|
||||
try:
|
||||
directory = kwargs.get("directory") or "./"
|
||||
filename = kwargs["filename"]
|
||||
|
||||
if self.base_dir is not None:
|
||||
# Developer controls the directory; LLM only supplies filename.
|
||||
filepath = os.path.join(self.base_dir, filename)
|
||||
else:
|
||||
directory = kwargs.get("directory") or "./"
|
||||
filepath = os.path.join(directory, filename)
|
||||
filepath = os.path.join(directory, filename)
|
||||
|
||||
validated = self._validate_path(filepath)
|
||||
if validated is None:
|
||||
return "Error: Access denied — path resolves outside the allowed directory."
|
||||
# Prevent path traversal: the resolved path must stay within the
|
||||
# resolved directory. This blocks ../sequences, absolute paths in
|
||||
# filename, and symlink escapes regardless of how directory is set.
|
||||
real_directory = os.path.realpath(directory)
|
||||
real_filepath = os.path.realpath(filepath)
|
||||
if not real_filepath.startswith(real_directory + os.sep) and real_filepath != real_directory:
|
||||
return "Error: Invalid file path — the filename must not escape the target directory."
|
||||
|
||||
validated_dir = os.path.dirname(validated)
|
||||
os.makedirs(validated_dir, exist_ok=True)
|
||||
if kwargs.get("directory"):
|
||||
os.makedirs(real_directory, exist_ok=True)
|
||||
|
||||
kwargs["overwrite"] = strtobool(kwargs["overwrite"])
|
||||
|
||||
if os.path.exists(validated) and not kwargs["overwrite"]:
|
||||
return f"File {validated} already exists and overwrite option was not passed."
|
||||
if os.path.exists(real_filepath) and not kwargs["overwrite"]:
|
||||
return f"File {real_filepath} already exists and overwrite option was not passed."
|
||||
|
||||
mode = "w" if kwargs["overwrite"] else "x"
|
||||
with open(validated, mode) as file:
|
||||
with open(real_filepath, mode) as file:
|
||||
file.write(kwargs["content"])
|
||||
return f"Content successfully written to {validated}"
|
||||
return f"Content successfully written to {real_filepath}"
|
||||
except FileExistsError:
|
||||
return f"File already exists and overwrite option was not passed."
|
||||
return (
|
||||
f"File {real_filepath} already exists and overwrite option was not passed."
|
||||
)
|
||||
except KeyError as e:
|
||||
return f"An error occurred while accessing key: {e!s}"
|
||||
except Exception as e:
|
||||
|
||||
@@ -137,61 +137,36 @@ def test_file_exists_error_handling(tool, temp_env, overwrite):
|
||||
assert read_file(path) == "Pre-existing content"
|
||||
|
||||
|
||||
# --- base_dir containment ---
|
||||
# --- Path traversal prevention ---
|
||||
|
||||
@pytest.fixture
|
||||
def scoped_tool(temp_env):
|
||||
return FileWriterTool(base_dir=temp_env["temp_dir"])
|
||||
|
||||
|
||||
def test_base_dir_schema_has_no_directory_field(temp_env):
|
||||
"""When base_dir is set, the LLM schema has no directory field."""
|
||||
from crewai_tools.tools.file_writer_tool.file_writer_tool import ScopedFileWriterToolInput
|
||||
tool = FileWriterTool(base_dir=temp_env["temp_dir"])
|
||||
assert tool.args_schema is ScopedFileWriterToolInput
|
||||
assert "directory" not in tool.args_schema.model_fields
|
||||
|
||||
|
||||
def test_base_dir_allows_write_inside(scoped_tool, temp_env):
|
||||
"""LLM supplies only filename — file lands in base_dir."""
|
||||
result = scoped_tool._run(
|
||||
filename=temp_env["test_file"],
|
||||
content=temp_env["test_content"],
|
||||
overwrite=True,
|
||||
)
|
||||
assert "successfully written" in result
|
||||
assert read_file(get_test_path(temp_env["test_file"], temp_env["temp_dir"])) == temp_env["test_content"]
|
||||
|
||||
|
||||
def test_base_dir_blocks_traversal_in_filename(scoped_tool, temp_env):
|
||||
result = scoped_tool._run(
|
||||
def test_blocks_traversal_in_filename(tool, temp_env):
|
||||
result = tool._run(
|
||||
filename="../outside.txt",
|
||||
directory=temp_env["temp_dir"],
|
||||
content="should not be written",
|
||||
overwrite=True,
|
||||
)
|
||||
assert "Access denied" in result
|
||||
assert "Error" in result
|
||||
assert not os.path.exists(os.path.join(temp_env["temp_dir"], "../outside.txt"))
|
||||
|
||||
|
||||
def test_base_dir_blocks_absolute_filename(scoped_tool, temp_env):
|
||||
result = scoped_tool._run(
|
||||
def test_blocks_absolute_path_in_filename(tool, temp_env):
|
||||
result = tool._run(
|
||||
filename="/etc/passwd",
|
||||
directory=temp_env["temp_dir"],
|
||||
content="should not be written",
|
||||
overwrite=True,
|
||||
)
|
||||
assert "Access denied" in result
|
||||
assert "Error" in result
|
||||
|
||||
|
||||
def test_base_dir_blocks_symlink_escape(scoped_tool, temp_env):
|
||||
def test_blocks_symlink_escape(tool, temp_env):
|
||||
link = os.path.join(temp_env["temp_dir"], "escape")
|
||||
os.symlink("/etc", link)
|
||||
result = scoped_tool._run(
|
||||
result = tool._run(
|
||||
filename="escape/crontab",
|
||||
directory=temp_env["temp_dir"],
|
||||
content="should not be written",
|
||||
overwrite=True,
|
||||
)
|
||||
assert "Access denied" in result
|
||||
|
||||
|
||||
def test_base_dir_description_mentions_directory(temp_env):
|
||||
tool = FileWriterTool(base_dir=temp_env["temp_dir"])
|
||||
assert temp_env["temp_dir"] in tool.description
|
||||
assert "Error" in result
|
||||
|
||||
Reference in New Issue
Block a user