Compare commits

...

8 Commits

Author SHA1 Message Date
Rip&Tear
ee6e54233a docs: document FileWriterTool path confinement and CREWAI_TOOLS_ALLOWED_DIRS
Document the deny-by-default allow-list behavior, the new
CREWAI_TOOLS_ALLOWED_DIRS env var for extending allowed roots, the
fail-closed behavior when cwd is the filesystem root, and the
CREWAI_TOOLS_ALLOW_UNSAFE_PATHS escape hatch.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-20 11:45:36 +08:00
Rip&Tear
3bce3cceed fix: never default the path allow-list to the filesystem root
_get_allowed_roots defaulted its primary root to os.getcwd(). In a
container started without a WORKDIR, cwd is "/", and since "/" is a
parent of every absolute path the deny-by-default allow-list then
permitted the entire filesystem -- silently disabling confinement and
re-opening arbitrary LLM-controlled file read/write (the exact hole this
PR closes).

Distinguish an implicitly defaulted primary root (base_dir is None ->
os.getcwd()) from operator-provided roots (base_dir, allowed_dirs,
CREWAI_TOOLS_ALLOWED_DIRS). When the implicit cwd default resolves to
os.sep it is dropped; an explicit "/" is still honored as a deliberate
opt-in. If no usable root remains, raise a clear ValueError instead of
allowing everything.

Addresses the corridor-security review finding on #6248.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-20 11:22:29 +08:00
Rip&Tear
bdb763bfde Potential fix for pull request finding
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
2026-06-20 11:11:54 +08:00
Rip&Tear
5c9436d368 Potential fix for pull request finding
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
2026-06-20 11:11:37 +08:00
Rip&Tear
4877828264 Potential fix for pull request finding
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
2026-06-20 11:11:24 +08:00
Rip&Tear
685ea13c3b Potential fix for pull request finding
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
2026-06-20 11:11:01 +08:00
Rip&Tear
b70c74e17b Merge branch 'main' into fix/file-tools-path-allowlist 2026-06-20 11:00:47 +08:00
Rip&Tear
e0df891bdd fix: confine file tools to an allow-listed root to block path traversal
LLM/prompt-injection-controlled file paths could escape the working
directory. The RAG search tools and FileReadTool already routed through
validate_file_path, but FileWriterTool only checked that `filename` did
not escape the caller-supplied `directory` — and `directory` is itself
LLM-controlled, so an agent fed untrusted content could be steered into
writing anywhere on disk (e.g. ~/.ssh/authorized_keys).

- safe_path: replace the single base_dir cwd jail with a deny-by-default
  allow-list of roots, sourced from cwd + CREWAI_TOOLS_ALLOWED_DIRS +
  a caller-passed allowed_dirs. Backward compatible for existing callers.
- FileWriterTool: route the resolved write target through
  validate_file_path so writes are confined to an allow-listed root
  regardless of the directory argument.
- Tests: allow-list extension via env/param, deny-by-default, multi-root,
  and a regression test for the unbounded-directory write.

BREAKING: FileWriterTool no longer writes to arbitrary absolute
directories by default. Set CREWAI_TOOLS_ALLOWED_DIRS to permit
out-of-cwd writes.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-20 01:29:19 +08:00
5 changed files with 281 additions and 45 deletions

View File

@@ -42,6 +42,25 @@ print(result)
- `content`: The content to write into the file.
- `directory` (optional): The path to the directory where the file will be created. Defaults to the current directory (`.`). If the directory does not exist, it will be created.
## Path confinement
Because `filename` and `directory` may be supplied at runtime by an agent acting on untrusted content, `FileWriterTool` confines writes to an **allow-listed set of root directories**. The resolved target (after expanding symlinks and `..`) must fall inside one of these roots or the write is rejected — a `directory` argument pointing outside them (e.g. `~/.ssh`, `/etc`) no longer grants write access.
The allow-list is, by default, the current working directory. You can extend it for deployments that legitimately write elsewhere:
- `CREWAI_TOOLS_ALLOWED_DIRS` — one or more additional root directories, separated by the OS path separator (`:` on Linux/macOS, `;` on Windows).
```shell
# Allow writes under /data and /workspace in addition to the cwd
export CREWAI_TOOLS_ALLOWED_DIRS="/data:/workspace"
```
<Warning>
If the process runs with its working directory set to the filesystem root (`/`) — common in containers started without a `WORKDIR` — the tool will **not** fall back to allow-listing the entire filesystem. Writes fail with a `ValueError` until you set `CREWAI_TOOLS_ALLOWED_DIRS` to an explicit directory. Set a `WORKDIR` (or the env var) in such deployments.
</Warning>
The `CREWAI_TOOLS_ALLOW_UNSAFE_PATHS=true` escape hatch disables path validation entirely. It is intended only for trusted local development and should not be set in any environment that runs agent-generated or otherwise untrusted instructions.
## Conclusion
By integrating the `FileWriterTool` into your crews, the agents can reliably write content to files across different operating systems.

