Compare commits

...

1 Commits

Author SHA1 Message Date
Rip&Tear
e0df891bdd fix: confine file tools to an allow-listed root to block path traversal
LLM/prompt-injection-controlled file paths could escape the working
directory. The RAG search tools and FileReadTool already routed through
validate_file_path, but FileWriterTool only checked that `filename` did
not escape the caller-supplied `directory` — and `directory` is itself
LLM-controlled, so an agent fed untrusted content could be steered into
writing anywhere on disk (e.g. ~/.ssh/authorized_keys).

- safe_path: replace the single base_dir cwd jail with a deny-by-default
  allow-list of roots, sourced from cwd + CREWAI_TOOLS_ALLOWED_DIRS +
  a caller-passed allowed_dirs. Backward compatible for existing callers.
- FileWriterTool: route the resolved write target through
  validate_file_path so writes are confined to an allow-listed root
  regardless of the directory argument.
- Tests: allow-list extension via env/param, deny-by-default, multi-root,
  and a regression test for the unbounded-directory write.

BREAKING: FileWriterTool no longer writes to arbitrary absolute
directories by default. Set CREWAI_TOOLS_ALLOWED_DIRS to permit
out-of-cwd writes.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-20 01:29:19 +08:00
4 changed files with 202 additions and 45 deletions

View File

@@ -20,6 +20,55 @@ from urllib.parse import urlparse
logger = logging.getLogger(__name__)
_UNSAFE_PATHS_ENV = "CREWAI_TOOLS_ALLOW_UNSAFE_PATHS"
_ALLOWED_DIRS_ENV = "CREWAI_TOOLS_ALLOWED_DIRS"
def _get_allowed_roots(
base_dir: str | None = None,
allowed_dirs: list[str] | None = None,
) -> list[str]:
"""Build the deny-by-default set of allowed root directories.
Roots are drawn from, in order:
1. ``base_dir`` (defaults to the current working directory),
2. the ``CREWAI_TOOLS_ALLOWED_DIRS`` environment variable, split on
``os.pathsep``,
3. the caller-supplied ``allowed_dirs`` list.
Every root is resolved with :func:`os.path.realpath` so a symlinked root
is compared by its real location. Empty entries are ignored and duplicates
are collapsed while preserving order. The first element is always the
primary root used to resolve relative candidate paths.
"""
raw_roots: list[str] = [base_dir if base_dir is not None else os.getcwd()]
env_dirs = os.environ.get(_ALLOWED_DIRS_ENV, "")
if env_dirs:
raw_roots.extend(d for d in env_dirs.split(os.pathsep) if d)
if allowed_dirs:
raw_roots.extend(d for d in allowed_dirs if d)
resolved: list[str] = []
seen: set[str] = set()
for root in raw_roots:
real = os.path.realpath(root)
if real not in seen:
seen.add(real)
resolved.append(real)
return resolved
def _is_within_root(resolved_path: str, resolved_root: str) -> bool:
"""Return True if *resolved_path* equals *resolved_root* or lives beneath it.
When ``resolved_root`` already ends with a separator (e.g. the filesystem
root ``"/"``), appending ``os.sep`` would double it, so the root is used
as-is for the prefix in that case.
"""
prefix = resolved_root if resolved_root.endswith(os.sep) else resolved_root + os.sep
return resolved_path == resolved_root or resolved_path.startswith(prefix)
def format_path_for_display(path: str, base_dir: str | None = None) -> str:
@@ -52,21 +101,32 @@ def _is_escape_hatch_enabled() -> bool:
return os.environ.get(_UNSAFE_PATHS_ENV, "").lower() in ("true", "1", "yes")
def validate_file_path(path: str, base_dir: str | None = None) -> str:
def validate_file_path(
path: str,
base_dir: str | None = None,
*,
allowed_dirs: list[str] | None = None,
) -> str:
"""Validate that a file path is safe to read.
Resolves symlinks and ``..`` components, then checks that the resolved
path falls within *base_dir* (defaults to the current working directory).
path falls within at least one allowed root directory. The allow-list is
built from *base_dir* (defaults to the current working directory), the
``CREWAI_TOOLS_ALLOWED_DIRS`` environment variable, and *allowed_dirs* —
see :func:`_get_allowed_roots`. Access is denied by default for anything
outside that set.
Args:
path: The file path to validate.
base_dir: Allowed root directory. Defaults to ``os.getcwd()``.
base_dir: Primary allowed root. Defaults to ``os.getcwd()`` and is
used to resolve relative ``path`` values.
allowed_dirs: Additional allowed root directories.
Returns:
The resolved, validated absolute path.
Raises:
ValueError: If the path escapes the allowed directory.
ValueError: If the path escapes every allowed directory.
"""
if _is_escape_hatch_enabled():
logger.warning(
@@ -76,30 +136,30 @@ def validate_file_path(path: str, base_dir: str | None = None) -> str:
)
return os.path.realpath(path)
if base_dir is None:
base_dir = os.getcwd()
allowed_roots = _get_allowed_roots(base_dir, allowed_dirs)
primary_root = allowed_roots[0]
resolved_base = os.path.realpath(base_dir)
resolved_path = os.path.realpath(
os.path.join(resolved_base, path) if not os.path.isabs(path) else path
path if os.path.isabs(path) else os.path.join(primary_root, path)
)
# Ensure the resolved path is within the base directory.
# When resolved_base already ends with a separator (e.g. the filesystem
# root "/"), appending os.sep would double it ("//"), so use the base
# as-is in that case.
prefix = resolved_base if resolved_base.endswith(os.sep) else resolved_base + os.sep
if not resolved_path.startswith(prefix) and resolved_path != resolved_base:
raise ValueError(
f"Path '{format_path_for_display(resolved_path, resolved_base)}' is "
f"outside the allowed directory. "
f"Set {_UNSAFE_PATHS_ENV}=true to bypass this check."
)
if any(_is_within_root(resolved_path, root) for root in allowed_roots):
return resolved_path
return resolved_path
raise ValueError(
f"Path '{format_path_for_display(resolved_path, primary_root)}' is "
f"outside the allowed directories. "
f"Add the directory via {_ALLOWED_DIRS_ENV}, or set "
f"{_UNSAFE_PATHS_ENV}=true to bypass this check."
)
def validate_directory_path(path: str, base_dir: str | None = None) -> str:
def validate_directory_path(
path: str,
base_dir: str | None = None,
*,
allowed_dirs: list[str] | None = None,
) -> str:
"""Validate that a directory path is safe to read.
Same as :func:`validate_file_path` but also checks that the path
@@ -107,15 +167,16 @@ def validate_directory_path(path: str, base_dir: str | None = None) -> str:
Args:
path: The directory path to validate.
base_dir: Allowed root directory. Defaults to ``os.getcwd()``.
base_dir: Primary allowed root. Defaults to ``os.getcwd()``.
allowed_dirs: Additional allowed root directories.
Returns:
The resolved, validated absolute path.
Raises:
ValueError: If the path escapes the allowed directory or is not a directory.
ValueError: If the path escapes every allowed directory or is not a directory.
"""
validated = validate_file_path(path, base_dir)
validated = validate_file_path(path, base_dir, allowed_dirs=allowed_dirs)
if not os.path.isdir(validated):
raise ValueError(f"Path '{validated}' is not a directory.")
return validated

