mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-04-30 23:02:50 +00:00
fix: prevent path traversal in FileWriterTool (#4895)
* fix: add base_dir path containment to FileWriterTool os.path.join does not prevent traversal — joining "./" with "../../../etc/cron.d/pwned" resolves cleanly outside any intended scope. The tool also called os.makedirs on the unvalidated path, meaning it would create arbitrary directory structures. Adds a base_dir parameter that uses os.path.realpath() to resolve the final path (including symlinks) before checking containment. Any filename or directory argument that resolves outside base_dir is rejected before any filesystem operation occurs. When base_dir is not set the tool behaves as before — only use that in fully sandboxed environments. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix: make directory relative to base_dir for better UX When base_dir is set, the directory arg is now treated as a subdirectory of base_dir rather than an absolute path. This means the LLM only needs to specify a filename (and optionally a relative subdirectory) — it does not need to repeat the base_dir path. FileWriterTool(base_dir="./output") → filename="report.txt" writes to ./output/report.txt → filename="f.txt", directory="sub" writes to ./output/sub/f.txt Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix: remove directory field from LLM schema when base_dir is set When a developer sets base_dir, they control where files are written. The LLM should only supply filename and content — not a directory path. Adds ScopedFileWriterToolInput (no directory field) which is used when base_dir is provided at construction, following the same pattern as FileReadTool/ScrapeWebsiteTool. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix: prevent path traversal in FileWriterTool without interface changes Adds containment check inside _run() using os.path.realpath() to ensure the resolved file path stays within the resolved directory. Blocks ../ sequences, absolute filenames, and symlink escapes transparently — no schema or interface changes required. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix: use Path.is_relative_to() for path containment check Replaces startswith(real_directory + os.sep) with Path.is_relative_to(), which does a proper path-component comparison. This avoids the edge case where real_directory == "/" produces a "//" prefix, and is safe on case-insensitive filesystems. Also explicitly rejects the case where the filepath resolves to the directory itself (not a valid file target). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * test: fix portability issues in path traversal tests - test_blocks_traversal_in_filename: use a sibling temp dir instead of asserting against a potentially pre-existing ../outside.txt - test_blocks_absolute_path_in_filename: use a temp-dir-derived absolute path instead of hardcoding /etc/passwd - test_blocks_symlink_escape: symlink to a temp "outside" dir instead of /etc, assert target file was not created Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com> Co-authored-by: Greyson LaLonde <greyson.r.lalonde@gmail.com>
This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from crewai.tools import BaseTool
|
||||
@@ -30,27 +31,39 @@ class FileWriterTool(BaseTool):
|
||||
|
||||
def _run(self, **kwargs: Any) -> str:
|
||||
try:
|
||||
directory = kwargs.get("directory") or "./"
|
||||
filename = kwargs["filename"]
|
||||
|
||||
filepath = os.path.join(directory, filename)
|
||||
|
||||
# Prevent path traversal: the resolved path must be strictly inside
|
||||
# the resolved directory. This blocks ../sequences, absolute paths in
|
||||
# filename, and symlink escapes regardless of how directory is set.
|
||||
# is_relative_to() does a proper path-component comparison that is
|
||||
# safe on case-insensitive filesystems and avoids the "// " edge case
|
||||
# that plagues startswith(real_directory + os.sep).
|
||||
# We also reject the case where filepath resolves to the directory
|
||||
# itself, since that is not a valid file target.
|
||||
real_directory = Path(directory).resolve()
|
||||
real_filepath = Path(filepath).resolve()
|
||||
if not real_filepath.is_relative_to(real_directory) or real_filepath == real_directory:
|
||||
return "Error: Invalid file path — the filename must not escape the target directory."
|
||||
|
||||
if kwargs.get("directory"):
|
||||
os.makedirs(kwargs["directory"], exist_ok=True)
|
||||
os.makedirs(real_directory, exist_ok=True)
|
||||
|
||||
# Construct the full path
|
||||
filepath = os.path.join(kwargs.get("directory") or "", kwargs["filename"])
|
||||
|
||||
# Convert overwrite to boolean
|
||||
kwargs["overwrite"] = strtobool(kwargs["overwrite"])
|
||||
|
||||
# Check if file exists and overwrite is not allowed
|
||||
if os.path.exists(filepath) and not kwargs["overwrite"]:
|
||||
return f"File {filepath} already exists and overwrite option was not passed."
|
||||
if os.path.exists(real_filepath) and not kwargs["overwrite"]:
|
||||
return f"File {real_filepath} already exists and overwrite option was not passed."
|
||||
|
||||
# Write content to the file
|
||||
mode = "w" if kwargs["overwrite"] else "x"
|
||||
with open(filepath, mode) as file:
|
||||
with open(real_filepath, mode) as file:
|
||||
file.write(kwargs["content"])
|
||||
return f"Content successfully written to {filepath}"
|
||||
return f"Content successfully written to {real_filepath}"
|
||||
except FileExistsError:
|
||||
return (
|
||||
f"File {filepath} already exists and overwrite option was not passed."
|
||||
f"File {real_filepath} already exists and overwrite option was not passed."
|
||||
)
|
||||
except KeyError as e:
|
||||
return f"An error occurred while accessing key: {e!s}"
|
||||
|
||||
@@ -135,3 +135,59 @@ def test_file_exists_error_handling(tool, temp_env, overwrite):
|
||||
|
||||
assert "already exists and overwrite option was not passed" in result
|
||||
assert read_file(path) == "Pre-existing content"
|
||||
|
||||
|
||||
# --- Path traversal prevention ---
|
||||
|
||||
def test_blocks_traversal_in_filename(tool, temp_env):
|
||||
# Create a sibling "outside" directory so we can assert nothing was written there.
|
||||
outside_dir = tempfile.mkdtemp()
|
||||
outside_file = os.path.join(outside_dir, "outside.txt")
|
||||
try:
|
||||
result = tool._run(
|
||||
filename=f"../{os.path.basename(outside_dir)}/outside.txt",
|
||||
directory=temp_env["temp_dir"],
|
||||
content="should not be written",
|
||||
overwrite=True,
|
||||
)
|
||||
assert "Error" in result
|
||||
assert not os.path.exists(outside_file)
|
||||
finally:
|
||||
shutil.rmtree(outside_dir, ignore_errors=True)
|
||||
|
||||
|
||||
def test_blocks_absolute_path_in_filename(tool, temp_env):
|
||||
# Use a temp file outside temp_dir as the absolute target so we don't
|
||||
# depend on /etc/passwd existing or being writable on the host.
|
||||
outside_dir = tempfile.mkdtemp()
|
||||
outside_file = os.path.join(outside_dir, "target.txt")
|
||||
try:
|
||||
result = tool._run(
|
||||
filename=outside_file,
|
||||
directory=temp_env["temp_dir"],
|
||||
content="should not be written",
|
||||
overwrite=True,
|
||||
)
|
||||
assert "Error" in result
|
||||
assert not os.path.exists(outside_file)
|
||||
finally:
|
||||
shutil.rmtree(outside_dir, ignore_errors=True)
|
||||
|
||||
|
||||
def test_blocks_symlink_escape(tool, temp_env):
|
||||
# Symlink inside temp_dir pointing to a separate temp "outside" directory.
|
||||
outside_dir = tempfile.mkdtemp()
|
||||
outside_file = os.path.join(outside_dir, "target.txt")
|
||||
link = os.path.join(temp_env["temp_dir"], "escape")
|
||||
os.symlink(outside_dir, link)
|
||||
try:
|
||||
result = tool._run(
|
||||
filename="escape/target.txt",
|
||||
directory=temp_env["temp_dir"],
|
||||
content="should not be written",
|
||||
overwrite=True,
|
||||
)
|
||||
assert "Error" in result
|
||||
assert not os.path.exists(outside_file)
|
||||
finally:
|
||||
shutil.rmtree(outside_dir, ignore_errors=True)
|
||||
|
||||
Reference in New Issue
Block a user