Compare commits

..

2 Commits

Author SHA1 Message Date
Devin AI
6cfc105d54 chore: re-trigger CI
Co-Authored-By: João <joao@crewai.com>
2026-01-12 06:19:45 +00:00
Devin AI
703e0f6191 fix: memory leak in execution_spans dictionary
Fix memory leak in EventListener where completed/failed tasks were never
fully removed from the execution_spans dictionary. Instead of removing
entries, the code was setting them to None, causing task objects to
remain referenced indefinitely and preventing garbage collection.

Changes:
- Replace 'self.execution_spans[source] = None' with
  'self.execution_spans.pop(source, None)' in both TaskCompletedEvent
  and TaskFailedEvent handlers
- Add comprehensive tests to verify execution_spans cleanup behavior

This fixes unbounded memory growth in long-running processes or systems
executing many tasks.

Fixes #4222

Co-Authored-By: João <joao@crewai.com>
2026-01-12 06:16:23 +00:00
15 changed files with 249 additions and 1208 deletions

View File

@@ -91,10 +91,6 @@ The `A2AConfig` class accepts the following parameters:
Update mechanism for receiving task status. Options: `StreamingConfig`, `PollingConfig`, or `PushNotificationConfig`.
</ParamField>
<ParamField path="transport_protocol" type="Literal['JSONRPC', 'GRPC', 'HTTP+JSON']" default="JSONRPC">
Transport protocol for A2A communication. Options: `JSONRPC` (default), `GRPC`, or `HTTP+JSON`.
</ParamField>
## Authentication
For A2A agents that require authentication, use one of the provided auth schemes:

View File

