Compare commits

..

7 Commits

Author SHA1 Message Date
lorenzejay
bb823b047a Merge branch 'main' of github.com:crewAIInc/crewAI into lorenze/experimental-environment-tools 2026-01-12 09:49:53 -08:00
Koushiv
8f99fa76ed feat: additional a2a transports
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Check Documentation Broken Links / Check broken links (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
Co-authored-by: Koushiv Sadhukhan <koushiv.777@gmail.com>
Co-authored-by: Greyson LaLonde <greyson.r.lalonde@gmail.com>
2026-01-12 12:03:06 -05:00
GininDenis
17e3fcbe1f fix: unlink task in execution spans
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
Co-authored-by: Greyson LaLonde <greyson.r.lalonde@gmail.com>
2026-01-12 02:58:42 -05:00
lorenzejay
9d33706fd5 refactor: update allowed_paths default behavior in environment tools
Modified the default value of allowed_paths in BaseEnvironmentTool and EnvironmentTools to default to the current directory (".") instead of an empty list. This change enhances security by ensuring that operations are restricted to the current directory unless explicitly specified otherwise. Additionally, updated related tests to reflect this new default behavior and improved type hints for better clarity.
2026-01-08 16:33:13 -08:00
lorenzejay
4364503615 linted 2026-01-08 16:20:30 -08:00
lorenzejay
d0e4c356e1 refactor: improve code readability in environment tools
Refactored the file_search_tool.py, grep_tool.py, and list_dir_tool.py to enhance code readability. Changes include formatting list comprehensions for better clarity, adjusting error handling for consistency, and removing unnecessary imports. These improvements aim to streamline the codebase and maintain a clean structure for future development.
2026-01-08 16:13:07 -08:00
lorenzejay
6c2dfdff56 feat: add environment tools for file system operations
Introduced a new set of environment tools to enhance file system interactions within the CrewAI framework. This includes tools for reading files, searching for files by name patterns, and listing directory contents, all with built-in path security to prevent unauthorized access. The new tools are designed to facilitate context engineering for agents, improving their ability to interact with the file system effectively. Additionally, updated the experimental module's  to include these new tools in the public API.
2026-01-08 16:10:57 -08:00
15 changed files with 1208 additions and 249 deletions

View File

@@ -91,6 +91,10 @@ The `A2AConfig` class accepts the following parameters:
Update mechanism for receiving task status. Options: `StreamingConfig`, `PollingConfig`, or `PushNotificationConfig`.
</ParamField>
<ParamField path="transport_protocol" type="Literal['JSONRPC', 'GRPC', 'HTTP+JSON']" default="JSONRPC">
Transport protocol for A2A communication. Options: `JSONRPC` (default), `GRPC`, or `HTTP+JSON`.
</ParamField>
## Authentication
For A2A agents that require authentication, use one of the provided auth schemes:

View File

@@ -5,7 +5,7 @@ This module is separate from experimental.a2a to avoid circular imports.
from __future__ import annotations
from typing import Annotated, Any, ClassVar
from typing import Annotated, Any, ClassVar, Literal
from pydantic import (
BaseModel,
@@ -53,6 +53,7 @@ class A2AConfig(BaseModel):
fail_fast: If True, raise error when agent unreachable; if False, skip and continue.
trust_remote_completion_status: If True, return A2A agent's result directly when completed.
updates: Update mechanism config.
transport_protocol: A2A transport protocol (grpc, jsonrpc, http+json).
"""
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
@@ -82,3 +83,7 @@ class A2AConfig(BaseModel):
default_factory=_get_default_update_config,
description="Update mechanism config",
)
transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"] = Field(
default="JSONRPC",
description="Specified mode of A2A transport protocol",
)

View File

@@ -7,7 +7,7 @@ from collections.abc import AsyncIterator, MutableMapping
from contextlib import asynccontextmanager
from functools import lru_cache
import time
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, Literal
import uuid
from a2a.client import A2AClientHTTPError, Client, ClientConfig, ClientFactory
@@ -18,7 +18,6 @@ from a2a.types import (
PushNotificationConfig as A2APushNotificationConfig,
Role,
TextPart,
TransportProtocol,
)
from aiocache import cached # type: ignore[import-untyped]
from aiocache.serializers import PickleSerializer # type: ignore[import-untyped]
@@ -259,6 +258,7 @@ async def _afetch_agent_card_impl(
def execute_a2a_delegation(
endpoint: str,
transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"],
auth: AuthScheme | None,
timeout: int,
task_description: str,
@@ -282,6 +282,23 @@ def execute_a2a_delegation(
use aexecute_a2a_delegation directly.
Args:
endpoint: A2A agent endpoint URL (AgentCard URL)
transport_protocol: Optional A2A transport protocol (grpc, jsonrpc, http+json)
auth: Optional AuthScheme for authentication (Bearer, OAuth2, API Key, HTTP Basic/Digest)
timeout: Request timeout in seconds
task_description: The task to delegate
context: Optional context information
context_id: Context ID for correlating messages/tasks
task_id: Specific task identifier
reference_task_ids: List of related task IDs
metadata: Additional metadata (external_id, request_id, etc.)
extensions: Protocol extensions for custom fields
conversation_history: Previous Message objects from conversation
agent_id: Agent identifier for logging
agent_role: Role of the CrewAI agent delegating the task
agent_branch: Optional agent tree branch for logging
response_model: Optional Pydantic model for structured outputs
turn_number: Optional turn number for multi-turn conversations
endpoint: A2A agent endpoint URL.
auth: Optional AuthScheme for authentication.
timeout: Request timeout in seconds.
@@ -323,6 +340,7 @@ def execute_a2a_delegation(
agent_role=agent_role,
agent_branch=agent_branch,
response_model=response_model,
transport_protocol=transport_protocol,
turn_number=turn_number,
updates=updates,
)
@@ -333,6 +351,7 @@ def execute_a2a_delegation(
async def aexecute_a2a_delegation(
endpoint: str,
transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"],
auth: AuthScheme | None,
timeout: int,
task_description: str,
@@ -356,6 +375,23 @@ async def aexecute_a2a_delegation(
in an async context (e.g., with Crew.akickoff() or agent.aexecute_task()).
Args:
endpoint: A2A agent endpoint URL
transport_protocol: Optional A2A transport protocol (grpc, jsonrpc, http+json)
auth: Optional AuthScheme for authentication
timeout: Request timeout in seconds
task_description: Task to delegate
context: Optional context
context_id: Context ID for correlation
task_id: Specific task identifier
reference_task_ids: Related task IDs
metadata: Additional metadata
extensions: Protocol extensions
conversation_history: Previous Message objects
turn_number: Current turn number
agent_branch: Agent tree branch for logging
agent_id: Agent identifier for logging
agent_role: Agent role for logging
response_model: Optional Pydantic model for structured outputs
endpoint: A2A agent endpoint URL.
auth: Optional AuthScheme for authentication.
timeout: Request timeout in seconds.
@@ -414,6 +450,7 @@ async def aexecute_a2a_delegation(
agent_role=agent_role,
response_model=response_model,
updates=updates,
transport_protocol=transport_protocol,
)
crewai_event_bus.emit(
@@ -431,6 +468,7 @@ async def aexecute_a2a_delegation(
async def _aexecute_a2a_delegation_impl(
endpoint: str,
transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"],
auth: AuthScheme | None,
timeout: int,
task_description: str,
@@ -524,7 +562,6 @@ async def _aexecute_a2a_delegation_impl(
extensions=extensions,
)
transport_protocol = TransportProtocol("JSONRPC")
new_messages: list[Message] = [*conversation_history, message]
crewai_event_bus.emit(
None,
@@ -596,7 +633,7 @@ async def _aexecute_a2a_delegation_impl(
@asynccontextmanager
async def _create_a2a_client(
agent_card: AgentCard,
transport_protocol: TransportProtocol,
transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"],
timeout: int,
headers: MutableMapping[str, str],
streaming: bool,
@@ -640,7 +677,7 @@ async def _create_a2a_client(
config = ClientConfig(
httpx_client=httpx_client,
supported_transports=[str(transport_protocol.value)],
supported_transports=[transport_protocol],
streaming=streaming and not use_polling,
polling=use_polling,
accepted_output_modes=["application/json"],

View File

@@ -771,6 +771,7 @@ def _delegate_to_a2a(
response_model=agent_config.response_model,
turn_number=turn_num + 1,
updates=agent_config.updates,
transport_protocol=agent_config.transport_protocol,
)
conversation_history = a2a_result.get("history", [])
@@ -1085,6 +1086,7 @@ async def _adelegate_to_a2a(
agent_branch=agent_branch,
response_model=agent_config.response_model,
turn_number=turn_num + 1,
transport_protocol=agent_config.transport_protocol,
updates=agent_config.updates,
)

View File

@@ -1,4 +1,12 @@
from crewai.experimental.crew_agent_executor_flow import CrewAgentExecutorFlow
from crewai.experimental.environment_tools import (
BaseEnvironmentTool,
EnvironmentTools,
FileReadTool,
FileSearchTool,
GrepTool,
ListDirTool,
)
from crewai.experimental.evaluation import (
AgentEvaluationResult,
AgentEvaluator,
@@ -23,14 +31,20 @@ from crewai.experimental.evaluation import (
__all__ = [
"AgentEvaluationResult",
"AgentEvaluator",
"BaseEnvironmentTool",
"BaseEvaluator",
"CrewAgentExecutorFlow",
"EnvironmentTools",
"EvaluationScore",
"EvaluationTraceCallback",
"ExperimentResult",
"ExperimentResults",
"ExperimentRunner",
"FileReadTool",
"FileSearchTool",
"GoalAlignmentEvaluator",
"GrepTool",
"ListDirTool",
"MetricCategory",
"ParameterExtractionEvaluator",
"ReasoningEfficiencyEvaluator",

View File

@@ -0,0 +1,24 @@
"""Environment tools for file system operations.
These tools provide agents with the ability to explore and read from
the filesystem for context engineering purposes.
"""
from crewai.experimental.environment_tools.base_environment_tool import (
BaseEnvironmentTool,
)
from crewai.experimental.environment_tools.environment_tools import EnvironmentTools
from crewai.experimental.environment_tools.file_read_tool import FileReadTool
from crewai.experimental.environment_tools.file_search_tool import FileSearchTool
from crewai.experimental.environment_tools.grep_tool import GrepTool
from crewai.experimental.environment_tools.list_dir_tool import ListDirTool
__all__ = [
"BaseEnvironmentTool",
"EnvironmentTools",
"FileReadTool",
"FileSearchTool",
"GrepTool",
"ListDirTool",
]

View File

@@ -0,0 +1,84 @@
"""Base class for environment tools with path security."""
from __future__ import annotations
from pathlib import Path
from typing import Any
from pydantic import Field
from crewai.tools.base_tool import BaseTool
class BaseEnvironmentTool(BaseTool):
"""Base class for environment/file system tools with path security.
Provides path validation to restrict file operations to allowed directories.
This prevents path traversal attacks and enforces security sandboxing.
Attributes:
allowed_paths: List of paths that operations are restricted to.
Empty list means allow all paths (no restrictions).
"""
allowed_paths: list[str] = Field(
default_factory=lambda: ["."],
description="Restrict operations to these paths. Defaults to current directory.",
)
def _validate_path(self, path: str) -> tuple[bool, Path | str]:
"""Validate and resolve a path against allowed_paths whitelist.
Args:
path: The path to validate.
Returns:
A tuple of (is_valid, result) where:
- If valid: (True, resolved_path as Path)
- If invalid: (False, error_message as str)
"""
try:
resolved = Path(path).resolve()
# If no restrictions, allow all paths
if not self.allowed_paths:
return True, resolved
# Check if path is within any allowed path
for allowed in self.allowed_paths:
allowed_resolved = Path(allowed).resolve()
try:
# This will raise ValueError if resolved is not relative to allowed_resolved
resolved.relative_to(allowed_resolved)
return True, resolved
except ValueError:
continue
return (
False,
f"Path '{path}' is outside allowed paths: {self.allowed_paths}",
)
except Exception as e:
return False, f"Invalid path '{path}': {e}"
def _format_size(self, size: int) -> str:
"""Format file size in human-readable format.
Args:
size: Size in bytes.
Returns:
Human-readable size string (e.g., "1.5KB", "2.3MB").
"""
if size < 1024:
return f"{size}B"
if size < 1024 * 1024:
return f"{size / 1024:.1f}KB"
if size < 1024 * 1024 * 1024:
return f"{size / (1024 * 1024):.1f}MB"
return f"{size / (1024 * 1024 * 1024):.1f}GB"
def _run(self, *args: Any, **kwargs: Any) -> Any:
"""Subclasses must implement this method."""
raise NotImplementedError("Subclasses must implement _run method")

View File

@@ -0,0 +1,77 @@
"""Manager class for environment tools."""
from __future__ import annotations
from typing import TYPE_CHECKING
from crewai.experimental.environment_tools.file_read_tool import FileReadTool
from crewai.experimental.environment_tools.file_search_tool import FileSearchTool
from crewai.experimental.environment_tools.grep_tool import GrepTool
from crewai.experimental.environment_tools.list_dir_tool import ListDirTool
if TYPE_CHECKING:
from crewai.tools.base_tool import BaseTool
class EnvironmentTools:
"""Manager class for file system/environment tools.
Provides a convenient way to create a set of file system tools
with shared security configuration (allowed_paths).
Similar to AgentTools but for file system operations. Use this to
give agents the ability to explore and read files for context engineering.
Example:
from crewai.experimental import EnvironmentTools
# Create tools with security sandbox
env_tools = EnvironmentTools(
allowed_paths=["./src", "./docs"],
)
# Use with an agent
agent = Agent(
role="Code Analyst",
tools=env_tools.tools(),
)
"""
def __init__(
self,
allowed_paths: list[str] | None = None,
include_grep: bool = True,
include_search: bool = True,
) -> None:
"""Initialize EnvironmentTools.
Args:
allowed_paths: List of paths to restrict operations to.
Defaults to current directory ["."] if None.
Pass empty list [] to allow all paths (not recommended).
include_grep: Whether to include GrepTool (requires grep installed).
include_search: Whether to include FileSearchTool.
"""
self.allowed_paths = allowed_paths if allowed_paths is not None else ["."]
self.include_grep = include_grep
self.include_search = include_search
def tools(self) -> list[BaseTool]:
"""Get all configured environment tools.
Returns:
List of BaseTool instances with shared allowed_paths configuration.
"""
tool_list: list[BaseTool] = [
FileReadTool(allowed_paths=self.allowed_paths),
ListDirTool(allowed_paths=self.allowed_paths),
]
if self.include_grep:
tool_list.append(GrepTool(allowed_paths=self.allowed_paths))
if self.include_search:
tool_list.append(FileSearchTool(allowed_paths=self.allowed_paths))
return tool_list

View File

@@ -0,0 +1,124 @@
"""Tool for reading file contents."""
from __future__ import annotations
from pathlib import Path
from pydantic import BaseModel, Field
from crewai.experimental.environment_tools.base_environment_tool import (
BaseEnvironmentTool,
)
class FileReadInput(BaseModel):
"""Input schema for reading files."""
path: str = Field(..., description="Path to the file to read")
start_line: int | None = Field(
default=None,
description="Line to start reading from (1-indexed). If None, starts from beginning.",
)
line_count: int | None = Field(
default=None,
description="Number of lines to read. If None, reads to end of file.",
)
class FileReadTool(BaseEnvironmentTool):
"""Read contents of text files with optional line ranges.
Use this tool to:
- Read configuration files, source code, logs
- Inspect file contents before making decisions
- Load reference documentation or data files
Supports reading entire files or specific line ranges for efficiency.
"""
name: str = "read_file"
description: str = """Read the contents of a text file.
Use this to read configuration files, source code, logs, or any text file.
You can optionally specify start_line and line_count to read specific portions.
Examples:
- Read entire file: path="config.yaml"
- Read lines 100-149: path="large.log", start_line=100, line_count=50
"""
args_schema: type[BaseModel] = FileReadInput
def _run(
self,
path: str,
start_line: int | None = None,
line_count: int | None = None,
) -> str:
"""Read file contents with optional line range.
Args:
path: Path to the file to read.
start_line: Line to start reading from (1-indexed).
line_count: Number of lines to read.
Returns:
File contents with metadata header, or error message.
"""
# Validate path against allowed_paths
valid, result = self._validate_path(path)
if not valid:
return f"Error: {result}"
assert isinstance(result, Path) # noqa: S101
file_path = result
# Check file exists and is a file
if not file_path.exists():
return f"Error: File not found: {path}"
if not file_path.is_file():
return f"Error: Not a file: {path}"
try:
with open(file_path, "r", encoding="utf-8") as f:
if start_line is None and line_count is None:
# Read entire file
content = f.read()
else:
# Read specific line range
lines = f.readlines()
start_idx = (start_line or 1) - 1 # Convert to 0-indexed
start_idx = max(0, start_idx) # Ensure non-negative
if line_count is not None:
end_idx = start_idx + line_count
else:
end_idx = len(lines)
content = "".join(lines[start_idx:end_idx])
# Get file metadata
stat = file_path.stat()
total_lines = content.count("\n") + (
1 if content and not content.endswith("\n") else 0
)
# Format output with metadata header
header = f"File: {path}\n"
header += f"Size: {self._format_size(stat.st_size)} | Lines: {total_lines}"
if start_line is not None or line_count is not None:
header += (
f" | Range: {start_line or 1}-{(start_line or 1) + total_lines - 1}"
)
header += "\n" + "=" * 60 + "\n"
return header + content
except UnicodeDecodeError:
return f"Error: File is not a text file or has encoding issues: {path}"
except PermissionError:
return f"Error: Permission denied: {path}"
except Exception as e:
return f"Error reading file: {e}"

View File

@@ -0,0 +1,127 @@
"""Tool for finding files by name pattern."""
from __future__ import annotations
from typing import Literal
from pydantic import BaseModel, Field
from crewai.experimental.environment_tools.base_environment_tool import (
BaseEnvironmentTool,
)
class FileSearchInput(BaseModel):
"""Input schema for file search."""
pattern: str = Field(
...,
description="Filename pattern to search for (glob syntax, e.g., '*.py', 'test_*.py')",
)
path: str = Field(
default=".",
description="Directory to search in",
)
file_type: Literal["file", "dir", "all"] | None = Field(
default="all",
description="Filter by type: 'file' for files only, 'dir' for directories only, 'all' for both",
)
class FileSearchTool(BaseEnvironmentTool):
"""Find files by name pattern.
Use this tool to:
- Find specific files in a codebase
- Locate configuration files
- Search for files matching a pattern
"""
name: str = "find_files"
description: str = """Find files by name pattern using glob syntax.
Searches recursively through directories to find matching files.
Examples:
- Find Python files: pattern="*.py", path="src/"
- Find test files: pattern="test_*.py"
- Find configs: pattern="*.yaml", path="."
- Find directories only: pattern="*", file_type="dir"
"""
args_schema: type[BaseModel] = FileSearchInput
def _run(
self,
pattern: str,
path: str = ".",
file_type: Literal["file", "dir", "all"] | None = "all",
) -> str:
"""Find files matching a pattern.
Args:
pattern: Glob pattern for filenames.
path: Directory to search in.
file_type: Filter by type ('file', 'dir', or 'all').
Returns:
List of matching files or error message.
"""
# Validate path against allowed_paths
valid, result = self._validate_path(path)
if not valid:
return f"Error: {result}"
search_path = result
# Check directory exists
if not search_path.exists():
return f"Error: Directory not found: {path}"
if not search_path.is_dir():
return f"Error: Not a directory: {path}"
try:
# Find matching entries recursively
matches = list(search_path.rglob(pattern))
# Filter by type
if file_type == "file":
matches = [m for m in matches if m.is_file()]
elif file_type == "dir":
matches = [m for m in matches if m.is_dir()]
# Filter out hidden files
matches = [
m for m in matches if not any(part.startswith(".") for part in m.parts)
]
# Sort alphabetically
matches.sort(key=lambda x: str(x).lower())
if not matches:
return f"No {file_type if file_type != 'all' else 'files'} matching '{pattern}' found in {path}"
# Format output
result_lines = [f"Found {len(matches)} matches for '{pattern}' in {path}:"]
result_lines.append("=" * 60)
for match in matches:
# Get relative path from search directory
rel_path = match.relative_to(search_path)
if match.is_dir():
result_lines.append(f"📁 {rel_path}/")
else:
try:
size = match.stat().st_size
except (OSError, PermissionError):
continue # Skip files we can't stat
size_str = self._format_size(size)
result_lines.append(f"📄 {rel_path} ({size_str})")
return "\n".join(result_lines)
except PermissionError:
return f"Error: Permission denied: {path}"
except Exception as e:
return f"Error searching files: {e}"

View File

@@ -0,0 +1,149 @@
"""Tool for searching patterns in files using grep."""
from __future__ import annotations
import subprocess
from pydantic import BaseModel, Field
from crewai.experimental.environment_tools.base_environment_tool import (
BaseEnvironmentTool,
)
class GrepInput(BaseModel):
"""Input schema for grep search."""
pattern: str = Field(..., description="Search pattern (supports regex)")
path: str = Field(..., description="File or directory to search in")
recursive: bool = Field(
default=True,
description="Search recursively in directories",
)
ignore_case: bool = Field(
default=False,
description="Case-insensitive search",
)
context_lines: int = Field(
default=2,
description="Number of context lines to show before/after matches",
)
class GrepTool(BaseEnvironmentTool):
"""Search for text patterns in files using grep.
Use this tool to:
- Find where a function or class is defined
- Search for error messages in logs
- Locate configuration values
- Find TODO comments or specific patterns
"""
name: str = "grep_search"
description: str = """Search for text patterns in files using grep.
Supports regex patterns. Returns matching lines with context.
Examples:
- Find function: pattern="def process_data", path="src/"
- Search logs: pattern="ERROR", path="logs/app.log"
- Case-insensitive: pattern="todo", path=".", ignore_case=True
"""
args_schema: type[BaseModel] = GrepInput
def _run(
self,
pattern: str,
path: str,
recursive: bool = True,
ignore_case: bool = False,
context_lines: int = 2,
) -> str:
"""Search for patterns in files.
Args:
pattern: Search pattern (regex supported).
path: File or directory to search in.
recursive: Whether to search recursively.
ignore_case: Whether to ignore case.
context_lines: Lines of context around matches.
Returns:
Search results or error message.
"""
# Validate path against allowed_paths
valid, result = self._validate_path(path)
if not valid:
return f"Error: {result}"
search_path = result
# Check path exists
if not search_path.exists():
return f"Error: Path not found: {path}"
try:
# Build grep command safely
cmd = ["grep", "--color=never"]
# Add recursive flag if searching directory
if recursive and search_path.is_dir():
cmd.append("-r")
# Case insensitive
if ignore_case:
cmd.append("-i")
# Context lines
if context_lines > 0:
cmd.extend(["-C", str(context_lines)])
# Show line numbers
cmd.append("-n")
# Use -- to prevent pattern from being interpreted as option
cmd.append("--")
cmd.append(pattern)
cmd.append(str(search_path))
# Execute with timeout
# Security: cmd is a list (no shell injection), path is validated above
result = subprocess.run( # noqa: S603
cmd,
capture_output=True,
text=True,
timeout=30,
)
if result.returncode == 0:
# Found matches
output = result.stdout
# Count actual match lines (not context lines)
match_lines = [
line
for line in output.split("\n")
if line and not line.startswith("--")
]
match_count = len(match_lines)
header = f"Found {match_count} matches for '{pattern}' in {path}\n"
header += "=" * 60 + "\n"
return header + output
if result.returncode == 1:
# No matches found (grep returns 1 for no matches)
return f"No matches found for '{pattern}' in {path}"
# Error occurred
error_msg = result.stderr.strip() if result.stderr else "Unknown error"
return f"Error: {error_msg}"
except subprocess.TimeoutExpired:
return "Error: Search timed out (>30s). Try narrowing the search path."
except FileNotFoundError:
return (
"Error: grep command not found. Ensure grep is installed on the system."
)
except Exception as e:
return f"Error during search: {e}"

View File

@@ -0,0 +1,147 @@
"""Tool for listing directory contents."""
from __future__ import annotations
from pathlib import Path
from pydantic import BaseModel, Field
from crewai.experimental.environment_tools.base_environment_tool import (
BaseEnvironmentTool,
)
class ListDirInput(BaseModel):
"""Input schema for listing directories."""
path: str = Field(default=".", description="Directory path to list")
pattern: str | None = Field(
default=None,
description="Glob pattern to filter entries (e.g., '*.py', '*.md')",
)
recursive: bool = Field(
default=False,
description="If True, list contents recursively including subdirectories",
)
class ListDirTool(BaseEnvironmentTool):
"""List contents of a directory with optional filtering.
Use this tool to:
- Explore project structure
- Find specific file types
- Check what files exist in a directory
- Navigate the file system
"""
name: str = "list_directory"
description: str = """List contents of a directory.
Use this to explore directories and find files. You can filter by pattern
and optionally list recursively.
Examples:
- List current dir: path="."
- List src folder: path="src/"
- Find Python files: path=".", pattern="*.py"
- Recursive listing: path="src/", recursive=True
"""
args_schema: type[BaseModel] = ListDirInput
def _run(
self,
path: str = ".",
pattern: str | None = None,
recursive: bool = False,
) -> str:
"""List directory contents.
Args:
path: Directory path to list.
pattern: Glob pattern to filter entries.
recursive: Whether to list recursively.
Returns:
Formatted directory listing or error message.
"""
# Validate path against allowed_paths
valid, result = self._validate_path(path)
if not valid:
return f"Error: {result}"
assert isinstance(result, Path) # noqa: S101
dir_path = result
# Check directory exists
if not dir_path.exists():
return f"Error: Directory not found: {path}"
if not dir_path.is_dir():
return f"Error: Not a directory: {path}"
try:
# Get entries based on pattern and recursive flag
if pattern:
if recursive:
entries = list(dir_path.rglob(pattern))
else:
entries = list(dir_path.glob(pattern))
else:
if recursive:
entries = list(dir_path.rglob("*"))
else:
entries = list(dir_path.iterdir())
# Filter out hidden files (starting with .)
entries = [e for e in entries if not e.name.startswith(".")]
# Sort: directories first, then files, alphabetically
entries.sort(key=lambda x: (not x.is_dir(), x.name.lower()))
if not entries:
if pattern:
return f"No entries matching '{pattern}' in {path}"
return f"Directory is empty: {path}"
# Format output
result_lines = [f"Contents of {path}:"]
result_lines.append("=" * 60)
dirs = []
files = []
for entry in entries:
# Get relative path for recursive listings
if recursive:
display_name = str(entry.relative_to(dir_path))
else:
display_name = entry.name
if entry.is_dir():
dirs.append(f"📁 {display_name}/")
else:
try:
size = entry.stat().st_size
except (OSError, PermissionError):
continue # Skip files we can't stat
size_str = self._format_size(size)
files.append(f"📄 {display_name} ({size_str})")
# Output directories first, then files
if dirs:
result_lines.extend(dirs)
if files:
if dirs:
result_lines.append("") # Blank line between dirs and files
result_lines.extend(files)
result_lines.append("")
result_lines.append(f"Total: {len(dirs)} directories, {len(files)} files")
return "\n".join(result_lines)
except PermissionError:
return f"Error: Permission denied: {path}"
except Exception as e:
return f"Error listing directory: {e}"

View File

@@ -1,243 +0,0 @@
"""Tests for EventListener execution_spans cleanup to prevent memory leaks."""
import asyncio
from unittest.mock import MagicMock, patch
import pytest
from crewai.events.event_bus import crewai_event_bus
from crewai.events.event_listener import EventListener
from crewai.events.types.task_events import (
TaskCompletedEvent,
TaskFailedEvent,
TaskStartedEvent,
)
from crewai.tasks.task_output import TaskOutput
class MockAgent:
"""Mock agent for testing."""
def __init__(self, role: str = "test_role"):
self.role = role
self.crew = MagicMock()
class MockTask:
"""Mock task for testing."""
def __init__(self, task_id: str = "test_task"):
self.id = task_id
self.name = "Test Task"
self.description = "A test task description"
self.agent = MockAgent()
@pytest.fixture
def event_listener():
"""Create a fresh EventListener instance for testing."""
EventListener._instance = None
EventListener._initialized = False
listener = EventListener()
listener.setup_listeners(crewai_event_bus)
return listener
@pytest.fixture
def mock_task():
"""Create a mock task for testing."""
return MockTask()
@pytest.fixture
def mock_task_output():
"""Create a mock task output for testing."""
return TaskOutput(
description="Test task description",
raw="Test output",
agent="test_agent",
)
@pytest.mark.asyncio
async def test_execution_spans_removed_on_task_completed(
event_listener, mock_task, mock_task_output
):
"""Test that execution_spans entries are properly removed when a task completes.
This test verifies the fix for the memory leak where completed tasks were
setting execution_spans[source] = None instead of removing the key entirely.
"""
with patch.object(event_listener._telemetry, "task_started") as mock_task_started:
with patch.object(event_listener._telemetry, "task_ended"):
mock_span = MagicMock()
mock_task_started.return_value = mock_span
start_event = TaskStartedEvent(context="test context", task=mock_task)
future = crewai_event_bus.emit(mock_task, start_event)
if future:
await asyncio.wrap_future(future)
assert mock_task in event_listener.execution_spans
assert event_listener.execution_spans[mock_task] == mock_span
completed_event = TaskCompletedEvent(output=mock_task_output, task=mock_task)
future = crewai_event_bus.emit(mock_task, completed_event)
if future:
await asyncio.wrap_future(future)
assert mock_task not in event_listener.execution_spans
@pytest.mark.asyncio
async def test_execution_spans_removed_on_task_failed(event_listener, mock_task):
"""Test that execution_spans entries are properly removed when a task fails.
This test verifies the fix for the memory leak where failed tasks were
setting execution_spans[source] = None instead of removing the key entirely.
"""
with patch.object(event_listener._telemetry, "task_started") as mock_task_started:
with patch.object(event_listener._telemetry, "task_ended"):
mock_span = MagicMock()
mock_task_started.return_value = mock_span
start_event = TaskStartedEvent(context="test context", task=mock_task)
future = crewai_event_bus.emit(mock_task, start_event)
if future:
await asyncio.wrap_future(future)
assert mock_task in event_listener.execution_spans
assert event_listener.execution_spans[mock_task] == mock_span
failed_event = TaskFailedEvent(error="Test error", task=mock_task)
future = crewai_event_bus.emit(mock_task, failed_event)
if future:
await asyncio.wrap_future(future)
assert mock_task not in event_listener.execution_spans
@pytest.mark.asyncio
async def test_execution_spans_dict_size_does_not_grow_unbounded(
event_listener, mock_task_output
):
"""Test that execution_spans dictionary size remains bounded after many tasks.
This test simulates the memory leak scenario where many tasks complete/fail
and verifies that the dictionary doesn't grow unboundedly.
"""
num_tasks = 100
with patch.object(event_listener._telemetry, "task_started") as mock_task_started:
with patch.object(event_listener._telemetry, "task_ended"):
mock_task_started.return_value = MagicMock()
for i in range(num_tasks):
task = MockTask(task_id=f"task_{i}")
start_event = TaskStartedEvent(context="test context", task=task)
future = crewai_event_bus.emit(task, start_event)
if future:
await asyncio.wrap_future(future)
if i % 2 == 0:
completed_event = TaskCompletedEvent(
output=mock_task_output, task=task
)
future = crewai_event_bus.emit(task, completed_event)
else:
failed_event = TaskFailedEvent(error="Test error", task=task)
future = crewai_event_bus.emit(task, failed_event)
if future:
await asyncio.wrap_future(future)
assert len(event_listener.execution_spans) == 0
@pytest.mark.asyncio
async def test_execution_spans_handles_missing_task_gracefully(
event_listener, mock_task, mock_task_output
):
"""Test that completing/failing a task not in execution_spans doesn't cause errors.
This ensures the fix using pop(source, None) handles edge cases gracefully.
"""
with patch.object(event_listener._telemetry, "task_ended"):
assert mock_task not in event_listener.execution_spans
completed_event = TaskCompletedEvent(output=mock_task_output, task=mock_task)
future = crewai_event_bus.emit(mock_task, completed_event)
if future:
await asyncio.wrap_future(future)
assert mock_task not in event_listener.execution_spans
@pytest.mark.asyncio
async def test_execution_spans_handles_missing_task_on_failure_gracefully(
event_listener, mock_task
):
"""Test that failing a task not in execution_spans doesn't cause errors.
This ensures the fix using pop(source, None) handles edge cases gracefully.
"""
with patch.object(event_listener._telemetry, "task_ended"):
assert mock_task not in event_listener.execution_spans
failed_event = TaskFailedEvent(error="Test error", task=mock_task)
future = crewai_event_bus.emit(mock_task, failed_event)
if future:
await asyncio.wrap_future(future)
assert mock_task not in event_listener.execution_spans
@pytest.mark.asyncio
async def test_telemetry_task_ended_called_with_span_on_completion(
event_listener, mock_task, mock_task_output
):
"""Test that telemetry.task_ended is called with the correct span on completion."""
with patch.object(event_listener._telemetry, "task_started") as mock_task_started:
with patch.object(event_listener._telemetry, "task_ended") as mock_task_ended:
mock_span = MagicMock()
mock_task_started.return_value = mock_span
start_event = TaskStartedEvent(context="test context", task=mock_task)
future = crewai_event_bus.emit(mock_task, start_event)
if future:
await asyncio.wrap_future(future)
completed_event = TaskCompletedEvent(output=mock_task_output, task=mock_task)
future = crewai_event_bus.emit(mock_task, completed_event)
if future:
await asyncio.wrap_future(future)
mock_task_ended.assert_called_once_with(
mock_span, mock_task, mock_task.agent.crew
)
@pytest.mark.asyncio
async def test_telemetry_task_ended_called_with_span_on_failure(
event_listener, mock_task
):
"""Test that telemetry.task_ended is called with the correct span on failure."""
with patch.object(event_listener._telemetry, "task_started") as mock_task_started:
with patch.object(event_listener._telemetry, "task_ended") as mock_task_ended:
mock_span = MagicMock()
mock_task_started.return_value = mock_span
start_event = TaskStartedEvent(context="test context", task=mock_task)
future = crewai_event_bus.emit(mock_task, start_event)
if future:
await asyncio.wrap_future(future)
failed_event = TaskFailedEvent(error="Test error", task=mock_task)
future = crewai_event_bus.emit(mock_task, failed_event)
if future:
await asyncio.wrap_future(future)
mock_task_ended.assert_called_once_with(
mock_span, mock_task, mock_task.agent.crew
)

View File

@@ -0,0 +1,408 @@
"""Tests for experimental environment tools."""
from __future__ import annotations
import os
import tempfile
from collections.abc import Generator
from pathlib import Path
import pytest
from crewai.experimental.environment_tools import (
BaseEnvironmentTool,
EnvironmentTools,
FileReadTool,
FileSearchTool,
GrepTool,
ListDirTool,
)
# ============================================================================
# Fixtures
# ============================================================================
@pytest.fixture
def temp_dir() -> Generator[str, None, None]:
"""Create a temporary directory with test files."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create test files
test_file = Path(tmpdir) / "test.txt"
test_file.write_text("Line 1\nLine 2\nLine 3\nLine 4\nLine 5\n")
python_file = Path(tmpdir) / "example.py"
python_file.write_text("def hello():\n print('Hello World')\n")
# Create subdirectory with files
subdir = Path(tmpdir) / "subdir"
subdir.mkdir()
(subdir / "nested.txt").write_text("Nested content\n")
(subdir / "another.py").write_text("# Another Python file\n")
yield tmpdir
@pytest.fixture
def restricted_temp_dir() -> Generator[tuple[str, str], None, None]:
"""Create two directories - one allowed, one not."""
with tempfile.TemporaryDirectory() as allowed_dir:
with tempfile.TemporaryDirectory() as forbidden_dir:
# Create files in both
(Path(allowed_dir) / "allowed.txt").write_text("Allowed content\n")
(Path(forbidden_dir) / "forbidden.txt").write_text("Forbidden content\n")
yield allowed_dir, forbidden_dir
# ============================================================================
# BaseEnvironmentTool Tests
# ============================================================================
class TestBaseEnvironmentTool:
"""Tests for BaseEnvironmentTool path validation."""
def test_default_allowed_paths_is_current_directory(self) -> None:
"""Default allowed_paths should be current directory for security."""
tool = FileReadTool()
assert tool.allowed_paths == ["."]
def test_validate_path_explicit_no_restrictions(self, temp_dir: str) -> None:
"""With explicit empty allowed_paths, all paths should be allowed."""
tool = FileReadTool(allowed_paths=[])
valid, result = tool._validate_path(temp_dir)
assert valid is True
assert isinstance(result, Path)
def test_validate_path_within_allowed(self, temp_dir: str) -> None:
"""Paths within allowed_paths should be valid."""
tool = FileReadTool(allowed_paths=[temp_dir])
test_file = os.path.join(temp_dir, "test.txt")
valid, result = tool._validate_path(test_file)
assert valid is True
assert isinstance(result, Path)
def test_validate_path_outside_allowed(self, restricted_temp_dir: tuple[str, str]) -> None:
"""Paths outside allowed_paths should be rejected."""
allowed_dir, forbidden_dir = restricted_temp_dir
tool = FileReadTool(allowed_paths=[allowed_dir])
forbidden_file = os.path.join(forbidden_dir, "forbidden.txt")
valid, result = tool._validate_path(forbidden_file)
assert valid is False
assert isinstance(result, str)
assert "outside allowed paths" in result
def test_format_size(self) -> None:
"""Test human-readable size formatting."""
tool = FileReadTool()
assert tool._format_size(500) == "500B"
assert tool._format_size(1024) == "1.0KB"
assert tool._format_size(1536) == "1.5KB"
assert tool._format_size(1024 * 1024) == "1.0MB"
assert tool._format_size(1024 * 1024 * 1024) == "1.0GB"
# ============================================================================
# FileReadTool Tests
# ============================================================================
class TestFileReadTool:
"""Tests for FileReadTool."""
def test_read_entire_file(self, temp_dir: str) -> None:
"""Should read entire file contents."""
tool = FileReadTool(allowed_paths=[temp_dir])
test_file = os.path.join(temp_dir, "test.txt")
result = tool._run(path=test_file)
assert "Line 1" in result
assert "Line 2" in result
assert "Line 5" in result
assert "File:" in result # Metadata header
def test_read_with_line_range(self, temp_dir: str) -> None:
"""Should read specific line range."""
tool = FileReadTool(allowed_paths=[temp_dir])
test_file = os.path.join(temp_dir, "test.txt")
result = tool._run(path=test_file, start_line=2, line_count=2)
assert "Line 2" in result
assert "Line 3" in result
# Should not include lines outside range
assert "Line 1" not in result.split("=" * 60)[-1] # Check content after header
def test_read_file_not_found(self, temp_dir: str) -> None:
"""Should return error for missing file."""
tool = FileReadTool(allowed_paths=[temp_dir])
missing_file = os.path.join(temp_dir, "nonexistent.txt")
result = tool._run(path=missing_file)
assert "Error: File not found" in result
def test_read_file_path_restricted(self, restricted_temp_dir: tuple[str, str]) -> None:
"""Should reject paths outside allowed_paths."""
allowed_dir, forbidden_dir = restricted_temp_dir
tool = FileReadTool(allowed_paths=[allowed_dir])
forbidden_file = os.path.join(forbidden_dir, "forbidden.txt")
result = tool._run(path=forbidden_file)
assert "Error:" in result
assert "outside allowed paths" in result
# ============================================================================
# ListDirTool Tests
# ============================================================================
class TestListDirTool:
"""Tests for ListDirTool."""
def test_list_directory(self, temp_dir: str) -> None:
"""Should list directory contents."""
tool = ListDirTool(allowed_paths=[temp_dir])
result = tool._run(path=temp_dir)
assert "test.txt" in result
assert "example.py" in result
assert "subdir" in result
assert "Total:" in result
def test_list_with_pattern(self, temp_dir: str) -> None:
"""Should filter by pattern."""
tool = ListDirTool(allowed_paths=[temp_dir])
result = tool._run(path=temp_dir, pattern="*.py")
assert "example.py" in result
assert "test.txt" not in result
def test_list_recursive(self, temp_dir: str) -> None:
"""Should list recursively when enabled."""
tool = ListDirTool(allowed_paths=[temp_dir])
result = tool._run(path=temp_dir, recursive=True)
assert "nested.txt" in result
assert "another.py" in result
def test_list_nonexistent_directory(self, temp_dir: str) -> None:
"""Should return error for missing directory."""
tool = ListDirTool(allowed_paths=[temp_dir])
result = tool._run(path=os.path.join(temp_dir, "nonexistent"))
assert "Error: Directory not found" in result
def test_list_path_restricted(self, restricted_temp_dir: tuple[str, str]) -> None:
"""Should reject paths outside allowed_paths."""
allowed_dir, forbidden_dir = restricted_temp_dir
tool = ListDirTool(allowed_paths=[allowed_dir])
result = tool._run(path=forbidden_dir)
assert "Error:" in result
assert "outside allowed paths" in result
# ============================================================================
# GrepTool Tests
# ============================================================================
class TestGrepTool:
"""Tests for GrepTool."""
def test_grep_finds_pattern(self, temp_dir: str) -> None:
"""Should find matching patterns."""
tool = GrepTool(allowed_paths=[temp_dir])
test_file = os.path.join(temp_dir, "test.txt")
result = tool._run(pattern="Line 2", path=test_file)
assert "Line 2" in result
assert "matches" in result.lower() or "found" in result.lower()
def test_grep_no_matches(self, temp_dir: str) -> None:
"""Should report when no matches found."""
tool = GrepTool(allowed_paths=[temp_dir])
test_file = os.path.join(temp_dir, "test.txt")
result = tool._run(pattern="nonexistent pattern xyz", path=test_file)
assert "No matches found" in result
def test_grep_recursive(self, temp_dir: str) -> None:
"""Should search recursively in directories."""
tool = GrepTool(allowed_paths=[temp_dir])
result = tool._run(pattern="Nested", path=temp_dir, recursive=True)
assert "Nested" in result
def test_grep_case_insensitive(self, temp_dir: str) -> None:
"""Should support case-insensitive search."""
tool = GrepTool(allowed_paths=[temp_dir])
test_file = os.path.join(temp_dir, "test.txt")
result = tool._run(pattern="LINE", path=test_file, ignore_case=True)
assert "Line" in result or "matches" in result.lower()
def test_grep_path_restricted(self, restricted_temp_dir: tuple[str, str]) -> None:
"""Should reject paths outside allowed_paths."""
allowed_dir, forbidden_dir = restricted_temp_dir
tool = GrepTool(allowed_paths=[allowed_dir])
result = tool._run(pattern="test", path=forbidden_dir)
assert "Error:" in result
assert "outside allowed paths" in result
# ============================================================================
# FileSearchTool Tests
# ============================================================================
class TestFileSearchTool:
"""Tests for FileSearchTool."""
def test_find_files_by_pattern(self, temp_dir: str) -> None:
"""Should find files matching pattern."""
tool = FileSearchTool(allowed_paths=[temp_dir])
result = tool._run(pattern="*.py", path=temp_dir)
assert "example.py" in result
assert "another.py" in result
def test_find_no_matches(self, temp_dir: str) -> None:
"""Should report when no files match."""
tool = FileSearchTool(allowed_paths=[temp_dir])
result = tool._run(pattern="*.xyz", path=temp_dir)
assert "No" in result and "found" in result
def test_find_files_only(self, temp_dir: str) -> None:
"""Should filter to files only."""
tool = FileSearchTool(allowed_paths=[temp_dir])
result = tool._run(pattern="*", path=temp_dir, file_type="file")
# Should include files
assert "test.txt" in result or "example.py" in result
# Directories should have trailing slash in output
# Check that subdir is not listed as a file
def test_find_dirs_only(self, temp_dir: str) -> None:
"""Should filter to directories only."""
tool = FileSearchTool(allowed_paths=[temp_dir])
result = tool._run(pattern="*", path=temp_dir, file_type="dir")
assert "subdir" in result
def test_find_path_restricted(self, restricted_temp_dir: tuple[str, str]) -> None:
"""Should reject paths outside allowed_paths."""
allowed_dir, forbidden_dir = restricted_temp_dir
tool = FileSearchTool(allowed_paths=[allowed_dir])
result = tool._run(pattern="*", path=forbidden_dir)
assert "Error:" in result
assert "outside allowed paths" in result
# ============================================================================
# EnvironmentTools Manager Tests
# ============================================================================
class TestEnvironmentTools:
"""Tests for EnvironmentTools manager class."""
def test_default_allowed_paths_is_current_directory(self) -> None:
"""Default should restrict to current directory for security."""
env_tools = EnvironmentTools()
tools = env_tools.tools()
# All tools should default to current directory
for tool in tools:
assert isinstance(tool, BaseEnvironmentTool)
assert tool.allowed_paths == ["."]
def test_explicit_empty_allowed_paths_allows_all(self) -> None:
"""Passing empty list should allow all paths."""
env_tools = EnvironmentTools(allowed_paths=[])
tools = env_tools.tools()
for tool in tools:
assert isinstance(tool, BaseEnvironmentTool)
assert tool.allowed_paths == []
def test_returns_all_tools_by_default(self) -> None:
"""Should return all four tools by default."""
env_tools = EnvironmentTools()
tools = env_tools.tools()
assert len(tools) == 4
tool_names = [t.name for t in tools]
assert "read_file" in tool_names
assert "list_directory" in tool_names
assert "grep_search" in tool_names
assert "find_files" in tool_names
def test_exclude_grep(self) -> None:
"""Should exclude grep tool when disabled."""
env_tools = EnvironmentTools(include_grep=False)
tools = env_tools.tools()
assert len(tools) == 3
tool_names = [t.name for t in tools]
assert "grep_search" not in tool_names
def test_exclude_search(self) -> None:
"""Should exclude search tool when disabled."""
env_tools = EnvironmentTools(include_search=False)
tools = env_tools.tools()
assert len(tools) == 3
tool_names = [t.name for t in tools]
assert "find_files" not in tool_names
def test_allowed_paths_propagated(self, temp_dir: str) -> None:
"""Should propagate allowed_paths to all tools."""
env_tools = EnvironmentTools(allowed_paths=[temp_dir])
tools = env_tools.tools()
for tool in tools:
assert isinstance(tool, BaseEnvironmentTool)
assert tool.allowed_paths == [temp_dir]
def test_tools_are_base_tool_instances(self) -> None:
"""All returned tools should be BaseTool instances."""
from crewai.tools.base_tool import BaseTool
env_tools = EnvironmentTools()
tools = env_tools.tools()
for tool in tools:
assert isinstance(tool, BaseTool)