fix: validate compressor output_path at the write sink (defense in depth)

Corridor scan flagged output_path reaching the archive writer. _run does
validate output_path before dispatch, but _compress_zip/_compress_tar are
staticmethods reachable independently of _run — called directly they would
write unvalidated. Validate at the sink so every call site is confined to
the allow-list, not just the _run entrypoint.

Also decouple the symlink tests from the configurable-allow-list feature
(PR #6248): chdir into the working dir so the allowed root is cwd, instead
of setting CREWAI_TOOLS_ALLOWED_DIRS (which this branch, off main, does not
yet support). Adds direct-call tests asserting the sink rejects an
out-of-root output path.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Rip&Tear
2026-06-20 01:54:50 +08:00
parent 8e6d5fb38b
commit 034b119d34
2 changed files with 35 additions and 10 deletions

View File

@@ -125,6 +125,9 @@ class FileCompressorTool(BaseTool):
validated to stop a symlink inside the tree from copying the contents
of an out-of-tree target (e.g. ``~/.ssh/id_rsa``) into the archive.
"""
# Defense in depth: validate the write target at the sink, so this is
# safe even if called directly rather than through _run.
output_path = validate_file_path(output_path)
skipped: list[str] = []
with zipfile.ZipFile(output_path, "w", zipfile.ZIP_DEFLATED) as zipf:
if os.path.isfile(input_path):
@@ -171,6 +174,9 @@ class FileCompressorTool(BaseTool):
return None
return tarinfo
# Defense in depth: validate the write target at the sink, so this is
# safe even if called directly rather than through _run.
output_path = validate_file_path(output_path)
with tarfile.open(output_path, mode) as tarf: # type: ignore[call-overload]
arcname = os.path.basename(input_path)
tarf.add(input_path, arcname=arcname, filter=_drop_links)

View File

@@ -143,10 +143,16 @@ def test_prepare_output_makes_dir(mock_exists, mock_makedirs):
@pytest.fixture
def symlink_env():
"""A working dir (allow-listed) containing a normal file and a symlink that
points to a secret file OUTSIDE the allow-list."""
work_dir = tempfile.mkdtemp()
secret_dir = tempfile.mkdtemp() # deliberately NOT allow-listed
"""A working dir (the allowed root, via cwd) containing a normal file and a
symlink pointing at a secret file OUTSIDE that root.
The working directory is the allowed root for path validation, so we chdir
into ``work_dir`` rather than relying on CREWAI_TOOLS_ALLOWED_DIRS. This
keeps the test independent of the configurable-allow-list feature and
exercises the default cwd confinement.
"""
work_dir = os.path.realpath(tempfile.mkdtemp())
secret_dir = os.path.realpath(tempfile.mkdtemp()) # outside the allowed root
secret_file = os.path.join(secret_dir, "secret.txt")
with open(secret_file, "w") as f:
f.write("TOP_SECRET_PRIVATE_KEY")
@@ -157,13 +163,10 @@ def symlink_env():
f.write("safe content")
os.symlink(secret_file, os.path.join(src, "leak.txt"))
prev = os.environ.get("CREWAI_TOOLS_ALLOWED_DIRS")
os.environ["CREWAI_TOOLS_ALLOWED_DIRS"] = work_dir
prev_cwd = os.getcwd()
os.chdir(work_dir)
yield {"work_dir": work_dir, "src": src, "secret_dir": secret_dir}
if prev is None:
os.environ.pop("CREWAI_TOOLS_ALLOWED_DIRS", None)
else:
os.environ["CREWAI_TOOLS_ALLOWED_DIRS"] = prev
os.chdir(prev_cwd)
shutil.rmtree(work_dir, ignore_errors=True)
shutil.rmtree(secret_dir, ignore_errors=True)
@@ -200,3 +203,19 @@ def test_tar_excludes_symlink_to_outside_file(tool, symlink_env):
assert any(m.endswith("normal.txt") for m in members)
assert not any(m.endswith("leak.txt") for m in members)
assert all(not (tf.getmember(m).issym() or tf.getmember(m).islnk()) for m in members)
def test_compress_zip_validates_output_path_at_sink(symlink_env):
# Calling the compressor directly (bypassing _run) must still refuse to
# write outside the allow-list.
outside = os.path.join(symlink_env["secret_dir"], "evil.zip")
with pytest.raises(ValueError, match="outside the allowed director"):
FileCompressorTool._compress_zip(symlink_env["src"], outside)
assert not os.path.exists(outside)
def test_compress_tar_validates_output_path_at_sink(symlink_env):
outside = os.path.join(symlink_env["secret_dir"], "evil.tar.gz")
with pytest.raises(ValueError, match="outside the allowed director"):
FileCompressorTool._compress_tar(symlink_env["src"], outside, "tar.gz")
assert not os.path.exists(outside)