View File

@@ -1,5 +1,4 @@
import os
from pathlib import Path
from typing import Any
from crewai.tools import BaseTool
@@ -8,6 +7,7 @@ from pydantic import BaseModel
from crewai_tools.security.safe_path import (
format_error_for_display,
format_path_for_display,
validate_file_path,
)
@@ -41,22 +41,27 @@ class FileWriterTool(BaseTool):
filepath = os.path.join(directory, filename)
# Prevent path traversal: the resolved path must be strictly inside
# filename, and symlink escapes regardless of how directory is set.
# is_relative_to() does a proper path-component comparison that is
# safe on case-insensitive filesystems and avoids the "// " edge case
# We also reject the case where filepath resolves to the directory
# itself, since that is not a valid file target.
real_directory = Path(directory).resolve()
real_filepath = Path(filepath).resolve()
display_filepath = format_path_for_display(
str(real_filepath), str(real_directory)
)
if (
not real_filepath.is_relative_to(real_directory)
or real_filepath == real_directory
):
return "Error: Invalid file path — the filename must not escape the target directory."
# Confine the resolved write target to an allow-listed root
# (cwd + CREWAI_TOOLS_ALLOWED_DIRS), NOT merely inside the
# caller-supplied `directory`. That value is itself untrusted when
# an LLM tool call chooses it, so checking containment against it
# would let an agent write anywhere (e.g. ~/.ssh/authorized_keys).
# validate_file_path resolves symlinks and ".." before checking.
try:
real_filepath = validate_file_path(filepath)
except ValueError as e:
return f"Error: {format_error_for_display(e)}"
real_directory = os.path.dirname(real_filepath)
display_filepath = format_path_for_display(real_filepath, real_directory)
# A target that resolves to an existing directory is not a valid
# file destination.
if os.path.isdir(real_filepath):
return (
"Error: Invalid file path — the target must be a file, "
"not a directory."
)
if kwargs.get("directory"):
os.makedirs(real_directory, exist_ok=True)

View File