View File

@@ -20,6 +20,82 @@ from urllib.parse import urlparse
logger = logging.getLogger(__name__)
_UNSAFE_PATHS_ENV = "CREWAI_TOOLS_ALLOW_UNSAFE_PATHS"
_ALLOWED_DIRS_ENV = "CREWAI_TOOLS_ALLOWED_DIRS"
def _get_allowed_roots(
base_dir: str | None = None,
allowed_dirs: list[str] | None = None,
) -> list[str]:
"""Build the deny-by-default set of allowed root directories.
Roots are drawn from, in order:
1. ``base_dir`` (defaults to the current working directory),
2. the ``CREWAI_TOOLS_ALLOWED_DIRS`` environment variable, split on
``os.pathsep``,
3. the caller-supplied ``allowed_dirs`` list.
Every root is resolved with :func:`os.path.realpath` so a symlinked root
is compared by its real location. Empty entries are ignored and duplicates
are collapsed while preserving order. The first element is always the
primary root used to resolve relative candidate paths.
The filesystem root (``os.sep``, e.g. ``"/"``) is never accepted as an
*implicitly defaulted* root. When ``base_dir`` is not supplied and the
current working directory is ``/`` -- common in containers started without
a ``WORKDIR`` -- defaulting to it would make every absolute path "within"
the allow-list and disable confinement entirely. In that case the cwd
default is dropped; an operator who genuinely wants the whole filesystem
must opt in explicitly via ``base_dir``, ``allowed_dirs``, or
``CREWAI_TOOLS_ALLOWED_DIRS``. If no usable root remains, a ``ValueError``
is raised rather than silently allowing everything.
"""
primary_explicit = base_dir is not None
primary = base_dir if base_dir is not None else os.getcwd()
# (root, is_explicit) -- explicit roots are operator-provided and may
# legitimately include the filesystem root as an opt-in.
raw_roots: list[tuple[str, bool]] = [(primary, primary_explicit)]
env_dirs = os.environ.get(_ALLOWED_DIRS_ENV, "")
if env_dirs:
raw_roots.extend((d, True) for d in env_dirs.split(os.pathsep) if d)
if allowed_dirs:
raw_roots.extend((d, True) for d in allowed_dirs if d)
resolved: list[str] = []
seen: set[str] = set()
for root, is_explicit in raw_roots:
real = os.path.realpath(root)
if real == os.sep and not is_explicit:
# Refuse to let an unconfigured cwd of "/" open the whole filesystem.
continue
if real not in seen:
seen.add(real)
resolved.append(real)
if not resolved:
raise ValueError(
"No safe allowed directory could be determined: the current working "
f"directory is the filesystem root ('{os.sep}'). Set "
f"{_ALLOWED_DIRS_ENV} to an explicit directory, pass "
f"base_dir/allowed_dirs, or set {_UNSAFE_PATHS_ENV}=true to bypass "
"path validation."
)
return resolved
def _is_within_root(resolved_path: str, resolved_root: str) -> bool:
"""Return True if *resolved_path* equals *resolved_root* or lives beneath it.
When ``resolved_root`` already ends with a separator (e.g. the filesystem
root ``"/"``), appending ``os.sep`` would double it, so the root is used
as-is for the prefix in that case.
"""
prefix = resolved_root if resolved_root.endswith(os.sep) else resolved_root + os.sep
return resolved_path == resolved_root or resolved_path.startswith(prefix)
def format_path_for_display(path: str, base_dir: str | None = None) -> str:
@@ -52,21 +128,32 @@ def _is_escape_hatch_enabled() -> bool:
return os.environ.get(_UNSAFE_PATHS_ENV, "").lower() in ("true", "1", "yes")
def validate_file_path(path: str, base_dir: str | None = None) -> str:
def validate_file_path(
path: str,
base_dir: str | None = None,
*,
allowed_dirs: list[str] | None = None,
) -> str:
"""Validate that a file path is safe to read.
Resolves symlinks and ``..`` components, then checks that the resolved
path falls within *base_dir* (defaults to the current working directory).
path falls within at least one allowed root directory. The allow-list is
built from *base_dir* (defaults to the current working directory), the
``CREWAI_TOOLS_ALLOWED_DIRS`` environment variable, and *allowed_dirs* —
see :func:`_get_allowed_roots`. Access is denied by default for anything
outside that set.
Args:
path: The file path to validate.
base_dir: Allowed root directory. Defaults to ``os.getcwd()``.
base_dir: Primary allowed root. Defaults to ``os.getcwd()`` and is
used to resolve relative ``path`` values.
allowed_dirs: Additional allowed root directories.
Returns:
The resolved, validated absolute path.
Raises:
ValueError: If the path escapes the allowed directory.
ValueError: If the path escapes every allowed directory.
"""
if _is_escape_hatch_enabled():
logger.warning(
@@ -76,30 +163,30 @@ def validate_file_path(path: str, base_dir: str | None = None) -> str:
)
return os.path.realpath(path)
if base_dir is None:
base_dir = os.getcwd()
allowed_roots = _get_allowed_roots(base_dir, allowed_dirs)
primary_root = allowed_roots[0]
resolved_base = os.path.realpath(base_dir)
resolved_path = os.path.realpath(
os.path.join(resolved_base, path) if not os.path.isabs(path) else path
path if os.path.isabs(path) else os.path.join(primary_root, path)
)
# Ensure the resolved path is within the base directory.
# When resolved_base already ends with a separator (e.g. the filesystem
# root "/"), appending os.sep would double it ("//"), so use the base
# as-is in that case.
prefix = resolved_base if resolved_base.endswith(os.sep) else resolved_base + os.sep
if not resolved_path.startswith(prefix) and resolved_path != resolved_base:
raise ValueError(
f"Path '{format_path_for_display(resolved_path, resolved_base)}' is "
f"outside the allowed directory. "
f"Set {_UNSAFE_PATHS_ENV}=true to bypass this check."
)
if any(_is_within_root(resolved_path, root) for root in allowed_roots):
return resolved_path
return resolved_path
raise ValueError(
f"Path '{format_path_for_display(resolved_path, primary_root)}' is "
f"outside the allowed directories. "
f"Add the directory via {_ALLOWED_DIRS_ENV}, or set "
f"{_UNSAFE_PATHS_ENV}=true to bypass this check."
)
def validate_directory_path(path: str, base_dir: str | None = None) -> str:
def validate_directory_path(
path: str,
base_dir: str | None = None,
*,
allowed_dirs: list[str] | None = None,
) -> str:
"""Validate that a directory path is safe to read.
Same as :func:`validate_file_path` but also checks that the path
@@ -107,15 +194,16 @@ def validate_directory_path(path: str, base_dir: str | None = None) -> str:
Args:
path: The directory path to validate.
base_dir: Allowed root directory. Defaults to ``os.getcwd()``.
base_dir: Primary allowed root. Defaults to ``os.getcwd()``.
allowed_dirs: Additional allowed root directories.
Returns:
The resolved, validated absolute path.
Raises:
ValueError: If the path escapes the allowed directory or is not a directory.
ValueError: If the path escapes every allowed directory or is not a directory.
"""
validated = validate_file_path(path, base_dir)
validated = validate_file_path(path, base_dir, allowed_dirs=allowed_dirs)
if not os.path.isdir(validated):
raise ValueError(f"Path '{validated}' is not a directory.")
return validated

