Compare commits

..

1 Commits

Author SHA1 Message Date
Greyson LaLonde
c1776aca8a feat: add utils for deferred imports 2026-01-13 11:27:02 -05:00
11 changed files with 370 additions and 1154 deletions

View File

@@ -1,12 +1,4 @@
from crewai.experimental.crew_agent_executor_flow import CrewAgentExecutorFlow
from crewai.experimental.environment_tools import (
BaseEnvironmentTool,
EnvironmentTools,
FileReadTool,
FileSearchTool,
GrepTool,
ListDirTool,
)
from crewai.experimental.evaluation import (
AgentEvaluationResult,
AgentEvaluator,
@@ -31,20 +23,14 @@ from crewai.experimental.evaluation import (
__all__ = [
"AgentEvaluationResult",
"AgentEvaluator",
"BaseEnvironmentTool",
"BaseEvaluator",
"CrewAgentExecutorFlow",
"EnvironmentTools",
"EvaluationScore",
"EvaluationTraceCallback",
"ExperimentResult",
"ExperimentResults",
"ExperimentRunner",
"FileReadTool",
"FileSearchTool",
"GoalAlignmentEvaluator",
"GrepTool",
"ListDirTool",
"MetricCategory",
"ParameterExtractionEvaluator",
"ReasoningEfficiencyEvaluator",

View File

@@ -1,24 +0,0 @@
"""Environment tools for file system operations.
These tools provide agents with the ability to explore and read from
the filesystem for context engineering purposes.
"""
from crewai.experimental.environment_tools.base_environment_tool import (
BaseEnvironmentTool,
)
from crewai.experimental.environment_tools.environment_tools import EnvironmentTools
from crewai.experimental.environment_tools.file_read_tool import FileReadTool
from crewai.experimental.environment_tools.file_search_tool import FileSearchTool
from crewai.experimental.environment_tools.grep_tool import GrepTool
from crewai.experimental.environment_tools.list_dir_tool import ListDirTool
__all__ = [
"BaseEnvironmentTool",
"EnvironmentTools",
"FileReadTool",
"FileSearchTool",
"GrepTool",
"ListDirTool",
]

View File

@@ -1,84 +0,0 @@
"""Base class for environment tools with path security."""
from __future__ import annotations
from pathlib import Path
from typing import Any
from pydantic import Field
from crewai.tools.base_tool import BaseTool
class BaseEnvironmentTool(BaseTool):
"""Base class for environment/file system tools with path security.
Provides path validation to restrict file operations to allowed directories.
This prevents path traversal attacks and enforces security sandboxing.
Attributes:
allowed_paths: List of paths that operations are restricted to.
Empty list means allow all paths (no restrictions).
"""
allowed_paths: list[str] = Field(
default_factory=lambda: ["."],
description="Restrict operations to these paths. Defaults to current directory.",
)
def _validate_path(self, path: str) -> tuple[bool, Path | str]:
"""Validate and resolve a path against allowed_paths whitelist.
Args:
path: The path to validate.
Returns:
A tuple of (is_valid, result) where:
- If valid: (True, resolved_path as Path)
- If invalid: (False, error_message as str)
"""
try:
resolved = Path(path).resolve()
# If no restrictions, allow all paths
if not self.allowed_paths:
return True, resolved
# Check if path is within any allowed path
for allowed in self.allowed_paths:
allowed_resolved = Path(allowed).resolve()
try:
# This will raise ValueError if resolved is not relative to allowed_resolved
resolved.relative_to(allowed_resolved)
return True, resolved
except ValueError:
continue
return (
False,
f"Path '{path}' is outside allowed paths: {self.allowed_paths}",
)
except Exception as e:
return False, f"Invalid path '{path}': {e}"
def _format_size(self, size: int) -> str:
"""Format file size in human-readable format.
Args:
size: Size in bytes.
Returns:
Human-readable size string (e.g., "1.5KB", "2.3MB").
"""
if size < 1024:
return f"{size}B"
if size < 1024 * 1024:
return f"{size / 1024:.1f}KB"
if size < 1024 * 1024 * 1024:
return f"{size / (1024 * 1024):.1f}MB"
return f"{size / (1024 * 1024 * 1024):.1f}GB"
def _run(self, *args: Any, **kwargs: Any) -> Any:
"""Subclasses must implement this method."""
raise NotImplementedError("Subclasses must implement _run method")

View File

@@ -1,77 +0,0 @@
"""Manager class for environment tools."""
from __future__ import annotations
from typing import TYPE_CHECKING
from crewai.experimental.environment_tools.file_read_tool import FileReadTool
from crewai.experimental.environment_tools.file_search_tool import FileSearchTool
from crewai.experimental.environment_tools.grep_tool import GrepTool
from crewai.experimental.environment_tools.list_dir_tool import ListDirTool
if TYPE_CHECKING:
from crewai.tools.base_tool import BaseTool
class EnvironmentTools:
"""Manager class for file system/environment tools.
Provides a convenient way to create a set of file system tools
with shared security configuration (allowed_paths).
Similar to AgentTools but for file system operations. Use this to
give agents the ability to explore and read files for context engineering.
Example:
from crewai.experimental import EnvironmentTools
# Create tools with security sandbox
env_tools = EnvironmentTools(
allowed_paths=["./src", "./docs"],
)
# Use with an agent
agent = Agent(
role="Code Analyst",
tools=env_tools.tools(),
)
"""
def __init__(
self,
allowed_paths: list[str] | None = None,
include_grep: bool = True,
include_search: bool = True,
) -> None:
"""Initialize EnvironmentTools.
Args:
allowed_paths: List of paths to restrict operations to.
Defaults to current directory ["."] if None.
Pass empty list [] to allow all paths (not recommended).
include_grep: Whether to include GrepTool (requires grep installed).
include_search: Whether to include FileSearchTool.
"""
self.allowed_paths = allowed_paths if allowed_paths is not None else ["."]
self.include_grep = include_grep
self.include_search = include_search
def tools(self) -> list[BaseTool]:
"""Get all configured environment tools.
Returns:
List of BaseTool instances with shared allowed_paths configuration.
"""
tool_list: list[BaseTool] = [
FileReadTool(allowed_paths=self.allowed_paths),
ListDirTool(allowed_paths=self.allowed_paths),
]
if self.include_grep:
tool_list.append(GrepTool(allowed_paths=self.allowed_paths))
if self.include_search:
tool_list.append(FileSearchTool(allowed_paths=self.allowed_paths))
return tool_list

View File

@@ -1,124 +0,0 @@
"""Tool for reading file contents."""
from __future__ import annotations
from pathlib import Path
from pydantic import BaseModel, Field
from crewai.experimental.environment_tools.base_environment_tool import (
BaseEnvironmentTool,
)
class FileReadInput(BaseModel):
"""Input schema for reading files."""
path: str = Field(..., description="Path to the file to read")
start_line: int | None = Field(
default=None,
description="Line to start reading from (1-indexed). If None, starts from beginning.",
)
line_count: int | None = Field(
default=None,
description="Number of lines to read. If None, reads to end of file.",
)
class FileReadTool(BaseEnvironmentTool):
"""Read contents of text files with optional line ranges.
Use this tool to:
- Read configuration files, source code, logs
- Inspect file contents before making decisions
- Load reference documentation or data files
Supports reading entire files or specific line ranges for efficiency.
"""
name: str = "read_file"
description: str = """Read the contents of a text file.
Use this to read configuration files, source code, logs, or any text file.
You can optionally specify start_line and line_count to read specific portions.
Examples:
- Read entire file: path="config.yaml"
- Read lines 100-149: path="large.log", start_line=100, line_count=50
"""
args_schema: type[BaseModel] = FileReadInput
def _run(
self,
path: str,
start_line: int | None = None,
line_count: int | None = None,
) -> str:
"""Read file contents with optional line range.
Args:
path: Path to the file to read.
start_line: Line to start reading from (1-indexed).
line_count: Number of lines to read.
Returns:
File contents with metadata header, or error message.
"""
# Validate path against allowed_paths
valid, result = self._validate_path(path)
if not valid:
return f"Error: {result}"
assert isinstance(result, Path) # noqa: S101
file_path = result
# Check file exists and is a file
if not file_path.exists():
return f"Error: File not found: {path}"
if not file_path.is_file():
return f"Error: Not a file: {path}"
try:
with open(file_path, "r", encoding="utf-8") as f:
if start_line is None and line_count is None:
# Read entire file
content = f.read()
else:
# Read specific line range
lines = f.readlines()
start_idx = (start_line or 1) - 1 # Convert to 0-indexed
start_idx = max(0, start_idx) # Ensure non-negative
if line_count is not None:
end_idx = start_idx + line_count
else:
end_idx = len(lines)
content = "".join(lines[start_idx:end_idx])
# Get file metadata
stat = file_path.stat()
total_lines = content.count("\n") + (
1 if content and not content.endswith("\n") else 0
)
# Format output with metadata header
header = f"File: {path}\n"
header += f"Size: {self._format_size(stat.st_size)} | Lines: {total_lines}"
if start_line is not None or line_count is not None:
header += (
f" | Range: {start_line or 1}-{(start_line or 1) + total_lines - 1}"
)
header += "\n" + "=" * 60 + "\n"
return header + content
except UnicodeDecodeError:
return f"Error: File is not a text file or has encoding issues: {path}"
except PermissionError:
return f"Error: Permission denied: {path}"
except Exception as e:
return f"Error reading file: {e}"

View File

@@ -1,127 +0,0 @@
"""Tool for finding files by name pattern."""
from __future__ import annotations
from typing import Literal
from pydantic import BaseModel, Field
from crewai.experimental.environment_tools.base_environment_tool import (
BaseEnvironmentTool,
)
class FileSearchInput(BaseModel):
"""Input schema for file search."""
pattern: str = Field(
...,
description="Filename pattern to search for (glob syntax, e.g., '*.py', 'test_*.py')",
)
path: str = Field(
default=".",
description="Directory to search in",
)
file_type: Literal["file", "dir", "all"] | None = Field(
default="all",
description="Filter by type: 'file' for files only, 'dir' for directories only, 'all' for both",
)
class FileSearchTool(BaseEnvironmentTool):
"""Find files by name pattern.
Use this tool to:
- Find specific files in a codebase
- Locate configuration files
- Search for files matching a pattern
"""
name: str = "find_files"
description: str = """Find files by name pattern using glob syntax.
Searches recursively through directories to find matching files.
Examples:
- Find Python files: pattern="*.py", path="src/"
- Find test files: pattern="test_*.py"
- Find configs: pattern="*.yaml", path="."
- Find directories only: pattern="*", file_type="dir"
"""
args_schema: type[BaseModel] = FileSearchInput
def _run(
self,
pattern: str,
path: str = ".",
file_type: Literal["file", "dir", "all"] | None = "all",
) -> str:
"""Find files matching a pattern.
Args:
pattern: Glob pattern for filenames.
path: Directory to search in.
file_type: Filter by type ('file', 'dir', or 'all').
Returns:
List of matching files or error message.
"""
# Validate path against allowed_paths
valid, result = self._validate_path(path)
if not valid:
return f"Error: {result}"
search_path = result
# Check directory exists
if not search_path.exists():
return f"Error: Directory not found: {path}"
if not search_path.is_dir():
return f"Error: Not a directory: {path}"
try:
# Find matching entries recursively
matches = list(search_path.rglob(pattern))
# Filter by type
if file_type == "file":
matches = [m for m in matches if m.is_file()]
elif file_type == "dir":
matches = [m for m in matches if m.is_dir()]
# Filter out hidden files
matches = [
m for m in matches if not any(part.startswith(".") for part in m.parts)
]
# Sort alphabetically
matches.sort(key=lambda x: str(x).lower())
if not matches:
return f"No {file_type if file_type != 'all' else 'files'} matching '{pattern}' found in {path}"
# Format output
result_lines = [f"Found {len(matches)} matches for '{pattern}' in {path}:"]
result_lines.append("=" * 60)
for match in matches:
# Get relative path from search directory
rel_path = match.relative_to(search_path)
if match.is_dir():
result_lines.append(f"📁 {rel_path}/")
else:
try:
size = match.stat().st_size
except (OSError, PermissionError):
continue # Skip files we can't stat
size_str = self._format_size(size)
result_lines.append(f"📄 {rel_path} ({size_str})")
return "\n".join(result_lines)
except PermissionError:
return f"Error: Permission denied: {path}"
except Exception as e:
return f"Error searching files: {e}"

View File

@@ -1,149 +0,0 @@
"""Tool for searching patterns in files using grep."""
from __future__ import annotations
import subprocess
from pydantic import BaseModel, Field
from crewai.experimental.environment_tools.base_environment_tool import (
BaseEnvironmentTool,
)
class GrepInput(BaseModel):
"""Input schema for grep search."""
pattern: str = Field(..., description="Search pattern (supports regex)")
path: str = Field(..., description="File or directory to search in")
recursive: bool = Field(
default=True,
description="Search recursively in directories",
)
ignore_case: bool = Field(
default=False,
description="Case-insensitive search",
)
context_lines: int = Field(
default=2,
description="Number of context lines to show before/after matches",
)
class GrepTool(BaseEnvironmentTool):
"""Search for text patterns in files using grep.
Use this tool to:
- Find where a function or class is defined
- Search for error messages in logs
- Locate configuration values
- Find TODO comments or specific patterns
"""
name: str = "grep_search"
description: str = """Search for text patterns in files using grep.
Supports regex patterns. Returns matching lines with context.
Examples:
- Find function: pattern="def process_data", path="src/"
- Search logs: pattern="ERROR", path="logs/app.log"
- Case-insensitive: pattern="todo", path=".", ignore_case=True
"""
args_schema: type[BaseModel] = GrepInput
def _run(
self,
pattern: str,
path: str,
recursive: bool = True,
ignore_case: bool = False,
context_lines: int = 2,
) -> str:
"""Search for patterns in files.
Args:
pattern: Search pattern (regex supported).
path: File or directory to search in.
recursive: Whether to search recursively.
ignore_case: Whether to ignore case.
context_lines: Lines of context around matches.
Returns:
Search results or error message.
"""
# Validate path against allowed_paths
valid, result = self._validate_path(path)
if not valid:
return f"Error: {result}"
search_path = result
# Check path exists
if not search_path.exists():
return f"Error: Path not found: {path}"
try:
# Build grep command safely
cmd = ["grep", "--color=never"]
# Add recursive flag if searching directory
if recursive and search_path.is_dir():
cmd.append("-r")
# Case insensitive
if ignore_case:
cmd.append("-i")
# Context lines
if context_lines > 0:
cmd.extend(["-C", str(context_lines)])
# Show line numbers
cmd.append("-n")
# Use -- to prevent pattern from being interpreted as option
cmd.append("--")
cmd.append(pattern)
cmd.append(str(search_path))
# Execute with timeout
# Security: cmd is a list (no shell injection), path is validated above
result = subprocess.run( # noqa: S603
cmd,
capture_output=True,
text=True,
timeout=30,
)
if result.returncode == 0:
# Found matches
output = result.stdout
# Count actual match lines (not context lines)
match_lines = [
line
for line in output.split("\n")
if line and not line.startswith("--")
]
match_count = len(match_lines)
header = f"Found {match_count} matches for '{pattern}' in {path}\n"
header += "=" * 60 + "\n"
return header + output
if result.returncode == 1:
# No matches found (grep returns 1 for no matches)
return f"No matches found for '{pattern}' in {path}"
# Error occurred
error_msg = result.stderr.strip() if result.stderr else "Unknown error"
return f"Error: {error_msg}"
except subprocess.TimeoutExpired:
return "Error: Search timed out (>30s). Try narrowing the search path."
except FileNotFoundError:
return (
"Error: grep command not found. Ensure grep is installed on the system."
)
except Exception as e:
return f"Error during search: {e}"

View File

@@ -1,147 +0,0 @@
"""Tool for listing directory contents."""
from __future__ import annotations
from pathlib import Path
from pydantic import BaseModel, Field
from crewai.experimental.environment_tools.base_environment_tool import (
BaseEnvironmentTool,
)
class ListDirInput(BaseModel):
"""Input schema for listing directories."""
path: str = Field(default=".", description="Directory path to list")
pattern: str | None = Field(
default=None,
description="Glob pattern to filter entries (e.g., '*.py', '*.md')",
)
recursive: bool = Field(
default=False,
description="If True, list contents recursively including subdirectories",
)
class ListDirTool(BaseEnvironmentTool):
"""List contents of a directory with optional filtering.
Use this tool to:
- Explore project structure
- Find specific file types
- Check what files exist in a directory
- Navigate the file system
"""
name: str = "list_directory"
description: str = """List contents of a directory.
Use this to explore directories and find files. You can filter by pattern
and optionally list recursively.
Examples:
- List current dir: path="."
- List src folder: path="src/"
- Find Python files: path=".", pattern="*.py"
- Recursive listing: path="src/", recursive=True
"""
args_schema: type[BaseModel] = ListDirInput
def _run(
self,
path: str = ".",
pattern: str | None = None,
recursive: bool = False,
) -> str:
"""List directory contents.
Args:
path: Directory path to list.
pattern: Glob pattern to filter entries.
recursive: Whether to list recursively.
Returns:
Formatted directory listing or error message.
"""
# Validate path against allowed_paths
valid, result = self._validate_path(path)
if not valid:
return f"Error: {result}"
assert isinstance(result, Path) # noqa: S101
dir_path = result
# Check directory exists
if not dir_path.exists():
return f"Error: Directory not found: {path}"
if not dir_path.is_dir():
return f"Error: Not a directory: {path}"
try:
# Get entries based on pattern and recursive flag
if pattern:
if recursive:
entries = list(dir_path.rglob(pattern))
else:
entries = list(dir_path.glob(pattern))
else:
if recursive:
entries = list(dir_path.rglob("*"))
else:
entries = list(dir_path.iterdir())
# Filter out hidden files (starting with .)
entries = [e for e in entries if not e.name.startswith(".")]
# Sort: directories first, then files, alphabetically
entries.sort(key=lambda x: (not x.is_dir(), x.name.lower()))
if not entries:
if pattern:
return f"No entries matching '{pattern}' in {path}"
return f"Directory is empty: {path}"
# Format output
result_lines = [f"Contents of {path}:"]
result_lines.append("=" * 60)
dirs = []
files = []
for entry in entries:
# Get relative path for recursive listings
if recursive:
display_name = str(entry.relative_to(dir_path))
else:
display_name = entry.name
if entry.is_dir():
dirs.append(f"📁 {display_name}/")
else:
try:
size = entry.stat().st_size
except (OSError, PermissionError):
continue # Skip files we can't stat
size_str = self._format_size(size)
files.append(f"📄 {display_name} ({size_str})")
# Output directories first, then files
if dirs:
result_lines.extend(dirs)
if files:
if dirs:
result_lines.append("") # Blank line between dirs and files
result_lines.extend(files)
result_lines.append("")
result_lines.append(f"Total: {len(dirs)} directories, {len(files)} files")
return "\n".join(result_lines)
except PermissionError:
return f"Error: Permission denied: {path}"
except Exception as e:
return f"Error listing directory: {e}"

View File

@@ -0,0 +1,370 @@
"""Lazy loader for Python packages.
Makes it easy to load subpackages and functions on demand.
Pulled from https://github.com/scientific-python/lazy-loader/blob/main/src/lazy_loader/__init__.py,
modernized a little.
"""
import ast
from collections.abc import Callable, Sequence
from dataclasses import dataclass, field
import importlib
import importlib.metadata
import importlib.util
import inspect
import os
from pathlib import Path
import sys
import threading
import types
from typing import Any, NoReturn
import warnings
import packaging.requirements
_threadlock = threading.Lock()
@dataclass(frozen=True, slots=True)
class _FrameData:
"""Captured stack frame information for delayed error reporting."""
filename: str
lineno: int
function: str
code_context: Sequence[str] | None
def attach(
package_name: str,
submodules: set[str] | None = None,
submod_attrs: dict[str, list[str]] | None = None,
) -> tuple[Callable[[str], Any], Callable[[], list[str]], list[str]]:
"""Attach lazily loaded submodules, functions, or other attributes.
Replaces a package's `__getattr__`, `__dir__`, and `__all__` such that
imports work normally but occur upon first use.
Example:
__getattr__, __dir__, __all__ = lazy.attach(
__name__, ["mysubmodule"], {"foo": ["someattr"]}
)
Args:
package_name: The package name, typically ``__name__``.
submodules: Set of submodule names to attach.
submod_attrs: Mapping of submodule names to lists of attributes.
These attributes are imported as they are used.
Returns:
A tuple of (__getattr__, __dir__, __all__) to assign in the package.
"""
submod_attrs = submod_attrs or {}
submodules = set(submodules) if submodules else set()
attr_to_modules = {
attr: mod for mod, attrs in submod_attrs.items() for attr in attrs
}
__all__ = sorted(submodules | attr_to_modules.keys())
def __getattr__(name: str) -> Any: # noqa: N807
if name in submodules:
return importlib.import_module(f"{package_name}.{name}")
if name in attr_to_modules:
submod_path = f"{package_name}.{attr_to_modules[name]}"
submod = importlib.import_module(submod_path)
attr = getattr(submod, name)
# If the attribute lives in a file (module) with the same
# name as the attribute, ensure that the attribute and *not*
# the module is accessible on the package.
if name == attr_to_modules[name]:
pkg = sys.modules[package_name]
pkg.__dict__[name] = attr
return attr
raise AttributeError(f"No {package_name} attribute {name}")
def __dir__() -> list[str]: # noqa: N807
return __all__.copy()
if os.environ.get("EAGER_IMPORT"):
for attr in set(attr_to_modules.keys()) | submodules:
__getattr__(attr)
return __getattr__, __dir__, __all__.copy()
class DelayedImportErrorModule(types.ModuleType):
"""Module type that delays raising ModuleNotFoundError until attribute access.
Captures stack frame data to provide helpful error messages showing where
the original import was attempted.
"""
def __init__(
self,
frame_data: _FrameData,
*args: Any,
message: str,
**kwargs: Any,
) -> None:
"""Initialize the delayed error module.
Args:
frame_data: Captured frame information for error reporting.
*args: Positional arguments passed to ModuleType.
message: The error message to display when accessed.
**kwargs: Keyword arguments passed to ModuleType.
"""
self._frame_data = frame_data
self._message = message
super().__init__(*args, **kwargs)
def __getattr__(self, name: str) -> NoReturn:
"""Raise ModuleNotFoundError with detailed context on any attribute access."""
frame = self._frame_data
code = "".join(frame.code_context) if frame.code_context else ""
raise ModuleNotFoundError(
f"{self._message}\n\n"
"This error is lazily reported, having originally occurred in\n"
f" File {frame.filename}, line {frame.lineno}, in {frame.function}\n\n"
f"----> {code.strip()}"
)
def load(
fullname: str,
*,
require: str | None = None,
error_on_import: bool = False,
suppress_warning: bool = False,
) -> types.ModuleType:
"""Return a lazily imported proxy for a module.
The proxy module delays actual import until first attribute access.
Example:
np = lazy.load("numpy")
def myfunc():
np.norm(...)
Warning:
Lazily loading subpackages causes the parent package to be eagerly
loaded. Use `lazy_loader.attach` instead for subpackages.
Args:
fullname: The full name of the module to import (e.g., "scipy").
require: A PEP-508 dependency requirement (e.g., "numpy >=1.24").
If specified, raises an error if the installed version doesn't match.
error_on_import: If True, raise import errors immediately.
If False (default), delay errors until module is accessed.
suppress_warning: If True, suppress the warning when loading subpackages.
Returns:
A proxy module that loads on first attribute access.
"""
with _threadlock:
module = sys.modules.get(fullname)
# Most common, short-circuit
if module is not None and require is None:
return module
have_module = module is not None
if not suppress_warning and "." in fullname:
msg = (
"subpackages can technically be lazily loaded, but it causes the "
"package to be eagerly loaded even if it is already lazily loaded. "
"So, you probably shouldn't use subpackages with this lazy feature."
)
warnings.warn(msg, RuntimeWarning, stacklevel=2)
spec = None
if not have_module:
spec = importlib.util.find_spec(fullname)
have_module = spec is not None
if not have_module:
not_found_message = f"No module named '{fullname}'"
elif require is not None:
try:
have_module = _check_requirement(require)
except ModuleNotFoundError as e:
raise ValueError(
f"Found module '{fullname}' but cannot test "
"requirement '{require}'. "
"Requirements must match distribution name, not module name."
) from e
not_found_message = f"No distribution can be found matching '{require}'"
if not have_module:
if error_on_import:
raise ModuleNotFoundError(not_found_message)
parent = inspect.stack()[1]
frame_data = _FrameData(
filename=parent.filename,
lineno=parent.lineno,
function=parent.function,
code_context=parent.code_context,
)
del parent
return DelayedImportErrorModule(
frame_data,
"DelayedImportErrorModule",
message=not_found_message,
)
if spec is not None:
module = importlib.util.module_from_spec(spec)
sys.modules[fullname] = module
if spec.loader is not None:
loader = importlib.util.LazyLoader(spec.loader)
loader.exec_module(module)
if module is None:
raise ModuleNotFoundError(f"No module named '{fullname}'")
return module
def _check_requirement(require: str) -> bool:
"""Verify that a package requirement is satisfied.
Args:
require: A dependency requirement as defined in PEP-508.
Returns:
True if the installed version matches the requirement, False otherwise.
Raises:
ModuleNotFoundError: If the package is not installed.
"""
req = packaging.requirements.Requirement(require)
return req.specifier.contains(
importlib.metadata.version(req.name),
prereleases=True,
)
@dataclass
class _StubVisitor(ast.NodeVisitor):
"""AST visitor to parse a stub file for submodules and submod_attrs."""
_submodules: set[str] = field(default_factory=set)
_submod_attrs: dict[str, list[str]] = field(default_factory=dict)
def visit_ImportFrom(self, node: ast.ImportFrom) -> None:
"""Visit an ImportFrom node and extract submodule/attribute information.
Args:
node: The AST ImportFrom node to visit.
Raises:
ValueError: If the import is not a relative import or uses star import.
"""
if node.level != 1:
raise ValueError(
"Only within-module imports are supported (`from .* import`)"
)
names = [alias.name for alias in node.names]
if node.module:
if "*" in names:
raise ValueError(
f"lazy stub loader does not support star import "
f"`from {node.module} import *`"
)
self._submod_attrs.setdefault(node.module, []).extend(names)
else:
self._submodules.update(names)
def attach_stub(
package_name: str,
filename: str,
) -> tuple[Callable[[str], Any], Callable[[], list[str]], list[str]]:
"""Attach lazily loaded submodules and functions from a type stub.
Parses a `.pyi` stub file to infer submodules and attributes. This allows
static type checkers to find imports while providing lazy loading at runtime.
Args:
package_name: The package name, typically ``__name__``.
filename: Path to `.py` file with an adjacent `.pyi` file.
Typically use ``__file__``.
Returns:
A tuple of (__getattr__, __dir__, __all__) to assign in the package.
Raises:
ValueError: If stub file is not found or contains invalid imports.
"""
path = Path(filename)
stubfile = path if path.suffix == ".pyi" else path.with_suffix(".pyi")
if not stubfile.exists():
raise ValueError(f"Cannot load imports from non-existent stub {stubfile!r}")
visitor = _StubVisitor()
visitor.visit(ast.parse(stubfile.read_text()))
return attach(package_name, visitor._submodules, visitor._submod_attrs)
def lazy_exports_stub(package_name: str, filename: str) -> None:
"""Install lazy loading on a module based on its .pyi stub file.
Parses the adjacent `.pyi` stub file to determine what to export lazily.
Type checkers see the stub, runtime gets lazy loading.
Example:
# __init__.py
from crewai.utilities.lazy import lazy_exports_stub
lazy_exports_stub(__name__, __file__)
# __init__.pyi
from .config import ChromaDBConfig, ChromaDBSettings
from .types import EmbeddingType
Args:
package_name: The package name, typically ``__name__``.
filename: Path to the module file, typically ``__file__``.
"""
__getattr__, __dir__, __all__ = attach_stub(package_name, filename)
module = sys.modules[package_name]
module.__getattr__ = __getattr__ # type: ignore[method-assign]
module.__dir__ = __dir__ # type: ignore[method-assign]
module.__dict__["__all__"] = __all__
def lazy_exports(
package_name: str,
submod_attrs: dict[str, list[str]],
submodules: set[str] | None = None,
) -> None:
"""Install lazy loading on a module.
Example:
from crewai.utilities.lazy import lazy_exports
lazy_exports(__name__, {
'config': ['ChromaDBConfig', 'ChromaDBSettings'],
'types': ['EmbeddingType'],
})
Args:
package_name: The package name, typically ``__name__``.
submod_attrs: Mapping of submodule names to lists of attributes.
submodules: Optional set of submodule names to expose directly.
"""
__getattr__, __dir__, __all__ = attach(package_name, submodules, submod_attrs)
module = sys.modules[package_name]
module.__getattr__ = __getattr__ # type: ignore[method-assign]
module.__dir__ = __dir__ # type: ignore[method-assign]
module.__dict__["__all__"] = __all__

View File

@@ -1,408 +0,0 @@
"""Tests for experimental environment tools."""
from __future__ import annotations
import os
import tempfile
from collections.abc import Generator
from pathlib import Path
import pytest
from crewai.experimental.environment_tools import (
BaseEnvironmentTool,
EnvironmentTools,
FileReadTool,
FileSearchTool,
GrepTool,
ListDirTool,
)
# ============================================================================
# Fixtures
# ============================================================================
@pytest.fixture
def temp_dir() -> Generator[str, None, None]:
"""Create a temporary directory with test files."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create test files
test_file = Path(tmpdir) / "test.txt"
test_file.write_text("Line 1\nLine 2\nLine 3\nLine 4\nLine 5\n")
python_file = Path(tmpdir) / "example.py"
python_file.write_text("def hello():\n print('Hello World')\n")
# Create subdirectory with files
subdir = Path(tmpdir) / "subdir"
subdir.mkdir()
(subdir / "nested.txt").write_text("Nested content\n")
(subdir / "another.py").write_text("# Another Python file\n")
yield tmpdir
@pytest.fixture
def restricted_temp_dir() -> Generator[tuple[str, str], None, None]:
"""Create two directories - one allowed, one not."""
with tempfile.TemporaryDirectory() as allowed_dir:
with tempfile.TemporaryDirectory() as forbidden_dir:
# Create files in both
(Path(allowed_dir) / "allowed.txt").write_text("Allowed content\n")
(Path(forbidden_dir) / "forbidden.txt").write_text("Forbidden content\n")
yield allowed_dir, forbidden_dir
# ============================================================================
# BaseEnvironmentTool Tests
# ============================================================================
class TestBaseEnvironmentTool:
"""Tests for BaseEnvironmentTool path validation."""
def test_default_allowed_paths_is_current_directory(self) -> None:
"""Default allowed_paths should be current directory for security."""
tool = FileReadTool()
assert tool.allowed_paths == ["."]
def test_validate_path_explicit_no_restrictions(self, temp_dir: str) -> None:
"""With explicit empty allowed_paths, all paths should be allowed."""
tool = FileReadTool(allowed_paths=[])
valid, result = tool._validate_path(temp_dir)
assert valid is True
assert isinstance(result, Path)
def test_validate_path_within_allowed(self, temp_dir: str) -> None:
"""Paths within allowed_paths should be valid."""
tool = FileReadTool(allowed_paths=[temp_dir])
test_file = os.path.join(temp_dir, "test.txt")
valid, result = tool._validate_path(test_file)
assert valid is True
assert isinstance(result, Path)
def test_validate_path_outside_allowed(self, restricted_temp_dir: tuple[str, str]) -> None:
"""Paths outside allowed_paths should be rejected."""
allowed_dir, forbidden_dir = restricted_temp_dir
tool = FileReadTool(allowed_paths=[allowed_dir])
forbidden_file = os.path.join(forbidden_dir, "forbidden.txt")
valid, result = tool._validate_path(forbidden_file)
assert valid is False
assert isinstance(result, str)
assert "outside allowed paths" in result
def test_format_size(self) -> None:
"""Test human-readable size formatting."""
tool = FileReadTool()
assert tool._format_size(500) == "500B"
assert tool._format_size(1024) == "1.0KB"
assert tool._format_size(1536) == "1.5KB"
assert tool._format_size(1024 * 1024) == "1.0MB"
assert tool._format_size(1024 * 1024 * 1024) == "1.0GB"
# ============================================================================
# FileReadTool Tests
# ============================================================================
class TestFileReadTool:
"""Tests for FileReadTool."""
def test_read_entire_file(self, temp_dir: str) -> None:
"""Should read entire file contents."""
tool = FileReadTool(allowed_paths=[temp_dir])
test_file = os.path.join(temp_dir, "test.txt")
result = tool._run(path=test_file)
assert "Line 1" in result
assert "Line 2" in result
assert "Line 5" in result
assert "File:" in result # Metadata header
def test_read_with_line_range(self, temp_dir: str) -> None:
"""Should read specific line range."""
tool = FileReadTool(allowed_paths=[temp_dir])
test_file = os.path.join(temp_dir, "test.txt")
result = tool._run(path=test_file, start_line=2, line_count=2)
assert "Line 2" in result
assert "Line 3" in result
# Should not include lines outside range
assert "Line 1" not in result.split("=" * 60)[-1] # Check content after header
def test_read_file_not_found(self, temp_dir: str) -> None:
"""Should return error for missing file."""
tool = FileReadTool(allowed_paths=[temp_dir])
missing_file = os.path.join(temp_dir, "nonexistent.txt")
result = tool._run(path=missing_file)
assert "Error: File not found" in result
def test_read_file_path_restricted(self, restricted_temp_dir: tuple[str, str]) -> None:
"""Should reject paths outside allowed_paths."""
allowed_dir, forbidden_dir = restricted_temp_dir
tool = FileReadTool(allowed_paths=[allowed_dir])
forbidden_file = os.path.join(forbidden_dir, "forbidden.txt")
result = tool._run(path=forbidden_file)
assert "Error:" in result
assert "outside allowed paths" in result
# ============================================================================
# ListDirTool Tests
# ============================================================================
class TestListDirTool:
"""Tests for ListDirTool."""
def test_list_directory(self, temp_dir: str) -> None:
"""Should list directory contents."""
tool = ListDirTool(allowed_paths=[temp_dir])
result = tool._run(path=temp_dir)
assert "test.txt" in result
assert "example.py" in result
assert "subdir" in result
assert "Total:" in result
def test_list_with_pattern(self, temp_dir: str) -> None:
"""Should filter by pattern."""
tool = ListDirTool(allowed_paths=[temp_dir])
result = tool._run(path=temp_dir, pattern="*.py")
assert "example.py" in result
assert "test.txt" not in result
def test_list_recursive(self, temp_dir: str) -> None:
"""Should list recursively when enabled."""
tool = ListDirTool(allowed_paths=[temp_dir])
result = tool._run(path=temp_dir, recursive=True)
assert "nested.txt" in result
assert "another.py" in result
def test_list_nonexistent_directory(self, temp_dir: str) -> None:
"""Should return error for missing directory."""
tool = ListDirTool(allowed_paths=[temp_dir])
result = tool._run(path=os.path.join(temp_dir, "nonexistent"))
assert "Error: Directory not found" in result
def test_list_path_restricted(self, restricted_temp_dir: tuple[str, str]) -> None:
"""Should reject paths outside allowed_paths."""
allowed_dir, forbidden_dir = restricted_temp_dir
tool = ListDirTool(allowed_paths=[allowed_dir])
result = tool._run(path=forbidden_dir)
assert "Error:" in result
assert "outside allowed paths" in result
# ============================================================================
# GrepTool Tests
# ============================================================================
class TestGrepTool:
"""Tests for GrepTool."""
def test_grep_finds_pattern(self, temp_dir: str) -> None:
"""Should find matching patterns."""
tool = GrepTool(allowed_paths=[temp_dir])
test_file = os.path.join(temp_dir, "test.txt")
result = tool._run(pattern="Line 2", path=test_file)
assert "Line 2" in result
assert "matches" in result.lower() or "found" in result.lower()
def test_grep_no_matches(self, temp_dir: str) -> None:
"""Should report when no matches found."""
tool = GrepTool(allowed_paths=[temp_dir])
test_file = os.path.join(temp_dir, "test.txt")
result = tool._run(pattern="nonexistent pattern xyz", path=test_file)
assert "No matches found" in result
def test_grep_recursive(self, temp_dir: str) -> None:
"""Should search recursively in directories."""
tool = GrepTool(allowed_paths=[temp_dir])
result = tool._run(pattern="Nested", path=temp_dir, recursive=True)
assert "Nested" in result
def test_grep_case_insensitive(self, temp_dir: str) -> None:
"""Should support case-insensitive search."""
tool = GrepTool(allowed_paths=[temp_dir])
test_file = os.path.join(temp_dir, "test.txt")
result = tool._run(pattern="LINE", path=test_file, ignore_case=True)
assert "Line" in result or "matches" in result.lower()
def test_grep_path_restricted(self, restricted_temp_dir: tuple[str, str]) -> None:
"""Should reject paths outside allowed_paths."""
allowed_dir, forbidden_dir = restricted_temp_dir
tool = GrepTool(allowed_paths=[allowed_dir])
result = tool._run(pattern="test", path=forbidden_dir)
assert "Error:" in result
assert "outside allowed paths" in result
# ============================================================================
# FileSearchTool Tests
# ============================================================================
class TestFileSearchTool:
"""Tests for FileSearchTool."""
def test_find_files_by_pattern(self, temp_dir: str) -> None:
"""Should find files matching pattern."""
tool = FileSearchTool(allowed_paths=[temp_dir])
result = tool._run(pattern="*.py", path=temp_dir)
assert "example.py" in result
assert "another.py" in result
def test_find_no_matches(self, temp_dir: str) -> None:
"""Should report when no files match."""
tool = FileSearchTool(allowed_paths=[temp_dir])
result = tool._run(pattern="*.xyz", path=temp_dir)
assert "No" in result and "found" in result
def test_find_files_only(self, temp_dir: str) -> None:
"""Should filter to files only."""
tool = FileSearchTool(allowed_paths=[temp_dir])
result = tool._run(pattern="*", path=temp_dir, file_type="file")
# Should include files
assert "test.txt" in result or "example.py" in result
# Directories should have trailing slash in output
# Check that subdir is not listed as a file
def test_find_dirs_only(self, temp_dir: str) -> None:
"""Should filter to directories only."""
tool = FileSearchTool(allowed_paths=[temp_dir])
result = tool._run(pattern="*", path=temp_dir, file_type="dir")
assert "subdir" in result
def test_find_path_restricted(self, restricted_temp_dir: tuple[str, str]) -> None:
"""Should reject paths outside allowed_paths."""
allowed_dir, forbidden_dir = restricted_temp_dir
tool = FileSearchTool(allowed_paths=[allowed_dir])
result = tool._run(pattern="*", path=forbidden_dir)
assert "Error:" in result
assert "outside allowed paths" in result
# ============================================================================
# EnvironmentTools Manager Tests
# ============================================================================
class TestEnvironmentTools:
"""Tests for EnvironmentTools manager class."""
def test_default_allowed_paths_is_current_directory(self) -> None:
"""Default should restrict to current directory for security."""
env_tools = EnvironmentTools()
tools = env_tools.tools()
# All tools should default to current directory
for tool in tools:
assert isinstance(tool, BaseEnvironmentTool)
assert tool.allowed_paths == ["."]
def test_explicit_empty_allowed_paths_allows_all(self) -> None:
"""Passing empty list should allow all paths."""
env_tools = EnvironmentTools(allowed_paths=[])
tools = env_tools.tools()
for tool in tools:
assert isinstance(tool, BaseEnvironmentTool)
assert tool.allowed_paths == []
def test_returns_all_tools_by_default(self) -> None:
"""Should return all four tools by default."""
env_tools = EnvironmentTools()
tools = env_tools.tools()
assert len(tools) == 4
tool_names = [t.name for t in tools]
assert "read_file" in tool_names
assert "list_directory" in tool_names
assert "grep_search" in tool_names
assert "find_files" in tool_names
def test_exclude_grep(self) -> None:
"""Should exclude grep tool when disabled."""
env_tools = EnvironmentTools(include_grep=False)
tools = env_tools.tools()
assert len(tools) == 3
tool_names = [t.name for t in tools]
assert "grep_search" not in tool_names
def test_exclude_search(self) -> None:
"""Should exclude search tool when disabled."""
env_tools = EnvironmentTools(include_search=False)
tools = env_tools.tools()
assert len(tools) == 3
tool_names = [t.name for t in tools]
assert "find_files" not in tool_names
def test_allowed_paths_propagated(self, temp_dir: str) -> None:
"""Should propagate allowed_paths to all tools."""
env_tools = EnvironmentTools(allowed_paths=[temp_dir])
tools = env_tools.tools()
for tool in tools:
assert isinstance(tool, BaseEnvironmentTool)
assert tool.allowed_paths == [temp_dir]
def test_tools_are_base_tool_instances(self) -> None:
"""All returned tools should be BaseTool instances."""
from crewai.tools.base_tool import BaseTool
env_tools = EnvironmentTools()
tools = env_tools.tools()
for tool in tools:
assert isinstance(tool, BaseTool)