fix: add batch_size support to prevent embedder token limit errors

- add batch_size field to baseragconfig (default=100)  
- update chromadb/qdrant clients and factories to use batch_size  
- extract and filter batch_size from embedder config in knowledgestorage  
- fix large csv files exceeding embedder token limits (#3574)  
- remove unneeded conditional for type  

Co-authored-by: Vini Brasil <vini@hey.com>
This commit is contained in:
Greyson LaLonde
2025-09-24 00:05:43 -04:00
committed by GitHub
parent 4ac65eb0a6
commit 1dbe8aab52
12 changed files with 558 additions and 56 deletions

View File

@@ -8,7 +8,7 @@ from crewai.rag.chromadb.config import ChromaDBConfig
from crewai.rag.chromadb.types import ChromaEmbeddingFunctionWrapper
from crewai.rag.config.utils import get_rag_client
from crewai.rag.core.base_client import BaseClient
from crewai.rag.embeddings.factory import get_embedding_function
from crewai.rag.embeddings.factory import EmbedderConfig, get_embedding_function
from crewai.rag.factory import create_client
from crewai.rag.types import BaseRecord, SearchResult
from crewai.utilities.logger import Logger
@@ -27,6 +27,7 @@ class KnowledgeStorage(BaseKnowledgeStorage):
) -> None:
self.collection_name = collection_name
self._client: BaseClient | None = None
self._embedder_config = embedder # Store embedder config
warnings.filterwarnings(
"ignore",
@@ -35,12 +36,29 @@ class KnowledgeStorage(BaseKnowledgeStorage):
)
if embedder:
embedding_function = get_embedding_function(embedder)
config = ChromaDBConfig(
embedding_function=cast(
ChromaEmbeddingFunctionWrapper, embedding_function
# Cast to EmbedderConfig for type checking
embedder_typed = cast(EmbedderConfig, embedder)
embedding_function = get_embedding_function(embedder_typed)
batch_size = None
if isinstance(embedder, dict) and "config" in embedder:
nested_config = embedder["config"]
if isinstance(nested_config, dict):
batch_size = nested_config.get("batch_size")
# Create config with batch_size if provided
if batch_size is not None:
config = ChromaDBConfig(
embedding_function=cast(
ChromaEmbeddingFunctionWrapper, embedding_function
),
batch_size=batch_size,
)
else:
config = ChromaDBConfig(
embedding_function=cast(
ChromaEmbeddingFunctionWrapper, embedding_function
)
)
)
self._client = create_client(config)
def _get_client(self) -> BaseClient:
@@ -105,9 +123,23 @@ class KnowledgeStorage(BaseKnowledgeStorage):
rag_documents: list[BaseRecord] = [{"content": doc} for doc in documents]
client.add_documents(
collection_name=collection_name, documents=rag_documents
)
batch_size = None
if self._embedder_config and isinstance(self._embedder_config, dict):
if "config" in self._embedder_config:
nested_config = self._embedder_config["config"]
if isinstance(nested_config, dict):
batch_size = nested_config.get("batch_size")
if batch_size is not None:
client.add_documents(
collection_name=collection_name,
documents=rag_documents,
batch_size=batch_size,
)
else:
client.add_documents(
collection_name=collection_name, documents=rag_documents
)
except Exception as e:
if "dimension mismatch" in str(e).lower():
Logger(verbose=True).log(

View File

@@ -66,11 +66,28 @@ class RAGStorage(BaseRAGStorage):
f"Error: {e}"
) from e
config = ChromaDBConfig(
embedding_function=cast(
ChromaEmbeddingFunctionWrapper, embedding_function
batch_size = None
if (
isinstance(self.embedder_config, dict)
and "config" in self.embedder_config
):
nested_config = self.embedder_config["config"]
if isinstance(nested_config, dict):
batch_size = nested_config.get("batch_size")
if batch_size is not None:
config = ChromaDBConfig(
embedding_function=cast(
ChromaEmbeddingFunctionWrapper, embedding_function
),
batch_size=batch_size,
)
else:
config = ChromaDBConfig(
embedding_function=cast(
ChromaEmbeddingFunctionWrapper, embedding_function
)
)
)
self._client = create_client(config)
def _get_client(self) -> BaseClient:
@@ -111,7 +128,26 @@ class RAGStorage(BaseRAGStorage):
if metadata:
document["metadata"] = metadata
client.add_documents(collection_name=collection_name, documents=[document])
batch_size = None
if (
self.embedder_config
and isinstance(self.embedder_config, dict)
and "config" in self.embedder_config
):
nested_config = self.embedder_config["config"]
if isinstance(nested_config, dict):
batch_size = nested_config.get("batch_size")
if batch_size is not None:
client.add_documents(
collection_name=collection_name,
documents=[document],
batch_size=batch_size,
)
else:
client.add_documents(
collection_name=collection_name, documents=[document]
)
except Exception as e:
logging.error(
f"Error during {self.type} save: {e!s}\n{traceback.format_exc()}"

View File

@@ -17,6 +17,7 @@ from crewai.rag.chromadb.types import (
ChromaDBCollectionSearchParams,
)
from crewai.rag.chromadb.utils import (
_create_batch_slice,
_extract_search_params,
_is_async_client,
_is_sync_client,
@@ -52,6 +53,7 @@ class ChromaDBClient(BaseClient):
embedding_function: ChromaEmbeddingFunction,
default_limit: int = 5,
default_score_threshold: float = 0.6,
default_batch_size: int = 100,
) -> None:
"""Initialize ChromaDBClient with client and embedding function.
@@ -60,11 +62,13 @@ class ChromaDBClient(BaseClient):
embedding_function: Embedding function for text to vector conversion.
default_limit: Default number of results to return in searches.
default_score_threshold: Default minimum score for search results.
default_batch_size: Default batch size for adding documents.
"""
self.client = client
self.embedding_function = embedding_function
self.default_limit = default_limit
self.default_score_threshold = default_score_threshold
self.default_batch_size = default_batch_size
def create_collection(
self, **kwargs: Unpack[ChromaDBCollectionCreateParams]
@@ -291,6 +295,7 @@ class ChromaDBClient(BaseClient):
- content: The text content (required)
- doc_id: Optional unique identifier (auto-generated if missing)
- metadata: Optional metadata dictionary
batch_size: Optional batch size for processing documents (default: 100)
Raises:
TypeError: If AsyncClientAPI is used instead of ClientAPI for sync operations.
@@ -305,6 +310,7 @@ class ChromaDBClient(BaseClient):
collection_name = kwargs["collection_name"]
documents = kwargs["documents"]
batch_size = kwargs.get("batch_size", self.default_batch_size)
if not documents:
raise ValueError("Documents list cannot be empty")
@@ -315,13 +321,17 @@ class ChromaDBClient(BaseClient):
)
prepared = _prepare_documents_for_chromadb(documents)
# ChromaDB doesn't accept empty metadata dicts, so pass None if all are empty
metadatas = prepared.metadatas if any(m for m in prepared.metadatas) else None
collection.upsert(
ids=prepared.ids,
documents=prepared.texts,
metadatas=metadatas,
)
for i in range(0, len(prepared.ids), batch_size):
batch_ids, batch_texts, batch_metadatas = _create_batch_slice(
prepared=prepared, start_index=i, batch_size=batch_size
)
collection.upsert(
ids=batch_ids,
documents=batch_texts,
metadatas=batch_metadatas,
)
async def aadd_documents(self, **kwargs: Unpack[BaseCollectionAddParams]) -> None:
"""Add documents with their embeddings to a collection asynchronously.
@@ -335,6 +345,7 @@ class ChromaDBClient(BaseClient):
- content: The text content (required)
- doc_id: Optional unique identifier (auto-generated if missing)
- metadata: Optional metadata dictionary
batch_size: Optional batch size for processing documents (default: 100)
Raises:
TypeError: If ClientAPI is used instead of AsyncClientAPI for async operations.
@@ -349,6 +360,7 @@ class ChromaDBClient(BaseClient):
collection_name = kwargs["collection_name"]
documents = kwargs["documents"]
batch_size = kwargs.get("batch_size", self.default_batch_size)
if not documents:
raise ValueError("Documents list cannot be empty")
@@ -358,13 +370,17 @@ class ChromaDBClient(BaseClient):
embedding_function=self.embedding_function,
)
prepared = _prepare_documents_for_chromadb(documents)
# ChromaDB doesn't accept empty metadata dicts, so pass None if all are empty
metadatas = prepared.metadatas if any(m for m in prepared.metadatas) else None
await collection.upsert(
ids=prepared.ids,
documents=prepared.texts,
metadatas=metadatas,
)
for i in range(0, len(prepared.ids), batch_size):
batch_ids, batch_texts, batch_metadatas = _create_batch_slice(
prepared=prepared, start_index=i, batch_size=batch_size
)
await collection.upsert(
ids=batch_ids,
documents=batch_texts,
metadatas=batch_metadatas,
)
def search(
self, **kwargs: Unpack[ChromaDBCollectionSearchParams]

View File

@@ -41,4 +41,5 @@ def create_client(config: ChromaDBConfig) -> ChromaDBClient:
embedding_function=config.embedding_function,
default_limit=config.limit,
default_score_threshold=config.score_threshold,
default_batch_size=config.batch_size,
)

View File

@@ -1,6 +1,7 @@
"""Utility functions for ChromaDB client implementation."""
import hashlib
import json
from collections.abc import Mapping
from typing import Literal, TypeGuard, cast
@@ -72,7 +73,15 @@ def _prepare_documents_for_chromadb(
if "doc_id" in doc:
ids.append(doc["doc_id"])
else:
content_hash = hashlib.sha256(doc["content"].encode()).hexdigest()[:16]
content_for_hash = doc["content"]
metadata = doc.get("metadata")
if metadata:
metadata_str = json.dumps(metadata, sort_keys=True)
content_for_hash = f"{content_for_hash}|{metadata_str}"
content_hash = hashlib.blake2b(
content_for_hash.encode(), digest_size=32
).hexdigest()
ids.append(content_hash)
texts.append(doc["content"])
@@ -88,6 +97,32 @@ def _prepare_documents_for_chromadb(
return PreparedDocuments(ids, texts, metadatas)
def _create_batch_slice(
prepared: PreparedDocuments, start_index: int, batch_size: int
) -> tuple[list[str], list[str], list[Mapping[str, str | int | float | bool]] | None]:
"""Create a batch slice from prepared documents.
Args:
prepared: PreparedDocuments containing ids, texts, and metadatas.
start_index: Starting index for the batch.
batch_size: Size of the batch.
Returns:
Tuple of (batch_ids, batch_texts, batch_metadatas).
"""
batch_end = min(start_index + batch_size, len(prepared.ids))
batch_ids = prepared.ids[start_index:batch_end]
batch_texts = prepared.texts[start_index:batch_end]
batch_metadatas = (
prepared.metadatas[start_index:batch_end] if prepared.metadatas else None
)
if batch_metadatas and not any(m for m in batch_metadatas):
batch_metadatas = None
return batch_ids, batch_texts, batch_metadatas
def _extract_search_params(
kwargs: ChromaDBCollectionSearchParams,
) -> ExtractedSearchParams:

View File

@@ -16,3 +16,4 @@ class BaseRagConfig:
embedding_function: Any | None = field(default=None)
limit: int = field(default=5)
score_threshold: float = field(default=0.6)
batch_size: int = field(default=100)

View File

@@ -29,7 +29,7 @@ class BaseCollectionParams(TypedDict):
]
class BaseCollectionAddParams(BaseCollectionParams):
class BaseCollectionAddParams(BaseCollectionParams, total=False):
"""Parameters for adding documents to a collection.
Extends BaseCollectionParams with document-specific fields.
@@ -37,9 +37,11 @@ class BaseCollectionAddParams(BaseCollectionParams):
Attributes:
collection_name: The name of the collection to add documents to.
documents: List of BaseRecord dictionaries containing document data.
batch_size: Optional batch size for processing documents to avoid token limits.
"""
documents: list[BaseRecord]
documents: Required[list[BaseRecord]]
batch_size: int
class BaseCollectionSearchParams(BaseCollectionParams, total=False):

View File

@@ -244,4 +244,6 @@ def get_embedding_function(
_inject_api_key_from_env(provider, config_dict)
config_dict.pop("batch_size", None)
return EMBEDDING_PROVIDERS[provider](**config_dict)

View File

@@ -48,6 +48,7 @@ class QdrantClient(BaseClient):
embedding_function: EmbeddingFunction | AsyncEmbeddingFunction,
default_limit: int = 5,
default_score_threshold: float = 0.6,
default_batch_size: int = 100,
) -> None:
"""Initialize QdrantClient with client and embedding function.
@@ -56,11 +57,13 @@ class QdrantClient(BaseClient):
embedding_function: Embedding function for text to vector conversion.
default_limit: Default number of results to return in searches.
default_score_threshold: Default minimum score for search results.
default_batch_size: Default batch size for adding documents.
"""
self.client = client
self.embedding_function = embedding_function
self.default_limit = default_limit
self.default_score_threshold = default_score_threshold
self.default_batch_size = default_batch_size
def create_collection(self, **kwargs: Unpack[QdrantCollectionCreateParams]) -> None:
"""Create a new collection in Qdrant.
@@ -234,6 +237,7 @@ class QdrantClient(BaseClient):
Keyword Args:
collection_name: The name of the collection to add documents to.
documents: List of BaseRecord dicts containing document data.
batch_size: Optional batch size for processing documents (default: 100)
Raises:
ValueError: If collection doesn't exist or documents list is empty.
@@ -249,6 +253,7 @@ class QdrantClient(BaseClient):
collection_name = kwargs["collection_name"]
documents = kwargs["documents"]
batch_size = kwargs.get("batch_size", self.default_batch_size)
if not documents:
raise ValueError("Documents list cannot be empty")
@@ -256,19 +261,20 @@ class QdrantClient(BaseClient):
if not self.client.collection_exists(collection_name):
raise ValueError(f"Collection '{collection_name}' does not exist")
points = []
for doc in documents:
if _is_async_embedding_function(self.embedding_function):
raise TypeError(
"Async embedding function cannot be used with sync add_documents. "
"Use aadd_documents instead."
)
sync_fn = cast(EmbeddingFunction, self.embedding_function)
embedding = sync_fn(doc["content"])
point = _create_point_from_document(doc, embedding)
points.append(point)
self.client.upsert(collection_name=collection_name, points=points)
for i in range(0, len(documents), batch_size):
batch_docs = documents[i : min(i + batch_size, len(documents))]
points = []
for doc in batch_docs:
if _is_async_embedding_function(self.embedding_function):
raise TypeError(
"Async embedding function cannot be used with sync add_documents. "
"Use aadd_documents instead."
)
sync_fn = cast(EmbeddingFunction, self.embedding_function)
embedding = sync_fn(doc["content"])
point = _create_point_from_document(doc, embedding)
points.append(point)
self.client.upsert(collection_name=collection_name, points=points)
async def aadd_documents(self, **kwargs: Unpack[BaseCollectionAddParams]) -> None:
"""Add documents with their embeddings to a collection asynchronously.
@@ -276,6 +282,7 @@ class QdrantClient(BaseClient):
Keyword Args:
collection_name: The name of the collection to add documents to.
documents: List of BaseRecord dicts containing document data.
batch_size: Optional batch size for processing documents (default: 100)
Raises:
ValueError: If collection doesn't exist or documents list is empty.
@@ -291,6 +298,7 @@ class QdrantClient(BaseClient):
collection_name = kwargs["collection_name"]
documents = kwargs["documents"]
batch_size = kwargs.get("batch_size", self.default_batch_size)
if not documents:
raise ValueError("Documents list cannot be empty")
@@ -298,18 +306,19 @@ class QdrantClient(BaseClient):
if not await self.client.collection_exists(collection_name):
raise ValueError(f"Collection '{collection_name}' does not exist")
points = []
for doc in documents:
if _is_async_embedding_function(self.embedding_function):
async_fn = cast(AsyncEmbeddingFunction, self.embedding_function)
embedding = await async_fn(doc["content"])
else:
sync_fn = cast(EmbeddingFunction, self.embedding_function)
embedding = sync_fn(doc["content"])
point = _create_point_from_document(doc, embedding)
points.append(point)
await self.client.upsert(collection_name=collection_name, points=points)
for i in range(0, len(documents), batch_size):
batch_docs = documents[i : min(i + batch_size, len(documents))]
points = []
for doc in batch_docs:
if _is_async_embedding_function(self.embedding_function):
async_fn = cast(AsyncEmbeddingFunction, self.embedding_function)
embedding = await async_fn(doc["content"])
else:
sync_fn = cast(EmbeddingFunction, self.embedding_function)
embedding = sync_fn(doc["content"])
point = _create_point_from_document(doc, embedding)
points.append(point)
await self.client.upsert(collection_name=collection_name, points=points)
def search(
self, **kwargs: Unpack[BaseCollectionSearchParams]

View File

@@ -22,4 +22,5 @@ def create_client(config: QdrantConfig) -> QdrantClient:
embedding_function=config.embedding_function,
default_limit=config.limit,
default_score_threshold=config.score_threshold,
default_batch_size=config.batch_size,
)