View File

@@ -1,5 +1,4 @@
import os
from pathlib import Path
from typing import Any
from crewai.tools import BaseTool
@@ -8,6 +7,7 @@ from pydantic import BaseModel
from crewai_tools.security.safe_path import (
format_error_for_display,
format_path_for_display,
validate_file_path,
)
@@ -41,22 +41,27 @@ class FileWriterTool(BaseTool):
filepath = os.path.join(directory, filename)
# Prevent path traversal: the resolved path must be strictly inside
# filename, and symlink escapes regardless of how directory is set.
# is_relative_to() does a proper path-component comparison that is
# safe on case-insensitive filesystems and avoids the "// " edge case
# We also reject the case where filepath resolves to the directory
# itself, since that is not a valid file target.
real_directory = Path(directory).resolve()
real_filepath = Path(filepath).resolve()
display_filepath = format_path_for_display(
str(real_filepath), str(real_directory)
)
if (
not real_filepath.is_relative_to(real_directory)
or real_filepath == real_directory
):
return "Error: Invalid file path — the filename must not escape the target directory."
# Confine the resolved write target to an allow-listed root
# (cwd + CREWAI_TOOLS_ALLOWED_DIRS), NOT merely inside the
# caller-supplied `directory`. That value is itself untrusted when
# an LLM tool call chooses it, so checking containment against it
# would let an agent write anywhere (e.g. ~/.ssh/authorized_keys).
# validate_file_path resolves symlinks and ".." before checking.
try:
real_filepath = validate_file_path(filepath)
except ValueError as e:
return f"Error: {format_error_for_display(e)}"
real_directory = os.path.dirname(real_filepath)
display_filepath = format_path_for_display(real_filepath, real_directory)
# A target that resolves to an existing directory is not a valid
# file destination.
if os.path.isdir(real_filepath):
return (
"Error: Invalid file path — the target must be a file, "
"not a directory."
)
if kwargs.get("directory"):
os.makedirs(real_directory, exist_ok=True)

