Compare commits

...

8 Commits

Author SHA1 Message Date
Devin AI
6b354201cb Fix lint issues: suppress unused chromadb import warnings and update remaining external memory test
Co-Authored-By: João <joao@crewai.com>
2025-05-30 09:45:39 +00:00
Devin AI
5566f4716b Update external memory tests to handle optional ChromaDB dependency
Co-Authored-By: João <joao@crewai.com>
2025-05-30 09:38:50 +00:00
Devin AI
ea5f6b592c Update memory tests to handle optional ChromaDB dependency
Co-Authored-By: João <joao@crewai.com>
2025-05-30 09:37:44 +00:00
Devin AI
a0057afe45 Fix Collection type annotation and test mocking issues
Co-Authored-By: João <joao@crewai.com>
2025-05-30 09:27:06 +00:00
Devin AI
ebcb6c6f90 Fix CI failures: restore missing error classes and resolve type issues
Co-Authored-By: João <joao@crewai.com>
2025-05-30 09:19:30 +00:00
Devin AI
c7e83a7529 Update optional dependencies tests
Co-Authored-By: João <joao@crewai.com>
2025-05-30 09:10:46 +00:00
Devin AI
f0b1cc23f4 Add ChromaDBRequiredError class and improve error handling
Co-Authored-By: João <joao@crewai.com>
2025-05-30 09:10:35 +00:00
Devin AI
7b129fc847 Fix #2919: Make chromadb an optional dependency to resolve package conflicts
Co-Authored-By: João <joao@crewai.com>
2025-05-30 08:56:35 +00:00
10 changed files with 570 additions and 171 deletions

View File