@@ -17,12 +17,23 @@ def temp_env():
test_file = "test.txt"
test_content = "Hello, World!"
# FileWriterTool confines writes to an allow-listed root (cwd plus
# CREWAI_TOOLS_ALLOWED_DIRS). Explicitly permit this temp dir — this is the
# supported way for a developer to widen the write scope to an external
# directory, and lets the happy-path tests below write into it.
prev_allowed = os.environ.get("CREWAI_TOOLS_ALLOWED_DIRS")
os.environ["CREWAI_TOOLS_ALLOWED_DIRS"] = temp_dir
yield {
"temp_dir": temp_dir,
"test_file": test_file,
"test_content": test_content,
}
if prev_allowed is None:
os.environ.pop("CREWAI_TOOLS_ALLOWED_DIRS", None)
else:
os.environ["CREWAI_TOOLS_ALLOWED_DIRS"] = prev_allowed
shutil.rmtree(temp_dir, ignore_errors=True)
@@ -196,3 +207,24 @@ def test_blocks_symlink_escape(tool, temp_env):
assert not os.path.exists(outside_file)
finally:
shutil.rmtree(outside_dir, ignore_errors=True)
def test_blocks_unbounded_directory_arg(tool, temp_env):
# The core fix: the `directory` argument is itself untrusted (LLM-chosen).
# A directory outside the allow-list must be rejected even when filename
# is benign — previously this let an agent write anywhere on disk
# (e.g. ~/.ssh/authorized_keys).
outside_dir = tempfile.mkdtemp() # NOT added to CREWAI_TOOLS_ALLOWED_DIRS
outside_file = os.path.join(outside_dir, "test.txt")
try:
result = tool._run(
filename="test.txt",
directory=outside_dir,
content="should not be written",
overwrite=True,
)
assert "Error" in result
assert not os.path.exists(outside_file)
finally:
shutil.rmtree(outside_dir, ignore_errors=True)

View File

@@ -32,12 +32,12 @@ class TestValidateFilePath:
def test_rejects_dotdot_traversal(self, tmp_path):
"""Reject ../ traversal that escapes base_dir."""
with pytest.raises(ValueError, match="outside the allowed directory"):
with pytest.raises(ValueError, match="outside the allowed director"):
validate_file_path("../../etc/passwd", str(tmp_path))
def test_rejects_absolute_path_outside_base(self, tmp_path):
"""Reject absolute path outside base_dir."""
with pytest.raises(ValueError, match="outside the allowed directory"):
with pytest.raises(ValueError, match="outside the allowed director"):
validate_file_path("/etc/passwd", str(tmp_path))
def test_allows_absolute_path_inside_base(self, tmp_path):
@@ -50,7 +50,7 @@ class TestValidateFilePath:
"""Reject symlinks that point outside base_dir."""
link = tmp_path / "sneaky_link"
os.symlink("/etc/passwd", str(link))
with pytest.raises(ValueError, match="outside the allowed directory"):
with pytest.raises(ValueError, match="outside the allowed director"):
validate_file_path("sneaky_link", str(tmp_path))
def test_defaults_to_cwd(self):
@@ -113,7 +113,7 @@ class TestValidateDirectoryPath:
validate_directory_path("file.txt", str(tmp_path))
def test_rejects_traversal(self, tmp_path):
with pytest.raises(ValueError, match="outside the allowed directory"):
with pytest.raises(ValueError, match="outside the allowed director"):
validate_directory_path("../../", str(tmp_path))
@@ -191,3 +191,62 @@ class TestValidateUrl:
# file:// would normally be blocked
result = validate_url("file:///etc/passwd")
assert result == "file:///etc/passwd"
class TestAllowList:
"""Tests for the configurable deny-by-default allow-list of roots."""
def test_param_extends_allowed_roots(self, tmp_path):
"""A directory passed via allowed_dirs is permitted."""
extra = tmp_path / "extra"
extra.mkdir()
(extra / "data.txt").touch()
result = validate_file_path(
str(extra / "data.txt"),
base_dir=str(tmp_path / "base"),
allowed_dirs=[str(extra)],
)
assert result == str(extra / "data.txt")
def test_env_extends_allowed_roots(self, tmp_path, monkeypatch):
"""A directory listed in CREWAI_TOOLS_ALLOWED_DIRS is permitted."""
base = tmp_path / "base"
base.mkdir()
extra = tmp_path / "extra"
extra.mkdir()
(extra / "data.txt").touch()
monkeypatch.setenv("CREWAI_TOOLS_ALLOWED_DIRS", str(extra))
result = validate_file_path(str(extra / "data.txt"), base_dir=str(base))
assert result == str(extra / "data.txt")
def test_denied_without_allow_listing(self, tmp_path, monkeypatch):
"""The same external dir is rejected when not allow-listed."""
base = tmp_path / "base"
base.mkdir()
extra = tmp_path / "extra"
extra.mkdir()
(extra / "data.txt").touch()
monkeypatch.delenv("CREWAI_TOOLS_ALLOWED_DIRS", raising=False)
with pytest.raises(ValueError, match="outside the allowed director"):
validate_file_path(str(extra / "data.txt"), base_dir=str(base))
def test_multiple_env_roots(self, tmp_path, monkeypatch):
"""Multiple os.pathsep-separated roots are each honored."""
base = tmp_path / "base"
base.mkdir()
a = tmp_path / "a"
a.mkdir()
b = tmp_path / "b"
b.mkdir()
(a / "fa.txt").touch()
(b / "fb.txt").touch()
monkeypatch.setenv(
"CREWAI_TOOLS_ALLOWED_DIRS", os.pathsep.join([str(a), str(b)])
)
assert validate_file_path(str(a / "fa.txt"), base_dir=str(base)) == str(
a / "fa.txt"
)
assert validate_file_path(str(b / "fb.txt"), base_dir=str(base)) == str(
b / "fb.txt"
)