fix: ensure parameters in RagTool.add, add typing, tests (#3979)
Some checks failed
Mark stale issues and pull requests / stale (push) Has been cancelled
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled
Build uv cache / build-cache (3.10) (push) Has been cancelled
Build uv cache / build-cache (3.11) (push) Has been cancelled
Build uv cache / build-cache (3.12) (push) Has been cancelled
Build uv cache / build-cache (3.13) (push) Has been cancelled

* fix: ensure parameters in RagTool.add, add typing, tests

* feat: substitute pymupdf for pypdf, better parsing performance

---------

Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com>
This commit is contained in:
Greyson LaLonde
2025-11-27 01:32:43 -05:00
committed by GitHub
parent bed9a3847a
commit 2025a26fc3
8 changed files with 733 additions and 80 deletions

View File

@@ -3,8 +3,7 @@
from __future__ import annotations
import hashlib
from pathlib import Path
from typing import TYPE_CHECKING, Any, TypeAlias, TypedDict, cast
from typing import TYPE_CHECKING, Any, cast
import uuid
from crewai.rag.config.types import RagConfigType
@@ -19,15 +18,13 @@ from typing_extensions import TypeIs, Unpack
from crewai_tools.rag.data_types import DataType
from crewai_tools.rag.misc import sanitize_metadata_for_chromadb
from crewai_tools.tools.rag.rag_tool import Adapter
from crewai_tools.tools.rag.types import AddDocumentParams, ContentItem
if TYPE_CHECKING:
from crewai.rag.qdrant.config import QdrantConfig
ContentItem: TypeAlias = str | Path | dict[str, Any]
def _is_qdrant_config(config: Any) -> TypeIs[QdrantConfig]:
"""Check if config is a QdrantConfig using safe duck typing.
@@ -46,19 +43,6 @@ def _is_qdrant_config(config: Any) -> TypeIs[QdrantConfig]:
return False
class AddDocumentParams(TypedDict, total=False):
"""Parameters for adding documents to the RAG system."""
data_type: DataType
metadata: dict[str, Any]
website: str
url: str
file_path: str | Path
github_url: str
youtube_url: str
directory_path: str | Path
class CrewAIRagAdapter(Adapter):
"""Adapter that uses CrewAI's native RAG system.
@@ -131,13 +115,26 @@ class CrewAIRagAdapter(Adapter):
def add(self, *args: ContentItem, **kwargs: Unpack[AddDocumentParams]) -> None:
"""Add content to the knowledge base.
This method handles various input types and converts them to documents
for the vector database. It supports the data_type parameter for
compatibility with existing tools.
Args:
*args: Content items to add (strings, paths, or document dicts)
**kwargs: Additional parameters including data_type, metadata, etc.
**kwargs: Additional parameters including:
- data_type: DataType enum or string (e.g., "file", "pdf_file", "text")
- path: Path to file or directory (alternative to positional arg)
- file_path: Alias for path
- metadata: Additional metadata to attach to documents
- url: URL to fetch content from
- website: Website URL to scrape
- github_url: GitHub repository URL
- youtube_url: YouTube video URL
- directory_path: Path to directory
Examples:
rag_tool.add("path/to/document.pdf", data_type=DataType.PDF_FILE)
rag_tool.add(path="path/to/document.pdf", data_type="file")
rag_tool.add(file_path="path/to/document.pdf", data_type="pdf_file")
rag_tool.add("path/to/document.pdf") # auto-detects PDF
"""
import os
@@ -146,10 +143,54 @@ class CrewAIRagAdapter(Adapter):
from crewai_tools.rag.source_content import SourceContent
documents: list[BaseRecord] = []
data_type: DataType | None = kwargs.get("data_type")
raw_data_type = kwargs.get("data_type")
base_metadata: dict[str, Any] = kwargs.get("metadata", {})
for arg in args:
data_type: DataType | None = None
if raw_data_type is not None:
if isinstance(raw_data_type, DataType):
if raw_data_type != DataType.FILE:
data_type = raw_data_type
elif isinstance(raw_data_type, str):
if raw_data_type != "file":
try:
data_type = DataType(raw_data_type)
except ValueError:
raise ValueError(
f"Invalid data_type: '{raw_data_type}'. "
f"Valid values are: 'file' (auto-detect), or one of: "
f"{', '.join(dt.value for dt in DataType)}"
) from None
content_items: list[ContentItem] = list(args)
path_value = kwargs.get("path") or kwargs.get("file_path")
if path_value is not None:
content_items.append(path_value)
if url := kwargs.get("url"):
content_items.append(url)
if website := kwargs.get("website"):
content_items.append(website)
if github_url := kwargs.get("github_url"):
content_items.append(github_url)
if youtube_url := kwargs.get("youtube_url"):
content_items.append(youtube_url)
if directory_path := kwargs.get("directory_path"):
content_items.append(directory_path)
file_extensions = {
".pdf",
".txt",
".csv",
".json",
".xml",
".docx",
".mdx",
".md",
}
for arg in content_items:
source_ref: str
if isinstance(arg, dict):
source_ref = str(arg.get("source", arg.get("content", "")))
@@ -157,6 +198,14 @@ class CrewAIRagAdapter(Adapter):
source_ref = str(arg)
if not data_type:
ext = os.path.splitext(source_ref)[1].lower()
is_url = source_ref.startswith(("http://", "https://", "file://"))
if (
ext in file_extensions
and not is_url
and not os.path.isfile(source_ref)
):
raise FileNotFoundError(f"File does not exist: {source_ref}")
data_type = DataTypes.from_content(source_ref)
if data_type == DataType.DIRECTORY:

View File

@@ -1,6 +1,8 @@
from enum import Enum
from importlib import import_module
import os
from pathlib import Path
from typing import cast
from urllib.parse import urlparse
from crewai_tools.rag.base_loader import BaseLoader
@@ -8,6 +10,7 @@ from crewai_tools.rag.chunkers.base_chunker import BaseChunker
class DataType(str, Enum):
FILE = "file"
PDF_FILE = "pdf_file"
TEXT_FILE = "text_file"
CSV = "csv"
@@ -15,22 +18,14 @@ class DataType(str, Enum):
XML = "xml"
DOCX = "docx"
MDX = "mdx"
# Database types
MYSQL = "mysql"
POSTGRES = "postgres"
# Repository types
GITHUB = "github"
DIRECTORY = "directory"
# Web types
WEBSITE = "website"
DOCS_SITE = "docs_site"
YOUTUBE_VIDEO = "youtube_video"
YOUTUBE_CHANNEL = "youtube_channel"
# Raw types
TEXT = "text"
def get_chunker(self) -> BaseChunker:
@@ -63,13 +58,11 @@ class DataType(str, Enum):
try:
module = import_module(module_path)
return getattr(module, class_name)()
return cast(BaseChunker, getattr(module, class_name)())
except Exception as e:
raise ValueError(f"Error loading chunker for {self}: {e}") from e
def get_loader(self) -> BaseLoader:
from importlib import import_module
loaders = {
DataType.PDF_FILE: ("pdf_loader", "PDFLoader"),
DataType.TEXT_FILE: ("text_loader", "TextFileLoader"),
@@ -98,7 +91,7 @@ class DataType(str, Enum):
module_path = f"crewai_tools.rag.loaders.{module_name}"
try:
module = import_module(module_path)
return getattr(module, class_name)()
return cast(BaseLoader, getattr(module, class_name)())
except Exception as e:
raise ValueError(f"Error loading loader for {self}: {e}") from e

View File

@@ -2,70 +2,112 @@
import os
from pathlib import Path
from typing import Any
from typing import Any, cast
from urllib.parse import urlparse
import urllib.request
from crewai_tools.rag.base_loader import BaseLoader, LoaderResult
from crewai_tools.rag.source_content import SourceContent
class PDFLoader(BaseLoader):
"""Loader for PDF files."""
"""Loader for PDF files and URLs."""
def load(self, source: SourceContent, **kwargs) -> LoaderResult: # type: ignore[override]
"""Load and extract text from a PDF file.
@staticmethod
def _is_url(path: str) -> bool:
"""Check if the path is a URL."""
try:
parsed = urlparse(path)
return parsed.scheme in ("http", "https")
except Exception:
return False
@staticmethod
def _download_pdf(url: str) -> bytes:
"""Download PDF content from a URL.
Args:
source: The source content containing the PDF file path
url: The URL to download from.
Returns:
LoaderResult with extracted text content
The PDF content as bytes.
Raises:
FileNotFoundError: If the PDF file doesn't exist
ImportError: If required PDF libraries aren't installed
ValueError: If the download fails.
"""
try:
with urllib.request.urlopen(url, timeout=30) as response: # noqa: S310
return cast(bytes, response.read())
except Exception as e:
raise ValueError(f"Failed to download PDF from {url}: {e!s}") from e
def load(self, source: SourceContent, **kwargs: Any) -> LoaderResult: # type: ignore[override]
"""Load and extract text from a PDF file or URL.
Args:
source: The source content containing the PDF file path or URL.
Returns:
LoaderResult with extracted text content.
Raises:
FileNotFoundError: If the PDF file doesn't exist.
ImportError: If required PDF libraries aren't installed.
ValueError: If the PDF cannot be read or downloaded.
"""
try:
import pypdf
except ImportError:
try:
import PyPDF2 as pypdf # type: ignore[import-not-found,no-redef] # noqa: N813
except ImportError as e:
raise ImportError(
"PDF support requires pypdf or PyPDF2. Install with: uv add pypdf"
) from e
import pymupdf # type: ignore[import-untyped]
except ImportError as e:
raise ImportError(
"PDF support requires pymupdf. Install with: uv add pymupdf"
) from e
file_path = source.source
is_url = self._is_url(file_path)
if not os.path.isfile(file_path):
raise FileNotFoundError(f"PDF file not found: {file_path}")
if is_url:
source_name = Path(urlparse(file_path).path).name or "downloaded.pdf"
else:
source_name = Path(file_path).name
text_content = []
text_content: list[str] = []
metadata: dict[str, Any] = {
"source": str(file_path),
"file_name": Path(file_path).name,
"source": file_path,
"file_name": source_name,
"file_type": "pdf",
}
try:
with open(file_path, "rb") as file:
pdf_reader = pypdf.PdfReader(file)
metadata["num_pages"] = len(pdf_reader.pages)
if is_url:
pdf_bytes = self._download_pdf(file_path)
doc = pymupdf.open(stream=pdf_bytes, filetype="pdf")
else:
if not os.path.isfile(file_path):
raise FileNotFoundError(f"PDF file not found: {file_path}")
doc = pymupdf.open(file_path)
for page_num, page in enumerate(pdf_reader.pages, 1):
page_text = page.extract_text()
if page_text.strip():
text_content.append(f"Page {page_num}:\n{page_text}")
metadata["num_pages"] = len(doc)
for page_num, page in enumerate(doc, 1):
page_text = page.get_text()
if page_text.strip():
text_content.append(f"Page {page_num}:\n{page_text}")
doc.close()
except FileNotFoundError:
raise
except Exception as e:
raise ValueError(f"Error reading PDF file {file_path}: {e!s}") from e
raise ValueError(f"Error reading PDF from {file_path}: {e!s}") from e
if not text_content:
content = f"[PDF file with no extractable text: {Path(file_path).name}]"
content = f"[PDF file with no extractable text: {source_name}]"
else:
content = "\n\n".join(text_content)
return LoaderResult(
content=content,
source=str(file_path),
source=file_path,
metadata=metadata,
doc_id=self.generate_doc_id(source_ref=str(file_path), content=content),
doc_id=self.generate_doc_id(source_ref=file_path, content=content),
)

View File

@@ -14,9 +14,14 @@ from pydantic import (
field_validator,
model_validator,
)
from typing_extensions import Self
from typing_extensions import Self, Unpack
from crewai_tools.tools.rag.types import RagToolConfig, VectorDbConfig
from crewai_tools.tools.rag.types import (
AddDocumentParams,
ContentItem,
RagToolConfig,
VectorDbConfig,
)
def _validate_embedding_config(
@@ -72,6 +77,8 @@ def _validate_embedding_config(
class Adapter(BaseModel, ABC):
"""Abstract base class for RAG adapters."""
model_config = ConfigDict(arbitrary_types_allowed=True)
@abstractmethod
@@ -86,8 +93,8 @@ class Adapter(BaseModel, ABC):
@abstractmethod
def add(
self,
*args: Any,
**kwargs: Any,
*args: ContentItem,
**kwargs: Unpack[AddDocumentParams],
) -> None:
"""Add content to the knowledge base."""
@@ -102,7 +109,11 @@ class RagTool(BaseTool):
) -> str:
raise NotImplementedError
def add(self, *args: Any, **kwargs: Any) -> None:
def add(
self,
*args: ContentItem,
**kwargs: Unpack[AddDocumentParams],
) -> None:
raise NotImplementedError
name: str = "Knowledge base"
@@ -207,9 +218,34 @@ class RagTool(BaseTool):
def add(
self,
*args: Any,
**kwargs: Any,
*args: ContentItem,
**kwargs: Unpack[AddDocumentParams],
) -> None:
"""Add content to the knowledge base.
Args:
*args: Content items to add (strings, paths, or document dicts)
data_type: DataType enum or string (e.g., "file", "pdf_file", "text")
path: Path to file or directory, alias to positional arg
file_path: Alias for path
metadata: Additional metadata to attach to documents
url: URL to fetch content from
website: Website URL to scrape
github_url: GitHub repository URL
youtube_url: YouTube video URL
directory_path: Path to directory
Examples:
rag_tool.add("path/to/document.pdf", data_type=DataType.PDF_FILE)
# Keyword argument (documented API)
rag_tool.add(path="path/to/document.pdf", data_type="file")
rag_tool.add(file_path="path/to/document.pdf", data_type="pdf_file")
# Auto-detect type from extension
rag_tool.add("path/to/document.pdf") # auto-detects PDF
"""
self.adapter.add(*args, **kwargs)
def _run(

View File

@@ -1,10 +1,50 @@
"""Type definitions for RAG tool configuration."""
from typing import Any, Literal
from pathlib import Path
from typing import Any, Literal, TypeAlias
from crewai.rag.embeddings.types import ProviderSpec
from typing_extensions import TypedDict
from crewai_tools.rag.data_types import DataType
DataTypeStr: TypeAlias = Literal[
"file",
"pdf_file",
"text_file",
"csv",
"json",
"xml",
"docx",
"mdx",
"mysql",
"postgres",
"github",
"directory",
"website",
"docs_site",
"youtube_video",
"youtube_channel",
"text",
]
ContentItem: TypeAlias = str | Path | dict[str, Any]
class AddDocumentParams(TypedDict, total=False):
"""Parameters for adding documents to the RAG system."""
data_type: DataType | DataTypeStr
metadata: dict[str, Any]
path: str | Path
file_path: str | Path
website: str
url: str
github_url: str
youtube_url: str
directory_path: str | Path
class VectorDbConfig(TypedDict):
"""Configuration for vector database provider.