Compare commits

...

2 Commits

Author SHA1 Message Date
Rip&Tear
034b119d34 fix: validate compressor output_path at the write sink (defense in depth)
Corridor scan flagged output_path reaching the archive writer. _run does
validate output_path before dispatch, but _compress_zip/_compress_tar are
staticmethods reachable independently of _run — called directly they would
write unvalidated. Validate at the sink so every call site is confined to
the allow-list, not just the _run entrypoint.

Also decouple the symlink tests from the configurable-allow-list feature
(PR #6248): chdir into the working dir so the allowed root is cwd, instead
of setting CREWAI_TOOLS_ALLOWED_DIRS (which this branch, off main, does not
yet support). Adds direct-call tests asserting the sink rejects an
out-of-root output path.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-20 01:54:50 +08:00
Rip&Tear
8e6d5fb38b fix: stop FileCompressorTool from archiving out-of-tree symlink targets
input_path/output_path were already allow-list validated, but when
compressing a directory the per-file members were not. zipfile.write()
dereferences symlinks, so a symlink inside an allowed directory pointing
at an out-of-tree secret (e.g. ~/.ssh/id_rsa, /etc/passwd) had its
contents copied into the archive — a confinement bypass / info-disclosure.

- _compress_zip: validate each walked file against the allow-list; skip
  and record any whose resolved path escapes it.
- _compress_tar: filter drops symlink/hardlink members (tar stores links
  rather than dereferencing, so this prevents shipping an out-of-tree
  link that resolves at extraction time).
- _run: surface a "N item(s) skipped for safety" note rather than
  swallowing the exclusions.
- tests: regression tests asserting the secret content never lands in the
  zip or tar archive. Also fix a pre-existing broken import that made the
  whole compressor test module error at collection (empty package
  __init__; aligned to the module-path import used by sibling tools).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-20 01:45:49 +08:00
2 changed files with 141 additions and 9 deletions

View File

@@ -77,11 +77,19 @@ class FileCompressorTool(BaseTool):
"tar.xz": self._compress_tar,
}
if format == "zip":
format_compression[format](input_path, output_path) # type: ignore[operator]
skipped = format_compression[format](input_path, output_path) # type: ignore[operator]
else:
format_compression[format](input_path, output_path, format) # type: ignore[operator]
skipped = format_compression[format]( # type: ignore[operator]
input_path, output_path, format
)
return f"Successfully compressed '{input_path}' into '{output_path}'"
message = f"Successfully compressed '{input_path}' into '{output_path}'"
if isinstance(skipped, list) and skipped:
message += (
f" ({len(skipped)} item(s) skipped for safety: resolved "
f"outside the allowed directory)"
)
return message
except FileNotFoundError:
return f"Error: File not found at path: {input_path}"
except PermissionError:
@@ -109,8 +117,18 @@ class FileCompressorTool(BaseTool):
return True
@staticmethod
def _compress_zip(input_path: str, output_path: str) -> None:
"""Compresses input into a zip archive."""
def _compress_zip(input_path: str, output_path: str) -> list[str]:
"""Compresses input into a zip archive.
Returns the files skipped because they resolved outside the allow-list.
``zipfile.write`` dereferences symlinks, so each walked file is
validated to stop a symlink inside the tree from copying the contents
of an out-of-tree target (e.g. ``~/.ssh/id_rsa``) into the archive.
"""
# Defense in depth: validate the write target at the sink, so this is
# safe even if called directly rather than through _run.
output_path = validate_file_path(output_path)
skipped: list[str] = []
with zipfile.ZipFile(output_path, "w", zipfile.ZIP_DEFLATED) as zipf:
if os.path.isfile(input_path):
zipf.write(input_path, os.path.basename(input_path))
@@ -118,12 +136,25 @@ class FileCompressorTool(BaseTool):
for root, _, files in os.walk(input_path):
for file in files:
full_path = os.path.join(root, file)
try:
validate_file_path(full_path)
except ValueError:
skipped.append(full_path)
continue
arcname = os.path.relpath(full_path, start=input_path)
zipf.write(full_path, arcname)
return skipped
@staticmethod
def _compress_tar(input_path: str, output_path: str, format: str) -> None:
"""Compresses input into a tar archive with the given format."""
def _compress_tar(input_path: str, output_path: str, format: str) -> list[str]:
"""Compresses input into a tar archive with the given format.
Returns the members skipped for safety. ``tarfile`` stores symlinks and
hardlinks as link entries rather than dereferencing them, so the
compress-time content leak that affects zip does not apply here. The
filter drops link members so an out-of-tree symlink target cannot be
shipped inside the archive and resolved at extraction time.
"""
format_mode = {
"tar": "w",
"tar.gz": "w:gz",
@@ -135,7 +166,18 @@ class FileCompressorTool(BaseTool):
raise ValueError(f"Unsupported tar format: {format}")
mode = format_mode[format]
skipped: list[str] = []
def _drop_links(tarinfo: tarfile.TarInfo) -> tarfile.TarInfo | None:
if tarinfo.issym() or tarinfo.islnk():
skipped.append(tarinfo.name)
return None
return tarinfo
# Defense in depth: validate the write target at the sink, so this is
# safe even if called directly rather than through _run.
output_path = validate_file_path(output_path)
with tarfile.open(output_path, mode) as tarf: # type: ignore[call-overload]
arcname = os.path.basename(input_path)
tarf.add(input_path, arcname=arcname)
tarf.add(input_path, arcname=arcname, filter=_drop_links)
return skipped

View File

@@ -1,6 +1,13 @@
import os
import shutil
import tarfile
import tempfile
import zipfile
from unittest.mock import patch
from crewai_tools.tools.files_compressor_tool import FileCompressorTool
from crewai_tools.tools.files_compressor_tool.files_compressor_tool import (
FileCompressorTool,
)
import pytest
@@ -129,3 +136,86 @@ def test_prepare_output_makes_dir(mock_exists, mock_makedirs):
result = tool._prepare_output("some/missing/path/file.zip", overwrite=True)
assert result is True
mock_makedirs.assert_called_once()
# --- Security: symlink content must not leak out of the allow-list ---
@pytest.fixture
def symlink_env():
"""A working dir (the allowed root, via cwd) containing a normal file and a
symlink pointing at a secret file OUTSIDE that root.
The working directory is the allowed root for path validation, so we chdir
into ``work_dir`` rather than relying on CREWAI_TOOLS_ALLOWED_DIRS. This
keeps the test independent of the configurable-allow-list feature and
exercises the default cwd confinement.
"""
work_dir = os.path.realpath(tempfile.mkdtemp())
secret_dir = os.path.realpath(tempfile.mkdtemp()) # outside the allowed root
secret_file = os.path.join(secret_dir, "secret.txt")
with open(secret_file, "w") as f:
f.write("TOP_SECRET_PRIVATE_KEY")
src = os.path.join(work_dir, "src")
os.makedirs(src)
with open(os.path.join(src, "normal.txt"), "w") as f:
f.write("safe content")
os.symlink(secret_file, os.path.join(src, "leak.txt"))
prev_cwd = os.getcwd()
os.chdir(work_dir)
yield {"work_dir": work_dir, "src": src, "secret_dir": secret_dir}
os.chdir(prev_cwd)
shutil.rmtree(work_dir, ignore_errors=True)
shutil.rmtree(secret_dir, ignore_errors=True)
def test_zip_excludes_symlink_to_outside_file(tool, symlink_env):
out = os.path.join(symlink_env["work_dir"], "archive.zip")
result = tool._run(
input_path=symlink_env["src"], output_path=out, format="zip", overwrite=True
)
assert "Successfully compressed" in result
assert "skipped for safety" in result
with zipfile.ZipFile(out) as zf:
names = zf.namelist()
assert "normal.txt" in names
assert "leak.txt" not in names
blob = b"".join(zf.read(n) for n in names)
assert b"TOP_SECRET_PRIVATE_KEY" not in blob
def test_tar_excludes_symlink_to_outside_file(tool, symlink_env):
out = os.path.join(symlink_env["work_dir"], "archive.tar.gz")
result = tool._run(
input_path=symlink_env["src"],
output_path=out,
format="tar.gz",
overwrite=True,
)
assert "Successfully compressed" in result
assert "skipped for safety" in result
with tarfile.open(out) as tf:
members = tf.getnames()
assert any(m.endswith("normal.txt") for m in members)
assert not any(m.endswith("leak.txt") for m in members)
assert all(not (tf.getmember(m).issym() or tf.getmember(m).islnk()) for m in members)
def test_compress_zip_validates_output_path_at_sink(symlink_env):
# Calling the compressor directly (bypassing _run) must still refuse to
# write outside the allow-list.
outside = os.path.join(symlink_env["secret_dir"], "evil.zip")
with pytest.raises(ValueError, match="outside the allowed director"):
FileCompressorTool._compress_zip(symlink_env["src"], outside)
assert not os.path.exists(outside)
def test_compress_tar_validates_output_path_at_sink(symlink_env):
outside = os.path.join(symlink_env["secret_dir"], "evil.tar.gz")
with pytest.raises(ValueError, match="outside the allowed director"):
FileCompressorTool._compress_tar(symlink_env["src"], outside, "tar.gz")
assert not os.path.exists(outside)