@@ -5,7 +5,7 @@ This module is separate from experimental.a2a to avoid circular imports.
from __future__ import annotations
from typing import Annotated, Any, ClassVar, Literal
from typing import Annotated, Any, ClassVar
from pydantic import (
BaseModel,
@@ -53,7 +53,6 @@ class A2AConfig(BaseModel):
fail_fast: If True, raise error when agent unreachable; if False, skip and continue.
trust_remote_completion_status: If True, return A2A agent's result directly when completed.
updates: Update mechanism config.
transport_protocol: A2A transport protocol (grpc, jsonrpc, http+json).
"""
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
@@ -83,7 +82,3 @@ class A2AConfig(BaseModel):
default_factory=_get_default_update_config,
description="Update mechanism config",
)
transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"] = Field(
default="JSONRPC",
description="Specified mode of A2A transport protocol",
)

View File

@@ -7,7 +7,7 @@ from collections.abc import AsyncIterator, MutableMapping
from contextlib import asynccontextmanager
from functools import lru_cache
import time
from typing import TYPE_CHECKING, Any, Literal
from typing import TYPE_CHECKING, Any
import uuid
from a2a.client import A2AClientHTTPError, Client, ClientConfig, ClientFactory
@@ -18,6 +18,7 @@ from a2a.types import (
PushNotificationConfig as A2APushNotificationConfig,
Role,
TextPart,
TransportProtocol,
)
from aiocache import cached # type: ignore[import-untyped]
from aiocache.serializers import PickleSerializer # type: ignore[import-untyped]
@@ -258,7 +259,6 @@ async def _afetch_agent_card_impl(
def execute_a2a_delegation(
endpoint: str,
transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"],
auth: AuthScheme | None,
timeout: int,
task_description: str,
@@ -282,23 +282,6 @@ def execute_a2a_delegation(
use aexecute_a2a_delegation directly.
Args:
endpoint: A2A agent endpoint URL (AgentCard URL)
transport_protocol: Optional A2A transport protocol (grpc, jsonrpc, http+json)
auth: Optional AuthScheme for authentication (Bearer, OAuth2, API Key, HTTP Basic/Digest)
timeout: Request timeout in seconds
task_description: The task to delegate
context: Optional context information
context_id: Context ID for correlating messages/tasks
task_id: Specific task identifier
reference_task_ids: List of related task IDs
metadata: Additional metadata (external_id, request_id, etc.)
extensions: Protocol extensions for custom fields
conversation_history: Previous Message objects from conversation
agent_id: Agent identifier for logging
agent_role: Role of the CrewAI agent delegating the task
agent_branch: Optional agent tree branch for logging
response_model: Optional Pydantic model for structured outputs
turn_number: Optional turn number for multi-turn conversations
endpoint: A2A agent endpoint URL.
auth: Optional AuthScheme for authentication.
timeout: Request timeout in seconds.
@@ -340,7 +323,6 @@ def execute_a2a_delegation(
agent_role=agent_role,
agent_branch=agent_branch,
response_model=response_model,
transport_protocol=transport_protocol,
turn_number=turn_number,
updates=updates,
)
@@ -351,7 +333,6 @@ def execute_a2a_delegation(
async def aexecute_a2a_delegation(
endpoint: str,
transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"],
auth: AuthScheme | None,
timeout: int,
task_description: str,
@@ -375,23 +356,6 @@ async def aexecute_a2a_delegation(
in an async context (e.g., with Crew.akickoff() or agent.aexecute_task()).
Args:
endpoint: A2A agent endpoint URL
transport_protocol: Optional A2A transport protocol (grpc, jsonrpc, http+json)
auth: Optional AuthScheme for authentication
timeout: Request timeout in seconds
task_description: Task to delegate
context: Optional context
context_id: Context ID for correlation
task_id: Specific task identifier
reference_task_ids: Related task IDs
metadata: Additional metadata
extensions: Protocol extensions
conversation_history: Previous Message objects
turn_number: Current turn number
agent_branch: Agent tree branch for logging
agent_id: Agent identifier for logging
agent_role: Agent role for logging
response_model: Optional Pydantic model for structured outputs
endpoint: A2A agent endpoint URL.
auth: Optional AuthScheme for authentication.
timeout: Request timeout in seconds.
@@ -450,7 +414,6 @@ async def aexecute_a2a_delegation(
agent_role=agent_role,
response_model=response_model,
updates=updates,
transport_protocol=transport_protocol,
)
crewai_event_bus.emit(
@@ -468,7 +431,6 @@ async def aexecute_a2a_delegation(
async def _aexecute_a2a_delegation_impl(
endpoint: str,
transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"],
auth: AuthScheme | None,
timeout: int,
task_description: str,
@@ -562,6 +524,7 @@ async def _aexecute_a2a_delegation_impl(
extensions=extensions,
)
transport_protocol = TransportProtocol("JSONRPC")
new_messages: list[Message] = [*conversation_history, message]
crewai_event_bus.emit(
None,
@@ -633,7 +596,7 @@ async def _aexecute_a2a_delegation_impl(
@asynccontextmanager
async def _create_a2a_client(
agent_card: AgentCard,
transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"],
transport_protocol: TransportProtocol,
timeout: int,
headers: MutableMapping[str, str],
streaming: bool,
@@ -677,7 +640,7 @@ async def _create_a2a_client(
config = ClientConfig(
httpx_client=httpx_client,
supported_transports=[transport_protocol],
supported_transports=[str(transport_protocol.value)],
streaming=streaming and not use_polling,
polling=use_polling,
accepted_output_modes=["application/json"],

View File

@@ -771,7 +771,6 @@ def _delegate_to_a2a(
response_model=agent_config.response_model,
turn_number=turn_num + 1,
updates=agent_config.updates,
transport_protocol=agent_config.transport_protocol,
)
conversation_history = a2a_result.get("history", [])
@@ -1086,7 +1085,6 @@ async def _adelegate_to_a2a(
agent_branch=agent_branch,
response_model=agent_config.response_model,
turn_number=turn_num + 1,
transport_protocol=agent_config.transport_protocol,
updates=agent_config.updates,
)

View File

@@ -1,12 +1,4 @@
from crewai.experimental.crew_agent_executor_flow import CrewAgentExecutorFlow
from crewai.experimental.environment_tools import (
BaseEnvironmentTool,
EnvironmentTools,
FileReadTool,
FileSearchTool,
GrepTool,
ListDirTool,
)
from crewai.experimental.evaluation import (
AgentEvaluationResult,
AgentEvaluator,
@@ -31,20 +23,14 @@ from crewai.experimental.evaluation import (
__all__ = [
"AgentEvaluationResult",
"AgentEvaluator",
"BaseEnvironmentTool",
"BaseEvaluator",
"CrewAgentExecutorFlow",
"EnvironmentTools",
"EvaluationScore",
"EvaluationTraceCallback",
"ExperimentResult",
"ExperimentResults",
"ExperimentRunner",
"FileReadTool",
"FileSearchTool",
"GoalAlignmentEvaluator",
"GrepTool",
"ListDirTool",
"MetricCategory",
"ParameterExtractionEvaluator",
"ReasoningEfficiencyEvaluator",

View File

@@ -1,24 +0,0 @@
"""Environment tools for file system operations.
These tools provide agents with the ability to explore and read from
the filesystem for context engineering purposes.
"""
from crewai.experimental.environment_tools.base_environment_tool import (
BaseEnvironmentTool,
)
from crewai.experimental.environment_tools.environment_tools import EnvironmentTools
from crewai.experimental.environment_tools.file_read_tool import FileReadTool
from crewai.experimental.environment_tools.file_search_tool import FileSearchTool
from crewai.experimental.environment_tools.grep_tool import GrepTool
from crewai.experimental.environment_tools.list_dir_tool import ListDirTool
__all__ = [
"BaseEnvironmentTool",
"EnvironmentTools",
"FileReadTool",
"FileSearchTool",
"GrepTool",
"ListDirTool",
]

View File

@@ -1,84 +0,0 @@
"""Base class for environment tools with path security."""
from __future__ import annotations
from pathlib import Path
from typing import Any
from pydantic import Field
from crewai.tools.base_tool import BaseTool
class BaseEnvironmentTool(BaseTool):
"""Base class for environment/file system tools with path security.
Provides path validation to restrict file operations to allowed directories.
This prevents path traversal attacks and enforces security sandboxing.
Attributes:
allowed_paths: List of paths that operations are restricted to.
Empty list means allow all paths (no restrictions).
"""
allowed_paths: list[str] = Field(
default_factory=lambda: ["."],
description="Restrict operations to these paths. Defaults to current directory.",
)
def _validate_path(self, path: str) -> tuple[bool, Path | str]:
"""Validate and resolve a path against allowed_paths whitelist.
Args:
path: The path to validate.
Returns:
A tuple of (is_valid, result) where:
- If valid: (True, resolved_path as Path)
- If invalid: (False, error_message as str)
"""
try:
resolved = Path(path).resolve()
# If no restrictions, allow all paths
if not self.allowed_paths:
return True, resolved
# Check if path is within any allowed path
for allowed in self.allowed_paths:
allowed_resolved = Path(allowed).resolve()
try:
# This will raise ValueError if resolved is not relative to allowed_resolved
resolved.relative_to(allowed_resolved)
return True, resolved
except ValueError:
continue
return (
False,
f"Path '{path}' is outside allowed paths: {self.allowed_paths}",
)
except Exception as e:
return False, f"Invalid path '{path}': {e}"
def _format_size(self, size: int) -> str:
"""Format file size in human-readable format.
Args:
size: Size in bytes.
Returns:
Human-readable size string (e.g., "1.5KB", "2.3MB").
"""
if size < 1024:
return f"{size}B"
if size < 1024 * 1024:
return f"{size / 1024:.1f}KB"
if size < 1024 * 1024 * 1024:
return f"{size / (1024 * 1024):.1f}MB"
return f"{size / (1024 * 1024 * 1024):.1f}GB"
def _run(self, *args: Any, **kwargs: Any) -> Any:
"""Subclasses must implement this method."""
raise NotImplementedError("Subclasses must implement _run method")

View File

@@ -1,77 +0,0 @@
"""Manager class for environment tools."""
from __future__ import annotations
from typing import TYPE_CHECKING
from crewai.experimental.environment_tools.file_read_tool import FileReadTool
from crewai.experimental.environment_tools.file_search_tool import FileSearchTool
from crewai.experimental.environment_tools.grep_tool import GrepTool
from crewai.experimental.environment_tools.list_dir_tool import ListDirTool
if TYPE_CHECKING:
from crewai.tools.base_tool import BaseTool
class EnvironmentTools:
"""Manager class for file system/environment tools.
Provides a convenient way to create a set of file system tools
with shared security configuration (allowed_paths).
Similar to AgentTools but for file system operations. Use this to
give agents the ability to explore and read files for context engineering.
Example:
from crewai.experimental import EnvironmentTools
# Create tools with security sandbox
env_tools = EnvironmentTools(
allowed_paths=["./src", "./docs"],
)
# Use with an agent
agent = Agent(
role="Code Analyst",
tools=env_tools.tools(),
)
"""
def __init__(
self,
allowed_paths: list[str] | None = None,
include_grep: bool = True,
include_search: bool = True,
) -> None:
"""Initialize EnvironmentTools.
Args:
allowed_paths: List of paths to restrict operations to.
Defaults to current directory ["."] if None.
Pass empty list [] to allow all paths (not recommended).
include_grep: Whether to include GrepTool (requires grep installed).
include_search: Whether to include FileSearchTool.
"""
self.allowed_paths = allowed_paths if allowed_paths is not None else ["."]
self.include_grep = include_grep
self.include_search = include_search
def tools(self) -> list[BaseTool]:
"""Get all configured environment tools.
Returns:
List of BaseTool instances with shared allowed_paths configuration.
"""
tool_list: list[BaseTool] = [
FileReadTool(allowed_paths=self.allowed_paths),
ListDirTool(allowed_paths=self.allowed_paths),
]
if self.include_grep:
tool_list.append(GrepTool(allowed_paths=self.allowed_paths))
if self.include_search:
tool_list.append(FileSearchTool(allowed_paths=self.allowed_paths))
return tool_list

View File

@@ -1,124 +0,0 @@
"""Tool for reading file contents."""
from __future__ import annotations
from pathlib import Path
from pydantic import BaseModel, Field
from crewai.experimental.environment_tools.base_environment_tool import (
BaseEnvironmentTool,
)
class FileReadInput(BaseModel):
"""Input schema for reading files."""
path: str = Field(..., description="Path to the file to read")
start_line: int | None = Field(
default=None,
description="Line to start reading from (1-indexed). If None, starts from beginning.",
)
line_count: int | None = Field(
default=None,
description="Number of lines to read. If None, reads to end of file.",
)
class FileReadTool(BaseEnvironmentTool):
"""Read contents of text files with optional line ranges.
Use this tool to:
- Read configuration files, source code, logs
- Inspect file contents before making decisions
- Load reference documentation or data files
Supports reading entire files or specific line ranges for efficiency.
"""
name: str = "read_file"
description: str = """Read the contents of a text file.
Use this to read configuration files, source code, logs, or any text file.
You can optionally specify start_line and line_count to read specific portions.
Examples:
- Read entire file: path="config.yaml"
- Read lines 100-149: path="large.log", start_line=100, line_count=50
"""
args_schema: type[BaseModel] = FileReadInput
def _run(
self,
path: str,
start_line: int | None = None,
line_count: int | None = None,
) -> str:
"""Read file contents with optional line range.
Args:
path: Path to the file to read.
start_line: Line to start reading from (1-indexed).
line_count: Number of lines to read.
Returns:
File contents with metadata header, or error message.
"""
# Validate path against allowed_paths
valid, result = self._validate_path(path)
if not valid:
return f"Error: {result}"
assert isinstance(result, Path) # noqa: S101
file_path = result
# Check file exists and is a file
if not file_path.exists():
return f"Error: File not found: {path}"
if not file_path.is_file():
return f"Error: Not a file: {path}"
try:
with open(file_path, "r", encoding="utf-8") as f:
if start_line is None and line_count is None:
# Read entire file
content = f.read()
else:
# Read specific line range
lines = f.readlines()
start_idx = (start_line or 1) - 1 # Convert to 0-indexed
start_idx = max(0, start_idx) # Ensure non-negative
if line_count is not None:
end_idx = start_idx + line_count
else:
end_idx = len(lines)
content = "".join(lines[start_idx:end_idx])
# Get file metadata
stat = file_path.stat()
total_lines = content.count("\n") + (
1 if content and not content.endswith("\n") else 0
)
# Format output with metadata header
header = f"File: {path}\n"
header += f"Size: {self._format_size(stat.st_size)} | Lines: {total_lines}"
if start_line is not None or line_count is not None:
header += (
f" | Range: {start_line or 1}-{(start_line or 1) + total_lines - 1}"
)
header += "\n" + "=" * 60 + "\n"
return header + content
except UnicodeDecodeError:
return f"Error: File is not a text file or has encoding issues: {path}"
except PermissionError:
return f"Error: Permission denied: {path}"
except Exception as e:
return f"Error reading file: {e}"

View File

@@ -1,127 +0,0 @@
"""Tool for finding files by name pattern."""
from __future__ import annotations
from typing import Literal
from pydantic import BaseModel, Field
from crewai.experimental.environment_tools.base_environment_tool import (
BaseEnvironmentTool,
)
class FileSearchInput(BaseModel):
"""Input schema for file search."""
pattern: str = Field(
...,
description="Filename pattern to search for (glob syntax, e.g., '*.py', 'test_*.py')",
)
path: str = Field(
default=".",
description="Directory to search in",
)
file_type: Literal["file", "dir", "all"] | None = Field(
default="all",
description="Filter by type: 'file' for files only, 'dir' for directories only, 'all' for both",
)
class FileSearchTool(BaseEnvironmentTool):
"""Find files by name pattern.
Use this tool to:
- Find specific files in a codebase
- Locate configuration files
- Search for files matching a pattern
"""
name: str = "find_files"
description: str = """Find files by name pattern using glob syntax.
Searches recursively through directories to find matching files.
Examples:
- Find Python files: pattern="*.py", path="src/"
- Find test files: pattern="test_*.py"
- Find configs: pattern="*.yaml", path="."
- Find directories only: pattern="*", file_type="dir"
"""
args_schema: type[BaseModel] = FileSearchInput
def _run(
self,
pattern: str,
path: str = ".",
file_type: Literal["file", "dir", "all"] | None = "all",
) -> str:
"""Find files matching a pattern.
Args:
pattern: Glob pattern for filenames.
path: Directory to search in.
file_type: Filter by type ('file', 'dir', or 'all').
Returns:
List of matching files or error message.
"""
# Validate path against allowed_paths
valid, result = self._validate_path(path)
if not valid:
return f"Error: {result}"
search_path = result
# Check directory exists
if not search_path.exists():
return f"Error: Directory not found: {path}"
if not search_path.is_dir():
return f"Error: Not a directory: {path}"
try:
# Find matching entries recursively
matches = list(search_path.rglob(pattern))
# Filter by type
if file_type == "file":
matches = [m for m in matches if m.is_file()]
elif file_type == "dir":
matches = [m for m in matches if m.is_dir()]
# Filter out hidden files
matches = [
m for m in matches if not any(part.startswith(".") for part in m.parts)
]
# Sort alphabetically
matches.sort(key=lambda x: str(x).lower())
if not matches:
return f"No {file_type if file_type != 'all' else 'files'} matching '{pattern}' found in {path}"
# Format output
result_lines = [f"Found {len(matches)} matches for '{pattern}' in {path}:"]
result_lines.append("=" * 60)
for match in matches:
# Get relative path from search directory
rel_path = match.relative_to(search_path)
if match.is_dir():
result_lines.append(f"📁 {rel_path}/")
else:
try:
size = match.stat().st_size
except (OSError, PermissionError):
continue # Skip files we can't stat
size_str = self._format_size(size)
result_lines.append(f"📄 {rel_path} ({size_str})")
return "\n".join(result_lines)
except PermissionError:
return f"Error: Permission denied: {path}"
except Exception as e:
return f"Error searching files: {e}"

View File

@@ -1,149 +0,0 @@
"""Tool for searching patterns in files using grep."""
from __future__ import annotations
import subprocess
from pydantic import BaseModel, Field
from crewai.experimental.environment_tools.base_environment_tool import (
BaseEnvironmentTool,
)
class GrepInput(BaseModel):
"""Input schema for grep search."""
pattern: str = Field(..., description="Search pattern (supports regex)")
path: str = Field(..., description="File or directory to search in")
recursive: bool = Field(
default=True,
description="Search recursively in directories",
)
ignore_case: bool = Field(
default=False,
description="Case-insensitive search",
)
context_lines: int = Field(
default=2,
description="Number of context lines to show before/after matches",
)
class GrepTool(BaseEnvironmentTool):
"""Search for text patterns in files using grep.
Use this tool to:
- Find where a function or class is defined
- Search for error messages in logs
- Locate configuration values
- Find TODO comments or specific patterns
"""
name: str = "grep_search"
description: str = """Search for text patterns in files using grep.
Supports regex patterns. Returns matching lines with context.
Examples:
- Find function: pattern="def process_data", path="src/"
- Search logs: pattern="ERROR", path="logs/app.log"
- Case-insensitive: pattern="todo", path=".", ignore_case=True
"""
args_schema: type[BaseModel] = GrepInput
def _run(
self,
pattern: str,
path: str,
recursive: bool = True,
ignore_case: bool = False,
context_lines: int = 2,
) -> str:
"""Search for patterns in files.
Args:
pattern: Search pattern (regex supported).
path: File or directory to search in.
recursive: Whether to search recursively.
ignore_case: Whether to ignore case.
context_lines: Lines of context around matches.
Returns:
Search results or error message.
"""
# Validate path against allowed_paths
valid, result = self._validate_path(path)
if not valid:
return f"Error: {result}"
search_path = result
# Check path exists
if not search_path.exists():
return f"Error: Path not found: {path}"
try:
# Build grep command safely
cmd = ["grep", "--color=never"]
# Add recursive flag if searching directory
if recursive and search_path.is_dir():
cmd.append("-r")
# Case insensitive
if ignore_case:
cmd.append("-i")
# Context lines
if context_lines > 0:
cmd.extend(["-C", str(context_lines)])
# Show line numbers
cmd.append("-n")
# Use -- to prevent pattern from being interpreted as option
cmd.append("--")
cmd.append(pattern)
cmd.append(str(search_path))
# Execute with timeout
# Security: cmd is a list (no shell injection), path is validated above
result = subprocess.run( # noqa: S603
cmd,
capture_output=True,
text=True,
timeout=30,
)
if result.returncode == 0:
# Found matches
output = result.stdout
# Count actual match lines (not context lines)
match_lines = [
line
for line in output.split("\n")
if line and not line.startswith("--")
]
match_count = len(match_lines)
header = f"Found {match_count} matches for '{pattern}' in {path}\n"
header += "=" * 60 + "\n"
return header + output
if result.returncode == 1:
# No matches found (grep returns 1 for no matches)
return f"No matches found for '{pattern}' in {path}"
# Error occurred
error_msg = result.stderr.strip() if result.stderr else "Unknown error"
return f"Error: {error_msg}"
except subprocess.TimeoutExpired:
return "Error: Search timed out (>30s). Try narrowing the search path."
except FileNotFoundError:
return (
"Error: grep command not found. Ensure grep is installed on the system."
)
except Exception as e:
return f"Error during search: {e}"

View File

@@ -1,147 +0,0 @@
"""Tool for listing directory contents."""
from __future__ import annotations
from pathlib import Path
from pydantic import BaseModel, Field
from crewai.experimental.environment_tools.base_environment_tool import (
BaseEnvironmentTool,
)
class ListDirInput(BaseModel):
"""Input schema for listing directories."""
path: str = Field(default=".", description="Directory path to list")
pattern: str | None = Field(
default=None,
description="Glob pattern to filter entries (e.g., '*.py', '*.md')",
)
recursive: bool = Field(
default=False,
description="If True, list contents recursively including subdirectories",
)
class ListDirTool(BaseEnvironmentTool):
"""List contents of a directory with optional filtering.
Use this tool to:
- Explore project structure
- Find specific file types
- Check what files exist in a directory
- Navigate the file system
"""
name: str = "list_directory"
description: str = """List contents of a directory.
Use this to explore directories and find files. You can filter by pattern
and optionally list recursively.
Examples:
- List current dir: path="."
- List src folder: path="src/"
- Find Python files: path=".", pattern="*.py"
- Recursive listing: path="src/", recursive=True
"""
args_schema: type[BaseModel] = ListDirInput
def _run(
self,
path: str = ".",
pattern: str | None = None,
recursive: bool = False,
) -> str:
"""List directory contents.
Args:
path: Directory path to list.
pattern: Glob pattern to filter entries.
recursive: Whether to list recursively.
Returns:
Formatted directory listing or error message.
"""
# Validate path against allowed_paths
valid, result = self._validate_path(path)
if not valid:
return f"Error: {result}"
assert isinstance(result, Path) # noqa: S101
dir_path = result
# Check directory exists
if not dir_path.exists():
return f"Error: Directory not found: {path}"
if not dir_path.is_dir():
return f"Error: Not a directory: {path}"
try:
# Get entries based on pattern and recursive flag
if pattern:
if recursive:
entries = list(dir_path.rglob(pattern))
else:
entries = list(dir_path.glob(pattern))
else:
if recursive:
entries = list(dir_path.rglob("*"))
else:
entries = list(dir_path.iterdir())
# Filter out hidden files (starting with .)
entries = [e for e in entries if not e.name.startswith(".")]
# Sort: directories first, then files, alphabetically
entries.sort(key=lambda x: (not x.is_dir(), x.name.lower()))
if not entries:
if pattern:
return f"No entries matching '{pattern}' in {path}"
return f"Directory is empty: {path}"
# Format output
result_lines = [f"Contents of {path}:"]
result_lines.append("=" * 60)
dirs = []
files = []
for entry in entries:
# Get relative path for recursive listings
if recursive:
display_name = str(entry.relative_to(dir_path))
else:
display_name = entry.name
if entry.is_dir():
dirs.append(f"📁 {display_name}/")
else:
try:
size = entry.stat().st_size
except (OSError, PermissionError):
continue # Skip files we can't stat
size_str = self._format_size(size)
files.append(f"📄 {display_name} ({size_str})")
# Output directories first, then files
if dirs:
result_lines.extend(dirs)
if files:
if dirs:
result_lines.append("") # Blank line between dirs and files
result_lines.extend(files)
result_lines.append("")
result_lines.append(f"Total: {len(dirs)} directories, {len(files)} files")
return "\n".join(result_lines)
except PermissionError:
return f"Error: Permission denied: {path}"
except Exception as e:
return f"Error listing directory: {e}"

View File

@@ -0,0 +1,243 @@
"""Tests for EventListener execution_spans cleanup to prevent memory leaks."""
import asyncio
from unittest.mock import MagicMock, patch
import pytest
from crewai.events.event_bus import crewai_event_bus
from crewai.events.event_listener import EventListener
from crewai.events.types.task_events import (
TaskCompletedEvent,
TaskFailedEvent,
TaskStartedEvent,
)
from crewai.tasks.task_output import TaskOutput
class MockAgent:
"""Mock agent for testing."""
def __init__(self, role: str = "test_role"):
self.role = role
self.crew = MagicMock()
class MockTask:
"""Mock task for testing."""
def __init__(self, task_id: str = "test_task"):
self.id = task_id
self.name = "Test Task"
self.description = "A test task description"
self.agent = MockAgent()
@pytest.fixture
def event_listener():
"""Create a fresh EventListener instance for testing."""
EventListener._instance = None
EventListener._initialized = False
listener = EventListener()
listener.setup_listeners(crewai_event_bus)
return listener
@pytest.fixture
def mock_task():
"""Create a mock task for testing."""
return MockTask()
@pytest.fixture
def mock_task_output():
"""Create a mock task output for testing."""
return TaskOutput(
description="Test task description",
raw="Test output",
agent="test_agent",
)
@pytest.mark.asyncio
async def test_execution_spans_removed_on_task_completed(
event_listener, mock_task, mock_task_output
):
"""Test that execution_spans entries are properly removed when a task completes.
This test verifies the fix for the memory leak where completed tasks were
setting execution_spans[source] = None instead of removing the key entirely.
"""
with patch.object(event_listener._telemetry, "task_started") as mock_task_started:
with patch.object(event_listener._telemetry, "task_ended"):
mock_span = MagicMock()
mock_task_started.return_value = mock_span
start_event = TaskStartedEvent(context="test context", task=mock_task)
future = crewai_event_bus.emit(mock_task, start_event)
if future:
await asyncio.wrap_future(future)
assert mock_task in event_listener.execution_spans
assert event_listener.execution_spans[mock_task] == mock_span
completed_event = TaskCompletedEvent(output=mock_task_output, task=mock_task)
future = crewai_event_bus.emit(mock_task, completed_event)
if future:
await asyncio.wrap_future(future)
assert mock_task not in event_listener.execution_spans
@pytest.mark.asyncio
async def test_execution_spans_removed_on_task_failed(event_listener, mock_task):
"""Test that execution_spans entries are properly removed when a task fails.
This test verifies the fix for the memory leak where failed tasks were
setting execution_spans[source] = None instead of removing the key entirely.
"""
with patch.object(event_listener._telemetry, "task_started") as mock_task_started:
with patch.object(event_listener._telemetry, "task_ended"):
mock_span = MagicMock()
mock_task_started.return_value = mock_span
start_event = TaskStartedEvent(context="test context", task=mock_task)
future = crewai_event_bus.emit(mock_task, start_event)
if future:
await asyncio.wrap_future(future)
assert mock_task in event_listener.execution_spans
assert event_listener.execution_spans[mock_task] == mock_span
failed_event = TaskFailedEvent(error="Test error", task=mock_task)
future = crewai_event_bus.emit(mock_task, failed_event)
if future:
await asyncio.wrap_future(future)
assert mock_task not in event_listener.execution_spans
@pytest.mark.asyncio
async def test_execution_spans_dict_size_does_not_grow_unbounded(
event_listener, mock_task_output
):
"""Test that execution_spans dictionary size remains bounded after many tasks.
This test simulates the memory leak scenario where many tasks complete/fail
and verifies that the dictionary doesn't grow unboundedly.
"""
num_tasks = 100
with patch.object(event_listener._telemetry, "task_started") as mock_task_started:
with patch.object(event_listener._telemetry, "task_ended"):
mock_task_started.return_value = MagicMock()
for i in range(num_tasks):
task = MockTask(task_id=f"task_{i}")
start_event = TaskStartedEvent(context="test context", task=task)
future = crewai_event_bus.emit(task, start_event)
if future:
await asyncio.wrap_future(future)
if i % 2 == 0:
completed_event = TaskCompletedEvent(
output=mock_task_output, task=task
)
future = crewai_event_bus.emit(task, completed_event)
else:
failed_event = TaskFailedEvent(error="Test error", task=task)
future = crewai_event_bus.emit(task, failed_event)
if future:
await asyncio.wrap_future(future)
assert len(event_listener.execution_spans) == 0
@pytest.mark.asyncio
async def test_execution_spans_handles_missing_task_gracefully(
event_listener, mock_task, mock_task_output
):
"""Test that completing/failing a task not in execution_spans doesn't cause errors.
This ensures the fix using pop(source, None) handles edge cases gracefully.
"""
with patch.object(event_listener._telemetry, "task_ended"):
assert mock_task not in event_listener.execution_spans
completed_event = TaskCompletedEvent(output=mock_task_output, task=mock_task)
future = crewai_event_bus.emit(mock_task, completed_event)
if future:
await asyncio.wrap_future(future)
assert mock_task not in event_listener.execution_spans
@pytest.mark.asyncio
async def test_execution_spans_handles_missing_task_on_failure_gracefully(
event_listener, mock_task
):
"""Test that failing a task not in execution_spans doesn't cause errors.
This ensures the fix using pop(source, None) handles edge cases gracefully.
"""
with patch.object(event_listener._telemetry, "task_ended"):
assert mock_task not in event_listener.execution_spans
failed_event = TaskFailedEvent(error="Test error", task=mock_task)
future = crewai_event_bus.emit(mock_task, failed_event)
if future:
await asyncio.wrap_future(future)
assert mock_task not in event_listener.execution_spans
@pytest.mark.asyncio
async def test_telemetry_task_ended_called_with_span_on_completion(
event_listener, mock_task, mock_task_output
):
"""Test that telemetry.task_ended is called with the correct span on completion."""
with patch.object(event_listener._telemetry, "task_started") as mock_task_started:
with patch.object(event_listener._telemetry, "task_ended") as mock_task_ended:
mock_span = MagicMock()
mock_task_started.return_value = mock_span
start_event = TaskStartedEvent(context="test context", task=mock_task)
future = crewai_event_bus.emit(mock_task, start_event)
if future:
await asyncio.wrap_future(future)
completed_event = TaskCompletedEvent(output=mock_task_output, task=mock_task)
future = crewai_event_bus.emit(mock_task, completed_event)
if future:
await asyncio.wrap_future(future)
mock_task_ended.assert_called_once_with(
mock_span, mock_task, mock_task.agent.crew
)
@pytest.mark.asyncio
async def test_telemetry_task_ended_called_with_span_on_failure(
event_listener, mock_task
):
"""Test that telemetry.task_ended is called with the correct span on failure."""
with patch.object(event_listener._telemetry, "task_started") as mock_task_started:
with patch.object(event_listener._telemetry, "task_ended") as mock_task_ended:
mock_span = MagicMock()
mock_task_started.return_value = mock_span
start_event = TaskStartedEvent(context="test context", task=mock_task)
future = crewai_event_bus.emit(mock_task, start_event)
if future:
await asyncio.wrap_future(future)
failed_event = TaskFailedEvent(error="Test error", task=mock_task)
future = crewai_event_bus.emit(mock_task, failed_event)
if future:
await asyncio.wrap_future(future)
mock_task_ended.assert_called_once_with(
mock_span, mock_task, mock_task.agent.crew
)

View File

@@ -1,408 +0,0 @@
"""Tests for experimental environment tools."""
from __future__ import annotations
import os
import tempfile
from collections.abc import Generator
from pathlib import Path
import pytest
from crewai.experimental.environment_tools import (
BaseEnvironmentTool,
EnvironmentTools,
FileReadTool,
FileSearchTool,
GrepTool,
ListDirTool,
)
# ============================================================================
# Fixtures
# ============================================================================
@pytest.fixture
def temp_dir() -> Generator[str, None, None]:
"""Create a temporary directory with test files."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create test files
test_file = Path(tmpdir) / "test.txt"
test_file.write_text("Line 1\nLine 2\nLine 3\nLine 4\nLine 5\n")
python_file = Path(tmpdir) / "example.py"
python_file.write_text("def hello():\n print('Hello World')\n")
# Create subdirectory with files
subdir = Path(tmpdir) / "subdir"
subdir.mkdir()
(subdir / "nested.txt").write_text("Nested content\n")
(subdir / "another.py").write_text("# Another Python file\n")
yield tmpdir
@pytest.fixture
def restricted_temp_dir() -> Generator[tuple[str, str], None, None]:
"""Create two directories - one allowed, one not."""
with tempfile.TemporaryDirectory() as allowed_dir:
with tempfile.TemporaryDirectory() as forbidden_dir:
# Create files in both
(Path(allowed_dir) / "allowed.txt").write_text("Allowed content\n")
(Path(forbidden_dir) / "forbidden.txt").write_text("Forbidden content\n")
yield allowed_dir, forbidden_dir
# ============================================================================
# BaseEnvironmentTool Tests
# ============================================================================
class TestBaseEnvironmentTool:
"""Tests for BaseEnvironmentTool path validation."""
def test_default_allowed_paths_is_current_directory(self) -> None:
"""Default allowed_paths should be current directory for security."""
tool = FileReadTool()
assert tool.allowed_paths == ["."]
def test_validate_path_explicit_no_restrictions(self, temp_dir: str) -> None:
"""With explicit empty allowed_paths, all paths should be allowed."""
tool = FileReadTool(allowed_paths=[])
valid, result = tool._validate_path(temp_dir)
assert valid is True
assert isinstance(result, Path)
def test_validate_path_within_allowed(self, temp_dir: str) -> None:
"""Paths within allowed_paths should be valid."""
tool = FileReadTool(allowed_paths=[temp_dir])
test_file = os.path.join(temp_dir, "test.txt")
valid, result = tool._validate_path(test_file)
assert valid is True
assert isinstance(result, Path)
def test_validate_path_outside_allowed(self, restricted_temp_dir: tuple[str, str]) -> None:
"""Paths outside allowed_paths should be rejected."""
allowed_dir, forbidden_dir = restricted_temp_dir
tool = FileReadTool(allowed_paths=[allowed_dir])
forbidden_file = os.path.join(forbidden_dir, "forbidden.txt")
valid, result = tool._validate_path(forbidden_file)
assert valid is False
assert isinstance(result, str)
assert "outside allowed paths" in result
def test_format_size(self) -> None:
"""Test human-readable size formatting."""
tool = FileReadTool()
assert tool._format_size(500) == "500B"
assert tool._format_size(1024) == "1.0KB"
assert tool._format_size(1536) == "1.5KB"
assert tool._format_size(1024 * 1024) == "1.0MB"
assert tool._format_size(1024 * 1024 * 1024) == "1.0GB"
# ============================================================================
# FileReadTool Tests
# ============================================================================
class TestFileReadTool:
"""Tests for FileReadTool."""
def test_read_entire_file(self, temp_dir: str) -> None:
"""Should read entire file contents."""
tool = FileReadTool(allowed_paths=[temp_dir])
test_file = os.path.join(temp_dir, "test.txt")
result = tool._run(path=test_file)
assert "Line 1" in result
assert "Line 2" in result
assert "Line 5" in result
assert "File:" in result # Metadata header
def test_read_with_line_range(self, temp_dir: str) -> None:
"""Should read specific line range."""
tool = FileReadTool(allowed_paths=[temp_dir])
test_file = os.path.join(temp_dir, "test.txt")
result = tool._run(path=test_file, start_line=2, line_count=2)
assert "Line 2" in result
assert "Line 3" in result
# Should not include lines outside range
assert "Line 1" not in result.split("=" * 60)[-1] # Check content after header
def test_read_file_not_found(self, temp_dir: str) -> None:
"""Should return error for missing file."""
tool = FileReadTool(allowed_paths=[temp_dir])
missing_file = os.path.join(temp_dir, "nonexistent.txt")
result = tool._run(path=missing_file)
assert "Error: File not found" in result
def test_read_file_path_restricted(self, restricted_temp_dir: tuple[str, str]) -> None:
"""Should reject paths outside allowed_paths."""
allowed_dir, forbidden_dir = restricted_temp_dir
tool = FileReadTool(allowed_paths=[allowed_dir])
forbidden_file = os.path.join(forbidden_dir, "forbidden.txt")
result = tool._run(path=forbidden_file)
assert "Error:" in result
assert "outside allowed paths" in result
# ============================================================================
# ListDirTool Tests
# ============================================================================
class TestListDirTool:
"""Tests for ListDirTool."""
def test_list_directory(self, temp_dir: str) -> None:
"""Should list directory contents."""
tool = ListDirTool(allowed_paths=[temp_dir])
result = tool._run(path=temp_dir)
assert "test.txt" in result
assert "example.py" in result
assert "subdir" in result
assert "Total:" in result
def test_list_with_pattern(self, temp_dir: str) -> None:
"""Should filter by pattern."""
tool = ListDirTool(allowed_paths=[temp_dir])
result = tool._run(path=temp_dir, pattern="*.py")
assert "example.py" in result
assert "test.txt" not in result
def test_list_recursive(self, temp_dir: str) -> None:
"""Should list recursively when enabled."""
tool = ListDirTool(allowed_paths=[temp_dir])
result = tool._run(path=temp_dir, recursive=True)
assert "nested.txt" in result
assert "another.py" in result
def test_list_nonexistent_directory(self, temp_dir: str) -> None:
"""Should return error for missing directory."""
tool = ListDirTool(allowed_paths=[temp_dir])
result = tool._run(path=os.path.join(temp_dir, "nonexistent"))
assert "Error: Directory not found" in result
def test_list_path_restricted(self, restricted_temp_dir: tuple[str, str]) -> None:
"""Should reject paths outside allowed_paths."""
allowed_dir, forbidden_dir = restricted_temp_dir
tool = ListDirTool(allowed_paths=[allowed_dir])
result = tool._run(path=forbidden_dir)
assert "Error:" in result
assert "outside allowed paths" in result
# ============================================================================
# GrepTool Tests
# ============================================================================
class TestGrepTool:
"""Tests for GrepTool."""
def test_grep_finds_pattern(self, temp_dir: str) -> None:
"""Should find matching patterns."""
tool = GrepTool(allowed_paths=[temp_dir])
test_file = os.path.join(temp_dir, "test.txt")
result = tool._run(pattern="Line 2", path=test_file)
assert "Line 2" in result
assert "matches" in result.lower() or "found" in result.lower()
def test_grep_no_matches(self, temp_dir: str) -> None:
"""Should report when no matches found."""
tool = GrepTool(allowed_paths=[temp_dir])
test_file = os.path.join(temp_dir, "test.txt")
result = tool._run(pattern="nonexistent pattern xyz", path=test_file)
assert "No matches found" in result
def test_grep_recursive(self, temp_dir: str) -> None:
"""Should search recursively in directories."""
tool = GrepTool(allowed_paths=[temp_dir])
result = tool._run(pattern="Nested", path=temp_dir, recursive=True)
assert "Nested" in result
def test_grep_case_insensitive(self, temp_dir: str) -> None:
"""Should support case-insensitive search."""
tool = GrepTool(allowed_paths=[temp_dir])
test_file = os.path.join(temp_dir, "test.txt")
result = tool._run(pattern="LINE", path=test_file, ignore_case=True)
assert "Line" in result or "matches" in result.lower()
def test_grep_path_restricted(self, restricted_temp_dir: tuple[str, str]) -> None:
"""Should reject paths outside allowed_paths."""
allowed_dir, forbidden_dir = restricted_temp_dir
tool = GrepTool(allowed_paths=[allowed_dir])
result = tool._run(pattern="test", path=forbidden_dir)
assert "Error:" in result
assert "outside allowed paths" in result
# ============================================================================
# FileSearchTool Tests
# ============================================================================
class TestFileSearchTool:
"""Tests for FileSearchTool."""
def test_find_files_by_pattern(self, temp_dir: str) -> None:
"""Should find files matching pattern."""
tool = FileSearchTool(allowed_paths=[temp_dir])
result = tool._run(pattern="*.py", path=temp_dir)
assert "example.py" in result
assert "another.py" in result
def test_find_no_matches(self, temp_dir: str) -> None:
"""Should report when no files match."""
tool = FileSearchTool(allowed_paths=[temp_dir])
result = tool._run(pattern="*.xyz", path=temp_dir)
assert "No" in result and "found" in result
def test_find_files_only(self, temp_dir: str) -> None:
"""Should filter to files only."""
tool = FileSearchTool(allowed_paths=[temp_dir])
result = tool._run(pattern="*", path=temp_dir, file_type="file")
# Should include files
assert "test.txt" in result or "example.py" in result
# Directories should have trailing slash in output
# Check that subdir is not listed as a file
def test_find_dirs_only(self, temp_dir: str) -> None:
"""Should filter to directories only."""
tool = FileSearchTool(allowed_paths=[temp_dir])
result = tool._run(pattern="*", path=temp_dir, file_type="dir")
assert "subdir" in result
def test_find_path_restricted(self, restricted_temp_dir: tuple[str, str]) -> None:
"""Should reject paths outside allowed_paths."""
allowed_dir, forbidden_dir = restricted_temp_dir
tool = FileSearchTool(allowed_paths=[allowed_dir])
result = tool._run(pattern="*", path=forbidden_dir)
assert "Error:" in result
assert "outside allowed paths" in result
# ============================================================================
# EnvironmentTools Manager Tests
# ============================================================================
class TestEnvironmentTools:
"""Tests for EnvironmentTools manager class."""
def test_default_allowed_paths_is_current_directory(self) -> None:
"""Default should restrict to current directory for security."""
env_tools = EnvironmentTools()
tools = env_tools.tools()
# All tools should default to current directory
for tool in tools:
assert isinstance(tool, BaseEnvironmentTool)
assert tool.allowed_paths == ["."]
def test_explicit_empty_allowed_paths_allows_all(self) -> None:
"""Passing empty list should allow all paths."""
env_tools = EnvironmentTools(allowed_paths=[])
tools = env_tools.tools()
for tool in tools:
assert isinstance(tool, BaseEnvironmentTool)
assert tool.allowed_paths == []
def test_returns_all_tools_by_default(self) -> None:
"""Should return all four tools by default."""
env_tools = EnvironmentTools()
tools = env_tools.tools()
assert len(tools) == 4
tool_names = [t.name for t in tools]
assert "read_file" in tool_names
assert "list_directory" in tool_names
assert "grep_search" in tool_names
assert "find_files" in tool_names
def test_exclude_grep(self) -> None:
"""Should exclude grep tool when disabled."""
env_tools = EnvironmentTools(include_grep=False)
tools = env_tools.tools()
assert len(tools) == 3
tool_names = [t.name for t in tools]
assert "grep_search" not in tool_names
def test_exclude_search(self) -> None:
"""Should exclude search tool when disabled."""
env_tools = EnvironmentTools(include_search=False)
tools = env_tools.tools()
assert len(tools) == 3
tool_names = [t.name for t in tools]
assert "find_files" not in tool_names
def test_allowed_paths_propagated(self, temp_dir: str) -> None:
"""Should propagate allowed_paths to all tools."""
env_tools = EnvironmentTools(allowed_paths=[temp_dir])
tools = env_tools.tools()
for tool in tools:
assert isinstance(tool, BaseEnvironmentTool)
assert tool.allowed_paths == [temp_dir]
def test_tools_are_base_tool_instances(self) -> None:
"""All returned tools should be BaseTool instances."""
from crewai.tools.base_tool import BaseTool
env_tools = EnvironmentTools()
tools = env_tools.tools()
for tool in tools:
assert isinstance(tool, BaseTool)