View File

@@ -17,12 +17,23 @@ def temp_env():
test_file = "test.txt"
test_content = "Hello, World!"
# FileWriterTool confines writes to an allow-listed root (cwd plus
# CREWAI_TOOLS_ALLOWED_DIRS). Explicitly permit this temp dir — this is the
# supported way for a developer to widen the write scope to an external
# directory, and lets the happy-path tests below write into it.
prev_allowed = os.environ.get("CREWAI_TOOLS_ALLOWED_DIRS")
os.environ["CREWAI_TOOLS_ALLOWED_DIRS"] = temp_dir
yield {
"temp_dir": temp_dir,
"test_file": test_file,
"test_content": test_content,
}
if prev_allowed is None:
os.environ.pop("CREWAI_TOOLS_ALLOWED_DIRS", None)
else:
os.environ["CREWAI_TOOLS_ALLOWED_DIRS"] = prev_allowed
shutil.rmtree(temp_dir, ignore_errors=True)
@@ -196,3 +207,24 @@ def test_blocks_symlink_escape(tool, temp_env):
assert not os.path.exists(outside_file)
finally:
shutil.rmtree(outside_dir, ignore_errors=True)
def test_blocks_unbounded_directory_arg(tool, temp_env):
# The core fix: the `directory` argument is itself untrusted (LLM-chosen).
# A directory outside the allow-list must be rejected even when filename
# is benign — previously this let an agent write anywhere on disk
# (e.g. ~/.ssh/authorized_keys).
outside_dir = tempfile.mkdtemp() # NOT added to CREWAI_TOOLS_ALLOWED_DIRS
outside_file = os.path.join(outside_dir, "test.txt")
try:
result = tool._run(
filename="test.txt",
directory=outside_dir,
content="should not be written",
overwrite=True,
)
assert "Error" in result
assert not os.path.exists(outside_file)
finally:
shutil.rmtree(outside_dir, ignore_errors=True)

View File

