Files
crewAI/lib/crewai-tools/tests/tools/test_file_writer_tool.py
Rip&Tear 713fa7d01b fix: prevent path traversal in FileWriterTool (#4895)
* fix: add base_dir path containment to FileWriterTool

os.path.join does not prevent traversal — joining "./" with "../../../etc/cron.d/pwned"
resolves cleanly outside any intended scope. The tool also called os.makedirs on
the unvalidated path, meaning it would create arbitrary directory structures.

Adds a base_dir parameter that uses os.path.realpath() to resolve the final path
(including symlinks) before checking containment. Any filename or directory argument
that resolves outside base_dir is rejected before any filesystem operation occurs.

When base_dir is not set the tool behaves as before — only use that in fully
sandboxed environments.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: make directory relative to base_dir for better UX

When base_dir is set, the directory arg is now treated as a subdirectory
of base_dir rather than an absolute path. This means the LLM only needs
to specify a filename (and optionally a relative subdirectory) — it does
not need to repeat the base_dir path.

  FileWriterTool(base_dir="./output")
  → filename="report.txt"            writes to ./output/report.txt
  → filename="f.txt", directory="sub" writes to ./output/sub/f.txt

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: remove directory field from LLM schema when base_dir is set

When a developer sets base_dir, they control where files are written.
The LLM should only supply filename and content — not a directory path.

Adds ScopedFileWriterToolInput (no directory field) which is used when
base_dir is provided at construction, following the same pattern as
FileReadTool/ScrapeWebsiteTool.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: prevent path traversal in FileWriterTool without interface changes

Adds containment check inside _run() using os.path.realpath() to ensure
the resolved file path stays within the resolved directory. Blocks ../
sequences, absolute filenames, and symlink escapes transparently —
no schema or interface changes required.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: use Path.is_relative_to() for path containment check

Replaces startswith(real_directory + os.sep) with Path.is_relative_to(),
which does a proper path-component comparison. This avoids the edge case
where real_directory == "/" produces a "//" prefix, and is safe on
case-insensitive filesystems. Also explicitly rejects the case where
the filepath resolves to the directory itself (not a valid file target).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* test: fix portability issues in path traversal tests

- test_blocks_traversal_in_filename: use a sibling temp dir instead of
  asserting against a potentially pre-existing ../outside.txt
- test_blocks_absolute_path_in_filename: use a temp-dir-derived absolute
  path instead of hardcoding /etc/passwd
- test_blocks_symlink_escape: symlink to a temp "outside" dir instead of
  /etc, assert target file was not created

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-authored-by: Greyson LaLonde <greyson.r.lalonde@gmail.com>
2026-03-19 20:11:45 +08:00

194 lines
5.4 KiB
Python

import os
import shutil
import tempfile
from crewai_tools.tools.file_writer_tool.file_writer_tool import FileWriterTool
import pytest
@pytest.fixture
def tool():
return FileWriterTool()
@pytest.fixture
def temp_env():
temp_dir = tempfile.mkdtemp()
test_file = "test.txt"
test_content = "Hello, World!"
yield {
"temp_dir": temp_dir,
"test_file": test_file,
"test_content": test_content,
}
shutil.rmtree(temp_dir, ignore_errors=True)
def get_test_path(filename, directory):
return os.path.join(directory, filename)
def read_file(path):
with open(path, "r") as f:
return f.read()
def test_basic_file_write(tool, temp_env):
result = tool._run(
filename=temp_env["test_file"],
directory=temp_env["temp_dir"],
content=temp_env["test_content"],
overwrite=True,
)
path = get_test_path(temp_env["test_file"], temp_env["temp_dir"])
assert os.path.exists(path)
assert read_file(path) == temp_env["test_content"]
assert "successfully written" in result
def test_directory_creation(tool, temp_env):
new_dir = os.path.join(temp_env["temp_dir"], "nested_dir")
result = tool._run(
filename=temp_env["test_file"],
directory=new_dir,
content=temp_env["test_content"],
overwrite=True,
)
path = get_test_path(temp_env["test_file"], new_dir)
assert os.path.exists(new_dir)
assert os.path.exists(path)
assert "successfully written" in result
@pytest.mark.parametrize(
"overwrite",
["y", "yes", "t", "true", "on", "1", True],
)
def test_overwrite_true(tool, temp_env, overwrite):
path = get_test_path(temp_env["test_file"], temp_env["temp_dir"])
with open(path, "w") as f:
f.write("Original content")
result = tool._run(
filename=temp_env["test_file"],
directory=temp_env["temp_dir"],
content="New content",
overwrite=overwrite,
)
assert read_file(path) == "New content"
assert "successfully written" in result
def test_invalid_overwrite_value(tool, temp_env):
result = tool._run(
filename=temp_env["test_file"],
directory=temp_env["temp_dir"],
content=temp_env["test_content"],
overwrite="invalid",
)
assert "invalid value" in result
def test_missing_required_fields(tool, temp_env):
result = tool._run(
directory=temp_env["temp_dir"],
content=temp_env["test_content"],
overwrite=True,
)
assert "An error occurred while accessing key: 'filename'" in result
def test_empty_content(tool, temp_env):
result = tool._run(
filename=temp_env["test_file"],
directory=temp_env["temp_dir"],
content="",
overwrite=True,
)
path = get_test_path(temp_env["test_file"], temp_env["temp_dir"])
assert os.path.exists(path)
assert read_file(path) == ""
assert "successfully written" in result
@pytest.mark.parametrize(
"overwrite",
["n", "no", "f", "false", "off", "0", False],
)
def test_file_exists_error_handling(tool, temp_env, overwrite):
path = get_test_path(temp_env["test_file"], temp_env["temp_dir"])
with open(path, "w") as f:
f.write("Pre-existing content")
result = tool._run(
filename=temp_env["test_file"],
directory=temp_env["temp_dir"],
content="Should not be written",
overwrite=overwrite,
)
assert "already exists and overwrite option was not passed" in result
assert read_file(path) == "Pre-existing content"
# --- Path traversal prevention ---
def test_blocks_traversal_in_filename(tool, temp_env):
# Create a sibling "outside" directory so we can assert nothing was written there.
outside_dir = tempfile.mkdtemp()
outside_file = os.path.join(outside_dir, "outside.txt")
try:
result = tool._run(
filename=f"../{os.path.basename(outside_dir)}/outside.txt",
directory=temp_env["temp_dir"],
content="should not be written",
overwrite=True,
)
assert "Error" in result
assert not os.path.exists(outside_file)
finally:
shutil.rmtree(outside_dir, ignore_errors=True)
def test_blocks_absolute_path_in_filename(tool, temp_env):
# Use a temp file outside temp_dir as the absolute target so we don't
# depend on /etc/passwd existing or being writable on the host.
outside_dir = tempfile.mkdtemp()
outside_file = os.path.join(outside_dir, "target.txt")
try:
result = tool._run(
filename=outside_file,
directory=temp_env["temp_dir"],
content="should not be written",
overwrite=True,
)
assert "Error" in result
assert not os.path.exists(outside_file)
finally:
shutil.rmtree(outside_dir, ignore_errors=True)
def test_blocks_symlink_escape(tool, temp_env):
# Symlink inside temp_dir pointing to a separate temp "outside" directory.
outside_dir = tempfile.mkdtemp()
outside_file = os.path.join(outside_dir, "target.txt")
link = os.path.join(temp_env["temp_dir"], "escape")
os.symlink(outside_dir, link)
try:
result = tool._run(
filename="escape/target.txt",
directory=temp_env["temp_dir"],
content="should not be written",
overwrite=True,
)
assert "Error" in result
assert not os.path.exists(outside_file)
finally:
shutil.rmtree(outside_dir, ignore_errors=True)