From 96383abd8b05dd6cae962f3ca18942055f66c450 Mon Sep 17 00:00:00 2001 From: Rip&Tear <84775494+theCyberTech@users.noreply.github.com> Date: Mon, 16 Mar 2026 00:16:13 +0800 Subject: [PATCH] fix: prevent path traversal in FileWriterTool without interface changes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds containment check inside _run() using os.path.realpath() to ensure the resolved file path stays within the resolved directory. Blocks ../ sequences, absolute filenames, and symlink escapes transparently — no schema or interface changes required. Co-Authored-By: Claude Sonnet 4.6 --- .../file_writer_tool/file_writer_tool.py | 78 +++++-------------- .../tests/tools/test_file_writer_tool.py | 53 ++++--------- 2 files changed, 34 insertions(+), 97 deletions(-) diff --git a/lib/crewai-tools/src/crewai_tools/tools/file_writer_tool/file_writer_tool.py b/lib/crewai-tools/src/crewai_tools/tools/file_writer_tool/file_writer_tool.py index eb2d8f0f9..16e7c261f 100644 --- a/lib/crewai-tools/src/crewai_tools/tools/file_writer_tool/file_writer_tool.py +++ b/lib/crewai-tools/src/crewai_tools/tools/file_writer_tool/file_writer_tool.py @@ -2,7 +2,7 @@ import os from typing import Any from crewai.tools import BaseTool -from pydantic import BaseModel, Field +from pydantic import BaseModel def strtobool(val) -> bool: @@ -23,80 +23,42 @@ class FileWriterToolInput(BaseModel): content: str -class ScopedFileWriterToolInput(BaseModel): - """Input when base_dir is set — the LLM supplies only filename and content.""" - filename: str - overwrite: str | bool = False - content: str - - class FileWriterTool(BaseTool): name: str = "File Writer Tool" - description: str = ( - "A tool to write content to a specified file. " - "Accepts filename, content, and optionally a directory path and overwrite flag as input." - ) + description: str = "A tool to write content to a specified file. Accepts filename, content, and optionally a directory path and overwrite flag as input." args_schema: type[BaseModel] = FileWriterToolInput - base_dir: str | None = None - - def __init__(self, base_dir: str | None = None, **kwargs: Any) -> None: - """Initialize the FileWriterTool. - - Args: - base_dir (Optional[str]): Restrict all writes to this directory tree. - Any filename or directory that resolves outside base_dir is rejected, - including ../traversal and symlink escapes. When not set the tool - can write anywhere the process has permission to — only use that in - fully sandboxed environments. - """ - super().__init__(**kwargs) - self.base_dir = os.path.realpath(base_dir) if base_dir is not None else None - if base_dir is not None: - self.args_schema = ScopedFileWriterToolInput - self.description = ( - f"A tool to write files into {base_dir}. " - "Accepts a filename, content, and an optional overwrite flag." - ) - self._generate_description() - - def _validate_path(self, filepath: str) -> str | None: - """Resolve path and enforce base_dir containment. Returns None if rejected.""" - if self.base_dir is None: - return filepath - real_path = os.path.realpath(filepath) - if not real_path.startswith(self.base_dir + os.sep) and real_path != self.base_dir: - return None - return real_path def _run(self, **kwargs: Any) -> str: try: + directory = kwargs.get("directory") or "./" filename = kwargs["filename"] - if self.base_dir is not None: - # Developer controls the directory; LLM only supplies filename. - filepath = os.path.join(self.base_dir, filename) - else: - directory = kwargs.get("directory") or "./" - filepath = os.path.join(directory, filename) + filepath = os.path.join(directory, filename) - validated = self._validate_path(filepath) - if validated is None: - return "Error: Access denied — path resolves outside the allowed directory." + # Prevent path traversal: the resolved path must stay within the + # resolved directory. This blocks ../sequences, absolute paths in + # filename, and symlink escapes regardless of how directory is set. + real_directory = os.path.realpath(directory) + real_filepath = os.path.realpath(filepath) + if not real_filepath.startswith(real_directory + os.sep) and real_filepath != real_directory: + return "Error: Invalid file path — the filename must not escape the target directory." - validated_dir = os.path.dirname(validated) - os.makedirs(validated_dir, exist_ok=True) + if kwargs.get("directory"): + os.makedirs(real_directory, exist_ok=True) kwargs["overwrite"] = strtobool(kwargs["overwrite"]) - if os.path.exists(validated) and not kwargs["overwrite"]: - return f"File {validated} already exists and overwrite option was not passed." + if os.path.exists(real_filepath) and not kwargs["overwrite"]: + return f"File {real_filepath} already exists and overwrite option was not passed." mode = "w" if kwargs["overwrite"] else "x" - with open(validated, mode) as file: + with open(real_filepath, mode) as file: file.write(kwargs["content"]) - return f"Content successfully written to {validated}" + return f"Content successfully written to {real_filepath}" except FileExistsError: - return f"File already exists and overwrite option was not passed." + return ( + f"File {real_filepath} already exists and overwrite option was not passed." + ) except KeyError as e: return f"An error occurred while accessing key: {e!s}" except Exception as e: diff --git a/lib/crewai-tools/tests/tools/test_file_writer_tool.py b/lib/crewai-tools/tests/tools/test_file_writer_tool.py index af3e81016..38e23db9f 100644 --- a/lib/crewai-tools/tests/tools/test_file_writer_tool.py +++ b/lib/crewai-tools/tests/tools/test_file_writer_tool.py @@ -137,61 +137,36 @@ def test_file_exists_error_handling(tool, temp_env, overwrite): assert read_file(path) == "Pre-existing content" -# --- base_dir containment --- +# --- Path traversal prevention --- -@pytest.fixture -def scoped_tool(temp_env): - return FileWriterTool(base_dir=temp_env["temp_dir"]) - - -def test_base_dir_schema_has_no_directory_field(temp_env): - """When base_dir is set, the LLM schema has no directory field.""" - from crewai_tools.tools.file_writer_tool.file_writer_tool import ScopedFileWriterToolInput - tool = FileWriterTool(base_dir=temp_env["temp_dir"]) - assert tool.args_schema is ScopedFileWriterToolInput - assert "directory" not in tool.args_schema.model_fields - - -def test_base_dir_allows_write_inside(scoped_tool, temp_env): - """LLM supplies only filename — file lands in base_dir.""" - result = scoped_tool._run( - filename=temp_env["test_file"], - content=temp_env["test_content"], - overwrite=True, - ) - assert "successfully written" in result - assert read_file(get_test_path(temp_env["test_file"], temp_env["temp_dir"])) == temp_env["test_content"] - - -def test_base_dir_blocks_traversal_in_filename(scoped_tool, temp_env): - result = scoped_tool._run( +def test_blocks_traversal_in_filename(tool, temp_env): + result = tool._run( filename="../outside.txt", + directory=temp_env["temp_dir"], content="should not be written", overwrite=True, ) - assert "Access denied" in result + assert "Error" in result + assert not os.path.exists(os.path.join(temp_env["temp_dir"], "../outside.txt")) -def test_base_dir_blocks_absolute_filename(scoped_tool, temp_env): - result = scoped_tool._run( +def test_blocks_absolute_path_in_filename(tool, temp_env): + result = tool._run( filename="/etc/passwd", + directory=temp_env["temp_dir"], content="should not be written", overwrite=True, ) - assert "Access denied" in result + assert "Error" in result -def test_base_dir_blocks_symlink_escape(scoped_tool, temp_env): +def test_blocks_symlink_escape(tool, temp_env): link = os.path.join(temp_env["temp_dir"], "escape") os.symlink("/etc", link) - result = scoped_tool._run( + result = tool._run( filename="escape/crontab", + directory=temp_env["temp_dir"], content="should not be written", overwrite=True, ) - assert "Access denied" in result - - -def test_base_dir_description_mentions_directory(temp_env): - tool = FileWriterTool(base_dir=temp_env["temp_dir"]) - assert temp_env["temp_dir"] in tool.description + assert "Error" in result