@@ -21,7 +21,6 @@ dependencies = [
"opentelemetry-sdk>=1.30.0", "opentelemetry-sdk>=1.30.0",
"opentelemetry-exporter-otlp-proto-http>=1.30.0", "opentelemetry-exporter-otlp-proto-http>=1.30.0",
# Data Handling # Data Handling
"chromadb>=0.5.23",
"openpyxl>=3.1.5", "openpyxl>=3.1.5",
"pyvis>=0.3.2", "pyvis>=0.3.2",
# Authentication and Security # Authentication and Security
@@ -49,6 +48,9 @@ tools = ["crewai-tools~=0.45.0"]
embeddings = [ embeddings = [
"tiktoken~=0.7.0" "tiktoken~=0.7.0"
] ]
storage = [
"chromadb>=0.5.23"
]
agentops = ["agentops>=0.3.0"] agentops = ["agentops>=0.3.0"]
fastembed = ["fastembed>=0.4.1"] fastembed = ["fastembed>=0.4.1"]
pdfplumber = [ pdfplumber = [

View File

@@ -6,16 +6,25 @@ import os
import shutil import shutil
from typing import Any, Dict, List, Optional, Union from typing import Any, Dict, List, Optional, Union
import chromadb try:
import chromadb.errors import chromadb
from chromadb.api import ClientAPI import chromadb.errors
from chromadb.api.types import OneOrMany from chromadb.api import ClientAPI
from chromadb.config import Settings from chromadb.api.types import OneOrMany
from chromadb.config import Settings
HAS_CHROMADB = True
except ImportError:
chromadb = None # type: ignore
ClientAPI = Any # type: ignore
OneOrMany = Any # type: ignore
Settings = Any # type: ignore
HAS_CHROMADB = False
from crewai.knowledge.storage.base_knowledge_storage import BaseKnowledgeStorage from crewai.knowledge.storage.base_knowledge_storage import BaseKnowledgeStorage
from crewai.utilities import EmbeddingConfigurator from crewai.utilities import EmbeddingConfigurator
from crewai.utilities.chromadb import sanitize_collection_name from crewai.utilities.chromadb import sanitize_collection_name
from crewai.utilities.constants import KNOWLEDGE_DIRECTORY from crewai.utilities.constants import KNOWLEDGE_DIRECTORY
from crewai.utilities.errors import ChromaDBRequiredError
from crewai.utilities.logger import Logger from crewai.utilities.logger import Logger
from crewai.utilities.paths import db_storage_path from crewai.utilities.paths import db_storage_path
@@ -43,7 +52,7 @@ class KnowledgeStorage(BaseKnowledgeStorage):
search efficiency. search efficiency.
""" """
collection: Optional[chromadb.Collection] = None collection: Optional[Any] = None # type: ignore
collection_name: Optional[str] = "knowledge" collection_name: Optional[str] = "knowledge"
app: Optional[ClientAPI] = None app: Optional[ClientAPI] = None
@@ -62,6 +71,9 @@ class KnowledgeStorage(BaseKnowledgeStorage):
filter: Optional[dict] = None, filter: Optional[dict] = None,
score_threshold: float = 0.35, score_threshold: float = 0.35,
) -> List[Dict[str, Any]]: ) -> List[Dict[str, Any]]:
if not HAS_CHROMADB:
raise ChromaDBRequiredError("knowledge storage")
with suppress_logging(): with suppress_logging():
if self.collection: if self.collection:
fetched = self.collection.query( fetched = self.collection.query(
@@ -84,48 +96,63 @@ class KnowledgeStorage(BaseKnowledgeStorage):
raise Exception("Collection not initialized") raise Exception("Collection not initialized")
def initialize_knowledge_storage(self): def initialize_knowledge_storage(self):
if not HAS_CHROMADB:
raise ChromaDBRequiredError("knowledge storage")
base_path = os.path.join(db_storage_path(), "knowledge") base_path = os.path.join(db_storage_path(), "knowledge")
chroma_client = chromadb.PersistentClient(
path=base_path,
settings=Settings(allow_reset=True),
)
self.app = chroma_client
try: try:
collection_name = ( chroma_client = chromadb.PersistentClient(
f"knowledge_{self.collection_name}"
if self.collection_name
else "knowledge"
)
if self.app:
self.collection = self.app.get_or_create_collection(
name=sanitize_collection_name(collection_name),
embedding_function=self.embedder,
)
else:
raise Exception("Vector Database Client not initialized")
except Exception:
raise Exception("Failed to create or get collection")
def reset(self):
base_path = os.path.join(db_storage_path(), KNOWLEDGE_DIRECTORY)
if not self.app:
self.app = chromadb.PersistentClient(
path=base_path, path=base_path,
settings=Settings(allow_reset=True), settings=Settings(allow_reset=True),
) )
self.app.reset() self.app = chroma_client
shutil.rmtree(base_path)
self.app = None try:
self.collection = None collection_name = (
f"knowledge_{self.collection_name}"
if self.collection_name
else "knowledge"
)
if self.app:
self.collection = self.app.get_or_create_collection(
name=sanitize_collection_name(collection_name),
embedding_function=self.embedder,
)
else:
raise Exception("Vector Database Client not initialized")
except Exception:
raise Exception("Failed to create or get collection")
except ImportError:
raise ChromaDBRequiredError("knowledge storage")
def reset(self):
if not HAS_CHROMADB:
raise ChromaDBRequiredError("knowledge storage")
base_path = os.path.join(db_storage_path(), KNOWLEDGE_DIRECTORY)
try:
if not self.app:
self.app = chromadb.PersistentClient(
path=base_path,
settings=Settings(allow_reset=True),
)
self.app.reset()
shutil.rmtree(base_path)
self.app = None
self.collection = None
except ImportError:
raise ChromaDBRequiredError("knowledge storage")
def save( def save(
self, self,
documents: List[str], documents: List[str],
metadata: Optional[Union[Dict[str, Any], List[Dict[str, Any]]]] = None, metadata: Optional[Union[Dict[str, Any], List[Dict[str, Any]]]] = None,
): ):
if not HAS_CHROMADB:
raise ChromaDBRequiredError("knowledge storage")
if not self.collection: if not self.collection:
raise Exception("Collection not initialized") raise Exception("Collection not initialized")
@@ -156,7 +183,7 @@ class KnowledgeStorage(BaseKnowledgeStorage):
filtered_ids.append(doc_id) filtered_ids.append(doc_id)
# If we have no metadata at all, set it to None # If we have no metadata at all, set it to None
final_metadata: Optional[OneOrMany[chromadb.Metadata]] = ( final_metadata: Optional[OneOrMany[Any]] = (
None if all(m is None for m in filtered_metadata) else filtered_metadata None if all(m is None for m in filtered_metadata) else filtered_metadata
) )
@@ -165,29 +192,38 @@ class KnowledgeStorage(BaseKnowledgeStorage):
metadatas=final_metadata, metadatas=final_metadata,
ids=filtered_ids, ids=filtered_ids,
) )
except chromadb.errors.InvalidDimensionException as e: except ImportError:
Logger(verbose=True).log( raise ChromaDBRequiredError("knowledge storage")
"error",
"Embedding dimension mismatch. This usually happens when mixing different embedding models. Try resetting the collection using `crewai reset-memories -a`",
"red",
)
raise ValueError(
"Embedding dimension mismatch. Make sure you're using the same embedding model "
"across all operations with this collection."
"Try resetting the collection using `crewai reset-memories -a`"
) from e
except Exception as e: except Exception as e:
Logger(verbose=True).log("error", f"Failed to upsert documents: {e}", "red") if HAS_CHROMADB and isinstance(e, chromadb.errors.InvalidDimensionException):
raise Logger(verbose=True).log(
"error",
"Embedding dimension mismatch. This usually happens when mixing different embedding models. Try resetting the collection using `crewai reset-memories -a`",
"red",
)
raise ValueError(
"Embedding dimension mismatch. Make sure you're using the same embedding model "
"across all operations with this collection."
"Try resetting the collection using `crewai reset-memories -a`"
) from e
else:
Logger(verbose=True).log("error", f"Failed to upsert documents: {e}", "red")
raise
def _create_default_embedding_function(self): def _create_default_embedding_function(self):
from chromadb.utils.embedding_functions.openai_embedding_function import ( if not HAS_CHROMADB:
OpenAIEmbeddingFunction, raise ChromaDBRequiredError("knowledge storage")
)
return OpenAIEmbeddingFunction( try:
api_key=os.getenv("OPENAI_API_KEY"), model_name="text-embedding-3-small" from chromadb.utils.embedding_functions.openai_embedding_function import (
) OpenAIEmbeddingFunction,
)
return OpenAIEmbeddingFunction(
api_key=os.getenv("OPENAI_API_KEY"), model_name="text-embedding-3-small"
)
except ImportError:
raise ChromaDBRequiredError("knowledge storage")
def _set_embedder_config(self, embedder: Optional[Dict[str, Any]] = None) -> None: def _set_embedder_config(self, embedder: Optional[Dict[str, Any]] = None) -> None:
"""Set the embedding configuration for the knowledge storage. """Set the embedding configuration for the knowledge storage.

View File

@@ -6,11 +6,17 @@ import shutil
import uuid import uuid
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
from chromadb.api import ClientAPI try:
from chromadb.api import ClientAPI
HAS_CHROMADB = True
except ImportError:
ClientAPI = Any # type: ignore
HAS_CHROMADB = False
from crewai.memory.storage.base_rag_storage import BaseRAGStorage from crewai.memory.storage.base_rag_storage import BaseRAGStorage
from crewai.utilities import EmbeddingConfigurator from crewai.utilities import EmbeddingConfigurator
from crewai.utilities.constants import MAX_FILE_NAME_LENGTH from crewai.utilities.constants import MAX_FILE_NAME_LENGTH
from crewai.utilities.errors import ChromaDBRequiredError
from crewai.utilities.paths import db_storage_path from crewai.utilities.paths import db_storage_path
@@ -60,26 +66,32 @@ class RAGStorage(BaseRAGStorage):
self.embedder_config = configurator.configure_embedder(self.embedder_config) self.embedder_config = configurator.configure_embedder(self.embedder_config)
def _initialize_app(self): def _initialize_app(self):
import chromadb if not HAS_CHROMADB:
from chromadb.config import Settings raise ChromaDBRequiredError("memory storage")
self._set_embedder_config()
chroma_client = chromadb.PersistentClient(
path=self.path if self.path else self.storage_file_name,
settings=Settings(allow_reset=self.allow_reset),
)
self.app = chroma_client
try: try:
self.collection = self.app.get_collection( import chromadb
name=self.type, embedding_function=self.embedder_config from chromadb.config import Settings
)
except Exception: self._set_embedder_config()
self.collection = self.app.create_collection( chroma_client = chromadb.PersistentClient(
name=self.type, embedding_function=self.embedder_config path=self.path if self.path else self.storage_file_name,
settings=Settings(allow_reset=self.allow_reset),
) )
self.app = chroma_client
try:
self.collection = self.app.get_collection(
name=self.type, embedding_function=self.embedder_config
)
except Exception:
self.collection = self.app.create_collection(
name=self.type, embedding_function=self.embedder_config
)
except ImportError:
raise ChromaDBRequiredError("memory storage")
def _sanitize_role(self, role: str) -> str: def _sanitize_role(self, role: str) -> str:
""" """
Sanitizes agent roles to ensure valid directory names. Sanitizes agent roles to ensure valid directory names.
@@ -165,10 +177,16 @@ class RAGStorage(BaseRAGStorage):
) )
def _create_default_embedding_function(self): def _create_default_embedding_function(self):
from chromadb.utils.embedding_functions.openai_embedding_function import ( if not HAS_CHROMADB:
OpenAIEmbeddingFunction, raise ChromaDBRequiredError("memory storage")
)
return OpenAIEmbeddingFunction( try:
api_key=os.getenv("OPENAI_API_KEY"), model_name="text-embedding-3-small" from chromadb.utils.embedding_functions.openai_embedding_function import (
) OpenAIEmbeddingFunction,
)
return OpenAIEmbeddingFunction(
api_key=os.getenv("OPENAI_API_KEY"), model_name="text-embedding-3-small"
)
except ImportError:
raise ChromaDBRequiredError("memory storage")

