diff --git a/lib/crewai-tools/src/crewai_tools/tools/file_read_tool/file_read_tool.py b/lib/crewai-tools/src/crewai_tools/tools/file_read_tool/file_read_tool.py index 2c56a70cd..c1ac1dd46 100644 --- a/lib/crewai-tools/src/crewai_tools/tools/file_read_tool/file_read_tool.py +++ b/lib/crewai-tools/src/crewai_tools/tools/file_read_tool/file_read_tool.py @@ -1,28 +1,61 @@ +"""Tool for reading file contents from disk with line number support.""" + +from __future__ import annotations + +from pathlib import Path from typing import Any from crewai.tools import BaseTool from pydantic import BaseModel, Field +BINARY_CHECK_SIZE = 8192 +MAX_LINE_LENGTH = 500 +DEFAULT_LINE_LIMIT = 500 + + class FileReadToolSchema(BaseModel): """Input for FileReadTool.""" file_path: str = Field(..., description="Mandatory file full path to read the file") + offset: int | None = Field( + None, + description=( + "Line number to start reading from. Positive values are 1-indexed from " + "the start. Negative values count from the end (e.g., -10 reads last 10 lines). " + "If None, reads from the beginning." + ), + ) + limit: int | None = Field( + None, + description=( + "Maximum number of lines to read. If None, reads up to the default limit " + f"({DEFAULT_LINE_LIMIT} lines) for large files, or entire file for small files." + ), + ) + include_line_numbers: bool = Field( + True, + description="Whether to prefix each line with its line number (format: 'LINE_NUMBER|CONTENT')", + ) start_line: int | None = Field( - 1, description="Line number to start reading from (1-indexed)" + None, + description="[DEPRECATED: Use 'offset' instead] Line number to start reading from (1-indexed).", ) line_count: int | None = Field( - None, description="Number of lines to read. If None, reads the entire file" + None, + description="[DEPRECATED: Use 'limit' instead] Number of lines to read.", ) class FileReadTool(BaseTool): - """A tool for reading file contents. + """A tool for reading file contents with line number support. - This tool inherits its schema handling from BaseTool to avoid recursive schema - definition issues. The args_schema is set to FileReadToolSchema which defines - the required file_path parameter. The schema should not be overridden in the - constructor as it would break the inheritance chain and cause infinite loops. + This tool provides Claude Code-like file reading capabilities: + - Line number prefixes for easy reference + - Offset/limit support for reading specific portions of large files + - Negative offset support for reading from end of file + - Binary file detection + - File metadata (total lines) in response header The tool supports two ways of specifying the file path: 1. At construction time via the file_path parameter @@ -34,16 +67,23 @@ class FileReadTool(BaseTool): **kwargs: Additional keyword arguments passed to BaseTool. Example: - >>> tool = FileReadTool(file_path="/path/to/file.txt") - >>> content = tool.run() # Reads /path/to/file.txt - >>> content = tool.run(file_path="/path/to/other.txt") # Reads other.txt + >>> tool = FileReadTool() + >>> content = tool.run(file_path="/path/to/file.txt") # Reads entire file >>> content = tool.run( - ... file_path="/path/to/file.txt", start_line=100, line_count=50 - ... ) # Reads lines 100-149 + ... file_path="/path/to/file.txt", offset=100, limit=50 + ... ) # Lines 100-149 + >>> content = tool.run( + ... file_path="/path/to/file.txt", offset=-20 + ... ) # Last 20 lines """ - name: str = "Read a file's content" - description: str = "A tool that reads the content of a file. To use this tool, provide a 'file_path' parameter with the path to the file you want to read. Optionally, provide 'start_line' to start reading from a specific line and 'line_count' to limit the number of lines read." + name: str = "read_file" + description: str = ( + "Read content from a file on disk. Returns file content with line numbers " + "prefixed (format: 'LINE_NUMBER|CONTENT'). Use 'offset' to start from a " + "specific line (negative values read from end), and 'limit' to control " + "how many lines to read. For large files, reads are automatically limited." + ) args_schema: type[BaseModel] = FileReadToolSchema file_path: str | None = None @@ -57,46 +97,152 @@ class FileReadTool(BaseTool): """ if file_path is not None: kwargs["description"] = ( - f"A tool that reads file content. The default file is {file_path}, but you can provide a different 'file_path' parameter to read another file. You can also specify 'start_line' and 'line_count' to read specific parts of the file." + f"Read content from a file. The default file is {file_path}, but you " + "can provide a different 'file_path' parameter. Use 'offset' to start " + "from a specific line and 'limit' to control the number of lines read." ) super().__init__(**kwargs) self.file_path = file_path + def _is_binary_file(self, file_path: Path) -> bool: + """Check if a file is binary by looking for null bytes. + + Args: + file_path: Path to the file. + + Returns: + True if the file appears to be binary. + """ + try: + with open(file_path, "rb") as f: + chunk = f.read(BINARY_CHECK_SIZE) + return b"\x00" in chunk + except (OSError, PermissionError): + return True + + def _count_lines(self, file_path: Path) -> int: + """Count total lines in a file efficiently. + + Args: + file_path: Path to the file. + + Returns: + Total number of lines in the file. + """ + try: + with open(file_path, "rb") as f: + return sum(1 for _ in f) + except (OSError, PermissionError): + return 0 + def _run( self, file_path: str | None = None, - start_line: int | None = 1, + offset: int | None = None, + limit: int | None = None, + include_line_numbers: bool = True, + start_line: int | None = None, line_count: int | None = None, ) -> str: + """Read file contents with optional line range. + + Args: + file_path: Path to the file to read. + offset: Line to start from (1-indexed, negative counts from end). + limit: Maximum lines to read. + include_line_numbers: Whether to prefix lines with numbers. + start_line: Legacy parameter (maps to offset). + line_count: Legacy parameter (maps to limit). + + Returns: + File content with metadata header. + """ + if start_line is not None and offset is None: + offset = start_line + if line_count is not None and limit is None: + limit = line_count + file_path = file_path or self.file_path - start_line = start_line or 1 - line_count = line_count or None if file_path is None: return "Error: No file path provided. Please provide a file path either in the constructor or as an argument." - try: - with open(file_path, "r") as file: - if start_line == 1 and line_count is None: - return file.read() + path = Path(file_path) - start_idx = max(start_line - 1, 0) - - selected_lines = [ - line - for i, line in enumerate(file) - if i >= start_idx - and (line_count is None or i < start_idx + line_count) - ] - - if not selected_lines and start_idx > 0: - return f"Error: Start line {start_line} exceeds the number of lines in the file." - - return "".join(selected_lines) - except FileNotFoundError: + if not path.exists(): return f"Error: File not found at path: {file_path}" + + if path.is_dir(): + return f"Error: Path is a directory, not a file: {file_path}" + + if self._is_binary_file(path): + file_size = path.stat().st_size + return ( + f"Error: '{file_path}' appears to be a binary file ({file_size} bytes). " + "Binary files cannot be read as text. Use a specialized tool for binary content." + ) + + try: + total_lines = self._count_lines(path) + + if total_lines == 0: + return f"File: {file_path}\nTotal lines: 0\n\n(Empty file)" + + if offset is None: + start_idx = 0 + elif offset < 0: + start_idx = max(0, total_lines + offset) + else: + start_idx = max(0, offset - 1) + + if limit is None: + if total_lines > DEFAULT_LINE_LIMIT and offset is None: + effective_limit = DEFAULT_LINE_LIMIT + else: + effective_limit = total_lines - start_idx + else: + effective_limit = limit + + end_idx = min(start_idx + effective_limit, total_lines) + + with open(path, encoding="utf-8", errors="replace") as f: + lines: list[str] = [] + for i, line in enumerate(f): + if i < start_idx: + continue + if i >= end_idx: + break + + line_content = line.rstrip("\n\r") + + if len(line_content) > MAX_LINE_LENGTH: + line_content = line_content[:MAX_LINE_LENGTH] + "..." + + if include_line_numbers: + line_num = i + 1 # 1-indexed + lines.append(f"{line_num:6}|{line_content}") + else: + lines.append(line_content) + + header_parts = [f"File: {file_path}", f"Total lines: {total_lines}"] + + if start_idx > 0 or end_idx < total_lines: + header_parts.append(f"Showing lines: {start_idx + 1}-{end_idx}") + + if end_idx < total_lines and limit is None and offset is None: + header_parts.append( + "(File truncated. Use 'offset' and 'limit' to read more.)" + ) + + header = "\n".join(header_parts) + content = "\n".join(lines) + + return f"{header}\n\n{content}" + except PermissionError: return f"Error: Permission denied when trying to read file: {file_path}" + except UnicodeDecodeError as e: + return f"Error: Failed to decode file {file_path} as text: {e!s}" except Exception as e: return f"Error: Failed to read file {file_path}. {e!s}" diff --git a/lib/crewai-tools/tests/tools/test_file_read_tool.py b/lib/crewai-tools/tests/tools/test_file_read_tool.py new file mode 100644 index 000000000..41e915228 --- /dev/null +++ b/lib/crewai-tools/tests/tools/test_file_read_tool.py @@ -0,0 +1,195 @@ +"""Unit tests for FileReadTool.""" + +from pathlib import Path + +import pytest + +from crewai_tools.tools.file_read_tool.file_read_tool import FileReadTool + + +@pytest.fixture +def sample_file(tmp_path: Path) -> Path: + """Create a sample text file with numbered lines.""" + file_path = tmp_path / "sample.txt" + lines = [f"Line {i}: This is line number {i}." for i in range(1, 101)] + file_path.write_text("\n".join(lines) + "\n") + return file_path + + +@pytest.fixture +def binary_file(tmp_path: Path) -> Path: + """Create a binary file with null bytes.""" + file_path = tmp_path / "binary.bin" + file_path.write_bytes(b"\x00\x01\x02\x03binary content\x00\x04\x05") + return file_path + + +@pytest.fixture +def empty_file(tmp_path: Path) -> Path: + """Create an empty file.""" + file_path = tmp_path / "empty.txt" + file_path.write_text("") + return file_path + + +class TestFileReadTool: + """Tests for FileReadTool.""" + + def setup_method(self) -> None: + """Set up test fixtures.""" + self.tool = FileReadTool() + + def test_tool_metadata(self) -> None: + """Test tool has correct name and description.""" + assert self.tool.name == "read_file" + assert "read" in self.tool.description.lower() + + def test_args_schema(self) -> None: + """Test that args_schema has correct fields.""" + schema = self.tool.args_schema + fields = schema.model_fields + + assert "file_path" in fields + assert fields["file_path"].is_required() + + assert "offset" in fields + assert not fields["offset"].is_required() + + assert "limit" in fields + assert not fields["limit"].is_required() + + assert "include_line_numbers" in fields + assert not fields["include_line_numbers"].is_required() + + def test_read_entire_file(self, sample_file: Path) -> None: + """Test reading entire file with line numbers.""" + result = self.tool._run(file_path=str(sample_file)) + assert "File:" in result + assert "Total lines: 100" in result + assert "Line 1:" in result + assert "|" in result # Line number separator + + def test_read_with_offset(self, sample_file: Path) -> None: + """Test reading from a specific line offset.""" + result = self.tool._run(file_path=str(sample_file), offset=50, limit=10) + assert "Showing lines: 50-59" in result + assert "Line 50:" in result + assert "Line 59:" in result + # Should not include lines before offset + assert "Line 49:" not in result + + def test_negative_offset_reads_from_end(self, sample_file: Path) -> None: + """Test negative offset reads from end of file.""" + result = self.tool._run(file_path=str(sample_file), offset=-10) + assert "Showing lines: 91-100" in result + assert "Line 91:" in result + assert "Line 100:" in result + + def test_limit_controls_line_count(self, sample_file: Path) -> None: + """Test limit parameter controls how many lines are read.""" + result = self.tool._run(file_path=str(sample_file), offset=1, limit=5) + assert "Showing lines: 1-5" in result + # Count output lines (excluding header) + content_lines = [l for l in result.split("\n") if "|" in l and l.strip()] + assert len(content_lines) == 5 + + def test_line_numbers_included_by_default(self, sample_file: Path) -> None: + """Test line numbers are included by default.""" + result = self.tool._run(file_path=str(sample_file), limit=5) + # Lines should have format " 1|content" + assert "|" in result + for line in result.split("\n"): + if "Line 1:" in line: + assert "|" in line + + def test_line_numbers_can_be_disabled(self, sample_file: Path) -> None: + """Test line numbers can be disabled.""" + result = self.tool._run( + file_path=str(sample_file), limit=5, include_line_numbers=False + ) + # Content lines shouldn't have the line number prefix + content_section = result.split("\n\n", 1)[-1] # Skip header + for line in content_section.split("\n"): + if line.strip() and "Line" in line: + # Should not start with number| + assert not line.strip()[0].isdigit() or "|" not in line[:10] + + def test_binary_file_detection(self, binary_file: Path) -> None: + """Test binary files are detected and not read as text.""" + result = self.tool._run(file_path=str(binary_file)) + assert "Error" in result + assert "binary" in result.lower() + + def test_empty_file(self, empty_file: Path) -> None: + """Test reading empty file returns appropriate message.""" + result = self.tool._run(file_path=str(empty_file)) + assert "Total lines: 0" in result + assert "Empty file" in result + + def test_file_not_found(self) -> None: + """Test error message when file doesn't exist.""" + result = self.tool._run(file_path="/nonexistent/file.txt") + assert "Error" in result + assert "not found" in result.lower() + + def test_directory_path_error(self, tmp_path: Path) -> None: + """Test error when path is a directory.""" + result = self.tool._run(file_path=str(tmp_path)) + assert "Error" in result + assert "directory" in result.lower() + + def test_file_metadata_in_header(self, sample_file: Path) -> None: + """Test file metadata is included in response header.""" + result = self.tool._run(file_path=str(sample_file), limit=10) + # Should have file path + assert str(sample_file) in result + # Should have total lines + assert "Total lines:" in result + + def test_large_file_auto_truncation(self, tmp_path: Path) -> None: + """Test large files are automatically truncated.""" + # Create a file with 1000 lines + large_file = tmp_path / "large.txt" + lines = [f"Line {i}" for i in range(1, 1001)] + large_file.write_text("\n".join(lines)) + + result = self.tool._run(file_path=str(large_file)) + # Should be truncated and include message about it + assert "truncated" in result.lower() or "Showing lines" in result + # Should not read all 1000 lines without explicit limit + assert "Line 1000" not in result or "limit" in result.lower() + + def test_legacy_start_line_parameter(self, sample_file: Path) -> None: + """Test backward compatibility with start_line parameter.""" + result = self.tool._run(file_path=str(sample_file), start_line=10, line_count=5) + assert "Showing lines: 10-14" in result + assert "Line 10:" in result + + def test_constructor_with_file_path(self, sample_file: Path) -> None: + """Test constructing tool with default file path.""" + tool = FileReadTool(file_path=str(sample_file)) + result = tool._run() + assert "Line 1:" in result + + def test_constructor_file_path_override(self, sample_file: Path, tmp_path: Path) -> None: + """Test runtime file_path overrides constructor file_path.""" + other_file = tmp_path / "other.txt" + other_file.write_text("Different content\n") + + tool = FileReadTool(file_path=str(sample_file)) + result = tool._run(file_path=str(other_file)) + assert "Different content" in result + assert "Line 1:" not in result + + def test_no_file_path_error(self) -> None: + """Test error when no file path is provided.""" + result = self.tool._run() + assert "Error" in result + assert "No file path" in result + + def test_offset_beyond_file_length(self, sample_file: Path) -> None: + """Test offset beyond file length returns empty content.""" + result = self.tool._run(file_path=str(sample_file), offset=200) + # File has 100 lines, offset 200 should show nothing + # But header should still show file info + assert "Total lines: 100" in result diff --git a/lib/crewai/src/crewai/tools/agent_tools/__init__.py b/lib/crewai/src/crewai/tools/agent_tools/__init__.py index 53c47739b..8c9807c7a 100644 --- a/lib/crewai/src/crewai/tools/agent_tools/__init__.py +++ b/lib/crewai/src/crewai/tools/agent_tools/__init__.py @@ -1 +1,9 @@ """Agent tools for crewAI.""" + +from crewai.tools.agent_tools.glob_tool import GlobTool +from crewai.tools.agent_tools.grep_tool import GrepTool + +__all__ = [ + "GlobTool", + "GrepTool", +] diff --git a/lib/crewai/src/crewai/tools/agent_tools/glob_tool.py b/lib/crewai/src/crewai/tools/agent_tools/glob_tool.py new file mode 100644 index 000000000..b545bcda6 --- /dev/null +++ b/lib/crewai/src/crewai/tools/agent_tools/glob_tool.py @@ -0,0 +1,251 @@ +"""Tool for finding files matching glob patterns.""" + +from __future__ import annotations + +from dataclasses import dataclass +import os +from pathlib import Path +from typing import Literal + +from pydantic import BaseModel, Field + +from crewai.tools.base_tool import BaseTool + + +MAX_FILES = 1000 +MAX_OUTPUT_CHARS = 30_000 + +SKIP_DIRS = frozenset( + { + ".git", + "__pycache__", + "node_modules", + ".venv", + "venv", + ".tox", + ".mypy_cache", + ".pytest_cache", + ".ruff_cache", + ".coverage", + "dist", + "build", + ".eggs", + "*.egg-info", + } +) + + +@dataclass +class FileInfo: + """Information about a matched file.""" + + path: Path + size: int + is_dir: bool + + +class GlobToolSchema(BaseModel): + """Schema for glob tool arguments.""" + + pattern: str = Field( + ..., + description=( + "Glob pattern to match files. Examples: '*.py' (Python files), " + "'**/*.yaml' (all YAML files recursively), 'src/**/*.ts' (TypeScript in src), " + "'test_*.py' (test files). Patterns not starting with '**/' are auto-prefixed for recursive search." + ), + ) + path: str | None = Field( + default=None, + description="Directory to search in. Defaults to current working directory.", + ) + output_mode: Literal["paths", "tree", "detailed"] = Field( + default="paths", + description=( + "Output format: 'paths' shows file paths one per line, " + "'tree' shows directory tree structure, " + "'detailed' includes file sizes." + ), + ) + include_hidden: bool = Field( + default=False, + description="Whether to include hidden files and directories (starting with '.').", + ) + dirs_only: bool = Field( + default=False, + description="If True, only match directories, not files.", + ) + files_only: bool = Field( + default=True, + description="If True (default), only match files, not directories.", + ) + + +class GlobTool(BaseTool): + """Tool for finding files matching glob patterns. + + Recursively searches for files matching a glob pattern within a directory. + Useful for discovering files by name, extension, or path pattern. + Complements GrepTool which searches by file content. + """ + + name: str = "glob" + description: str = ( + "Find files matching a glob pattern. Use to discover files by name or extension. " + "Examples: '*.py' finds all Python files, '**/*.yaml' finds YAML files recursively, " + "'test_*.py' finds test files. Returns matching file paths sorted by modification time." + ) + args_schema: type[BaseModel] = GlobToolSchema + + def _run( + self, + pattern: str, + path: str | None = None, + output_mode: Literal["paths", "tree", "detailed"] = "paths", + include_hidden: bool = False, + dirs_only: bool = False, + files_only: bool = True, + **kwargs: object, + ) -> str: + """Find files matching a glob pattern. + + Args: + pattern: Glob pattern to match. + path: Directory to search in. Defaults to cwd. + output_mode: Output format (paths, tree, detailed). + include_hidden: Whether to include hidden files. + dirs_only: Only match directories. + files_only: Only match files (default True). + + Returns: + Formatted list of matching paths. + """ + # Resolve search path + search_path = Path(path) if path else Path(os.getcwd()) + if not search_path.exists(): + return f"Error: Path '{search_path}' does not exist." + if not search_path.is_dir(): + return f"Error: Path '{search_path}' is not a directory." + + # Normalize pattern for recursive search + normalized_pattern = pattern + if not pattern.startswith("**/") and not pattern.startswith("/"): + if "/" not in pattern: + normalized_pattern = f"**/{pattern}" + + matches: list[FileInfo] = [] + try: + for match_path in search_path.glob(normalized_pattern): + if not include_hidden: + if any( + part.startswith(".") + for part in match_path.relative_to(search_path).parts + ): + continue + + rel_parts = match_path.relative_to(search_path).parts + if any(part in SKIP_DIRS for part in rel_parts): + continue + + is_dir = match_path.is_dir() + if dirs_only and not is_dir: + continue + if files_only and is_dir: + continue + + try: + size = match_path.stat().st_size if not is_dir else 0 + matches.append(FileInfo(path=match_path, size=size, is_dir=is_dir)) + except (OSError, PermissionError): + continue + + if len(matches) >= MAX_FILES: + break + + except Exception as e: + return f"Error: Failed to search with pattern '{pattern}': {e!s}" + + if not matches: + return f"No files found matching pattern '{pattern}' in {search_path}" + + try: + matches.sort(key=lambda f: f.path.stat().st_mtime, reverse=True) + except (OSError, PermissionError): + matches.sort(key=lambda f: str(f.path)) + + if output_mode == "detailed": + output = self._format_detailed(matches, search_path) + elif output_mode == "tree": + output = self._format_tree(matches, search_path) + else: + output = self._format_paths(matches, search_path) + + summary = f"Found {len(matches)} file(s) matching '{pattern}'" + if len(matches) >= MAX_FILES: + summary += f" (limited to {MAX_FILES})" + + result = f"{summary}\n\n{output}" + + if len(result) > MAX_OUTPUT_CHARS: + result = ( + result[:MAX_OUTPUT_CHARS] + + "\n\n... Output truncated. Use a more specific pattern." + ) + + return result + + def _format_paths(self, matches: list[FileInfo], base_path: Path) -> str: + """Format as simple list of paths.""" + return "\n".join(str(f.path) for f in matches) + + def _format_detailed(self, matches: list[FileInfo], base_path: Path) -> str: + """Format with file sizes.""" + lines: list[str] = [] + for f in matches: + size_str = self._format_size(f.size) if not f.is_dir else "