@@ -32,12 +32,12 @@ class TestValidateFilePath:
def test_rejects_dotdot_traversal(self, tmp_path):
"""Reject ../ traversal that escapes base_dir."""
with pytest.raises(ValueError, match="outside the allowed directory"):
with pytest.raises(ValueError, match="outside the allowed director"):
validate_file_path("../../etc/passwd", str(tmp_path))
def test_rejects_absolute_path_outside_base(self, tmp_path):
"""Reject absolute path outside base_dir."""
with pytest.raises(ValueError, match="outside the allowed directory"):
with pytest.raises(ValueError, match="outside the allowed directories"):
validate_file_path("/etc/passwd", str(tmp_path))
def test_allows_absolute_path_inside_base(self, tmp_path):
@@ -50,7 +50,7 @@ class TestValidateFilePath:
"""Reject symlinks that point outside base_dir."""
link = tmp_path / "sneaky_link"
os.symlink("/etc/passwd", str(link))
with pytest.raises(ValueError, match="outside the allowed directory"):
with pytest.raises(ValueError, match="outside the allowed directories"):
validate_file_path("sneaky_link", str(tmp_path))
def test_defaults_to_cwd(self):
@@ -113,7 +113,7 @@ class TestValidateDirectoryPath:
validate_directory_path("file.txt", str(tmp_path))
def test_rejects_traversal(self, tmp_path):
with pytest.raises(ValueError, match="outside the allowed directory"):
with pytest.raises(ValueError, match="outside the allowed directories"):
validate_directory_path("../../", str(tmp_path))
@@ -191,3 +191,95 @@ class TestValidateUrl:
# file:// would normally be blocked
result = validate_url("file:///etc/passwd")
assert result == "file:///etc/passwd"
class TestAllowList:
"""Tests for the configurable deny-by-default allow-list of roots."""
def test_param_extends_allowed_roots(self, tmp_path):
"""A directory passed via allowed_dirs is permitted."""
extra = tmp_path / "extra"
extra.mkdir()
(extra / "data.txt").touch()
result = validate_file_path(
str(extra / "data.txt"),
base_dir=str(tmp_path / "base"),
allowed_dirs=[str(extra)],
)
assert result == str(extra / "data.txt")
def test_env_extends_allowed_roots(self, tmp_path, monkeypatch):
"""A directory listed in CREWAI_TOOLS_ALLOWED_DIRS is permitted."""
base = tmp_path / "base"
base.mkdir()
extra = tmp_path / "extra"
extra.mkdir()
(extra / "data.txt").touch()
monkeypatch.setenv("CREWAI_TOOLS_ALLOWED_DIRS", str(extra))
result = validate_file_path(str(extra / "data.txt"), base_dir=str(base))
assert result == str(extra / "data.txt")
def test_denied_without_allow_listing(self, tmp_path, monkeypatch):
"""The same external dir is rejected when not allow-listed."""
base = tmp_path / "base"
base.mkdir()
extra = tmp_path / "extra"
extra.mkdir()
(extra / "data.txt").touch()
monkeypatch.delenv("CREWAI_TOOLS_ALLOWED_DIRS", raising=False)
with pytest.raises(ValueError, match="outside the allowed directories"):
validate_file_path(str(extra / "data.txt"), base_dir=str(base))
def test_multiple_env_roots(self, tmp_path, monkeypatch):
"""Multiple os.pathsep-separated roots are each honored."""
base = tmp_path / "base"
base.mkdir()
a = tmp_path / "a"
a.mkdir()
b = tmp_path / "b"
b.mkdir()
(a / "fa.txt").touch()
(b / "fb.txt").touch()
monkeypatch.setenv(
"CREWAI_TOOLS_ALLOWED_DIRS", os.pathsep.join([str(a), str(b)])
)
assert validate_file_path(str(a / "fa.txt"), base_dir=str(base)) == str(
a / "fa.txt"
)
assert validate_file_path(str(b / "fb.txt"), base_dir=str(base)) == str(
b / "fb.txt"
)
def test_cwd_root_default_is_not_an_allowed_root(self, tmp_path, monkeypatch):
"""An unconfigured cwd of '/' must not open the whole filesystem.
Regression for the deny-by-default allow-list silently defaulting to the
filesystem root in containers started without a WORKDIR.
"""
monkeypatch.delenv("CREWAI_TOOLS_ALLOWED_DIRS", raising=False)
monkeypatch.delenv("CREWAI_TOOLS_ALLOW_UNSAFE_PATHS", raising=False)
monkeypatch.setattr(os, "getcwd", lambda: os.sep)
with pytest.raises(ValueError, match="filesystem root"):
validate_file_path("/etc/passwd")
def test_cwd_root_with_explicit_allowed_dirs_confines(
self, tmp_path, monkeypatch
):
"""With cwd '/', confinement falls back to the explicit allow-list."""
monkeypatch.delenv("CREWAI_TOOLS_ALLOWED_DIRS", raising=False)
monkeypatch.setattr(os, "getcwd", lambda: os.sep)
(tmp_path / "data.txt").touch()
assert validate_file_path(
str(tmp_path / "data.txt"), allowed_dirs=[str(tmp_path)]
) == str(tmp_path / "data.txt")
with pytest.raises(ValueError, match="outside the allowed directories"):
validate_file_path("/etc/passwd", allowed_dirs=[str(tmp_path)])
def test_explicit_base_dir_root_is_opt_in(self, monkeypatch):
"""An explicit base_dir of '/' is honored as a deliberate opt-in."""
monkeypatch.delenv("CREWAI_TOOLS_ALLOWED_DIRS", raising=False)
monkeypatch.delenv("CREWAI_TOOLS_ALLOW_UNSAFE_PATHS", raising=False)
assert validate_file_path("/etc/passwd", base_dir=os.sep) == os.path.realpath(
"/etc/passwd"
)