From e0df891bdd31f1e8c3566f539d3837feed48a4d7 Mon Sep 17 00:00:00 2001 From: Rip&Tear <84775494+theCyberTech@users.noreply.github.com> Date: Sat, 20 Jun 2026 01:29:19 +0800 Subject: [PATCH] fix: confine file tools to an allow-listed root to block path traversal MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit LLM/prompt-injection-controlled file paths could escape the working directory. The RAG search tools and FileReadTool already routed through validate_file_path, but FileWriterTool only checked that `filename` did not escape the caller-supplied `directory` — and `directory` is itself LLM-controlled, so an agent fed untrusted content could be steered into writing anywhere on disk (e.g. ~/.ssh/authorized_keys). - safe_path: replace the single base_dir cwd jail with a deny-by-default allow-list of roots, sourced from cwd + CREWAI_TOOLS_ALLOWED_DIRS + a caller-passed allowed_dirs. Backward compatible for existing callers. - FileWriterTool: route the resolved write target through validate_file_path so writes are confined to an allow-listed root regardless of the directory argument. - Tests: allow-list extension via env/param, deny-by-default, multi-root, and a regression test for the unbounded-directory write. BREAKING: FileWriterTool no longer writes to arbitrary absolute directories by default. Set CREWAI_TOOLS_ALLOWED_DIRS to permit out-of-cwd writes. Co-Authored-By: Claude Opus 4.8 --- .../src/crewai_tools/security/safe_path.py | 109 ++++++++++++++---- .../file_writer_tool/file_writer_tool.py | 39 ++++--- .../tests/tools/test_file_writer_tool.py | 32 +++++ .../tests/utilities/test_safe_path.py | 67 ++++++++++- 4 files changed, 202 insertions(+), 45 deletions(-) diff --git a/lib/crewai-tools/src/crewai_tools/security/safe_path.py b/lib/crewai-tools/src/crewai_tools/security/safe_path.py index 986cf799a..6b9a3a1c9 100644 --- a/lib/crewai-tools/src/crewai_tools/security/safe_path.py +++ b/lib/crewai-tools/src/crewai_tools/security/safe_path.py @@ -20,6 +20,55 @@ from urllib.parse import urlparse logger = logging.getLogger(__name__) _UNSAFE_PATHS_ENV = "CREWAI_TOOLS_ALLOW_UNSAFE_PATHS" +_ALLOWED_DIRS_ENV = "CREWAI_TOOLS_ALLOWED_DIRS" + + +def _get_allowed_roots( + base_dir: str | None = None, + allowed_dirs: list[str] | None = None, +) -> list[str]: + """Build the deny-by-default set of allowed root directories. + + Roots are drawn from, in order: + + 1. ``base_dir`` (defaults to the current working directory), + 2. the ``CREWAI_TOOLS_ALLOWED_DIRS`` environment variable, split on + ``os.pathsep``, + 3. the caller-supplied ``allowed_dirs`` list. + + Every root is resolved with :func:`os.path.realpath` so a symlinked root + is compared by its real location. Empty entries are ignored and duplicates + are collapsed while preserving order. The first element is always the + primary root used to resolve relative candidate paths. + """ + raw_roots: list[str] = [base_dir if base_dir is not None else os.getcwd()] + + env_dirs = os.environ.get(_ALLOWED_DIRS_ENV, "") + if env_dirs: + raw_roots.extend(d for d in env_dirs.split(os.pathsep) if d) + + if allowed_dirs: + raw_roots.extend(d for d in allowed_dirs if d) + + resolved: list[str] = [] + seen: set[str] = set() + for root in raw_roots: + real = os.path.realpath(root) + if real not in seen: + seen.add(real) + resolved.append(real) + return resolved + + +def _is_within_root(resolved_path: str, resolved_root: str) -> bool: + """Return True if *resolved_path* equals *resolved_root* or lives beneath it. + + When ``resolved_root`` already ends with a separator (e.g. the filesystem + root ``"/"``), appending ``os.sep`` would double it, so the root is used + as-is for the prefix in that case. + """ + prefix = resolved_root if resolved_root.endswith(os.sep) else resolved_root + os.sep + return resolved_path == resolved_root or resolved_path.startswith(prefix) def format_path_for_display(path: str, base_dir: str | None = None) -> str: @@ -52,21 +101,32 @@ def _is_escape_hatch_enabled() -> bool: return os.environ.get(_UNSAFE_PATHS_ENV, "").lower() in ("true", "1", "yes") -def validate_file_path(path: str, base_dir: str | None = None) -> str: +def validate_file_path( + path: str, + base_dir: str | None = None, + *, + allowed_dirs: list[str] | None = None, +) -> str: """Validate that a file path is safe to read. Resolves symlinks and ``..`` components, then checks that the resolved - path falls within *base_dir* (defaults to the current working directory). + path falls within at least one allowed root directory. The allow-list is + built from *base_dir* (defaults to the current working directory), the + ``CREWAI_TOOLS_ALLOWED_DIRS`` environment variable, and *allowed_dirs* — + see :func:`_get_allowed_roots`. Access is denied by default for anything + outside that set. Args: path: The file path to validate. - base_dir: Allowed root directory. Defaults to ``os.getcwd()``. + base_dir: Primary allowed root. Defaults to ``os.getcwd()`` and is + used to resolve relative ``path`` values. + allowed_dirs: Additional allowed root directories. Returns: The resolved, validated absolute path. Raises: - ValueError: If the path escapes the allowed directory. + ValueError: If the path escapes every allowed directory. """ if _is_escape_hatch_enabled(): logger.warning( @@ -76,30 +136,30 @@ def validate_file_path(path: str, base_dir: str | None = None) -> str: ) return os.path.realpath(path) - if base_dir is None: - base_dir = os.getcwd() + allowed_roots = _get_allowed_roots(base_dir, allowed_dirs) + primary_root = allowed_roots[0] - resolved_base = os.path.realpath(base_dir) resolved_path = os.path.realpath( - os.path.join(resolved_base, path) if not os.path.isabs(path) else path + path if os.path.isabs(path) else os.path.join(primary_root, path) ) - # Ensure the resolved path is within the base directory. - # When resolved_base already ends with a separator (e.g. the filesystem - # root "/"), appending os.sep would double it ("//"), so use the base - # as-is in that case. - prefix = resolved_base if resolved_base.endswith(os.sep) else resolved_base + os.sep - if not resolved_path.startswith(prefix) and resolved_path != resolved_base: - raise ValueError( - f"Path '{format_path_for_display(resolved_path, resolved_base)}' is " - f"outside the allowed directory. " - f"Set {_UNSAFE_PATHS_ENV}=true to bypass this check." - ) + if any(_is_within_root(resolved_path, root) for root in allowed_roots): + return resolved_path - return resolved_path + raise ValueError( + f"Path '{format_path_for_display(resolved_path, primary_root)}' is " + f"outside the allowed directories. " + f"Add the directory via {_ALLOWED_DIRS_ENV}, or set " + f"{_UNSAFE_PATHS_ENV}=true to bypass this check." + ) -def validate_directory_path(path: str, base_dir: str | None = None) -> str: +def validate_directory_path( + path: str, + base_dir: str | None = None, + *, + allowed_dirs: list[str] | None = None, +) -> str: """Validate that a directory path is safe to read. Same as :func:`validate_file_path` but also checks that the path @@ -107,15 +167,16 @@ def validate_directory_path(path: str, base_dir: str | None = None) -> str: Args: path: The directory path to validate. - base_dir: Allowed root directory. Defaults to ``os.getcwd()``. + base_dir: Primary allowed root. Defaults to ``os.getcwd()``. + allowed_dirs: Additional allowed root directories. Returns: The resolved, validated absolute path. Raises: - ValueError: If the path escapes the allowed directory or is not a directory. + ValueError: If the path escapes every allowed directory or is not a directory. """ - validated = validate_file_path(path, base_dir) + validated = validate_file_path(path, base_dir, allowed_dirs=allowed_dirs) if not os.path.isdir(validated): raise ValueError(f"Path '{validated}' is not a directory.") return validated diff --git a/lib/crewai-tools/src/crewai_tools/tools/file_writer_tool/file_writer_tool.py b/lib/crewai-tools/src/crewai_tools/tools/file_writer_tool/file_writer_tool.py index 8ab0c2004..f2c9a8397 100644 --- a/lib/crewai-tools/src/crewai_tools/tools/file_writer_tool/file_writer_tool.py +++ b/lib/crewai-tools/src/crewai_tools/tools/file_writer_tool/file_writer_tool.py @@ -1,5 +1,4 @@ import os -from pathlib import Path from typing import Any from crewai.tools import BaseTool @@ -8,6 +7,7 @@ from pydantic import BaseModel from crewai_tools.security.safe_path import ( format_error_for_display, format_path_for_display, + validate_file_path, ) @@ -41,22 +41,27 @@ class FileWriterTool(BaseTool): filepath = os.path.join(directory, filename) - # Prevent path traversal: the resolved path must be strictly inside - # filename, and symlink escapes regardless of how directory is set. - # is_relative_to() does a proper path-component comparison that is - # safe on case-insensitive filesystems and avoids the "// " edge case - # We also reject the case where filepath resolves to the directory - # itself, since that is not a valid file target. - real_directory = Path(directory).resolve() - real_filepath = Path(filepath).resolve() - display_filepath = format_path_for_display( - str(real_filepath), str(real_directory) - ) - if ( - not real_filepath.is_relative_to(real_directory) - or real_filepath == real_directory - ): - return "Error: Invalid file path — the filename must not escape the target directory." + # Confine the resolved write target to an allow-listed root + # (cwd + CREWAI_TOOLS_ALLOWED_DIRS), NOT merely inside the + # caller-supplied `directory`. That value is itself untrusted when + # an LLM tool call chooses it, so checking containment against it + # would let an agent write anywhere (e.g. ~/.ssh/authorized_keys). + # validate_file_path resolves symlinks and ".." before checking. + try: + real_filepath = validate_file_path(filepath) + except ValueError as e: + return f"Error: {format_error_for_display(e)}" + + real_directory = os.path.dirname(real_filepath) + display_filepath = format_path_for_display(real_filepath, real_directory) + + # A target that resolves to an existing directory is not a valid + # file destination. + if os.path.isdir(real_filepath): + return ( + "Error: Invalid file path — the target must be a file, " + "not a directory." + ) if kwargs.get("directory"): os.makedirs(real_directory, exist_ok=True) diff --git a/lib/crewai-tools/tests/tools/test_file_writer_tool.py b/lib/crewai-tools/tests/tools/test_file_writer_tool.py index 77f91f152..a41d6a7f0 100644 --- a/lib/crewai-tools/tests/tools/test_file_writer_tool.py +++ b/lib/crewai-tools/tests/tools/test_file_writer_tool.py @@ -17,12 +17,23 @@ def temp_env(): test_file = "test.txt" test_content = "Hello, World!" + # FileWriterTool confines writes to an allow-listed root (cwd plus + # CREWAI_TOOLS_ALLOWED_DIRS). Explicitly permit this temp dir — this is the + # supported way for a developer to widen the write scope to an external + # directory, and lets the happy-path tests below write into it. + prev_allowed = os.environ.get("CREWAI_TOOLS_ALLOWED_DIRS") + os.environ["CREWAI_TOOLS_ALLOWED_DIRS"] = temp_dir + yield { "temp_dir": temp_dir, "test_file": test_file, "test_content": test_content, } + if prev_allowed is None: + os.environ.pop("CREWAI_TOOLS_ALLOWED_DIRS", None) + else: + os.environ["CREWAI_TOOLS_ALLOWED_DIRS"] = prev_allowed shutil.rmtree(temp_dir, ignore_errors=True) @@ -196,3 +207,24 @@ def test_blocks_symlink_escape(tool, temp_env): assert not os.path.exists(outside_file) finally: shutil.rmtree(outside_dir, ignore_errors=True) + + + +def test_blocks_unbounded_directory_arg(tool, temp_env): + # The core fix: the `directory` argument is itself untrusted (LLM-chosen). + # A directory outside the allow-list must be rejected even when filename + # is benign — previously this let an agent write anywhere on disk + # (e.g. ~/.ssh/authorized_keys). + outside_dir = tempfile.mkdtemp() # NOT added to CREWAI_TOOLS_ALLOWED_DIRS + outside_file = os.path.join(outside_dir, "test.txt") + try: + result = tool._run( + filename="test.txt", + directory=outside_dir, + content="should not be written", + overwrite=True, + ) + assert "Error" in result + assert not os.path.exists(outside_file) + finally: + shutil.rmtree(outside_dir, ignore_errors=True) diff --git a/lib/crewai-tools/tests/utilities/test_safe_path.py b/lib/crewai-tools/tests/utilities/test_safe_path.py index 1a0faa14b..162adf118 100644 --- a/lib/crewai-tools/tests/utilities/test_safe_path.py +++ b/lib/crewai-tools/tests/utilities/test_safe_path.py @@ -32,12 +32,12 @@ class TestValidateFilePath: def test_rejects_dotdot_traversal(self, tmp_path): """Reject ../ traversal that escapes base_dir.""" - with pytest.raises(ValueError, match="outside the allowed directory"): + with pytest.raises(ValueError, match="outside the allowed director"): validate_file_path("../../etc/passwd", str(tmp_path)) def test_rejects_absolute_path_outside_base(self, tmp_path): """Reject absolute path outside base_dir.""" - with pytest.raises(ValueError, match="outside the allowed directory"): + with pytest.raises(ValueError, match="outside the allowed director"): validate_file_path("/etc/passwd", str(tmp_path)) def test_allows_absolute_path_inside_base(self, tmp_path): @@ -50,7 +50,7 @@ class TestValidateFilePath: """Reject symlinks that point outside base_dir.""" link = tmp_path / "sneaky_link" os.symlink("/etc/passwd", str(link)) - with pytest.raises(ValueError, match="outside the allowed directory"): + with pytest.raises(ValueError, match="outside the allowed director"): validate_file_path("sneaky_link", str(tmp_path)) def test_defaults_to_cwd(self): @@ -113,7 +113,7 @@ class TestValidateDirectoryPath: validate_directory_path("file.txt", str(tmp_path)) def test_rejects_traversal(self, tmp_path): - with pytest.raises(ValueError, match="outside the allowed directory"): + with pytest.raises(ValueError, match="outside the allowed director"): validate_directory_path("../../", str(tmp_path)) @@ -191,3 +191,62 @@ class TestValidateUrl: # file:// would normally be blocked result = validate_url("file:///etc/passwd") assert result == "file:///etc/passwd" + + + +class TestAllowList: + """Tests for the configurable deny-by-default allow-list of roots.""" + + def test_param_extends_allowed_roots(self, tmp_path): + """A directory passed via allowed_dirs is permitted.""" + extra = tmp_path / "extra" + extra.mkdir() + (extra / "data.txt").touch() + result = validate_file_path( + str(extra / "data.txt"), + base_dir=str(tmp_path / "base"), + allowed_dirs=[str(extra)], + ) + assert result == str(extra / "data.txt") + + def test_env_extends_allowed_roots(self, tmp_path, monkeypatch): + """A directory listed in CREWAI_TOOLS_ALLOWED_DIRS is permitted.""" + base = tmp_path / "base" + base.mkdir() + extra = tmp_path / "extra" + extra.mkdir() + (extra / "data.txt").touch() + monkeypatch.setenv("CREWAI_TOOLS_ALLOWED_DIRS", str(extra)) + result = validate_file_path(str(extra / "data.txt"), base_dir=str(base)) + assert result == str(extra / "data.txt") + + def test_denied_without_allow_listing(self, tmp_path, monkeypatch): + """The same external dir is rejected when not allow-listed.""" + base = tmp_path / "base" + base.mkdir() + extra = tmp_path / "extra" + extra.mkdir() + (extra / "data.txt").touch() + monkeypatch.delenv("CREWAI_TOOLS_ALLOWED_DIRS", raising=False) + with pytest.raises(ValueError, match="outside the allowed director"): + validate_file_path(str(extra / "data.txt"), base_dir=str(base)) + + def test_multiple_env_roots(self, tmp_path, monkeypatch): + """Multiple os.pathsep-separated roots are each honored.""" + base = tmp_path / "base" + base.mkdir() + a = tmp_path / "a" + a.mkdir() + b = tmp_path / "b" + b.mkdir() + (a / "fa.txt").touch() + (b / "fb.txt").touch() + monkeypatch.setenv( + "CREWAI_TOOLS_ALLOWED_DIRS", os.pathsep.join([str(a), str(b)]) + ) + assert validate_file_path(str(a / "fa.txt"), base_dir=str(base)) == str( + a / "fa.txt" + ) + assert validate_file_path(str(b / "fb.txt"), base_dir=str(base)) == str( + b / "fb.txt" + )