View File

@@ -1,8 +1,29 @@
import os import os
from typing import Any, Dict, Optional, cast from typing import Any, Dict, Optional, cast, Protocol, Sequence, TYPE_CHECKING, TypeVar, List, Union
from chromadb import Documents, EmbeddingFunction, Embeddings from crewai.utilities.errors import ChromaDBRequiredError
from chromadb.api.types import validate_embedding_function
if TYPE_CHECKING:
from numpy import ndarray
from numpy import dtype, floating, signedinteger, unsignedinteger
try:
from chromadb import Documents, EmbeddingFunction, Embeddings
from chromadb.api.types import validate_embedding_function
HAS_CHROMADB = True
except ImportError:
HAS_CHROMADB = False
Documents = List[str] # type: ignore
Embeddings = List[List[float]] # type: ignore
class EmbeddingFunction(Protocol): # type: ignore
"""Protocol for embedding functions when ChromaDB is not available."""
def __call__(self, input: List[str]) -> List[List[float]]: ...
def validate_embedding_function(func: Any) -> None: # type: ignore
"""Stub for validate_embedding_function when ChromaDB is not available."""
pass
class EmbeddingConfigurator: class EmbeddingConfigurator:
@@ -26,6 +47,9 @@ class EmbeddingConfigurator:
embedder_config: Optional[Dict[str, Any]] = None, embedder_config: Optional[Dict[str, Any]] = None,
) -> EmbeddingFunction: ) -> EmbeddingFunction:
"""Configures and returns an embedding function based on the provided config.""" """Configures and returns an embedding function based on the provided config."""
if not HAS_CHROMADB:
raise ChromaDBRequiredError("embedding functionality")
if embedder_config is None: if embedder_config is None:
return self._create_default_embedding_function() return self._create_default_embedding_function()
@@ -47,129 +71,189 @@ class EmbeddingConfigurator:
@staticmethod @staticmethod
def _create_default_embedding_function(): def _create_default_embedding_function():
from chromadb.utils.embedding_functions.openai_embedding_function import ( if not HAS_CHROMADB:
OpenAIEmbeddingFunction, raise ChromaDBRequiredError("embedding functionality")
)
return OpenAIEmbeddingFunction( try:
api_key=os.getenv("OPENAI_API_KEY"), model_name="text-embedding-3-small" from chromadb.utils.embedding_functions.openai_embedding_function import (
) OpenAIEmbeddingFunction,
)
return OpenAIEmbeddingFunction(
api_key=os.getenv("OPENAI_API_KEY"), model_name="text-embedding-3-small"
)
except ImportError:
raise ChromaDBRequiredError("embedding functionality")
@staticmethod @staticmethod
def _configure_openai(config, model_name): def _configure_openai(config, model_name):
from chromadb.utils.embedding_functions.openai_embedding_function import ( if not HAS_CHROMADB:
OpenAIEmbeddingFunction, raise ChromaDBRequiredError("embedding functionality")
)
return OpenAIEmbeddingFunction( try:
api_key=config.get("api_key") or os.getenv("OPENAI_API_KEY"), from chromadb.utils.embedding_functions.openai_embedding_function import (
model_name=model_name, OpenAIEmbeddingFunction,
api_base=config.get("api_base", None), )
api_type=config.get("api_type", None),
api_version=config.get("api_version", None), return OpenAIEmbeddingFunction(
default_headers=config.get("default_headers", None), api_key=config.get("api_key") or os.getenv("OPENAI_API_KEY"),
dimensions=config.get("dimensions", None), model_name=model_name,
deployment_id=config.get("deployment_id", None), api_base=config.get("api_base", None),
organization_id=config.get("organization_id", None), api_type=config.get("api_type", None),
) api_version=config.get("api_version", None),
default_headers=config.get("default_headers", None),
dimensions=config.get("dimensions", None),
deployment_id=config.get("deployment_id", None),
organization_id=config.get("organization_id", None),
)
except ImportError:
raise ChromaDBRequiredError("embedding functionality")
@staticmethod @staticmethod
def _configure_azure(config, model_name): def _configure_azure(config, model_name):
from chromadb.utils.embedding_functions.openai_embedding_function import ( if not HAS_CHROMADB:
OpenAIEmbeddingFunction, raise ChromaDBRequiredError("embedding functionality")
)
return OpenAIEmbeddingFunction( try:
api_key=config.get("api_key"), from chromadb.utils.embedding_functions.openai_embedding_function import (
api_base=config.get("api_base"), OpenAIEmbeddingFunction,
api_type=config.get("api_type", "azure"), )
api_version=config.get("api_version"),
model_name=model_name, return OpenAIEmbeddingFunction(
default_headers=config.get("default_headers"), api_key=config.get("api_key"),
dimensions=config.get("dimensions"), api_base=config.get("api_base"),
deployment_id=config.get("deployment_id"), api_type=config.get("api_type", "azure"),
organization_id=config.get("organization_id"), api_version=config.get("api_version"),
) model_name=model_name,
default_headers=config.get("default_headers"),
dimensions=config.get("dimensions"),
deployment_id=config.get("deployment_id"),
organization_id=config.get("organization_id"),
)
except ImportError:
raise ChromaDBRequiredError("embedding functionality")
@staticmethod @staticmethod
def _configure_ollama(config, model_name): def _configure_ollama(config, model_name):
from chromadb.utils.embedding_functions.ollama_embedding_function import ( if not HAS_CHROMADB:
OllamaEmbeddingFunction, raise ChromaDBRequiredError("embedding functionality")
)
return OllamaEmbeddingFunction( try:
url=config.get("url", "http://localhost:11434/api/embeddings"), from chromadb.utils.embedding_functions.ollama_embedding_function import (
model_name=model_name, OllamaEmbeddingFunction,
) )
return OllamaEmbeddingFunction(
url=config.get("url", "http://localhost:11434/api/embeddings"),
model_name=model_name,
)
except ImportError:
raise ChromaDBRequiredError("embedding functionality")
@staticmethod @staticmethod
def _configure_vertexai(config, model_name): def _configure_vertexai(config, model_name):
from chromadb.utils.embedding_functions.google_embedding_function import ( if not HAS_CHROMADB:
GoogleVertexEmbeddingFunction, raise ChromaDBRequiredError("embedding functionality")
)
return GoogleVertexEmbeddingFunction( try:
model_name=model_name, from chromadb.utils.embedding_functions.google_embedding_function import (
api_key=config.get("api_key"), GoogleVertexEmbeddingFunction,
project_id=config.get("project_id"), )
region=config.get("region"),
) return GoogleVertexEmbeddingFunction(
model_name=model_name,
api_key=config.get("api_key"),
project_id=config.get("project_id"),
region=config.get("region"),
)
except ImportError:
raise ChromaDBRequiredError("embedding functionality")
@staticmethod @staticmethod
def _configure_google(config, model_name): def _configure_google(config, model_name):
from chromadb.utils.embedding_functions.google_embedding_function import ( if not HAS_CHROMADB:
GoogleGenerativeAiEmbeddingFunction, raise ChromaDBRequiredError("embedding functionality")
)
return GoogleGenerativeAiEmbeddingFunction( try:
model_name=model_name, from chromadb.utils.embedding_functions.google_embedding_function import (
api_key=config.get("api_key"), GoogleGenerativeAiEmbeddingFunction,
task_type=config.get("task_type"), )
)
return GoogleGenerativeAiEmbeddingFunction(
model_name=model_name,
api_key=config.get("api_key"),
task_type=config.get("task_type"),
)
except ImportError:
raise ChromaDBRequiredError("embedding functionality")
@staticmethod @staticmethod
def _configure_cohere(config, model_name): def _configure_cohere(config, model_name):
from chromadb.utils.embedding_functions.cohere_embedding_function import ( if not HAS_CHROMADB:
CohereEmbeddingFunction, raise ChromaDBRequiredError("embedding functionality")
)
return CohereEmbeddingFunction( try:
model_name=model_name, from chromadb.utils.embedding_functions.cohere_embedding_function import (
api_key=config.get("api_key"), CohereEmbeddingFunction,
) )
return CohereEmbeddingFunction(
model_name=model_name,
api_key=config.get("api_key"),
)
except ImportError:
raise ChromaDBRequiredError("embedding functionality")
@staticmethod @staticmethod
def _configure_voyageai(config, model_name): def _configure_voyageai(config, model_name):
from chromadb.utils.embedding_functions.voyageai_embedding_function import ( if not HAS_CHROMADB:
VoyageAIEmbeddingFunction, raise ChromaDBRequiredError("embedding functionality")
)
return VoyageAIEmbeddingFunction( try:
model_name=model_name, from chromadb.utils.embedding_functions.voyageai_embedding_function import (
api_key=config.get("api_key"), VoyageAIEmbeddingFunction,
) )
return VoyageAIEmbeddingFunction(
model_name=model_name,
api_key=config.get("api_key"),
)
except ImportError:
raise ChromaDBRequiredError("embedding functionality")
@staticmethod @staticmethod
def _configure_bedrock(config, model_name): def _configure_bedrock(config, model_name):
from chromadb.utils.embedding_functions.amazon_bedrock_embedding_function import ( if not HAS_CHROMADB:
AmazonBedrockEmbeddingFunction, raise ChromaDBRequiredError("embedding functionality")
)
# Allow custom model_name override with backwards compatibility try:
kwargs = {"session": config.get("session")} from chromadb.utils.embedding_functions.amazon_bedrock_embedding_function import (
if model_name is not None: AmazonBedrockEmbeddingFunction,
kwargs["model_name"] = model_name )
return AmazonBedrockEmbeddingFunction(**kwargs)
# Allow custom model_name override with backwards compatibility
kwargs = {"session": config.get("session")}
if model_name is not None:
kwargs["model_name"] = model_name
return AmazonBedrockEmbeddingFunction(**kwargs)
except ImportError:
raise ChromaDBRequiredError("embedding functionality")
@staticmethod @staticmethod
def _configure_huggingface(config, model_name): def _configure_huggingface(config, model_name):
from chromadb.utils.embedding_functions.huggingface_embedding_function import ( if not HAS_CHROMADB:
HuggingFaceEmbeddingServer, raise ChromaDBRequiredError("embedding functionality")
)
return HuggingFaceEmbeddingServer( try:
url=config.get("api_url"), from chromadb.utils.embedding_functions.huggingface_embedding_function import (
) HuggingFaceEmbeddingServer,
)
return HuggingFaceEmbeddingServer(
url=config.get("api_url"),
)
except ImportError:
raise ChromaDBRequiredError("embedding functionality")
@staticmethod @staticmethod
def _configure_watson(config, model_name): def _configure_watson(config, model_name):
@@ -182,7 +266,7 @@ class EmbeddingConfigurator:
"IBM Watson dependencies are not installed. Please install them to use Watson embedding." "IBM Watson dependencies are not installed. Please install them to use Watson embedding."
) from e ) from e
class WatsonEmbeddingFunction(EmbeddingFunction): class WatsonEmbeddingFunction:
def __call__(self, input: Documents) -> Embeddings: def __call__(self, input: Documents) -> Embeddings:
if isinstance(input, str): if isinstance(input, str):
input = [input] input = [input]
@@ -212,6 +296,9 @@ class EmbeddingConfigurator:
@staticmethod @staticmethod
def _configure_custom(config): def _configure_custom(config):
if not HAS_CHROMADB:
raise ChromaDBRequiredError("embedding functionality")
custom_embedder = config.get("embedder") custom_embedder = config.get("embedder")
if isinstance(custom_embedder, EmbeddingFunction): if isinstance(custom_embedder, EmbeddingFunction):
try: try:

