mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-06-19 23:28:11 +00:00
Compare commits
1 Commits
main
...
fix/file-t
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e0df891bdd |
@@ -20,6 +20,55 @@ from urllib.parse import urlparse
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_UNSAFE_PATHS_ENV = "CREWAI_TOOLS_ALLOW_UNSAFE_PATHS"
|
||||
_ALLOWED_DIRS_ENV = "CREWAI_TOOLS_ALLOWED_DIRS"
|
||||
|
||||
|
||||
def _get_allowed_roots(
|
||||
base_dir: str | None = None,
|
||||
allowed_dirs: list[str] | None = None,
|
||||
) -> list[str]:
|
||||
"""Build the deny-by-default set of allowed root directories.
|
||||
|
||||
Roots are drawn from, in order:
|
||||
|
||||
1. ``base_dir`` (defaults to the current working directory),
|
||||
2. the ``CREWAI_TOOLS_ALLOWED_DIRS`` environment variable, split on
|
||||
``os.pathsep``,
|
||||
3. the caller-supplied ``allowed_dirs`` list.
|
||||
|
||||
Every root is resolved with :func:`os.path.realpath` so a symlinked root
|
||||
is compared by its real location. Empty entries are ignored and duplicates
|
||||
are collapsed while preserving order. The first element is always the
|
||||
primary root used to resolve relative candidate paths.
|
||||
"""
|
||||
raw_roots: list[str] = [base_dir if base_dir is not None else os.getcwd()]
|
||||
|
||||
env_dirs = os.environ.get(_ALLOWED_DIRS_ENV, "")
|
||||
if env_dirs:
|
||||
raw_roots.extend(d for d in env_dirs.split(os.pathsep) if d)
|
||||
|
||||
if allowed_dirs:
|
||||
raw_roots.extend(d for d in allowed_dirs if d)
|
||||
|
||||
resolved: list[str] = []
|
||||
seen: set[str] = set()
|
||||
for root in raw_roots:
|
||||
real = os.path.realpath(root)
|
||||
if real not in seen:
|
||||
seen.add(real)
|
||||
resolved.append(real)
|
||||
return resolved
|
||||
|
||||
|
||||
def _is_within_root(resolved_path: str, resolved_root: str) -> bool:
|
||||
"""Return True if *resolved_path* equals *resolved_root* or lives beneath it.
|
||||
|
||||
When ``resolved_root`` already ends with a separator (e.g. the filesystem
|
||||
root ``"/"``), appending ``os.sep`` would double it, so the root is used
|
||||
as-is for the prefix in that case.
|
||||
"""
|
||||
prefix = resolved_root if resolved_root.endswith(os.sep) else resolved_root + os.sep
|
||||
return resolved_path == resolved_root or resolved_path.startswith(prefix)
|
||||
|
||||
|
||||
def format_path_for_display(path: str, base_dir: str | None = None) -> str:
|
||||
@@ -52,21 +101,32 @@ def _is_escape_hatch_enabled() -> bool:
|
||||
return os.environ.get(_UNSAFE_PATHS_ENV, "").lower() in ("true", "1", "yes")
|
||||
|
||||
|
||||
def validate_file_path(path: str, base_dir: str | None = None) -> str:
|
||||
def validate_file_path(
|
||||
path: str,
|
||||
base_dir: str | None = None,
|
||||
*,
|
||||
allowed_dirs: list[str] | None = None,
|
||||
) -> str:
|
||||
"""Validate that a file path is safe to read.
|
||||
|
||||
Resolves symlinks and ``..`` components, then checks that the resolved
|
||||
path falls within *base_dir* (defaults to the current working directory).
|
||||
path falls within at least one allowed root directory. The allow-list is
|
||||
built from *base_dir* (defaults to the current working directory), the
|
||||
``CREWAI_TOOLS_ALLOWED_DIRS`` environment variable, and *allowed_dirs* —
|
||||
see :func:`_get_allowed_roots`. Access is denied by default for anything
|
||||
outside that set.
|
||||
|
||||
Args:
|
||||
path: The file path to validate.
|
||||
base_dir: Allowed root directory. Defaults to ``os.getcwd()``.
|
||||
base_dir: Primary allowed root. Defaults to ``os.getcwd()`` and is
|
||||
used to resolve relative ``path`` values.
|
||||
allowed_dirs: Additional allowed root directories.
|
||||
|
||||
Returns:
|
||||
The resolved, validated absolute path.
|
||||
|
||||
Raises:
|
||||
ValueError: If the path escapes the allowed directory.
|
||||
ValueError: If the path escapes every allowed directory.
|
||||
"""
|
||||
if _is_escape_hatch_enabled():
|
||||
logger.warning(
|
||||
@@ -76,30 +136,30 @@ def validate_file_path(path: str, base_dir: str | None = None) -> str:
|
||||
)
|
||||
return os.path.realpath(path)
|
||||
|
||||
if base_dir is None:
|
||||
base_dir = os.getcwd()
|
||||
allowed_roots = _get_allowed_roots(base_dir, allowed_dirs)
|
||||
primary_root = allowed_roots[0]
|
||||
|
||||
resolved_base = os.path.realpath(base_dir)
|
||||
resolved_path = os.path.realpath(
|
||||
os.path.join(resolved_base, path) if not os.path.isabs(path) else path
|
||||
path if os.path.isabs(path) else os.path.join(primary_root, path)
|
||||
)
|
||||
|
||||
# Ensure the resolved path is within the base directory.
|
||||
# When resolved_base already ends with a separator (e.g. the filesystem
|
||||
# root "/"), appending os.sep would double it ("//"), so use the base
|
||||
# as-is in that case.
|
||||
prefix = resolved_base if resolved_base.endswith(os.sep) else resolved_base + os.sep
|
||||
if not resolved_path.startswith(prefix) and resolved_path != resolved_base:
|
||||
raise ValueError(
|
||||
f"Path '{format_path_for_display(resolved_path, resolved_base)}' is "
|
||||
f"outside the allowed directory. "
|
||||
f"Set {_UNSAFE_PATHS_ENV}=true to bypass this check."
|
||||
)
|
||||
if any(_is_within_root(resolved_path, root) for root in allowed_roots):
|
||||
return resolved_path
|
||||
|
||||
return resolved_path
|
||||
raise ValueError(
|
||||
f"Path '{format_path_for_display(resolved_path, primary_root)}' is "
|
||||
f"outside the allowed directories. "
|
||||
f"Add the directory via {_ALLOWED_DIRS_ENV}, or set "
|
||||
f"{_UNSAFE_PATHS_ENV}=true to bypass this check."
|
||||
)
|
||||
|
||||
|
||||
def validate_directory_path(path: str, base_dir: str | None = None) -> str:
|
||||
def validate_directory_path(
|
||||
path: str,
|
||||
base_dir: str | None = None,
|
||||
*,
|
||||
allowed_dirs: list[str] | None = None,
|
||||
) -> str:
|
||||
"""Validate that a directory path is safe to read.
|
||||
|
||||
Same as :func:`validate_file_path` but also checks that the path
|
||||
@@ -107,15 +167,16 @@ def validate_directory_path(path: str, base_dir: str | None = None) -> str:
|
||||
|
||||
Args:
|
||||
path: The directory path to validate.
|
||||
base_dir: Allowed root directory. Defaults to ``os.getcwd()``.
|
||||
base_dir: Primary allowed root. Defaults to ``os.getcwd()``.
|
||||
allowed_dirs: Additional allowed root directories.
|
||||
|
||||
Returns:
|
||||
The resolved, validated absolute path.
|
||||
|
||||
Raises:
|
||||
ValueError: If the path escapes the allowed directory or is not a directory.
|
||||
ValueError: If the path escapes every allowed directory or is not a directory.
|
||||
"""
|
||||
validated = validate_file_path(path, base_dir)
|
||||
validated = validate_file_path(path, base_dir, allowed_dirs=allowed_dirs)
|
||||
if not os.path.isdir(validated):
|
||||
raise ValueError(f"Path '{validated}' is not a directory.")
|
||||
return validated
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from crewai.tools import BaseTool
|
||||
@@ -8,6 +7,7 @@ from pydantic import BaseModel
|
||||
from crewai_tools.security.safe_path import (
|
||||
format_error_for_display,
|
||||
format_path_for_display,
|
||||
validate_file_path,
|
||||
)
|
||||
|
||||
|
||||
@@ -41,22 +41,27 @@ class FileWriterTool(BaseTool):
|
||||
|
||||
filepath = os.path.join(directory, filename)
|
||||
|
||||
# Prevent path traversal: the resolved path must be strictly inside
|
||||
# filename, and symlink escapes regardless of how directory is set.
|
||||
# is_relative_to() does a proper path-component comparison that is
|
||||
# safe on case-insensitive filesystems and avoids the "// " edge case
|
||||
# We also reject the case where filepath resolves to the directory
|
||||
# itself, since that is not a valid file target.
|
||||
real_directory = Path(directory).resolve()
|
||||
real_filepath = Path(filepath).resolve()
|
||||
display_filepath = format_path_for_display(
|
||||
str(real_filepath), str(real_directory)
|
||||
)
|
||||
if (
|
||||
not real_filepath.is_relative_to(real_directory)
|
||||
or real_filepath == real_directory
|
||||
):
|
||||
return "Error: Invalid file path — the filename must not escape the target directory."
|
||||
# Confine the resolved write target to an allow-listed root
|
||||
# (cwd + CREWAI_TOOLS_ALLOWED_DIRS), NOT merely inside the
|
||||
# caller-supplied `directory`. That value is itself untrusted when
|
||||
# an LLM tool call chooses it, so checking containment against it
|
||||
# would let an agent write anywhere (e.g. ~/.ssh/authorized_keys).
|
||||
# validate_file_path resolves symlinks and ".." before checking.
|
||||
try:
|
||||
real_filepath = validate_file_path(filepath)
|
||||
except ValueError as e:
|
||||
return f"Error: {format_error_for_display(e)}"
|
||||
|
||||
real_directory = os.path.dirname(real_filepath)
|
||||
display_filepath = format_path_for_display(real_filepath, real_directory)
|
||||
|
||||
# A target that resolves to an existing directory is not a valid
|
||||
# file destination.
|
||||
if os.path.isdir(real_filepath):
|
||||
return (
|
||||
"Error: Invalid file path — the target must be a file, "
|
||||
"not a directory."
|
||||
)
|
||||
|
||||
if kwargs.get("directory"):
|
||||
os.makedirs(real_directory, exist_ok=True)
|
||||
|
||||
@@ -17,12 +17,23 @@ def temp_env():
|
||||
test_file = "test.txt"
|
||||
test_content = "Hello, World!"
|
||||
|
||||
# FileWriterTool confines writes to an allow-listed root (cwd plus
|
||||
# CREWAI_TOOLS_ALLOWED_DIRS). Explicitly permit this temp dir — this is the
|
||||
# supported way for a developer to widen the write scope to an external
|
||||
# directory, and lets the happy-path tests below write into it.
|
||||
prev_allowed = os.environ.get("CREWAI_TOOLS_ALLOWED_DIRS")
|
||||
os.environ["CREWAI_TOOLS_ALLOWED_DIRS"] = temp_dir
|
||||
|
||||
yield {
|
||||
"temp_dir": temp_dir,
|
||||
"test_file": test_file,
|
||||
"test_content": test_content,
|
||||
}
|
||||
|
||||
if prev_allowed is None:
|
||||
os.environ.pop("CREWAI_TOOLS_ALLOWED_DIRS", None)
|
||||
else:
|
||||
os.environ["CREWAI_TOOLS_ALLOWED_DIRS"] = prev_allowed
|
||||
shutil.rmtree(temp_dir, ignore_errors=True)
|
||||
|
||||
|
||||
@@ -196,3 +207,24 @@ def test_blocks_symlink_escape(tool, temp_env):
|
||||
assert not os.path.exists(outside_file)
|
||||
finally:
|
||||
shutil.rmtree(outside_dir, ignore_errors=True)
|
||||
|
||||
|
||||
|
||||
def test_blocks_unbounded_directory_arg(tool, temp_env):
|
||||
# The core fix: the `directory` argument is itself untrusted (LLM-chosen).
|
||||
# A directory outside the allow-list must be rejected even when filename
|
||||
# is benign — previously this let an agent write anywhere on disk
|
||||
# (e.g. ~/.ssh/authorized_keys).
|
||||
outside_dir = tempfile.mkdtemp() # NOT added to CREWAI_TOOLS_ALLOWED_DIRS
|
||||
outside_file = os.path.join(outside_dir, "test.txt")
|
||||
try:
|
||||
result = tool._run(
|
||||
filename="test.txt",
|
||||
directory=outside_dir,
|
||||
content="should not be written",
|
||||
overwrite=True,
|
||||
)
|
||||
assert "Error" in result
|
||||
assert not os.path.exists(outside_file)
|
||||
finally:
|
||||
shutil.rmtree(outside_dir, ignore_errors=True)
|
||||
|
||||
@@ -32,12 +32,12 @@ class TestValidateFilePath:
|
||||
|
||||
def test_rejects_dotdot_traversal(self, tmp_path):
|
||||
"""Reject ../ traversal that escapes base_dir."""
|
||||
with pytest.raises(ValueError, match="outside the allowed directory"):
|
||||
with pytest.raises(ValueError, match="outside the allowed director"):
|
||||
validate_file_path("../../etc/passwd", str(tmp_path))
|
||||
|
||||
def test_rejects_absolute_path_outside_base(self, tmp_path):
|
||||
"""Reject absolute path outside base_dir."""
|
||||
with pytest.raises(ValueError, match="outside the allowed directory"):
|
||||
with pytest.raises(ValueError, match="outside the allowed director"):
|
||||
validate_file_path("/etc/passwd", str(tmp_path))
|
||||
|
||||
def test_allows_absolute_path_inside_base(self, tmp_path):
|
||||
@@ -50,7 +50,7 @@ class TestValidateFilePath:
|
||||
"""Reject symlinks that point outside base_dir."""
|
||||
link = tmp_path / "sneaky_link"
|
||||
os.symlink("/etc/passwd", str(link))
|
||||
with pytest.raises(ValueError, match="outside the allowed directory"):
|
||||
with pytest.raises(ValueError, match="outside the allowed director"):
|
||||
validate_file_path("sneaky_link", str(tmp_path))
|
||||
|
||||
def test_defaults_to_cwd(self):
|
||||
@@ -113,7 +113,7 @@ class TestValidateDirectoryPath:
|
||||
validate_directory_path("file.txt", str(tmp_path))
|
||||
|
||||
def test_rejects_traversal(self, tmp_path):
|
||||
with pytest.raises(ValueError, match="outside the allowed directory"):
|
||||
with pytest.raises(ValueError, match="outside the allowed director"):
|
||||
validate_directory_path("../../", str(tmp_path))
|
||||
|
||||
|
||||
@@ -191,3 +191,62 @@ class TestValidateUrl:
|
||||
# file:// would normally be blocked
|
||||
result = validate_url("file:///etc/passwd")
|
||||
assert result == "file:///etc/passwd"
|
||||
|
||||
|
||||
|
||||
class TestAllowList:
|
||||
"""Tests for the configurable deny-by-default allow-list of roots."""
|
||||
|
||||
def test_param_extends_allowed_roots(self, tmp_path):
|
||||
"""A directory passed via allowed_dirs is permitted."""
|
||||
extra = tmp_path / "extra"
|
||||
extra.mkdir()
|
||||
(extra / "data.txt").touch()
|
||||
result = validate_file_path(
|
||||
str(extra / "data.txt"),
|
||||
base_dir=str(tmp_path / "base"),
|
||||
allowed_dirs=[str(extra)],
|
||||
)
|
||||
assert result == str(extra / "data.txt")
|
||||
|
||||
def test_env_extends_allowed_roots(self, tmp_path, monkeypatch):
|
||||
"""A directory listed in CREWAI_TOOLS_ALLOWED_DIRS is permitted."""
|
||||
base = tmp_path / "base"
|
||||
base.mkdir()
|
||||
extra = tmp_path / "extra"
|
||||
extra.mkdir()
|
||||
(extra / "data.txt").touch()
|
||||
monkeypatch.setenv("CREWAI_TOOLS_ALLOWED_DIRS", str(extra))
|
||||
result = validate_file_path(str(extra / "data.txt"), base_dir=str(base))
|
||||
assert result == str(extra / "data.txt")
|
||||
|
||||
def test_denied_without_allow_listing(self, tmp_path, monkeypatch):
|
||||
"""The same external dir is rejected when not allow-listed."""
|
||||
base = tmp_path / "base"
|
||||
base.mkdir()
|
||||
extra = tmp_path / "extra"
|
||||
extra.mkdir()
|
||||
(extra / "data.txt").touch()
|
||||
monkeypatch.delenv("CREWAI_TOOLS_ALLOWED_DIRS", raising=False)
|
||||
with pytest.raises(ValueError, match="outside the allowed director"):
|
||||
validate_file_path(str(extra / "data.txt"), base_dir=str(base))
|
||||
|
||||
def test_multiple_env_roots(self, tmp_path, monkeypatch):
|
||||
"""Multiple os.pathsep-separated roots are each honored."""
|
||||
base = tmp_path / "base"
|
||||
base.mkdir()
|
||||
a = tmp_path / "a"
|
||||
a.mkdir()
|
||||
b = tmp_path / "b"
|
||||
b.mkdir()
|
||||
(a / "fa.txt").touch()
|
||||
(b / "fb.txt").touch()
|
||||
monkeypatch.setenv(
|
||||
"CREWAI_TOOLS_ALLOWED_DIRS", os.pathsep.join([str(a), str(b)])
|
||||
)
|
||||
assert validate_file_path(str(a / "fa.txt"), base_dir=str(base)) == str(
|
||||
a / "fa.txt"
|
||||
)
|
||||
assert validate_file_path(str(b / "fb.txt"), base_dir=str(base)) == str(
|
||||
b / "fb.txt"
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user