Apply automatic linting fixes to src directory

Co-Authored-By: Joe Moura <joao@crewai.com>
This commit is contained in:
Devin AI
2025-05-12 13:30:50 +00:00
parent 807dfe0558
commit ad1ea46bbb
160 changed files with 3218 additions and 3197 deletions

View File

@@ -1,6 +1,5 @@
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Dict, List, Optional, Union
from pydantic import Field, field_validator
@@ -14,43 +13,43 @@ class BaseFileKnowledgeSource(BaseKnowledgeSource, ABC):
"""Base class for knowledge sources that load content from files."""
_logger: Logger = Logger(verbose=True)
file_path: Optional[Union[Path, List[Path], str, List[str]]] = Field(
file_path: Path | list[Path] | str | list[str] | None = Field(
default=None,
description="[Deprecated] The path to the file. Use file_paths instead.",
)
file_paths: Optional[Union[Path, List[Path], str, List[str]]] = Field(
default_factory=list, description="The path to the file"
file_paths: Path | list[Path] | str | list[str] | None = Field(
default_factory=list, description="The path to the file",
)
content: Dict[Path, str] = Field(init=False, default_factory=dict)
storage: Optional[KnowledgeStorage] = Field(default=None)
safe_file_paths: List[Path] = Field(default_factory=list)
content: dict[Path, str] = Field(init=False, default_factory=dict)
storage: KnowledgeStorage | None = Field(default=None)
safe_file_paths: list[Path] = Field(default_factory=list)
@field_validator("file_path", "file_paths", mode="before")
def validate_file_path(cls, v, info):
def validate_file_path(self, v, info):
"""Validate that at least one of file_path or file_paths is provided."""
# Single check if both are None, O(1) instead of nested conditions
if (
v is None
and info.data.get(
"file_path" if info.field_name == "file_paths" else "file_paths"
"file_path" if info.field_name == "file_paths" else "file_paths",
)
is None
):
raise ValueError("Either file_path or file_paths must be provided")
msg = "Either file_path or file_paths must be provided"
raise ValueError(msg)
return v
def model_post_init(self, _):
def model_post_init(self, _) -> None:
"""Post-initialization method to load content."""
self.safe_file_paths = self._process_file_paths()
self.validate_content()
self.content = self.load_content()
@abstractmethod
def load_content(self) -> Dict[Path, str]:
def load_content(self) -> dict[Path, str]:
"""Load and preprocess file content. Should be overridden by subclasses. Assume that the file path is relative to the project root in the knowledge directory."""
pass
def validate_content(self):
def validate_content(self) -> None:
"""Validate the paths."""
for path in self.safe_file_paths:
if not path.exists():
@@ -59,7 +58,8 @@ class BaseFileKnowledgeSource(BaseKnowledgeSource, ABC):
f"File not found: {path}. Try adding sources to the knowledge directory. If it's inside the knowledge directory, use the relative path.",
color="red",
)
raise FileNotFoundError(f"File not found: {path}")
msg = f"File not found: {path}"
raise FileNotFoundError(msg)
if not path.is_file():
self._logger.log(
"error",
@@ -67,20 +67,20 @@ class BaseFileKnowledgeSource(BaseKnowledgeSource, ABC):
color="red",
)
def _save_documents(self):
def _save_documents(self) -> None:
"""Save the documents to the storage."""
if self.storage:
self.storage.save(self.chunks)
else:
raise ValueError("No storage found to save documents.")
msg = "No storage found to save documents."
raise ValueError(msg)
def convert_to_path(self, path: Union[Path, str]) -> Path:
def convert_to_path(self, path: Path | str) -> Path:
"""Convert a path to a Path object."""
return Path(KNOWLEDGE_DIRECTORY + "/" + path) if isinstance(path, str) else path
def _process_file_paths(self) -> List[Path]:
def _process_file_paths(self) -> list[Path]:
"""Convert file_path to a list of Path objects."""
if hasattr(self, "file_path") and self.file_path is not None:
self._logger.log(
"warning",
@@ -90,10 +90,11 @@ class BaseFileKnowledgeSource(BaseKnowledgeSource, ABC):
self.file_paths = self.file_path
if self.file_paths is None:
raise ValueError("Your source must be provided with a file_paths: []")
msg = "Your source must be provided with a file_paths: []"
raise ValueError(msg)
# Convert single path to list
path_list: List[Union[Path, str]] = (
path_list: list[Path | str] = (
[self.file_paths]
if isinstance(self.file_paths, (str, Path))
else list(self.file_paths)
@@ -102,8 +103,9 @@ class BaseFileKnowledgeSource(BaseKnowledgeSource, ABC):
)
if not path_list:
msg = "file_path/file_paths must be a Path, str, or a list of these types"
raise ValueError(
"file_path/file_paths must be a Path, str, or a list of these types"
msg,
)
return [self.convert_to_path(path) for path in path_list]

View File

@@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional
from typing import Any
import numpy as np
from pydantic import BaseModel, ConfigDict, Field
@@ -12,41 +12,39 @@ class BaseKnowledgeSource(BaseModel, ABC):
chunk_size: int = 4000
chunk_overlap: int = 200
chunks: List[str] = Field(default_factory=list)
chunk_embeddings: List[np.ndarray] = Field(default_factory=list)
chunks: list[str] = Field(default_factory=list)
chunk_embeddings: list[np.ndarray] = Field(default_factory=list)
model_config = ConfigDict(arbitrary_types_allowed=True)
storage: Optional[KnowledgeStorage] = Field(default=None)
metadata: Dict[str, Any] = Field(default_factory=dict) # Currently unused
collection_name: Optional[str] = Field(default=None)
storage: KnowledgeStorage | None = Field(default=None)
metadata: dict[str, Any] = Field(default_factory=dict) # Currently unused
collection_name: str | None = Field(default=None)
@abstractmethod
def validate_content(self) -> Any:
"""Load and preprocess content from the source."""
pass
@abstractmethod
def add(self) -> None:
"""Process content, chunk it, compute embeddings, and save them."""
pass
def get_embeddings(self) -> List[np.ndarray]:
def get_embeddings(self) -> list[np.ndarray]:
"""Return the list of embeddings for the chunks."""
return self.chunk_embeddings
def _chunk_text(self, text: str) -> List[str]:
def _chunk_text(self, text: str) -> list[str]:
"""Utility method to split text into chunks."""
return [
text[i : i + self.chunk_size]
for i in range(0, len(text), self.chunk_size - self.chunk_overlap)
]
def _save_documents(self):
"""
Save the documents to the storage.
def _save_documents(self) -> None:
"""Save the documents to the storage.
This method should be called after the chunks and embeddings are generated.
"""
if self.storage:
self.storage.save(self.chunks)
else:
raise ValueError("No storage found to save documents.")
msg = "No storage found to save documents."
raise ValueError(msg)

View File

@@ -1,5 +1,6 @@
from collections.abc import Iterator
from pathlib import Path
from typing import Iterator, List, Optional, Union
from typing import TYPE_CHECKING
from urllib.parse import urlparse
try:
@@ -7,7 +8,6 @@ try:
from docling.document_converter import DocumentConverter
from docling.exceptions import ConversionError
from docling_core.transforms.chunker.hierarchical_chunker import HierarchicalChunker
from docling_core.types.doc.document import DoclingDocument
DOCLING_AVAILABLE = True
except ImportError:
@@ -19,27 +19,33 @@ from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.utilities.constants import KNOWLEDGE_DIRECTORY
from crewai.utilities.logger import Logger
if TYPE_CHECKING:
from docling_core.types.doc.document import DoclingDocument
class CrewDoclingSource(BaseKnowledgeSource):
"""Default Source class for converting documents to markdown or json
This will auto support PDF, DOCX, and TXT, XLSX, Images, and HTML files without any additional dependencies and follows the docling package as the source of truth.
"""
def __init__(self, *args, **kwargs):
def __init__(self, *args, **kwargs) -> None:
if not DOCLING_AVAILABLE:
raise ImportError(
msg = (
"The docling package is required to use CrewDoclingSource. "
"Please install it using: uv add docling"
)
raise ImportError(
msg,
)
super().__init__(*args, **kwargs)
_logger: Logger = Logger(verbose=True)
file_path: Optional[List[Union[Path, str]]] = Field(default=None)
file_paths: List[Union[Path, str]] = Field(default_factory=list)
chunks: List[str] = Field(default_factory=list)
safe_file_paths: List[Union[Path, str]] = Field(default_factory=list)
content: List["DoclingDocument"] = Field(default_factory=list)
file_path: list[Path | str] | None = Field(default=None)
file_paths: list[Path | str] = Field(default_factory=list)
chunks: list[str] = Field(default_factory=list)
safe_file_paths: list[Path | str] = Field(default_factory=list)
content: list["DoclingDocument"] = Field(default_factory=list)
document_converter: "DocumentConverter" = Field(
default_factory=lambda: DocumentConverter(
allowed_formats=[
@@ -51,8 +57,8 @@ class CrewDoclingSource(BaseKnowledgeSource):
InputFormat.IMAGE,
InputFormat.XLSX,
InputFormat.PPTX,
]
)
],
),
)
def model_post_init(self, _) -> None:
@@ -66,7 +72,7 @@ class CrewDoclingSource(BaseKnowledgeSource):
self.safe_file_paths = self.validate_content()
self.content = self._load_content()
def _load_content(self) -> List["DoclingDocument"]:
def _load_content(self) -> list["DoclingDocument"]:
try:
return self._convert_source_to_docling_documents()
except ConversionError as e:
@@ -75,10 +81,10 @@ class CrewDoclingSource(BaseKnowledgeSource):
f"Error loading content: {e}. Supported formats: {self.document_converter.allowed_formats}",
"red",
)
raise e
raise
except Exception as e:
self._logger.log("error", f"Error loading content: {e}")
raise e
raise
def add(self) -> None:
if self.content is None:
@@ -88,7 +94,7 @@ class CrewDoclingSource(BaseKnowledgeSource):
self.chunks.extend(list(new_chunks_iterable))
self._save_documents()
def _convert_source_to_docling_documents(self) -> List["DoclingDocument"]:
def _convert_source_to_docling_documents(self) -> list["DoclingDocument"]:
conv_results_iter = self.document_converter.convert_all(self.safe_file_paths)
return [result.document for result in conv_results_iter]
@@ -97,8 +103,8 @@ class CrewDoclingSource(BaseKnowledgeSource):
for chunk in chunker.chunk(doc):
yield chunk.text
def validate_content(self) -> List[Union[Path, str]]:
processed_paths: List[Union[Path, str]] = []
def validate_content(self) -> list[Path | str]:
processed_paths: list[Path | str] = []
for path in self.file_paths:
if isinstance(path, str):
if path.startswith(("http://", "https://")):
@@ -106,15 +112,18 @@ class CrewDoclingSource(BaseKnowledgeSource):
if self._validate_url(path):
processed_paths.append(path)
else:
raise ValueError(f"Invalid URL format: {path}")
msg = f"Invalid URL format: {path}"
raise ValueError(msg)
except Exception as e:
raise ValueError(f"Invalid URL: {path}. Error: {str(e)}")
msg = f"Invalid URL: {path}. Error: {e!s}"
raise ValueError(msg)
else:
local_path = Path(KNOWLEDGE_DIRECTORY + "/" + path)
if local_path.exists():
processed_paths.append(local_path)
else:
raise FileNotFoundError(f"File not found: {local_path}")
msg = f"File not found: {local_path}"
raise FileNotFoundError(msg)
else:
# this is an instance of Path
processed_paths.append(path)
@@ -128,7 +137,7 @@ class CrewDoclingSource(BaseKnowledgeSource):
result.scheme in ("http", "https"),
result.netloc,
len(result.netloc.split(".")) >= 2, # Ensure domain has TLD
]
],
)
except Exception:
return False

View File

@@ -1,6 +1,5 @@
import csv
from pathlib import Path
from typing import Dict, List
from crewai.knowledge.source.base_file_knowledge_source import BaseFileKnowledgeSource
@@ -8,11 +7,11 @@ from crewai.knowledge.source.base_file_knowledge_source import BaseFileKnowledge
class CSVKnowledgeSource(BaseFileKnowledgeSource):
"""A knowledge source that stores and queries CSV file content using embeddings."""
def load_content(self) -> Dict[Path, str]:
def load_content(self) -> dict[Path, str]:
"""Load and preprocess CSV file content."""
content_dict = {}
for file_path in self.safe_file_paths:
with open(file_path, "r", encoding="utf-8") as csvfile:
with open(file_path, encoding="utf-8") as csvfile:
reader = csv.reader(csvfile)
content = ""
for row in reader:
@@ -21,8 +20,7 @@ class CSVKnowledgeSource(BaseFileKnowledgeSource):
return content_dict
def add(self) -> None:
"""
Add CSV file content to the knowledge source, chunk it, compute embeddings,
"""Add CSV file content to the knowledge source, chunk it, compute embeddings,
and save the embeddings.
"""
content_str = (
@@ -32,7 +30,7 @@ class CSVKnowledgeSource(BaseFileKnowledgeSource):
self.chunks.extend(new_chunks)
self._save_documents()
def _chunk_text(self, text: str) -> List[str]:
def _chunk_text(self, text: str) -> list[str]:
"""Utility method to split text into chunks."""
return [
text[i : i + self.chunk_size]

View File

@@ -1,6 +1,4 @@
from pathlib import Path
from typing import Dict, Iterator, List, Optional, Union
from urllib.parse import urlparse
from pydantic import Field, field_validator
@@ -16,34 +14,34 @@ class ExcelKnowledgeSource(BaseKnowledgeSource):
_logger: Logger = Logger(verbose=True)
file_path: Optional[Union[Path, List[Path], str, List[str]]] = Field(
file_path: Path | list[Path] | str | list[str] | None = Field(
default=None,
description="[Deprecated] The path to the file. Use file_paths instead.",
)
file_paths: Optional[Union[Path, List[Path], str, List[str]]] = Field(
default_factory=list, description="The path to the file"
file_paths: Path | list[Path] | str | list[str] | None = Field(
default_factory=list, description="The path to the file",
)
chunks: List[str] = Field(default_factory=list)
content: Dict[Path, Dict[str, str]] = Field(default_factory=dict)
safe_file_paths: List[Path] = Field(default_factory=list)
chunks: list[str] = Field(default_factory=list)
content: dict[Path, dict[str, str]] = Field(default_factory=dict)
safe_file_paths: list[Path] = Field(default_factory=list)
@field_validator("file_path", "file_paths", mode="before")
def validate_file_path(cls, v, info):
def validate_file_path(self, v, info):
"""Validate that at least one of file_path or file_paths is provided."""
# Single check if both are None, O(1) instead of nested conditions
if (
v is None
and info.data.get(
"file_path" if info.field_name == "file_paths" else "file_paths"
"file_path" if info.field_name == "file_paths" else "file_paths",
)
is None
):
raise ValueError("Either file_path or file_paths must be provided")
msg = "Either file_path or file_paths must be provided"
raise ValueError(msg)
return v
def _process_file_paths(self) -> List[Path]:
def _process_file_paths(self) -> list[Path]:
"""Convert file_path to a list of Path objects."""
if hasattr(self, "file_path") and self.file_path is not None:
self._logger.log(
"warning",
@@ -53,10 +51,11 @@ class ExcelKnowledgeSource(BaseKnowledgeSource):
self.file_paths = self.file_path
if self.file_paths is None:
raise ValueError("Your source must be provided with a file_paths: []")
msg = "Your source must be provided with a file_paths: []"
raise ValueError(msg)
# Convert single path to list
path_list: List[Union[Path, str]] = (
path_list: list[Path | str] = (
[self.file_paths]
if isinstance(self.file_paths, (str, Path))
else list(self.file_paths)
@@ -65,13 +64,14 @@ class ExcelKnowledgeSource(BaseKnowledgeSource):
)
if not path_list:
msg = "file_path/file_paths must be a Path, str, or a list of these types"
raise ValueError(
"file_path/file_paths must be a Path, str, or a list of these types"
msg,
)
return [self.convert_to_path(path) for path in path_list]
def validate_content(self):
def validate_content(self) -> None:
"""Validate the paths."""
for path in self.safe_file_paths:
if not path.exists():
@@ -80,7 +80,8 @@ class ExcelKnowledgeSource(BaseKnowledgeSource):
f"File not found: {path}. Try adding sources to the knowledge directory. If it's inside the knowledge directory, use the relative path.",
color="red",
)
raise FileNotFoundError(f"File not found: {path}")
msg = f"File not found: {path}"
raise FileNotFoundError(msg)
if not path.is_file():
self._logger.log(
"error",
@@ -100,7 +101,7 @@ class ExcelKnowledgeSource(BaseKnowledgeSource):
self.validate_content()
self.content = self._load_content()
def _load_content(self) -> Dict[Path, Dict[str, str]]:
def _load_content(self) -> dict[Path, dict[str, str]]:
"""Load and preprocess Excel file content from multiple sheets.
Each sheet's content is converted to CSV format and stored.
@@ -111,6 +112,7 @@ class ExcelKnowledgeSource(BaseKnowledgeSource):
Raises:
ImportError: If required dependencies are missing.
FileNotFoundError: If the specified Excel file cannot be opened.
"""
pd = self._import_dependencies()
content_dict = {}
@@ -119,14 +121,14 @@ class ExcelKnowledgeSource(BaseKnowledgeSource):
with pd.ExcelFile(file_path) as xl:
sheet_dict = {
str(sheet_name): str(
pd.read_excel(xl, sheet_name).to_csv(index=False)
pd.read_excel(xl, sheet_name).to_csv(index=False),
)
for sheet_name in xl.sheet_names
}
content_dict[file_path] = sheet_dict
return content_dict
def convert_to_path(self, path: Union[Path, str]) -> Path:
def convert_to_path(self, path: Path | str) -> Path:
"""Convert a path to a Path object."""
return Path(KNOWLEDGE_DIRECTORY + "/" + path) if isinstance(path, str) else path
@@ -138,13 +140,13 @@ class ExcelKnowledgeSource(BaseKnowledgeSource):
return pd
except ImportError as e:
missing_package = str(e).split()[-1]
msg = f"{missing_package} is not installed. Please install it with: pip install {missing_package}"
raise ImportError(
f"{missing_package} is not installed. Please install it with: pip install {missing_package}"
msg,
)
def add(self) -> None:
"""
Add Excel file content to the knowledge source, chunk it, compute embeddings,
"""Add Excel file content to the knowledge source, chunk it, compute embeddings,
and save the embeddings.
"""
# Convert dictionary values to a single string if content is a dictionary
@@ -161,7 +163,7 @@ class ExcelKnowledgeSource(BaseKnowledgeSource):
self.chunks.extend(new_chunks)
self._save_documents()
def _chunk_text(self, text: str) -> List[str]:
def _chunk_text(self, text: str) -> list[str]:
"""Utility method to split text into chunks."""
return [
text[i : i + self.chunk_size]

View File

@@ -1,6 +1,6 @@
import json
from pathlib import Path
from typing import Any, Dict, List
from typing import Any
from crewai.knowledge.source.base_file_knowledge_source import BaseFileKnowledgeSource
@@ -8,12 +8,12 @@ from crewai.knowledge.source.base_file_knowledge_source import BaseFileKnowledge
class JSONKnowledgeSource(BaseFileKnowledgeSource):
"""A knowledge source that stores and queries JSON file content using embeddings."""
def load_content(self) -> Dict[Path, str]:
def load_content(self) -> dict[Path, str]:
"""Load and preprocess JSON file content."""
content: Dict[Path, str] = {}
content: dict[Path, str] = {}
for path in self.safe_file_paths:
path = self.convert_to_path(path)
with open(path, "r", encoding="utf-8") as json_file:
with open(path, encoding="utf-8") as json_file:
data = json.load(json_file)
content[path] = self._json_to_text(data)
return content
@@ -29,12 +29,11 @@ class JSONKnowledgeSource(BaseFileKnowledgeSource):
for item in data:
text += f"{indent}- {self._json_to_text(item, level + 1)}\n"
else:
text += f"{str(data)}"
text += f"{data!s}"
return text
def add(self) -> None:
"""
Add JSON file content to the knowledge source, chunk it, compute embeddings,
"""Add JSON file content to the knowledge source, chunk it, compute embeddings,
and save the embeddings.
"""
content_str = (
@@ -44,7 +43,7 @@ class JSONKnowledgeSource(BaseFileKnowledgeSource):
self.chunks.extend(new_chunks)
self._save_documents()
def _chunk_text(self, text: str) -> List[str]:
def _chunk_text(self, text: str) -> list[str]:
"""Utility method to split text into chunks."""
return [
text[i : i + self.chunk_size]

View File

@@ -1,5 +1,4 @@
from pathlib import Path
from typing import Dict, List
from crewai.knowledge.source.base_file_knowledge_source import BaseFileKnowledgeSource
@@ -7,7 +6,7 @@ from crewai.knowledge.source.base_file_knowledge_source import BaseFileKnowledge
class PDFKnowledgeSource(BaseFileKnowledgeSource):
"""A knowledge source that stores and queries PDF file content using embeddings."""
def load_content(self) -> Dict[Path, str]:
def load_content(self) -> dict[Path, str]:
"""Load and preprocess PDF file content."""
pdfplumber = self._import_pdfplumber()
@@ -31,21 +30,21 @@ class PDFKnowledgeSource(BaseFileKnowledgeSource):
return pdfplumber
except ImportError:
msg = "pdfplumber is not installed. Please install it with: pip install pdfplumber"
raise ImportError(
"pdfplumber is not installed. Please install it with: pip install pdfplumber"
msg,
)
def add(self) -> None:
"""
Add PDF file content to the knowledge source, chunk it, compute embeddings,
"""Add PDF file content to the knowledge source, chunk it, compute embeddings,
and save the embeddings.
"""
for _, text in self.content.items():
for text in self.content.values():
new_chunks = self._chunk_text(text)
self.chunks.extend(new_chunks)
self._save_documents()
def _chunk_text(self, text: str) -> List[str]:
def _chunk_text(self, text: str) -> list[str]:
"""Utility method to split text into chunks."""
return [
text[i : i + self.chunk_size]

View File

@@ -1,4 +1,3 @@
from typing import List, Optional
from pydantic import Field
@@ -9,16 +8,17 @@ class StringKnowledgeSource(BaseKnowledgeSource):
"""A knowledge source that stores and queries plain text content using embeddings."""
content: str = Field(...)
collection_name: Optional[str] = Field(default=None)
collection_name: str | None = Field(default=None)
def model_post_init(self, _):
def model_post_init(self, _) -> None:
"""Post-initialization method to validate content."""
self.validate_content()
def validate_content(self):
def validate_content(self) -> None:
"""Validate string content."""
if not isinstance(self.content, str):
raise ValueError("StringKnowledgeSource only accepts string content")
msg = "StringKnowledgeSource only accepts string content"
raise ValueError(msg)
def add(self) -> None:
"""Add string content to the knowledge source, chunk it, compute embeddings, and save them."""
@@ -26,7 +26,7 @@ class StringKnowledgeSource(BaseKnowledgeSource):
self.chunks.extend(new_chunks)
self._save_documents()
def _chunk_text(self, text: str) -> List[str]:
def _chunk_text(self, text: str) -> list[str]:
"""Utility method to split text into chunks."""
return [
text[i : i + self.chunk_size]

View File

@@ -1,5 +1,4 @@
from pathlib import Path
from typing import Dict, List
from crewai.knowledge.source.base_file_knowledge_source import BaseFileKnowledgeSource
@@ -7,26 +6,25 @@ from crewai.knowledge.source.base_file_knowledge_source import BaseFileKnowledge
class TextFileKnowledgeSource(BaseFileKnowledgeSource):
"""A knowledge source that stores and queries text file content using embeddings."""
def load_content(self) -> Dict[Path, str]:
def load_content(self) -> dict[Path, str]:
"""Load and preprocess text file content."""
content = {}
for path in self.safe_file_paths:
path = self.convert_to_path(path)
with open(path, "r", encoding="utf-8") as f:
with open(path, encoding="utf-8") as f:
content[path] = f.read()
return content
def add(self) -> None:
"""
Add text file content to the knowledge source, chunk it, compute embeddings,
"""Add text file content to the knowledge source, chunk it, compute embeddings,
and save the embeddings.
"""
for _, text in self.content.items():
for text in self.content.values():
new_chunks = self._chunk_text(text)
self.chunks.extend(new_chunks)
self._save_documents()
def _chunk_text(self, text: str) -> List[str]:
def _chunk_text(self, text: str) -> list[str]:
"""Utility method to split text into chunks."""
return [
text[i : i + self.chunk_size]