View File

@@ -0,0 +1,62 @@
"""Custom error classes for CrewAI."""
from typing import Optional
class ChromaDBRequiredError(ImportError):
"""Error raised when ChromaDB is required but not installed."""
def __init__(self, feature: str):
"""Initialize the error with a specific feature name.
Args:
feature: The name of the feature that requires ChromaDB.
"""
message = (
f"ChromaDB is required for {feature} features. "
"Please install it with 'pip install crewai[storage]'"
)
super().__init__(message)
class DatabaseOperationError(Exception):
"""Base exception class for database operation errors."""
def __init__(self, message: str, original_error: Optional[Exception] = None):
"""Initialize the database operation error.
Args:
message: The error message to display
original_error: The original exception that caused this error, if any
"""
super().__init__(message)
self.original_error = original_error
class DatabaseError:
"""Standardized error message templates for database operations."""
INIT_ERROR: str = "Database initialization error: {}"
SAVE_ERROR: str = "Error saving task outputs: {}"
UPDATE_ERROR: str = "Error updating task outputs: {}"
LOAD_ERROR: str = "Error loading task outputs: {}"
DELETE_ERROR: str = "Error deleting task outputs: {}"
@classmethod
def format_error(cls, template: str, error: Exception) -> str:
"""Format an error message with the given template and error.
Args:
template: The error message template to use
error: The exception to format into the template
Returns:
The formatted error message
"""
return template.format(str(error))
class AgentRepositoryError(Exception):
"""Exception raised when an agent repository is not found."""
...

