mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-04-30 23:02:50 +00:00
feat: introduce GlobTool for file pattern matching
- Added GlobTool to facilitate finding files that match specified glob patterns. - Enhanced agent_tools module to include GlobTool and GrepTool. - Implemented comprehensive functionality for recursive file searching, output formatting, and handling of hidden files. - Created unit tests for GlobTool to ensure reliability and correctness in various scenarios. This addition complements existing tools and enhances the file management capabilities within the CrewAI framework.
This commit is contained in:
@@ -1,28 +1,61 @@
|
||||
"""Tool for reading file contents from disk with line number support."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from crewai.tools import BaseTool
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
BINARY_CHECK_SIZE = 8192
|
||||
MAX_LINE_LENGTH = 500
|
||||
DEFAULT_LINE_LIMIT = 500
|
||||
|
||||
|
||||
class FileReadToolSchema(BaseModel):
|
||||
"""Input for FileReadTool."""
|
||||
|
||||
file_path: str = Field(..., description="Mandatory file full path to read the file")
|
||||
offset: int | None = Field(
|
||||
None,
|
||||
description=(
|
||||
"Line number to start reading from. Positive values are 1-indexed from "
|
||||
"the start. Negative values count from the end (e.g., -10 reads last 10 lines). "
|
||||
"If None, reads from the beginning."
|
||||
),
|
||||
)
|
||||
limit: int | None = Field(
|
||||
None,
|
||||
description=(
|
||||
"Maximum number of lines to read. If None, reads up to the default limit "
|
||||
f"({DEFAULT_LINE_LIMIT} lines) for large files, or entire file for small files."
|
||||
),
|
||||
)
|
||||
include_line_numbers: bool = Field(
|
||||
True,
|
||||
description="Whether to prefix each line with its line number (format: 'LINE_NUMBER|CONTENT')",
|
||||
)
|
||||
start_line: int | None = Field(
|
||||
1, description="Line number to start reading from (1-indexed)"
|
||||
None,
|
||||
description="[DEPRECATED: Use 'offset' instead] Line number to start reading from (1-indexed).",
|
||||
)
|
||||
line_count: int | None = Field(
|
||||
None, description="Number of lines to read. If None, reads the entire file"
|
||||
None,
|
||||
description="[DEPRECATED: Use 'limit' instead] Number of lines to read.",
|
||||
)
|
||||
|
||||
|
||||
class FileReadTool(BaseTool):
|
||||
"""A tool for reading file contents.
|
||||
"""A tool for reading file contents with line number support.
|
||||
|
||||
This tool inherits its schema handling from BaseTool to avoid recursive schema
|
||||
definition issues. The args_schema is set to FileReadToolSchema which defines
|
||||
the required file_path parameter. The schema should not be overridden in the
|
||||
constructor as it would break the inheritance chain and cause infinite loops.
|
||||
This tool provides Claude Code-like file reading capabilities:
|
||||
- Line number prefixes for easy reference
|
||||
- Offset/limit support for reading specific portions of large files
|
||||
- Negative offset support for reading from end of file
|
||||
- Binary file detection
|
||||
- File metadata (total lines) in response header
|
||||
|
||||
The tool supports two ways of specifying the file path:
|
||||
1. At construction time via the file_path parameter
|
||||
@@ -34,16 +67,23 @@ class FileReadTool(BaseTool):
|
||||
**kwargs: Additional keyword arguments passed to BaseTool.
|
||||
|
||||
Example:
|
||||
>>> tool = FileReadTool(file_path="/path/to/file.txt")
|
||||
>>> content = tool.run() # Reads /path/to/file.txt
|
||||
>>> content = tool.run(file_path="/path/to/other.txt") # Reads other.txt
|
||||
>>> tool = FileReadTool()
|
||||
>>> content = tool.run(file_path="/path/to/file.txt") # Reads entire file
|
||||
>>> content = tool.run(
|
||||
... file_path="/path/to/file.txt", start_line=100, line_count=50
|
||||
... ) # Reads lines 100-149
|
||||
... file_path="/path/to/file.txt", offset=100, limit=50
|
||||
... ) # Lines 100-149
|
||||
>>> content = tool.run(
|
||||
... file_path="/path/to/file.txt", offset=-20
|
||||
... ) # Last 20 lines
|
||||
"""
|
||||
|
||||
name: str = "Read a file's content"
|
||||
description: str = "A tool that reads the content of a file. To use this tool, provide a 'file_path' parameter with the path to the file you want to read. Optionally, provide 'start_line' to start reading from a specific line and 'line_count' to limit the number of lines read."
|
||||
name: str = "read_file"
|
||||
description: str = (
|
||||
"Read content from a file on disk. Returns file content with line numbers "
|
||||
"prefixed (format: 'LINE_NUMBER|CONTENT'). Use 'offset' to start from a "
|
||||
"specific line (negative values read from end), and 'limit' to control "
|
||||
"how many lines to read. For large files, reads are automatically limited."
|
||||
)
|
||||
args_schema: type[BaseModel] = FileReadToolSchema
|
||||
file_path: str | None = None
|
||||
|
||||
@@ -57,46 +97,152 @@ class FileReadTool(BaseTool):
|
||||
"""
|
||||
if file_path is not None:
|
||||
kwargs["description"] = (
|
||||
f"A tool that reads file content. The default file is {file_path}, but you can provide a different 'file_path' parameter to read another file. You can also specify 'start_line' and 'line_count' to read specific parts of the file."
|
||||
f"Read content from a file. The default file is {file_path}, but you "
|
||||
"can provide a different 'file_path' parameter. Use 'offset' to start "
|
||||
"from a specific line and 'limit' to control the number of lines read."
|
||||
)
|
||||
|
||||
super().__init__(**kwargs)
|
||||
self.file_path = file_path
|
||||
|
||||
def _is_binary_file(self, file_path: Path) -> bool:
|
||||
"""Check if a file is binary by looking for null bytes.
|
||||
|
||||
Args:
|
||||
file_path: Path to the file.
|
||||
|
||||
Returns:
|
||||
True if the file appears to be binary.
|
||||
"""
|
||||
try:
|
||||
with open(file_path, "rb") as f:
|
||||
chunk = f.read(BINARY_CHECK_SIZE)
|
||||
return b"\x00" in chunk
|
||||
except (OSError, PermissionError):
|
||||
return True
|
||||
|
||||
def _count_lines(self, file_path: Path) -> int:
|
||||
"""Count total lines in a file efficiently.
|
||||
|
||||
Args:
|
||||
file_path: Path to the file.
|
||||
|
||||
Returns:
|
||||
Total number of lines in the file.
|
||||
"""
|
||||
try:
|
||||
with open(file_path, "rb") as f:
|
||||
return sum(1 for _ in f)
|
||||
except (OSError, PermissionError):
|
||||
return 0
|
||||
|
||||
def _run(
|
||||
self,
|
||||
file_path: str | None = None,
|
||||
start_line: int | None = 1,
|
||||
offset: int | None = None,
|
||||
limit: int | None = None,
|
||||
include_line_numbers: bool = True,
|
||||
start_line: int | None = None,
|
||||
line_count: int | None = None,
|
||||
) -> str:
|
||||
"""Read file contents with optional line range.
|
||||
|
||||
Args:
|
||||
file_path: Path to the file to read.
|
||||
offset: Line to start from (1-indexed, negative counts from end).
|
||||
limit: Maximum lines to read.
|
||||
include_line_numbers: Whether to prefix lines with numbers.
|
||||
start_line: Legacy parameter (maps to offset).
|
||||
line_count: Legacy parameter (maps to limit).
|
||||
|
||||
Returns:
|
||||
File content with metadata header.
|
||||
"""
|
||||
if start_line is not None and offset is None:
|
||||
offset = start_line
|
||||
if line_count is not None and limit is None:
|
||||
limit = line_count
|
||||
|
||||
file_path = file_path or self.file_path
|
||||
start_line = start_line or 1
|
||||
line_count = line_count or None
|
||||
|
||||
if file_path is None:
|
||||
return "Error: No file path provided. Please provide a file path either in the constructor or as an argument."
|
||||
|
||||
try:
|
||||
with open(file_path, "r") as file:
|
||||
if start_line == 1 and line_count is None:
|
||||
return file.read()
|
||||
path = Path(file_path)
|
||||
|
||||
start_idx = max(start_line - 1, 0)
|
||||
|
||||
selected_lines = [
|
||||
line
|
||||
for i, line in enumerate(file)
|
||||
if i >= start_idx
|
||||
and (line_count is None or i < start_idx + line_count)
|
||||
]
|
||||
|
||||
if not selected_lines and start_idx > 0:
|
||||
return f"Error: Start line {start_line} exceeds the number of lines in the file."
|
||||
|
||||
return "".join(selected_lines)
|
||||
except FileNotFoundError:
|
||||
if not path.exists():
|
||||
return f"Error: File not found at path: {file_path}"
|
||||
|
||||
if path.is_dir():
|
||||
return f"Error: Path is a directory, not a file: {file_path}"
|
||||
|
||||
if self._is_binary_file(path):
|
||||
file_size = path.stat().st_size
|
||||
return (
|
||||
f"Error: '{file_path}' appears to be a binary file ({file_size} bytes). "
|
||||
"Binary files cannot be read as text. Use a specialized tool for binary content."
|
||||
)
|
||||
|
||||
try:
|
||||
total_lines = self._count_lines(path)
|
||||
|
||||
if total_lines == 0:
|
||||
return f"File: {file_path}\nTotal lines: 0\n\n(Empty file)"
|
||||
|
||||
if offset is None:
|
||||
start_idx = 0
|
||||
elif offset < 0:
|
||||
start_idx = max(0, total_lines + offset)
|
||||
else:
|
||||
start_idx = max(0, offset - 1)
|
||||
|
||||
if limit is None:
|
||||
if total_lines > DEFAULT_LINE_LIMIT and offset is None:
|
||||
effective_limit = DEFAULT_LINE_LIMIT
|
||||
else:
|
||||
effective_limit = total_lines - start_idx
|
||||
else:
|
||||
effective_limit = limit
|
||||
|
||||
end_idx = min(start_idx + effective_limit, total_lines)
|
||||
|
||||
with open(path, encoding="utf-8", errors="replace") as f:
|
||||
lines: list[str] = []
|
||||
for i, line in enumerate(f):
|
||||
if i < start_idx:
|
||||
continue
|
||||
if i >= end_idx:
|
||||
break
|
||||
|
||||
line_content = line.rstrip("\n\r")
|
||||
|
||||
if len(line_content) > MAX_LINE_LENGTH:
|
||||
line_content = line_content[:MAX_LINE_LENGTH] + "..."
|
||||
|
||||
if include_line_numbers:
|
||||
line_num = i + 1 # 1-indexed
|
||||
lines.append(f"{line_num:6}|{line_content}")
|
||||
else:
|
||||
lines.append(line_content)
|
||||
|
||||
header_parts = [f"File: {file_path}", f"Total lines: {total_lines}"]
|
||||
|
||||
if start_idx > 0 or end_idx < total_lines:
|
||||
header_parts.append(f"Showing lines: {start_idx + 1}-{end_idx}")
|
||||
|
||||
if end_idx < total_lines and limit is None and offset is None:
|
||||
header_parts.append(
|
||||
"(File truncated. Use 'offset' and 'limit' to read more.)"
|
||||
)
|
||||
|
||||
header = "\n".join(header_parts)
|
||||
content = "\n".join(lines)
|
||||
|
||||
return f"{header}\n\n{content}"
|
||||
|
||||
except PermissionError:
|
||||
return f"Error: Permission denied when trying to read file: {file_path}"
|
||||
except UnicodeDecodeError as e:
|
||||
return f"Error: Failed to decode file {file_path} as text: {e!s}"
|
||||
except Exception as e:
|
||||
return f"Error: Failed to read file {file_path}. {e!s}"
|
||||
|
||||
195
lib/crewai-tools/tests/tools/test_file_read_tool.py
Normal file
195
lib/crewai-tools/tests/tools/test_file_read_tool.py
Normal file
@@ -0,0 +1,195 @@
|
||||
"""Unit tests for FileReadTool."""
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from crewai_tools.tools.file_read_tool.file_read_tool import FileReadTool
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sample_file(tmp_path: Path) -> Path:
|
||||
"""Create a sample text file with numbered lines."""
|
||||
file_path = tmp_path / "sample.txt"
|
||||
lines = [f"Line {i}: This is line number {i}." for i in range(1, 101)]
|
||||
file_path.write_text("\n".join(lines) + "\n")
|
||||
return file_path
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def binary_file(tmp_path: Path) -> Path:
|
||||
"""Create a binary file with null bytes."""
|
||||
file_path = tmp_path / "binary.bin"
|
||||
file_path.write_bytes(b"\x00\x01\x02\x03binary content\x00\x04\x05")
|
||||
return file_path
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def empty_file(tmp_path: Path) -> Path:
|
||||
"""Create an empty file."""
|
||||
file_path = tmp_path / "empty.txt"
|
||||
file_path.write_text("")
|
||||
return file_path
|
||||
|
||||
|
||||
class TestFileReadTool:
|
||||
"""Tests for FileReadTool."""
|
||||
|
||||
def setup_method(self) -> None:
|
||||
"""Set up test fixtures."""
|
||||
self.tool = FileReadTool()
|
||||
|
||||
def test_tool_metadata(self) -> None:
|
||||
"""Test tool has correct name and description."""
|
||||
assert self.tool.name == "read_file"
|
||||
assert "read" in self.tool.description.lower()
|
||||
|
||||
def test_args_schema(self) -> None:
|
||||
"""Test that args_schema has correct fields."""
|
||||
schema = self.tool.args_schema
|
||||
fields = schema.model_fields
|
||||
|
||||
assert "file_path" in fields
|
||||
assert fields["file_path"].is_required()
|
||||
|
||||
assert "offset" in fields
|
||||
assert not fields["offset"].is_required()
|
||||
|
||||
assert "limit" in fields
|
||||
assert not fields["limit"].is_required()
|
||||
|
||||
assert "include_line_numbers" in fields
|
||||
assert not fields["include_line_numbers"].is_required()
|
||||
|
||||
def test_read_entire_file(self, sample_file: Path) -> None:
|
||||
"""Test reading entire file with line numbers."""
|
||||
result = self.tool._run(file_path=str(sample_file))
|
||||
assert "File:" in result
|
||||
assert "Total lines: 100" in result
|
||||
assert "Line 1:" in result
|
||||
assert "|" in result # Line number separator
|
||||
|
||||
def test_read_with_offset(self, sample_file: Path) -> None:
|
||||
"""Test reading from a specific line offset."""
|
||||
result = self.tool._run(file_path=str(sample_file), offset=50, limit=10)
|
||||
assert "Showing lines: 50-59" in result
|
||||
assert "Line 50:" in result
|
||||
assert "Line 59:" in result
|
||||
# Should not include lines before offset
|
||||
assert "Line 49:" not in result
|
||||
|
||||
def test_negative_offset_reads_from_end(self, sample_file: Path) -> None:
|
||||
"""Test negative offset reads from end of file."""
|
||||
result = self.tool._run(file_path=str(sample_file), offset=-10)
|
||||
assert "Showing lines: 91-100" in result
|
||||
assert "Line 91:" in result
|
||||
assert "Line 100:" in result
|
||||
|
||||
def test_limit_controls_line_count(self, sample_file: Path) -> None:
|
||||
"""Test limit parameter controls how many lines are read."""
|
||||
result = self.tool._run(file_path=str(sample_file), offset=1, limit=5)
|
||||
assert "Showing lines: 1-5" in result
|
||||
# Count output lines (excluding header)
|
||||
content_lines = [l for l in result.split("\n") if "|" in l and l.strip()]
|
||||
assert len(content_lines) == 5
|
||||
|
||||
def test_line_numbers_included_by_default(self, sample_file: Path) -> None:
|
||||
"""Test line numbers are included by default."""
|
||||
result = self.tool._run(file_path=str(sample_file), limit=5)
|
||||
# Lines should have format " 1|content"
|
||||
assert "|" in result
|
||||
for line in result.split("\n"):
|
||||
if "Line 1:" in line:
|
||||
assert "|" in line
|
||||
|
||||
def test_line_numbers_can_be_disabled(self, sample_file: Path) -> None:
|
||||
"""Test line numbers can be disabled."""
|
||||
result = self.tool._run(
|
||||
file_path=str(sample_file), limit=5, include_line_numbers=False
|
||||
)
|
||||
# Content lines shouldn't have the line number prefix
|
||||
content_section = result.split("\n\n", 1)[-1] # Skip header
|
||||
for line in content_section.split("\n"):
|
||||
if line.strip() and "Line" in line:
|
||||
# Should not start with number|
|
||||
assert not line.strip()[0].isdigit() or "|" not in line[:10]
|
||||
|
||||
def test_binary_file_detection(self, binary_file: Path) -> None:
|
||||
"""Test binary files are detected and not read as text."""
|
||||
result = self.tool._run(file_path=str(binary_file))
|
||||
assert "Error" in result
|
||||
assert "binary" in result.lower()
|
||||
|
||||
def test_empty_file(self, empty_file: Path) -> None:
|
||||
"""Test reading empty file returns appropriate message."""
|
||||
result = self.tool._run(file_path=str(empty_file))
|
||||
assert "Total lines: 0" in result
|
||||
assert "Empty file" in result
|
||||
|
||||
def test_file_not_found(self) -> None:
|
||||
"""Test error message when file doesn't exist."""
|
||||
result = self.tool._run(file_path="/nonexistent/file.txt")
|
||||
assert "Error" in result
|
||||
assert "not found" in result.lower()
|
||||
|
||||
def test_directory_path_error(self, tmp_path: Path) -> None:
|
||||
"""Test error when path is a directory."""
|
||||
result = self.tool._run(file_path=str(tmp_path))
|
||||
assert "Error" in result
|
||||
assert "directory" in result.lower()
|
||||
|
||||
def test_file_metadata_in_header(self, sample_file: Path) -> None:
|
||||
"""Test file metadata is included in response header."""
|
||||
result = self.tool._run(file_path=str(sample_file), limit=10)
|
||||
# Should have file path
|
||||
assert str(sample_file) in result
|
||||
# Should have total lines
|
||||
assert "Total lines:" in result
|
||||
|
||||
def test_large_file_auto_truncation(self, tmp_path: Path) -> None:
|
||||
"""Test large files are automatically truncated."""
|
||||
# Create a file with 1000 lines
|
||||
large_file = tmp_path / "large.txt"
|
||||
lines = [f"Line {i}" for i in range(1, 1001)]
|
||||
large_file.write_text("\n".join(lines))
|
||||
|
||||
result = self.tool._run(file_path=str(large_file))
|
||||
# Should be truncated and include message about it
|
||||
assert "truncated" in result.lower() or "Showing lines" in result
|
||||
# Should not read all 1000 lines without explicit limit
|
||||
assert "Line 1000" not in result or "limit" in result.lower()
|
||||
|
||||
def test_legacy_start_line_parameter(self, sample_file: Path) -> None:
|
||||
"""Test backward compatibility with start_line parameter."""
|
||||
result = self.tool._run(file_path=str(sample_file), start_line=10, line_count=5)
|
||||
assert "Showing lines: 10-14" in result
|
||||
assert "Line 10:" in result
|
||||
|
||||
def test_constructor_with_file_path(self, sample_file: Path) -> None:
|
||||
"""Test constructing tool with default file path."""
|
||||
tool = FileReadTool(file_path=str(sample_file))
|
||||
result = tool._run()
|
||||
assert "Line 1:" in result
|
||||
|
||||
def test_constructor_file_path_override(self, sample_file: Path, tmp_path: Path) -> None:
|
||||
"""Test runtime file_path overrides constructor file_path."""
|
||||
other_file = tmp_path / "other.txt"
|
||||
other_file.write_text("Different content\n")
|
||||
|
||||
tool = FileReadTool(file_path=str(sample_file))
|
||||
result = tool._run(file_path=str(other_file))
|
||||
assert "Different content" in result
|
||||
assert "Line 1:" not in result
|
||||
|
||||
def test_no_file_path_error(self) -> None:
|
||||
"""Test error when no file path is provided."""
|
||||
result = self.tool._run()
|
||||
assert "Error" in result
|
||||
assert "No file path" in result
|
||||
|
||||
def test_offset_beyond_file_length(self, sample_file: Path) -> None:
|
||||
"""Test offset beyond file length returns empty content."""
|
||||
result = self.tool._run(file_path=str(sample_file), offset=200)
|
||||
# File has 100 lines, offset 200 should show nothing
|
||||
# But header should still show file info
|
||||
assert "Total lines: 100" in result
|
||||
@@ -1 +1,9 @@
|
||||
"""Agent tools for crewAI."""
|
||||
|
||||
from crewai.tools.agent_tools.glob_tool import GlobTool
|
||||
from crewai.tools.agent_tools.grep_tool import GrepTool
|
||||
|
||||
__all__ = [
|
||||
"GlobTool",
|
||||
"GrepTool",
|
||||
]
|
||||
|
||||
251
lib/crewai/src/crewai/tools/agent_tools/glob_tool.py
Normal file
251
lib/crewai/src/crewai/tools/agent_tools/glob_tool.py
Normal file
@@ -0,0 +1,251 @@
|
||||
"""Tool for finding files matching glob patterns."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Literal
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from crewai.tools.base_tool import BaseTool
|
||||
|
||||
|
||||
MAX_FILES = 1000
|
||||
MAX_OUTPUT_CHARS = 30_000
|
||||
|
||||
SKIP_DIRS = frozenset(
|
||||
{
|
||||
".git",
|
||||
"__pycache__",
|
||||
"node_modules",
|
||||
".venv",
|
||||
"venv",
|
||||
".tox",
|
||||
".mypy_cache",
|
||||
".pytest_cache",
|
||||
".ruff_cache",
|
||||
".coverage",
|
||||
"dist",
|
||||
"build",
|
||||
".eggs",
|
||||
"*.egg-info",
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class FileInfo:
|
||||
"""Information about a matched file."""
|
||||
|
||||
path: Path
|
||||
size: int
|
||||
is_dir: bool
|
||||
|
||||
|
||||
class GlobToolSchema(BaseModel):
|
||||
"""Schema for glob tool arguments."""
|
||||
|
||||
pattern: str = Field(
|
||||
...,
|
||||
description=(
|
||||
"Glob pattern to match files. Examples: '*.py' (Python files), "
|
||||
"'**/*.yaml' (all YAML files recursively), 'src/**/*.ts' (TypeScript in src), "
|
||||
"'test_*.py' (test files). Patterns not starting with '**/' are auto-prefixed for recursive search."
|
||||
),
|
||||
)
|
||||
path: str | None = Field(
|
||||
default=None,
|
||||
description="Directory to search in. Defaults to current working directory.",
|
||||
)
|
||||
output_mode: Literal["paths", "tree", "detailed"] = Field(
|
||||
default="paths",
|
||||
description=(
|
||||
"Output format: 'paths' shows file paths one per line, "
|
||||
"'tree' shows directory tree structure, "
|
||||
"'detailed' includes file sizes."
|
||||
),
|
||||
)
|
||||
include_hidden: bool = Field(
|
||||
default=False,
|
||||
description="Whether to include hidden files and directories (starting with '.').",
|
||||
)
|
||||
dirs_only: bool = Field(
|
||||
default=False,
|
||||
description="If True, only match directories, not files.",
|
||||
)
|
||||
files_only: bool = Field(
|
||||
default=True,
|
||||
description="If True (default), only match files, not directories.",
|
||||
)
|
||||
|
||||
|
||||
class GlobTool(BaseTool):
|
||||
"""Tool for finding files matching glob patterns.
|
||||
|
||||
Recursively searches for files matching a glob pattern within a directory.
|
||||
Useful for discovering files by name, extension, or path pattern.
|
||||
Complements GrepTool which searches by file content.
|
||||
"""
|
||||
|
||||
name: str = "glob"
|
||||
description: str = (
|
||||
"Find files matching a glob pattern. Use to discover files by name or extension. "
|
||||
"Examples: '*.py' finds all Python files, '**/*.yaml' finds YAML files recursively, "
|
||||
"'test_*.py' finds test files. Returns matching file paths sorted by modification time."
|
||||
)
|
||||
args_schema: type[BaseModel] = GlobToolSchema
|
||||
|
||||
def _run(
|
||||
self,
|
||||
pattern: str,
|
||||
path: str | None = None,
|
||||
output_mode: Literal["paths", "tree", "detailed"] = "paths",
|
||||
include_hidden: bool = False,
|
||||
dirs_only: bool = False,
|
||||
files_only: bool = True,
|
||||
**kwargs: object,
|
||||
) -> str:
|
||||
"""Find files matching a glob pattern.
|
||||
|
||||
Args:
|
||||
pattern: Glob pattern to match.
|
||||
path: Directory to search in. Defaults to cwd.
|
||||
output_mode: Output format (paths, tree, detailed).
|
||||
include_hidden: Whether to include hidden files.
|
||||
dirs_only: Only match directories.
|
||||
files_only: Only match files (default True).
|
||||
|
||||
Returns:
|
||||
Formatted list of matching paths.
|
||||
"""
|
||||
# Resolve search path
|
||||
search_path = Path(path) if path else Path(os.getcwd())
|
||||
if not search_path.exists():
|
||||
return f"Error: Path '{search_path}' does not exist."
|
||||
if not search_path.is_dir():
|
||||
return f"Error: Path '{search_path}' is not a directory."
|
||||
|
||||
# Normalize pattern for recursive search
|
||||
normalized_pattern = pattern
|
||||
if not pattern.startswith("**/") and not pattern.startswith("/"):
|
||||
if "/" not in pattern:
|
||||
normalized_pattern = f"**/{pattern}"
|
||||
|
||||
matches: list[FileInfo] = []
|
||||
try:
|
||||
for match_path in search_path.glob(normalized_pattern):
|
||||
if not include_hidden:
|
||||
if any(
|
||||
part.startswith(".")
|
||||
for part in match_path.relative_to(search_path).parts
|
||||
):
|
||||
continue
|
||||
|
||||
rel_parts = match_path.relative_to(search_path).parts
|
||||
if any(part in SKIP_DIRS for part in rel_parts):
|
||||
continue
|
||||
|
||||
is_dir = match_path.is_dir()
|
||||
if dirs_only and not is_dir:
|
||||
continue
|
||||
if files_only and is_dir:
|
||||
continue
|
||||
|
||||
try:
|
||||
size = match_path.stat().st_size if not is_dir else 0
|
||||
matches.append(FileInfo(path=match_path, size=size, is_dir=is_dir))
|
||||
except (OSError, PermissionError):
|
||||
continue
|
||||
|
||||
if len(matches) >= MAX_FILES:
|
||||
break
|
||||
|
||||
except Exception as e:
|
||||
return f"Error: Failed to search with pattern '{pattern}': {e!s}"
|
||||
|
||||
if not matches:
|
||||
return f"No files found matching pattern '{pattern}' in {search_path}"
|
||||
|
||||
try:
|
||||
matches.sort(key=lambda f: f.path.stat().st_mtime, reverse=True)
|
||||
except (OSError, PermissionError):
|
||||
matches.sort(key=lambda f: str(f.path))
|
||||
|
||||
if output_mode == "detailed":
|
||||
output = self._format_detailed(matches, search_path)
|
||||
elif output_mode == "tree":
|
||||
output = self._format_tree(matches, search_path)
|
||||
else:
|
||||
output = self._format_paths(matches, search_path)
|
||||
|
||||
summary = f"Found {len(matches)} file(s) matching '{pattern}'"
|
||||
if len(matches) >= MAX_FILES:
|
||||
summary += f" (limited to {MAX_FILES})"
|
||||
|
||||
result = f"{summary}\n\n{output}"
|
||||
|
||||
if len(result) > MAX_OUTPUT_CHARS:
|
||||
result = (
|
||||
result[:MAX_OUTPUT_CHARS]
|
||||
+ "\n\n... Output truncated. Use a more specific pattern."
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
def _format_paths(self, matches: list[FileInfo], base_path: Path) -> str:
|
||||
"""Format as simple list of paths."""
|
||||
return "\n".join(str(f.path) for f in matches)
|
||||
|
||||
def _format_detailed(self, matches: list[FileInfo], base_path: Path) -> str:
|
||||
"""Format with file sizes."""
|
||||
lines: list[str] = []
|
||||
for f in matches:
|
||||
size_str = self._format_size(f.size) if not f.is_dir else "<dir>"
|
||||
rel_path = (
|
||||
f.path.relative_to(base_path)
|
||||
if f.path.is_relative_to(base_path)
|
||||
else f.path
|
||||
)
|
||||
lines.append(f"{size_str:>10} {rel_path}")
|
||||
return "\n".join(lines)
|
||||
|
||||
def _format_tree(self, matches: list[FileInfo], base_path: Path) -> str:
|
||||
"""Format as directory tree structure."""
|
||||
# Build tree structure
|
||||
tree: dict[str, list[str]] = {}
|
||||
for f in matches:
|
||||
try:
|
||||
rel_path = f.path.relative_to(base_path)
|
||||
except ValueError:
|
||||
rel_path = f.path
|
||||
|
||||
parent = str(rel_path.parent) if rel_path.parent != Path(".") else "."
|
||||
if parent not in tree:
|
||||
tree[parent] = []
|
||||
tree[parent].append(rel_path.name + ("/" if f.is_dir else ""))
|
||||
|
||||
# Format tree output
|
||||
lines: list[str] = [str(base_path)]
|
||||
for directory in sorted(tree.keys()):
|
||||
if directory != ".":
|
||||
lines.append(f" {directory}/")
|
||||
for filename in sorted(tree[directory]):
|
||||
prefix = " " if directory != "." else " "
|
||||
lines.append(f"{prefix}{filename}")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
def _format_size(self, size: int) -> str:
|
||||
"""Format file size in human-readable form."""
|
||||
size_float = float(size)
|
||||
for unit in ["B", "KB", "MB", "GB"]:
|
||||
if size_float < 1024:
|
||||
return (
|
||||
f"{size_float:.0f}{unit}"
|
||||
if unit == "B"
|
||||
else f"{size_float:.1f}{unit}"
|
||||
)
|
||||
size_float /= 1024
|
||||
return f"{size_float:.1f}TB"
|
||||
230
lib/crewai/tests/tools/agent_tools/test_glob_tool.py
Normal file
230
lib/crewai/tests/tools/agent_tools/test_glob_tool.py
Normal file
@@ -0,0 +1,230 @@
|
||||
"""Unit tests for GlobTool."""
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from crewai.tools.agent_tools.glob_tool import GlobTool
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sample_dir(tmp_path: Path) -> Path:
|
||||
"""Create a temp directory with sample files for testing."""
|
||||
# src/main.py
|
||||
src = tmp_path / "src"
|
||||
src.mkdir()
|
||||
(src / "main.py").write_text("def main(): pass\n")
|
||||
(src / "utils.py").write_text("def helper(): pass\n")
|
||||
(src / "config.yaml").write_text("key: value\n")
|
||||
|
||||
# src/components/
|
||||
components = src / "components"
|
||||
components.mkdir()
|
||||
(components / "button.tsx").write_text("export const Button = () => {};\n")
|
||||
(components / "input.tsx").write_text("export const Input = () => {};\n")
|
||||
(components / "index.ts").write_text("export * from './button';\n")
|
||||
|
||||
# tests/
|
||||
tests = tmp_path / "tests"
|
||||
tests.mkdir()
|
||||
(tests / "test_main.py").write_text("def test_main(): pass\n")
|
||||
(tests / "test_utils.py").write_text("def test_utils(): pass\n")
|
||||
|
||||
# docs/
|
||||
docs = tmp_path / "docs"
|
||||
docs.mkdir()
|
||||
(docs / "readme.md").write_text("# Project\n")
|
||||
(docs / "api.md").write_text("# API\n")
|
||||
|
||||
# data/binary.bin
|
||||
data = tmp_path / "data"
|
||||
data.mkdir()
|
||||
(data / "binary.bin").write_bytes(b"\x00\x01\x02\x03binary content")
|
||||
|
||||
# empty.txt
|
||||
(tmp_path / "empty.txt").write_text("")
|
||||
|
||||
# Hidden files (should be skipped by default)
|
||||
(tmp_path / ".hidden").write_text("hidden content\n")
|
||||
hidden_dir = tmp_path / ".hidden_dir"
|
||||
hidden_dir.mkdir()
|
||||
(hidden_dir / "secret.txt").write_text("secret\n")
|
||||
|
||||
# .git/config (should be skipped)
|
||||
git_dir = tmp_path / ".git"
|
||||
git_dir.mkdir()
|
||||
(git_dir / "config").write_text("[core]\n repositoryformatversion = 0\n")
|
||||
|
||||
# node_modules (should be skipped)
|
||||
node_modules = tmp_path / "node_modules"
|
||||
node_modules.mkdir()
|
||||
(node_modules / "package.json").write_text('{"name": "test"}\n')
|
||||
|
||||
return tmp_path
|
||||
|
||||
|
||||
class TestGlobTool:
|
||||
"""Tests for GlobTool."""
|
||||
|
||||
def setup_method(self) -> None:
|
||||
"""Set up test fixtures."""
|
||||
self.tool = GlobTool()
|
||||
|
||||
def test_tool_metadata(self) -> None:
|
||||
"""Test tool has correct name and description."""
|
||||
assert self.tool.name == "glob"
|
||||
assert "find" in self.tool.description.lower() or "pattern" in self.tool.description.lower()
|
||||
|
||||
def test_args_schema(self) -> None:
|
||||
"""Test that args_schema has correct fields and defaults."""
|
||||
schema = self.tool.args_schema
|
||||
fields = schema.model_fields
|
||||
|
||||
assert "pattern" in fields
|
||||
assert fields["pattern"].is_required()
|
||||
|
||||
assert "path" in fields
|
||||
assert not fields["path"].is_required()
|
||||
|
||||
assert "output_mode" in fields
|
||||
assert not fields["output_mode"].is_required()
|
||||
|
||||
assert "include_hidden" in fields
|
||||
assert not fields["include_hidden"].is_required()
|
||||
|
||||
def test_basic_pattern_match(self, sample_dir: Path) -> None:
|
||||
"""Test simple glob pattern finds files."""
|
||||
result = self.tool._run(pattern="*.py", path=str(sample_dir))
|
||||
assert "main.py" in result
|
||||
assert "utils.py" in result
|
||||
assert "test_main.py" in result
|
||||
assert "test_utils.py" in result
|
||||
|
||||
def test_recursive_pattern(self, sample_dir: Path) -> None:
|
||||
"""Test recursive glob pattern with **."""
|
||||
result = self.tool._run(pattern="**/*.tsx", path=str(sample_dir))
|
||||
assert "button.tsx" in result
|
||||
assert "input.tsx" in result
|
||||
|
||||
def test_auto_recursive_prefix(self, sample_dir: Path) -> None:
|
||||
"""Test that patterns without ** are auto-prefixed for recursive search."""
|
||||
result = self.tool._run(pattern="*.yaml", path=str(sample_dir))
|
||||
assert "config.yaml" in result
|
||||
|
||||
def test_specific_directory_pattern(self, sample_dir: Path) -> None:
|
||||
"""Test pattern targeting specific directory."""
|
||||
result = self.tool._run(pattern="src/**/*.py", path=str(sample_dir))
|
||||
assert "main.py" in result
|
||||
assert "utils.py" in result
|
||||
# Should not include test files
|
||||
assert "test_main.py" not in result
|
||||
|
||||
def test_output_mode_paths(self, sample_dir: Path) -> None:
|
||||
"""Test paths output mode shows full file paths."""
|
||||
result = self.tool._run(pattern="*.md", path=str(sample_dir), output_mode="paths")
|
||||
assert "readme.md" in result
|
||||
assert "api.md" in result
|
||||
|
||||
def test_output_mode_detailed(self, sample_dir: Path) -> None:
|
||||
"""Test detailed output mode includes file sizes."""
|
||||
result = self.tool._run(pattern="*.md", path=str(sample_dir), output_mode="detailed")
|
||||
assert "readme.md" in result
|
||||
# Should have size information
|
||||
assert "B" in result # Bytes unit
|
||||
|
||||
def test_output_mode_tree(self, sample_dir: Path) -> None:
|
||||
"""Test tree output mode shows directory structure."""
|
||||
result = self.tool._run(pattern="*.py", path=str(sample_dir), output_mode="tree")
|
||||
assert "src/" in result or "src" in result
|
||||
assert "tests/" in result or "tests" in result
|
||||
|
||||
def test_hidden_files_excluded_by_default(self, sample_dir: Path) -> None:
|
||||
"""Test hidden files are not included by default."""
|
||||
result = self.tool._run(pattern="*", path=str(sample_dir))
|
||||
assert ".hidden" not in result
|
||||
assert "secret.txt" not in result
|
||||
|
||||
def test_hidden_files_included_when_requested(self, sample_dir: Path) -> None:
|
||||
"""Test hidden files are included when include_hidden=True."""
|
||||
result = self.tool._run(pattern="*", path=str(sample_dir), include_hidden=True)
|
||||
assert ".hidden" in result
|
||||
|
||||
def test_git_directory_skipped(self, sample_dir: Path) -> None:
|
||||
"""Test .git directory contents are not included."""
|
||||
result = self.tool._run(pattern="*", path=str(sample_dir), include_hidden=True)
|
||||
# Even with include_hidden, .git should be skipped
|
||||
# The .git directory itself might show but not its contents
|
||||
assert "config" not in result or ".git" not in result.split("config")[0].split("\n")[-1]
|
||||
|
||||
def test_node_modules_skipped(self, sample_dir: Path) -> None:
|
||||
"""Test node_modules directory contents are not included."""
|
||||
result = self.tool._run(pattern="*.json", path=str(sample_dir))
|
||||
assert "package.json" not in result
|
||||
|
||||
def test_path_not_found(self) -> None:
|
||||
"""Test error message when path doesn't exist."""
|
||||
result = self.tool._run(pattern="*.py", path="/nonexistent/path")
|
||||
assert "Error" in result
|
||||
assert "does not exist" in result
|
||||
|
||||
def test_path_is_not_directory(self, sample_dir: Path) -> None:
|
||||
"""Test error message when path is a file, not directory."""
|
||||
file_path = str(sample_dir / "empty.txt")
|
||||
result = self.tool._run(pattern="*.py", path=file_path)
|
||||
assert "Error" in result
|
||||
assert "not a directory" in result
|
||||
|
||||
def test_no_matches_found(self, sample_dir: Path) -> None:
|
||||
"""Test message when no files match pattern."""
|
||||
result = self.tool._run(pattern="*.nonexistent", path=str(sample_dir))
|
||||
assert "No files found" in result
|
||||
|
||||
def test_files_only_default(self, sample_dir: Path) -> None:
|
||||
"""Test that only files are matched by default (not directories)."""
|
||||
result = self.tool._run(pattern="*", path=str(sample_dir))
|
||||
# Should have files
|
||||
assert ".txt" in result or ".py" in result
|
||||
# Directories shouldn't have trailing slash in paths mode
|
||||
lines = [l for l in result.split("\n") if "src/" in l and l.strip().endswith("/")]
|
||||
# Should not list src/ as a match (it's a directory)
|
||||
assert len(lines) == 0 or "tree" in result.lower()
|
||||
|
||||
def test_dirs_only(self, sample_dir: Path) -> None:
|
||||
"""Test dirs_only flag matches only directories."""
|
||||
result = self.tool._run(
|
||||
pattern="*", path=str(sample_dir), dirs_only=True, files_only=False
|
||||
)
|
||||
assert "src" in result
|
||||
assert "tests" in result
|
||||
assert "docs" in result
|
||||
# Should not include files
|
||||
assert ".py" not in result
|
||||
assert ".txt" not in result
|
||||
|
||||
def test_match_count_summary(self, sample_dir: Path) -> None:
|
||||
"""Test that result includes count of matched files."""
|
||||
result = self.tool._run(pattern="*.py", path=str(sample_dir))
|
||||
assert "Found" in result
|
||||
assert "file" in result.lower()
|
||||
|
||||
def test_run_with_kwargs(self, sample_dir: Path) -> None:
|
||||
"""Test _run ignores extra kwargs."""
|
||||
result = self.tool._run(
|
||||
pattern="*.py", path=str(sample_dir), extra_arg="ignored"
|
||||
)
|
||||
assert "main.py" in result
|
||||
|
||||
def test_test_file_pattern(self, sample_dir: Path) -> None:
|
||||
"""Test finding test files with test_*.py pattern."""
|
||||
result = self.tool._run(pattern="test_*.py", path=str(sample_dir))
|
||||
assert "test_main.py" in result
|
||||
assert "test_utils.py" in result
|
||||
# Should not include non-test files
|
||||
assert "main.py" not in result or "test_main.py" in result
|
||||
|
||||
def test_typescript_files(self, sample_dir: Path) -> None:
|
||||
"""Test finding TypeScript files with combined pattern."""
|
||||
result = self.tool._run(pattern="*.ts", path=str(sample_dir))
|
||||
assert "index.ts" in result
|
||||
# .tsx files should not match *.ts
|
||||
assert "button.tsx" not in result
|
||||
Reference in New Issue
Block a user