From 9d33706fd51afb2c85809d6d1870f01de96e01b2 Mon Sep 17 00:00:00 2001 From: lorenzejay Date: Thu, 8 Jan 2026 16:33:13 -0800 Subject: [PATCH] refactor: update allowed_paths default behavior in environment tools Modified the default value of allowed_paths in BaseEnvironmentTool and EnvironmentTools to default to the current directory (".") instead of an empty list. This change enhances security by ensuring that operations are restricted to the current directory unless explicitly specified otherwise. Additionally, updated related tests to reflect this new default behavior and improved type hints for better clarity. --- .../base_environment_tool.py | 4 +- .../environment_tools/environment_tools.py | 5 +- .../environment_tools/file_read_tool.py | 3 + .../environment_tools/list_dir_tool.py | 3 + .../test_environment_tools.py | 123 +++++++++++------- 5 files changed, 88 insertions(+), 50 deletions(-) diff --git a/lib/crewai/src/crewai/experimental/environment_tools/base_environment_tool.py b/lib/crewai/src/crewai/experimental/environment_tools/base_environment_tool.py index 5f81a56a8..8986cfa05 100644 --- a/lib/crewai/src/crewai/experimental/environment_tools/base_environment_tool.py +++ b/lib/crewai/src/crewai/experimental/environment_tools/base_environment_tool.py @@ -22,8 +22,8 @@ class BaseEnvironmentTool(BaseTool): """ allowed_paths: list[str] = Field( - default_factory=list, - description="Restrict operations to these paths. Empty list allows all.", + default_factory=lambda: ["."], + description="Restrict operations to these paths. Defaults to current directory.", ) def _validate_path(self, path: str) -> tuple[bool, Path | str]: diff --git a/lib/crewai/src/crewai/experimental/environment_tools/environment_tools.py b/lib/crewai/src/crewai/experimental/environment_tools/environment_tools.py index 9f8eec2c3..30a984c75 100644 --- a/lib/crewai/src/crewai/experimental/environment_tools/environment_tools.py +++ b/lib/crewai/src/crewai/experimental/environment_tools/environment_tools.py @@ -48,11 +48,12 @@ class EnvironmentTools: Args: allowed_paths: List of paths to restrict operations to. - Empty or None allows access to all paths. + Defaults to current directory ["."] if None. + Pass empty list [] to allow all paths (not recommended). include_grep: Whether to include GrepTool (requires grep installed). include_search: Whether to include FileSearchTool. """ - self.allowed_paths = allowed_paths or [] + self.allowed_paths = allowed_paths if allowed_paths is not None else ["."] self.include_grep = include_grep self.include_search = include_search diff --git a/lib/crewai/src/crewai/experimental/environment_tools/file_read_tool.py b/lib/crewai/src/crewai/experimental/environment_tools/file_read_tool.py index 5cdfec37a..63ecafe32 100644 --- a/lib/crewai/src/crewai/experimental/environment_tools/file_read_tool.py +++ b/lib/crewai/src/crewai/experimental/environment_tools/file_read_tool.py @@ -2,6 +2,8 @@ from __future__ import annotations +from pathlib import Path + from pydantic import BaseModel, Field from crewai.experimental.environment_tools.base_environment_tool import ( @@ -67,6 +69,7 @@ Examples: if not valid: return f"Error: {result}" + assert isinstance(result, Path) # noqa: S101 file_path = result # Check file exists and is a file diff --git a/lib/crewai/src/crewai/experimental/environment_tools/list_dir_tool.py b/lib/crewai/src/crewai/experimental/environment_tools/list_dir_tool.py index 0bb4e7cae..f4adde842 100644 --- a/lib/crewai/src/crewai/experimental/environment_tools/list_dir_tool.py +++ b/lib/crewai/src/crewai/experimental/environment_tools/list_dir_tool.py @@ -2,6 +2,8 @@ from __future__ import annotations +from pathlib import Path + from pydantic import BaseModel, Field from crewai.experimental.environment_tools.base_environment_tool import ( @@ -68,6 +70,7 @@ Examples: if not valid: return f"Error: {result}" + assert isinstance(result, Path) # noqa: S101 dir_path = result # Check directory exists diff --git a/lib/crewai/tests/experimental/environment_tools/test_environment_tools.py b/lib/crewai/tests/experimental/environment_tools/test_environment_tools.py index 61b23a1a8..7407ae7f2 100644 --- a/lib/crewai/tests/experimental/environment_tools/test_environment_tools.py +++ b/lib/crewai/tests/experimental/environment_tools/test_environment_tools.py @@ -1,12 +1,16 @@ """Tests for experimental environment tools.""" +from __future__ import annotations + import os import tempfile +from collections.abc import Generator from pathlib import Path import pytest from crewai.experimental.environment_tools import ( + BaseEnvironmentTool, EnvironmentTools, FileReadTool, FileSearchTool, @@ -21,7 +25,7 @@ from crewai.experimental.environment_tools import ( @pytest.fixture -def temp_dir(): +def temp_dir() -> Generator[str, None, None]: """Create a temporary directory with test files.""" with tempfile.TemporaryDirectory() as tmpdir: # Create test files @@ -41,7 +45,7 @@ def temp_dir(): @pytest.fixture -def restricted_temp_dir(): +def restricted_temp_dir() -> Generator[tuple[str, str], None, None]: """Create two directories - one allowed, one not.""" with tempfile.TemporaryDirectory() as allowed_dir: with tempfile.TemporaryDirectory() as forbidden_dir: @@ -60,15 +64,21 @@ def restricted_temp_dir(): class TestBaseEnvironmentTool: """Tests for BaseEnvironmentTool path validation.""" - def test_validate_path_no_restrictions(self, temp_dir): - """With no allowed_paths, all paths should be allowed.""" + def test_default_allowed_paths_is_current_directory(self) -> None: + """Default allowed_paths should be current directory for security.""" + tool = FileReadTool() + + assert tool.allowed_paths == ["."] + + def test_validate_path_explicit_no_restrictions(self, temp_dir: str) -> None: + """With explicit empty allowed_paths, all paths should be allowed.""" tool = FileReadTool(allowed_paths=[]) valid, result = tool._validate_path(temp_dir) assert valid is True assert isinstance(result, Path) - def test_validate_path_within_allowed(self, temp_dir): + def test_validate_path_within_allowed(self, temp_dir: str) -> None: """Paths within allowed_paths should be valid.""" tool = FileReadTool(allowed_paths=[temp_dir]) test_file = os.path.join(temp_dir, "test.txt") @@ -78,7 +88,7 @@ class TestBaseEnvironmentTool: assert valid is True assert isinstance(result, Path) - def test_validate_path_outside_allowed(self, restricted_temp_dir): + def test_validate_path_outside_allowed(self, restricted_temp_dir: tuple[str, str]) -> None: """Paths outside allowed_paths should be rejected.""" allowed_dir, forbidden_dir = restricted_temp_dir tool = FileReadTool(allowed_paths=[allowed_dir]) @@ -87,9 +97,10 @@ class TestBaseEnvironmentTool: valid, result = tool._validate_path(forbidden_file) assert valid is False + assert isinstance(result, str) assert "outside allowed paths" in result - def test_format_size(self): + def test_format_size(self) -> None: """Test human-readable size formatting.""" tool = FileReadTool() @@ -108,9 +119,9 @@ class TestBaseEnvironmentTool: class TestFileReadTool: """Tests for FileReadTool.""" - def test_read_entire_file(self, temp_dir): + def test_read_entire_file(self, temp_dir: str) -> None: """Should read entire file contents.""" - tool = FileReadTool() + tool = FileReadTool(allowed_paths=[temp_dir]) test_file = os.path.join(temp_dir, "test.txt") result = tool._run(path=test_file) @@ -120,9 +131,9 @@ class TestFileReadTool: assert "Line 5" in result assert "File:" in result # Metadata header - def test_read_with_line_range(self, temp_dir): + def test_read_with_line_range(self, temp_dir: str) -> None: """Should read specific line range.""" - tool = FileReadTool() + tool = FileReadTool(allowed_paths=[temp_dir]) test_file = os.path.join(temp_dir, "test.txt") result = tool._run(path=test_file, start_line=2, line_count=2) @@ -132,16 +143,16 @@ class TestFileReadTool: # Should not include lines outside range assert "Line 1" not in result.split("=" * 60)[-1] # Check content after header - def test_read_file_not_found(self, temp_dir): + def test_read_file_not_found(self, temp_dir: str) -> None: """Should return error for missing file.""" - tool = FileReadTool() + tool = FileReadTool(allowed_paths=[temp_dir]) missing_file = os.path.join(temp_dir, "nonexistent.txt") result = tool._run(path=missing_file) assert "Error: File not found" in result - def test_read_file_path_restricted(self, restricted_temp_dir): + def test_read_file_path_restricted(self, restricted_temp_dir: tuple[str, str]) -> None: """Should reject paths outside allowed_paths.""" allowed_dir, forbidden_dir = restricted_temp_dir tool = FileReadTool(allowed_paths=[allowed_dir]) @@ -161,9 +172,9 @@ class TestFileReadTool: class TestListDirTool: """Tests for ListDirTool.""" - def test_list_directory(self, temp_dir): + def test_list_directory(self, temp_dir: str) -> None: """Should list directory contents.""" - tool = ListDirTool() + tool = ListDirTool(allowed_paths=[temp_dir]) result = tool._run(path=temp_dir) @@ -172,33 +183,33 @@ class TestListDirTool: assert "subdir" in result assert "Total:" in result - def test_list_with_pattern(self, temp_dir): + def test_list_with_pattern(self, temp_dir: str) -> None: """Should filter by pattern.""" - tool = ListDirTool() + tool = ListDirTool(allowed_paths=[temp_dir]) result = tool._run(path=temp_dir, pattern="*.py") assert "example.py" in result assert "test.txt" not in result - def test_list_recursive(self, temp_dir): + def test_list_recursive(self, temp_dir: str) -> None: """Should list recursively when enabled.""" - tool = ListDirTool() + tool = ListDirTool(allowed_paths=[temp_dir]) result = tool._run(path=temp_dir, recursive=True) assert "nested.txt" in result assert "another.py" in result - def test_list_nonexistent_directory(self, temp_dir): + def test_list_nonexistent_directory(self, temp_dir: str) -> None: """Should return error for missing directory.""" - tool = ListDirTool() + tool = ListDirTool(allowed_paths=[temp_dir]) result = tool._run(path=os.path.join(temp_dir, "nonexistent")) assert "Error: Directory not found" in result - def test_list_path_restricted(self, restricted_temp_dir): + def test_list_path_restricted(self, restricted_temp_dir: tuple[str, str]) -> None: """Should reject paths outside allowed_paths.""" allowed_dir, forbidden_dir = restricted_temp_dir tool = ListDirTool(allowed_paths=[allowed_dir]) @@ -217,9 +228,9 @@ class TestListDirTool: class TestGrepTool: """Tests for GrepTool.""" - def test_grep_finds_pattern(self, temp_dir): + def test_grep_finds_pattern(self, temp_dir: str) -> None: """Should find matching patterns.""" - tool = GrepTool() + tool = GrepTool(allowed_paths=[temp_dir]) test_file = os.path.join(temp_dir, "test.txt") result = tool._run(pattern="Line 2", path=test_file) @@ -227,33 +238,33 @@ class TestGrepTool: assert "Line 2" in result assert "matches" in result.lower() or "found" in result.lower() - def test_grep_no_matches(self, temp_dir): + def test_grep_no_matches(self, temp_dir: str) -> None: """Should report when no matches found.""" - tool = GrepTool() + tool = GrepTool(allowed_paths=[temp_dir]) test_file = os.path.join(temp_dir, "test.txt") result = tool._run(pattern="nonexistent pattern xyz", path=test_file) assert "No matches found" in result - def test_grep_recursive(self, temp_dir): + def test_grep_recursive(self, temp_dir: str) -> None: """Should search recursively in directories.""" - tool = GrepTool() + tool = GrepTool(allowed_paths=[temp_dir]) result = tool._run(pattern="Nested", path=temp_dir, recursive=True) assert "Nested" in result - def test_grep_case_insensitive(self, temp_dir): + def test_grep_case_insensitive(self, temp_dir: str) -> None: """Should support case-insensitive search.""" - tool = GrepTool() + tool = GrepTool(allowed_paths=[temp_dir]) test_file = os.path.join(temp_dir, "test.txt") result = tool._run(pattern="LINE", path=test_file, ignore_case=True) assert "Line" in result or "matches" in result.lower() - def test_grep_path_restricted(self, restricted_temp_dir): + def test_grep_path_restricted(self, restricted_temp_dir: tuple[str, str]) -> None: """Should reject paths outside allowed_paths.""" allowed_dir, forbidden_dir = restricted_temp_dir tool = GrepTool(allowed_paths=[allowed_dir]) @@ -272,26 +283,26 @@ class TestGrepTool: class TestFileSearchTool: """Tests for FileSearchTool.""" - def test_find_files_by_pattern(self, temp_dir): + def test_find_files_by_pattern(self, temp_dir: str) -> None: """Should find files matching pattern.""" - tool = FileSearchTool() + tool = FileSearchTool(allowed_paths=[temp_dir]) result = tool._run(pattern="*.py", path=temp_dir) assert "example.py" in result assert "another.py" in result - def test_find_no_matches(self, temp_dir): + def test_find_no_matches(self, temp_dir: str) -> None: """Should report when no files match.""" - tool = FileSearchTool() + tool = FileSearchTool(allowed_paths=[temp_dir]) result = tool._run(pattern="*.xyz", path=temp_dir) assert "No" in result and "found" in result - def test_find_files_only(self, temp_dir): + def test_find_files_only(self, temp_dir: str) -> None: """Should filter to files only.""" - tool = FileSearchTool() + tool = FileSearchTool(allowed_paths=[temp_dir]) result = tool._run(pattern="*", path=temp_dir, file_type="file") @@ -300,15 +311,15 @@ class TestFileSearchTool: # Directories should have trailing slash in output # Check that subdir is not listed as a file - def test_find_dirs_only(self, temp_dir): + def test_find_dirs_only(self, temp_dir: str) -> None: """Should filter to directories only.""" - tool = FileSearchTool() + tool = FileSearchTool(allowed_paths=[temp_dir]) result = tool._run(pattern="*", path=temp_dir, file_type="dir") assert "subdir" in result - def test_find_path_restricted(self, restricted_temp_dir): + def test_find_path_restricted(self, restricted_temp_dir: tuple[str, str]) -> None: """Should reject paths outside allowed_paths.""" allowed_dir, forbidden_dir = restricted_temp_dir tool = FileSearchTool(allowed_paths=[allowed_dir]) @@ -327,7 +338,26 @@ class TestFileSearchTool: class TestEnvironmentTools: """Tests for EnvironmentTools manager class.""" - def test_returns_all_tools_by_default(self): + def test_default_allowed_paths_is_current_directory(self) -> None: + """Default should restrict to current directory for security.""" + env_tools = EnvironmentTools() + tools = env_tools.tools() + + # All tools should default to current directory + for tool in tools: + assert isinstance(tool, BaseEnvironmentTool) + assert tool.allowed_paths == ["."] + + def test_explicit_empty_allowed_paths_allows_all(self) -> None: + """Passing empty list should allow all paths.""" + env_tools = EnvironmentTools(allowed_paths=[]) + tools = env_tools.tools() + + for tool in tools: + assert isinstance(tool, BaseEnvironmentTool) + assert tool.allowed_paths == [] + + def test_returns_all_tools_by_default(self) -> None: """Should return all four tools by default.""" env_tools = EnvironmentTools() tools = env_tools.tools() @@ -340,7 +370,7 @@ class TestEnvironmentTools: assert "grep_search" in tool_names assert "find_files" in tool_names - def test_exclude_grep(self): + def test_exclude_grep(self) -> None: """Should exclude grep tool when disabled.""" env_tools = EnvironmentTools(include_grep=False) tools = env_tools.tools() @@ -349,7 +379,7 @@ class TestEnvironmentTools: tool_names = [t.name for t in tools] assert "grep_search" not in tool_names - def test_exclude_search(self): + def test_exclude_search(self) -> None: """Should exclude search tool when disabled.""" env_tools = EnvironmentTools(include_search=False) tools = env_tools.tools() @@ -358,15 +388,16 @@ class TestEnvironmentTools: tool_names = [t.name for t in tools] assert "find_files" not in tool_names - def test_allowed_paths_propagated(self, temp_dir): + def test_allowed_paths_propagated(self, temp_dir: str) -> None: """Should propagate allowed_paths to all tools.""" env_tools = EnvironmentTools(allowed_paths=[temp_dir]) tools = env_tools.tools() for tool in tools: + assert isinstance(tool, BaseEnvironmentTool) assert tool.allowed_paths == [temp_dir] - def test_tools_are_base_tool_instances(self): + def test_tools_are_base_tool_instances(self) -> None: """All returned tools should be BaseTool instances.""" from crewai.tools.base_tool import BaseTool