Compare commits

...

1 Commits

Author SHA1 Message Date
Rip&Tear
d3fc0d31f8 [codex] Redact file tool paths (#6134)
Some checks are pending
CodeQL Advanced / Analyze (actions) (push) Waiting to run
CodeQL Advanced / Analyze (python) (push) Waiting to run
Vulnerability Scan / pip-audit (push) Waiting to run
* Redact file tool paths

* Fix for pull request finding 'Empty except'

* Potential fix for pull request finding

---------
2026-06-12 15:50:40 +08:00
6 changed files with 150 additions and 27 deletions

View File

@@ -22,6 +22,31 @@ logger = logging.getLogger(__name__)
_UNSAFE_PATHS_ENV = "CREWAI_TOOLS_ALLOW_UNSAFE_PATHS"
def format_path_for_display(path: str, base_dir: str | None = None) -> str:
"""Return a path label that does not expose absolute directory prefixes."""
if base_dir is None:
base_dir = os.getcwd()
try:
resolved_base = os.path.realpath(base_dir)
resolved_path = os.path.realpath(
os.path.join(resolved_base, path) if not os.path.isabs(path) else path
)
if os.path.commonpath([resolved_base, resolved_path]) == resolved_base:
return os.path.relpath(resolved_path, resolved_base)
except (OSError, ValueError) as exc:
logger.debug("Falling back to basename for display path formatting: %s", exc)
return os.path.basename(os.path.realpath(path)) or "[redacted path]"
def format_error_for_display(error: Exception) -> str:
"""Return exception details without OS-added absolute path context."""
if isinstance(error, OSError):
return error.strerror or error.__class__.__name__
return str(error)
def _is_escape_hatch_enabled() -> bool:
"""Check if the unsafe paths escape hatch is enabled."""
return os.environ.get(_UNSAFE_PATHS_ENV, "").lower() in ("true", "1", "yes")
@@ -66,8 +91,8 @@ def validate_file_path(path: str, base_dir: str | None = None) -> str:
prefix = resolved_base if resolved_base.endswith(os.sep) else resolved_base + os.sep
if not resolved_path.startswith(prefix) and resolved_path != resolved_base:
raise ValueError(
f"Path '{path}' resolves to '{resolved_path}' which is outside "
f"the allowed directory '{resolved_base}'. "
f"Path '{format_path_for_display(resolved_path, resolved_base)}' is "
f"outside the allowed directory. "
f"Set {_UNSAFE_PATHS_ENV}=true to bypass this check."
)

View File

@@ -3,7 +3,11 @@ from typing import Any
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
from crewai_tools.security.safe_path import validate_file_path
from crewai_tools.security.safe_path import (
format_error_for_display,
format_path_for_display,
validate_file_path,
)
class FileReadToolSchema(BaseModel):
@@ -58,8 +62,9 @@ class FileReadTool(BaseTool):
**kwargs: Additional keyword arguments passed to BaseTool.
"""
if file_path is not None:
display_path = format_path_for_display(file_path)
kwargs["description"] = (
f"A tool that reads file content. The default file is {file_path}, but you can provide a different 'file_path' parameter to read another file. You can also specify 'start_line' and 'line_count' to read specific parts of the file."
f"A tool that reads file content. The default file is {display_path}, but you can provide a different 'file_path' parameter to read another file. You can also specify 'start_line' and 'line_count' to read specific parts of the file."
)
super().__init__(**kwargs)
@@ -78,7 +83,12 @@ class FileReadTool(BaseTool):
if file_path is None:
return "Error: No file path provided. Please provide a file path either in the constructor or as an argument."
file_path = validate_file_path(file_path)
try:
file_path = validate_file_path(file_path)
except ValueError as e:
return f"Error: Invalid file path: {e!s}"
display_path = format_path_for_display(file_path)
try:
with open(file_path, "r") as file:
if start_line == 1 and line_count is None:
@@ -98,8 +108,11 @@ class FileReadTool(BaseTool):
return "".join(selected_lines)
except FileNotFoundError:
return f"Error: File not found at path: {file_path}"
return f"Error: File not found at path: {display_path}"
except PermissionError:
return f"Error: Permission denied when trying to read file: {file_path}"
return f"Error: Permission denied when trying to read file: {display_path}"
except Exception as e:
return f"Error: Failed to read file {file_path}. {e!s}"
return (
f"Error: Failed to read file {display_path}. "
f"{format_error_for_display(e)}"
)

View File

@@ -5,6 +5,11 @@ from typing import Any
from crewai.tools import BaseTool
from pydantic import BaseModel
from crewai_tools.security.safe_path import (
format_error_for_display,
format_path_for_display,
)
def strtobool(val: str | bool) -> bool:
if isinstance(val, bool):
@@ -44,6 +49,9 @@ class FileWriterTool(BaseTool):
# itself, since that is not a valid file target.
real_directory = Path(directory).resolve()
real_filepath = Path(filepath).resolve()
display_filepath = format_path_for_display(
str(real_filepath), str(real_directory)
)
if (
not real_filepath.is_relative_to(real_directory)
or real_filepath == real_directory
@@ -56,15 +64,18 @@ class FileWriterTool(BaseTool):
kwargs["overwrite"] = strtobool(kwargs["overwrite"])
if os.path.exists(real_filepath) and not kwargs["overwrite"]:
return f"File {real_filepath} already exists and overwrite option was not passed."
return f"File {display_filepath} already exists and overwrite option was not passed."
mode = "w" if kwargs["overwrite"] else "x"
with open(real_filepath, mode) as file:
file.write(kwargs["content"])
return f"Content successfully written to {real_filepath}"
return f"Content successfully written to {display_filepath}"
except FileExistsError:
return f"File {real_filepath} already exists and overwrite option was not passed."
return f"File {display_filepath} already exists and overwrite option was not passed."
except KeyError as e:
return f"An error occurred while accessing key: {e!s}"
except Exception as e:
return f"An error occurred while writing to the file: {e!s}"
return (
"An error occurred while writing to the file: "
f"{format_error_for_display(e)}"
)

View File

@@ -1,4 +1,3 @@
import os
from unittest.mock import mock_open, patch
from crewai_tools import FileReadTool
@@ -6,21 +5,16 @@ from crewai_tools import FileReadTool
def test_file_read_tool_constructor():
"""Test FileReadTool initialization with file_path."""
test_file = "/tmp/test_file.txt"
test_content = "Hello, World!"
with open(test_file, "w") as f:
f.write(test_content)
test_file = "test_file.txt"
tool = FileReadTool(file_path=test_file)
assert tool.file_path == test_file
assert "test_file.txt" in tool.description
os.remove(test_file)
def test_file_read_tool_run():
"""Test FileReadTool _run method with file_path at runtime."""
test_file = "/tmp/test_file.txt"
test_file = "test_file.txt"
test_content = "Hello, World!"
# Use mock_open to mock file operations
@@ -36,18 +30,18 @@ def test_file_read_tool_error_handling():
result = tool._run()
assert "Error: No file path provided" in result
result = tool._run(file_path="/nonexistent/file.txt")
result = tool._run(file_path="nonexistent/file.txt")
assert "Error: File not found at path:" in result
with patch("builtins.open", side_effect=PermissionError()):
result = tool._run(file_path="/tmp/no_permission.txt")
result = tool._run(file_path="no_permission.txt")
assert "Error: Permission denied" in result
def test_file_read_tool_constructor_and_run():
"""Test FileReadTool using both constructor and runtime file paths."""
test_file1 = "/tmp/test1.txt"
test_file2 = "/tmp/test2.txt"
test_file1 = "test1.txt"
test_file2 = "test2.txt"
content1 = "File 1 content"
content2 = "File 2 content"
@@ -64,7 +58,7 @@ def test_file_read_tool_constructor_and_run():
def test_file_read_tool_chunk_reading():
"""Test FileReadTool reading specific chunks of a file."""
test_file = "/tmp/multiline_test.txt"
test_file = "multiline_test.txt"
lines = [
"Line 1\n",
"Line 2\n",
@@ -104,7 +98,7 @@ def test_file_read_tool_chunk_reading():
def test_file_read_tool_chunk_error_handling():
"""Test error handling for chunk reading."""
test_file = "/tmp/short_test.txt"
test_file = "short_test.txt"
lines = ["Line 1\n", "Line 2\n", "Line 3\n"]
file_content = "".join(lines)
@@ -122,7 +116,7 @@ def test_file_read_tool_chunk_error_handling():
def test_file_read_tool_zero_or_negative_start_line():
"""Test that start_line values of 0 or negative read from the start of the file."""
test_file = "/tmp/negative_test.txt"
test_file = "negative_test.txt"
lines = ["Line 1\n", "Line 2\n", "Line 3\n", "Line 4\n", "Line 5\n"]
file_content = "".join(lines)
@@ -150,3 +144,45 @@ def test_file_read_tool_zero_or_negative_start_line():
result = tool._run(file_path=test_file, start_line=-10, line_count=2)
expected = "".join(lines[0:2]) # Should read first 2 lines
assert result == expected
def test_file_read_tool_error_messages_do_not_disclose_absolute_paths(
tmp_path, monkeypatch
):
"""FileReadTool should redact absolute prefixes from user-visible errors."""
monkeypatch.chdir(tmp_path)
tool = FileReadTool()
target = tmp_path / "secret.txt"
result = tool._run(file_path=str(target))
assert "secret.txt" in result
assert str(tmp_path) not in result
target.touch()
with patch("builtins.open", side_effect=PermissionError()):
result = tool._run(file_path=str(target))
assert "secret.txt" in result
assert str(tmp_path) not in result
with patch(
"builtins.open",
side_effect=OSError(5, "Input/output error", str(target)),
):
result = tool._run(file_path=str(target))
assert "secret.txt" in result
assert str(tmp_path) not in result
def test_file_read_tool_invalid_path_error_does_not_disclose_workspace(
tmp_path, monkeypatch
):
"""Validation errors should not echo the resolved workspace path."""
monkeypatch.chdir(tmp_path)
outside = tmp_path.parent / "outside.txt"
result = FileReadTool()._run(file_path=str(outside))
assert "Invalid file path" in result
assert "outside.txt" in result
assert str(tmp_path) not in result
assert str(tmp_path.parent) not in result

View File

@@ -47,6 +47,8 @@ def test_basic_file_write(tool, temp_env):
assert os.path.exists(path)
assert read_file(path) == temp_env["test_content"]
assert "successfully written" in result
assert temp_env["test_file"] in result
assert temp_env["temp_dir"] not in result
def test_directory_creation(tool, temp_env):
@@ -62,6 +64,8 @@ def test_directory_creation(tool, temp_env):
assert os.path.exists(new_dir)
assert os.path.exists(path)
assert "successfully written" in result
assert temp_env["test_file"] in result
assert new_dir not in result
@pytest.mark.parametrize(
@@ -134,6 +138,8 @@ def test_file_exists_error_handling(tool, temp_env, overwrite):
)
assert "already exists and overwrite option was not passed" in result
assert temp_env["test_file"] in result
assert temp_env["temp_dir"] not in result
assert read_file(path) == "Pre-existing content"

View File

@@ -7,6 +7,7 @@ import os
import pytest
from crewai_tools.security.safe_path import (
format_path_for_display,
validate_directory_path,
validate_file_path,
validate_url,
@@ -66,6 +67,37 @@ class TestValidateFilePath:
result = validate_file_path("/etc/passwd", str(tmp_path))
assert result == os.path.realpath("/etc/passwd")
def test_rejection_message_redacts_absolute_prefixes(self, tmp_path):
outside = tmp_path.parent / "outside.txt"
with pytest.raises(ValueError) as exc_info:
validate_file_path(str(outside), str(tmp_path))
message = str(exc_info.value)
assert "outside.txt" in message
assert str(tmp_path) not in message
assert str(tmp_path.parent) not in message
class TestFormatPathForDisplay:
"""Tests for user-visible path labels."""
def test_returns_relative_path_inside_base(self, tmp_path):
nested_file = tmp_path / "nested" / "file.txt"
nested_file.parent.mkdir()
nested_file.touch()
result = format_path_for_display(str(nested_file), str(tmp_path))
assert result == os.path.join("nested", "file.txt")
def test_redacts_absolute_prefix_outside_base(self, tmp_path):
outside_file = tmp_path.parent / "outside.txt"
result = format_path_for_display(str(outside_file), str(tmp_path))
assert result == "outside.txt"
class TestValidateDirectoryPath:
"""Tests for validate_directory_path."""