View File

@@ -2385,6 +2385,16 @@ def test_multiple_conditional_tasks(researcher, writer):
def test_using_contextual_memory(): def test_using_contextual_memory():
from unittest.mock import patch from unittest.mock import patch
# Check if ChromaDB is available
try:
import chromadb # noqa: F401
HAS_CHROMADB = True
except ImportError:
HAS_CHROMADB = False
if not HAS_CHROMADB:
pytest.skip("ChromaDB is required for this test")
math_researcher = Agent( math_researcher = Agent(
role="Researcher", role="Researcher",
goal="You research about math.", goal="You research about math.",
@@ -2413,6 +2423,16 @@ def test_using_contextual_memory():
def test_using_contextual_memory_with_long_term_memory(): def test_using_contextual_memory_with_long_term_memory():
from unittest.mock import patch from unittest.mock import patch
# Check if ChromaDB is available
try:
import chromadb # noqa: F401
HAS_CHROMADB = True
except ImportError:
HAS_CHROMADB = False
if not HAS_CHROMADB:
pytest.skip("ChromaDB is required for this test")
math_researcher = Agent( math_researcher = Agent(
role="Researcher", role="Researcher",
goal="You research about math.", goal="You research about math.",
@@ -2442,6 +2462,16 @@ def test_using_contextual_memory_with_long_term_memory():
def test_warning_long_term_memory_without_entity_memory(): def test_warning_long_term_memory_without_entity_memory():
from unittest.mock import patch from unittest.mock import patch
# Check if ChromaDB is available
try:
import chromadb # noqa: F401
HAS_CHROMADB = True
except ImportError:
HAS_CHROMADB = False
if not HAS_CHROMADB:
pytest.skip("ChromaDB is required for this test")
math_researcher = Agent( math_researcher = Agent(
role="Researcher", role="Researcher",
goal="You research about math.", goal="You research about math.",
@@ -2479,6 +2509,16 @@ def test_warning_long_term_memory_without_entity_memory():
def test_long_term_memory_with_memory_flag(): def test_long_term_memory_with_memory_flag():
from unittest.mock import patch from unittest.mock import patch
# Check if ChromaDB is available
try:
import chromadb # noqa: F401
HAS_CHROMADB = True
except ImportError:
HAS_CHROMADB = False
if not HAS_CHROMADB:
pytest.skip("ChromaDB is required for this test")
math_researcher = Agent( math_researcher = Agent(
role="Researcher", role="Researcher",
goal="You research about math.", goal="You research about math.",
@@ -2514,6 +2554,16 @@ def test_long_term_memory_with_memory_flag():
def test_using_contextual_memory_with_short_term_memory(): def test_using_contextual_memory_with_short_term_memory():
from unittest.mock import patch from unittest.mock import patch
# Check if ChromaDB is available
try:
import chromadb # noqa: F401
HAS_CHROMADB = True
except ImportError:
HAS_CHROMADB = False
if not HAS_CHROMADB:
pytest.skip("ChromaDB is required for this test")
math_researcher = Agent( math_researcher = Agent(
role="Researcher", role="Researcher",
goal="You research about math.", goal="You research about math.",
@@ -2543,6 +2593,16 @@ def test_using_contextual_memory_with_short_term_memory():
def test_disabled_memory_using_contextual_memory(): def test_disabled_memory_using_contextual_memory():
from unittest.mock import patch from unittest.mock import patch
# Check if ChromaDB is available
try:
import chromadb # noqa: F401
HAS_CHROMADB = True
except ImportError:
HAS_CHROMADB = False
if not HAS_CHROMADB:
pytest.skip("ChromaDB is required for this test")
math_researcher = Agent( math_researcher = Agent(
role="Researcher", role="Researcher",
goal="You research about math.", goal="You research about math.",

View File

@@ -169,6 +169,15 @@ def test_crew_external_memory_reset(mem_type, crew_with_external_memory):
def test_crew_external_memory_save_with_memory_flag( def test_crew_external_memory_save_with_memory_flag(
mem_method, crew_with_external_memory mem_method, crew_with_external_memory
): ):
try:
import chromadb # noqa: F401
HAS_CHROMADB = True
except ImportError:
HAS_CHROMADB = False
if not HAS_CHROMADB:
pytest.skip("ChromaDB is required for this test")
with patch( with patch(
f"crewai.memory.external.external_memory.ExternalMemory.{mem_method}" f"crewai.memory.external.external_memory.ExternalMemory.{mem_method}"
) as mock_method: ) as mock_method:
@@ -181,6 +190,15 @@ def test_crew_external_memory_save_with_memory_flag(
def test_crew_external_memory_save_using_crew_without_memory_flag( def test_crew_external_memory_save_using_crew_without_memory_flag(
mem_method, crew_with_external_memory_without_memory_flag mem_method, crew_with_external_memory_without_memory_flag
): ):
try:
import chromadb # noqa: F401
HAS_CHROMADB = True
except ImportError:
HAS_CHROMADB = False
if not HAS_CHROMADB:
pytest.skip("ChromaDB is required for this test")
with patch( with patch(
f"crewai.memory.external.external_memory.ExternalMemory.{mem_method}" f"crewai.memory.external.external_memory.ExternalMemory.{mem_method}"
) as mock_method: ) as mock_method:

View File

@@ -7,10 +7,28 @@ from crewai.memory.long_term.long_term_memory_item import LongTermMemoryItem
@pytest.fixture @pytest.fixture
def long_term_memory(): def long_term_memory():
"""Fixture to create a LongTermMemory instance""" """Fixture to create a LongTermMemory instance"""
try:
import chromadb # noqa: F401
HAS_CHROMADB = True
except ImportError:
HAS_CHROMADB = False
if not HAS_CHROMADB:
pytest.skip("ChromaDB is required for this test")
return LongTermMemory() return LongTermMemory()
def test_save_and_search(long_term_memory): def test_save_and_search(long_term_memory):
try:
import chromadb # noqa: F401
HAS_CHROMADB = True
except ImportError:
HAS_CHROMADB = False
if not HAS_CHROMADB:
pytest.skip("ChromaDB is required for this test")
memory = LongTermMemoryItem( memory = LongTermMemoryItem(
agent="test_agent", agent="test_agent",
task="test_task", task="test_task",

View File

@@ -12,6 +12,15 @@ from crewai.task import Task
@pytest.fixture @pytest.fixture
def short_term_memory(): def short_term_memory():
"""Fixture to create a ShortTermMemory instance""" """Fixture to create a ShortTermMemory instance"""
try:
import chromadb # noqa: F401
HAS_CHROMADB = True
except ImportError:
HAS_CHROMADB = False
if not HAS_CHROMADB:
pytest.skip("ChromaDB is required for this test")
agent = Agent( agent = Agent(
role="Researcher", role="Researcher",
goal="Search relevant data and provide results", goal="Search relevant data and provide results",
@@ -29,6 +38,15 @@ def short_term_memory():
def test_save_and_search(short_term_memory): def test_save_and_search(short_term_memory):
try:
import chromadb # noqa: F401
HAS_CHROMADB = True
except ImportError:
HAS_CHROMADB = False
if not HAS_CHROMADB:
pytest.skip("ChromaDB is required for this test")
memory = ShortTermMemoryItem( memory = ShortTermMemoryItem(
data="""test value test value test value test value test value test value data="""test value test value test value test value test value test value
test value test value test value test value test value test value test value test value test value test value test value test value

View File

@@ -0,0 +1,80 @@
import pytest
import importlib
import sys
from unittest.mock import patch
from crewai.utilities.errors import ChromaDBRequiredError
def test_import_without_chromadb():
"""Test that crewai can be imported without chromadb."""
with patch.dict(sys.modules, {"chromadb": None, "chromadb.errors": None, "chromadb.api": None, "chromadb.config": None}):
modules_to_reload = [
"crewai.memory.storage.rag_storage",
"crewai.knowledge.storage.knowledge_storage",
"crewai.utilities.embedding_configurator"
]
for module in modules_to_reload:
if module in sys.modules:
importlib.reload(sys.modules[module])
from crewai import Agent, Task, Crew, Process
agent = Agent(role="Test Agent", goal="Test Goal", backstory="Test Backstory")
task = Task(description="Test Task", agent=agent)
_ = Crew(agents=[agent], tasks=[task], process=Process.sequential)
def test_memory_storage_without_chromadb():
"""Test that memory storage raises appropriate error when chromadb is not available."""
with patch.dict(sys.modules, {"chromadb": None, "chromadb.errors": None, "chromadb.api": None, "chromadb.config": None}):
if "crewai.memory.storage.rag_storage" in sys.modules:
importlib.reload(sys.modules["crewai.memory.storage.rag_storage"])
from crewai.memory.storage.rag_storage import RAGStorage, HAS_CHROMADB
assert not HAS_CHROMADB
with pytest.raises(ChromaDBRequiredError) as excinfo:
storage = RAGStorage("memory", allow_reset=True, crew=None)
assert "ChromaDB is required for memory storage" in str(excinfo.value)
def test_knowledge_storage_without_chromadb():
"""Test that knowledge storage raises appropriate error when chromadb is not available."""
with patch.dict(sys.modules, {"chromadb": None, "chromadb.errors": None, "chromadb.api": None, "chromadb.config": None}):
modules_to_reload = [
"crewai.knowledge.storage.knowledge_storage",
"crewai.utilities.embedding_configurator"
]
for module in modules_to_reload:
if module in sys.modules:
importlib.reload(sys.modules[module])
from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage, HAS_CHROMADB
assert not HAS_CHROMADB
with pytest.raises(ChromaDBRequiredError) as excinfo:
storage = KnowledgeStorage()
storage.initialize_knowledge_storage()
assert "ChromaDB is required for knowledge storage" in str(excinfo.value)
def test_embedding_configurator_without_chromadb():
"""Test that embedding configurator raises appropriate error when chromadb is not available."""
with patch.dict(sys.modules, {"chromadb": None, "chromadb.errors": None, "chromadb.api": None, "chromadb.config": None}):
if "crewai.utilities.embedding_configurator" in sys.modules:
importlib.reload(sys.modules["crewai.utilities.embedding_configurator"])
from crewai.utilities.embedding_configurator import EmbeddingConfigurator, HAS_CHROMADB
assert not HAS_CHROMADB
with pytest.raises(ChromaDBRequiredError) as excinfo:
configurator = EmbeddingConfigurator()
configurator.configure_embedder()
assert "ChromaDB is required for embedding functionality" in str(excinfo.value)