Compare commits

...

4 Commits

Author SHA1 Message Date
Devin AI
9fa65f724f Fix lint errors: sort imports
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-03-21 05:15:54 +00:00
Devin AI
13e1aa96de Fix type-checker errors in embedding_configurator.py and knowledge_storage.py
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-03-21 05:14:42 +00:00
Devin AI
7fb76bb858 Fix lint error: sort imports in test_numpy_compatibility.py
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-03-21 05:12:46 +00:00
Devin AI
486cf58c3b Fix NumPy 2.x compatibility issue (#2431)
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-03-21 05:10:53 +00:00
4 changed files with 464 additions and 200 deletions

View File

@@ -4,13 +4,34 @@ import io
import logging
import os
import shutil
import warnings
from typing import Any, Dict, List, Optional, Union, cast
import chromadb
import chromadb.errors
from chromadb.api import ClientAPI
from chromadb.api.types import OneOrMany
from chromadb.config import Settings
# Initialize module import status
CHROMADB_AVAILABLE = False
# Define placeholder types
class DummyClientAPI:
pass
class DummySettings:
pass
# Try to import chromadb-related modules with proper error handling
try:
import chromadb
import chromadb.errors
from chromadb.api import ClientAPI
from chromadb.api.types import OneOrMany
from chromadb.config import Settings
CHROMADB_AVAILABLE = True
except (ImportError, AttributeError) as e:
warnings.warn(f"Failed to import chromadb: {str(e)}. Knowledge functionality will be limited.")
# Use dummy classes when imports fail
chromadb = None
ClientAPI = DummyClientAPI
OneOrMany = Any
Settings = DummySettings
from crewai.knowledge.storage.base_knowledge_storage import BaseKnowledgeStorage
from crewai.utilities import EmbeddingConfigurator
@@ -42,9 +63,9 @@ class KnowledgeStorage(BaseKnowledgeStorage):
search efficiency.
"""
collection: Optional[chromadb.Collection] = None
collection = None # Type annotation removed to handle case when chromadb is not available
collection_name: Optional[str] = "knowledge"
app: Optional[ClientAPI] = None
app = None # Type annotation removed to handle case when chromadb is not available
def __init__(
self,
@@ -61,37 +82,52 @@ class KnowledgeStorage(BaseKnowledgeStorage):
filter: Optional[dict] = None,
score_threshold: float = 0.35,
) -> List[Dict[str, Any]]:
if not CHROMADB_AVAILABLE:
logging.warning("Cannot search knowledge as chromadb is not available.")
return []
with suppress_logging():
if self.collection:
fetched = self.collection.query(
query_texts=query,
n_results=limit,
where=filter,
)
results = []
for i in range(len(fetched["ids"][0])): # type: ignore
result = {
"id": fetched["ids"][0][i], # type: ignore
"metadata": fetched["metadatas"][0][i], # type: ignore
"context": fetched["documents"][0][i], # type: ignore
"score": fetched["distances"][0][i], # type: ignore
}
if result["score"] >= score_threshold:
results.append(result)
return results
try:
fetched = self.collection.query(
query_texts=query,
n_results=limit,
where=filter,
)
results = []
for i in range(len(fetched["ids"][0])): # type: ignore
result = {
"id": fetched["ids"][0][i], # type: ignore
"metadata": fetched["metadatas"][0][i], # type: ignore
"context": fetched["documents"][0][i], # type: ignore
"score": fetched["distances"][0][i], # type: ignore
}
if result["score"] >= score_threshold:
results.append(result)
return results
except Exception as e:
logging.error(f"Error during knowledge search: {str(e)}")
return []
else:
raise Exception("Collection not initialized")
logging.warning("Collection not initialized")
return []
def initialize_knowledge_storage(self):
base_path = os.path.join(db_storage_path(), "knowledge")
chroma_client = chromadb.PersistentClient(
path=base_path,
settings=Settings(allow_reset=True),
)
self.app = chroma_client
if not CHROMADB_AVAILABLE:
logging.warning("Cannot initialize knowledge storage as chromadb is not available.")
self.app = None
self.collection = None
return
try:
base_path = os.path.join(db_storage_path(), "knowledge")
chroma_client = chromadb.PersistentClient(
path=base_path,
settings=Settings(allow_reset=True),
)
self.app = chroma_client
collection_name = (
f"knowledge_{self.collection_name}"
if self.collection_name
@@ -102,30 +138,46 @@ class KnowledgeStorage(BaseKnowledgeStorage):
name=collection_name, embedding_function=self.embedder
)
else:
raise Exception("Vector Database Client not initialized")
except Exception:
raise Exception("Failed to create or get collection")
logging.warning("Vector Database Client not initialized")
self.collection = None
except Exception as e:
logging.error(f"Failed to create or get collection: {str(e)}")
self.app = None
self.collection = None
def reset(self):
base_path = os.path.join(db_storage_path(), KNOWLEDGE_DIRECTORY)
if not self.app:
self.app = chromadb.PersistentClient(
path=base_path,
settings=Settings(allow_reset=True),
)
if not CHROMADB_AVAILABLE:
logging.warning("Cannot reset knowledge storage as chromadb is not available.")
return
try:
base_path = os.path.join(db_storage_path(), KNOWLEDGE_DIRECTORY)
if not self.app:
self.app = chromadb.PersistentClient(
path=base_path,
settings=Settings(allow_reset=True),
)
self.app.reset()
shutil.rmtree(base_path)
self.app = None
self.collection = None
self.app.reset()
shutil.rmtree(base_path)
except Exception as e:
logging.error(f"Error during knowledge reset: {str(e)}")
finally:
self.app = None
self.collection = None
def save(
self,
documents: List[str],
metadata: Optional[Union[Dict[str, Any], List[Dict[str, Any]]]] = None,
):
if not CHROMADB_AVAILABLE:
logging.warning("Cannot save to knowledge storage as chromadb is not available.")
return
if not self.collection:
raise Exception("Collection not initialized")
logging.warning("Collection not initialized")
return
try:
# Create a dictionary to store unique documents
@@ -154,38 +206,46 @@ class KnowledgeStorage(BaseKnowledgeStorage):
filtered_ids.append(doc_id)
# If we have no metadata at all, set it to None
final_metadata: Optional[OneOrMany[chromadb.Metadata]] = (
None if all(m is None for m in filtered_metadata) else filtered_metadata
)
final_metadata = None
if not all(m is None for m in filtered_metadata):
final_metadata = filtered_metadata
self.collection.upsert(
documents=filtered_docs,
metadatas=final_metadata,
ids=filtered_ids,
)
except chromadb.errors.InvalidDimensionException as e:
Logger(verbose=True).log(
"error",
"Embedding dimension mismatch. This usually happens when mixing different embedding models. Try resetting the collection using `crewai reset-memories -a`",
"red",
)
raise ValueError(
"Embedding dimension mismatch. Make sure you're using the same embedding model "
"across all operations with this collection."
"Try resetting the collection using `crewai reset-memories -a`"
) from e
except Exception as e:
Logger(verbose=True).log("error", f"Failed to upsert documents: {e}", "red")
raise
if hasattr(chromadb, 'errors') and isinstance(e, chromadb.errors.InvalidDimensionException):
Logger(verbose=True).log(
"error",
"Embedding dimension mismatch. This usually happens when mixing different embedding models. Try resetting the collection using `crewai reset-memories -a`",
"red",
)
logging.error(
"Embedding dimension mismatch. Make sure you're using the same embedding model "
"across all operations with this collection."
"Try resetting the collection using `crewai reset-memories -a`"
)
else:
Logger(verbose=True).log("error", f"Failed to upsert documents: {e}", "red")
logging.error(f"Failed to upsert documents: {e}")
def _create_default_embedding_function(self):
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)
if not CHROMADB_AVAILABLE:
return None
try:
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)
return OpenAIEmbeddingFunction(
api_key=os.getenv("OPENAI_API_KEY"), model_name="text-embedding-3-small"
)
return OpenAIEmbeddingFunction(
api_key=os.getenv("OPENAI_API_KEY"), model_name="text-embedding-3-small"
)
except (ImportError, AttributeError) as e:
logging.warning(f"Failed to create default embedding function: {str(e)}")
return None
def _set_embedder_config(self, embedder: Optional[Dict[str, Any]] = None) -> None:
"""Set the embedding configuration for the knowledge storage.
@@ -194,8 +254,12 @@ class KnowledgeStorage(BaseKnowledgeStorage):
embedder_config (Optional[Dict[str, Any]]): Configuration dictionary for the embedder.
If None or empty, defaults to the default embedding function.
"""
self.embedder = (
EmbeddingConfigurator().configure_embedder(embedder)
if embedder
else self._create_default_embedding_function()
)
try:
self.embedder = (
EmbeddingConfigurator().configure_embedder(embedder)
if embedder
else self._create_default_embedding_function()
)
except Exception as e:
logging.warning(f"Failed to configure embedder: {str(e)}")
self.embedder = None

View File

@@ -60,26 +60,32 @@ class RAGStorage(BaseRAGStorage):
self.embedder_config = configurator.configure_embedder(self.embedder_config)
def _initialize_app(self):
import chromadb
from chromadb.config import Settings
self._set_embedder_config()
chroma_client = chromadb.PersistentClient(
path=self.path if self.path else self.storage_file_name,
settings=Settings(allow_reset=self.allow_reset),
)
self.app = chroma_client
try:
self.collection = self.app.get_collection(
name=self.type, embedding_function=self.embedder_config
)
except Exception:
self.collection = self.app.create_collection(
name=self.type, embedding_function=self.embedder_config
import chromadb
from chromadb.config import Settings
self._set_embedder_config()
chroma_client = chromadb.PersistentClient(
path=self.path if self.path else self.storage_file_name,
settings=Settings(allow_reset=self.allow_reset),
)
self.app = chroma_client
try:
self.collection = self.app.get_collection(
name=self.type, embedding_function=self.embedder_config
)
except Exception:
self.collection = self.app.create_collection(
name=self.type, embedding_function=self.embedder_config
)
except (ImportError, AttributeError) as e:
import logging
logging.warning(f"Failed to initialize chromadb: {str(e)}. Memory functionality will be limited.")
self.app = None
self.collection = None
def _sanitize_role(self, role: str) -> str:
"""
Sanitizes agent roles to ensure valid directory names.
@@ -103,6 +109,9 @@ class RAGStorage(BaseRAGStorage):
def save(self, value: Any, metadata: Dict[str, Any]) -> None:
if not hasattr(self, "app") or not hasattr(self, "collection"):
self._initialize_app()
if self.app is None or self.collection is None:
logging.warning("Cannot save to memory as chromadb is not available.")
return
try:
self._generate_embedding(value, metadata)
except Exception as e:
@@ -115,8 +124,12 @@ class RAGStorage(BaseRAGStorage):
filter: Optional[dict] = None,
score_threshold: float = 0.35,
) -> List[Any]:
if not hasattr(self, "app"):
if not hasattr(self, "app") or not hasattr(self, "collection"):
self._initialize_app()
if self.app is None or self.collection is None:
logging.warning("Cannot search memory as chromadb is not available.")
return []
try:
with suppress_logging():
@@ -141,6 +154,10 @@ class RAGStorage(BaseRAGStorage):
def _generate_embedding(self, text: str, metadata: Dict[str, Any]) -> None: # type: ignore
if not hasattr(self, "app") or not hasattr(self, "collection"):
self._initialize_app()
if self.app is None or self.collection is None:
logging.warning("Cannot generate embeddings as chromadb is not available.")
return
self.collection.add(
documents=[text],
@@ -160,15 +177,7 @@ class RAGStorage(BaseRAGStorage):
# Ignore this specific error
pass
else:
raise Exception(
f"An error occurred while resetting the {self.type} memory: {e}"
)
def _create_default_embedding_function(self):
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)
return OpenAIEmbeddingFunction(
api_key=os.getenv("OPENAI_API_KEY"), model_name="text-embedding-3-small"
)
logging.error(f"An error occurred while resetting the {self.type} memory: {e}")
# Don't raise exception to prevent crashes
self.app = None
self.collection = None

View File

@@ -1,8 +1,40 @@
import os
from typing import Any, Dict, Optional, cast
import warnings
from typing import Any, Callable, Dict, List, Optional, Union, cast
from chromadb import Documents, EmbeddingFunction, Embeddings
from chromadb.api.types import validate_embedding_function
# Initialize with None to indicate module import status
CHROMADB_AVAILABLE = False
# Define placeholder types for when chromadb is not available
class EmbeddingFunction:
def __call__(self, texts):
raise NotImplementedError("Chromadb is not available")
Documents = List[str]
Embeddings = List[List[float]]
def validate_embedding_function(func):
return func
# Try to import chromadb-related modules with proper error handling
try:
from chromadb.api.types import Documents as ChromaDocuments
from chromadb.api.types import EmbeddingFunction as ChromaEmbeddingFunction
from chromadb.api.types import Embeddings as ChromaEmbeddings
from chromadb.utils import (
validate_embedding_function as chroma_validate_embedding_function,
)
# Override our placeholder types with the real ones
Documents = ChromaDocuments
EmbeddingFunction = ChromaEmbeddingFunction
Embeddings = ChromaEmbeddings
validate_embedding_function = chroma_validate_embedding_function
CHROMADB_AVAILABLE = True
except (ImportError, AttributeError) as e:
# This captures both ImportError and AttributeError (which can happen with NumPy 2.x)
warnings.warn(f"Failed to import chromadb: {str(e)}. Embedding functionality will be limited.")
class EmbeddingConfigurator:
@@ -26,6 +58,9 @@ class EmbeddingConfigurator:
embedder_config: Optional[Dict[str, Any]] = None,
) -> EmbeddingFunction:
"""Configures and returns an embedding function based on the provided config."""
if not CHROMADB_AVAILABLE:
return self._create_unavailable_embedding_function()
if embedder_config is None:
return self._create_default_embedding_function()
@@ -44,143 +79,230 @@ class EmbeddingConfigurator:
if provider == "custom"
else embedding_function(config, model_name)
)
@staticmethod
def _create_unavailable_embedding_function():
"""Creates a fallback embedding function when chromadb is not available."""
class UnavailableEmbeddingFunction(EmbeddingFunction):
def __call__(self, input):
raise ImportError(
"Chromadb is not available due to NumPy compatibility issues. "
"Either downgrade to NumPy<2 or upgrade chromadb and related dependencies."
)
return UnavailableEmbeddingFunction()
@staticmethod
def _create_default_embedding_function():
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)
if not CHROMADB_AVAILABLE:
return EmbeddingConfigurator._create_unavailable_embedding_function()
try:
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)
return OpenAIEmbeddingFunction(
api_key=os.getenv("OPENAI_API_KEY"), model_name="text-embedding-3-small"
)
return OpenAIEmbeddingFunction(
api_key=os.getenv("OPENAI_API_KEY"), model_name="text-embedding-3-small"
)
except (ImportError, AttributeError) as e:
import warnings
warnings.warn(f"Failed to import OpenAIEmbeddingFunction: {str(e)}")
return EmbeddingConfigurator._create_unavailable_embedding_function()
@staticmethod
def _configure_openai(config, model_name):
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)
if not CHROMADB_AVAILABLE:
return EmbeddingConfigurator._create_unavailable_embedding_function()
try:
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)
return OpenAIEmbeddingFunction(
api_key=config.get("api_key") or os.getenv("OPENAI_API_KEY"),
model_name=model_name,
api_base=config.get("api_base", None),
api_type=config.get("api_type", None),
api_version=config.get("api_version", None),
default_headers=config.get("default_headers", None),
dimensions=config.get("dimensions", None),
deployment_id=config.get("deployment_id", None),
organization_id=config.get("organization_id", None),
)
return OpenAIEmbeddingFunction(
api_key=config.get("api_key") or os.getenv("OPENAI_API_KEY"),
model_name=model_name,
api_base=config.get("api_base", None),
api_type=config.get("api_type", None),
api_version=config.get("api_version", None),
default_headers=config.get("default_headers", None),
dimensions=config.get("dimensions", None),
deployment_id=config.get("deployment_id", None),
organization_id=config.get("organization_id", None),
)
except (ImportError, AttributeError) as e:
warnings.warn(f"Failed to import OpenAIEmbeddingFunction: {str(e)}")
return EmbeddingConfigurator._create_unavailable_embedding_function()
@staticmethod
def _configure_azure(config, model_name):
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)
if not CHROMADB_AVAILABLE:
return EmbeddingConfigurator._create_unavailable_embedding_function()
try:
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)
return OpenAIEmbeddingFunction(
api_key=config.get("api_key"),
api_base=config.get("api_base"),
api_type=config.get("api_type", "azure"),
api_version=config.get("api_version"),
model_name=model_name,
default_headers=config.get("default_headers"),
dimensions=config.get("dimensions"),
deployment_id=config.get("deployment_id"),
organization_id=config.get("organization_id"),
)
return OpenAIEmbeddingFunction(
api_key=config.get("api_key"),
api_base=config.get("api_base"),
api_type=config.get("api_type", "azure"),
api_version=config.get("api_version"),
model_name=model_name,
default_headers=config.get("default_headers"),
dimensions=config.get("dimensions"),
deployment_id=config.get("deployment_id"),
organization_id=config.get("organization_id"),
)
except (ImportError, AttributeError) as e:
warnings.warn(f"Failed to import OpenAIEmbeddingFunction: {str(e)}")
return EmbeddingConfigurator._create_unavailable_embedding_function()
@staticmethod
def _configure_ollama(config, model_name):
from chromadb.utils.embedding_functions.ollama_embedding_function import (
OllamaEmbeddingFunction,
)
if not CHROMADB_AVAILABLE:
return EmbeddingConfigurator._create_unavailable_embedding_function()
try:
from chromadb.utils.embedding_functions.ollama_embedding_function import (
OllamaEmbeddingFunction,
)
return OllamaEmbeddingFunction(
url=config.get("url", "http://localhost:11434/api/embeddings"),
model_name=model_name,
)
return OllamaEmbeddingFunction(
url=config.get("url", "http://localhost:11434/api/embeddings"),
model_name=model_name,
)
except (ImportError, AttributeError) as e:
warnings.warn(f"Failed to import OllamaEmbeddingFunction: {str(e)}")
return EmbeddingConfigurator._create_unavailable_embedding_function()
@staticmethod
def _configure_vertexai(config, model_name):
from chromadb.utils.embedding_functions.google_embedding_function import (
GoogleVertexEmbeddingFunction,
)
if not CHROMADB_AVAILABLE:
return EmbeddingConfigurator._create_unavailable_embedding_function()
try:
from chromadb.utils.embedding_functions.google_embedding_function import (
GoogleVertexEmbeddingFunction,
)
return GoogleVertexEmbeddingFunction(
model_name=model_name,
api_key=config.get("api_key"),
project_id=config.get("project_id"),
region=config.get("region"),
)
return GoogleVertexEmbeddingFunction(
model_name=model_name,
api_key=config.get("api_key"),
project_id=config.get("project_id"),
region=config.get("region"),
)
except (ImportError, AttributeError) as e:
warnings.warn(f"Failed to import GoogleVertexEmbeddingFunction: {str(e)}")
return EmbeddingConfigurator._create_unavailable_embedding_function()
@staticmethod
def _configure_google(config, model_name):
from chromadb.utils.embedding_functions.google_embedding_function import (
GoogleGenerativeAiEmbeddingFunction,
)
if not CHROMADB_AVAILABLE:
return EmbeddingConfigurator._create_unavailable_embedding_function()
try:
from chromadb.utils.embedding_functions.google_embedding_function import (
GoogleGenerativeAiEmbeddingFunction,
)
return GoogleGenerativeAiEmbeddingFunction(
model_name=model_name,
api_key=config.get("api_key"),
task_type=config.get("task_type"),
)
return GoogleGenerativeAiEmbeddingFunction(
model_name=model_name,
api_key=config.get("api_key"),
task_type=config.get("task_type"),
)
except (ImportError, AttributeError) as e:
warnings.warn(f"Failed to import GoogleGenerativeAiEmbeddingFunction: {str(e)}")
return EmbeddingConfigurator._create_unavailable_embedding_function()
@staticmethod
def _configure_cohere(config, model_name):
from chromadb.utils.embedding_functions.cohere_embedding_function import (
CohereEmbeddingFunction,
)
if not CHROMADB_AVAILABLE:
return EmbeddingConfigurator._create_unavailable_embedding_function()
try:
from chromadb.utils.embedding_functions.cohere_embedding_function import (
CohereEmbeddingFunction,
)
return CohereEmbeddingFunction(
model_name=model_name,
api_key=config.get("api_key"),
)
return CohereEmbeddingFunction(
model_name=model_name,
api_key=config.get("api_key"),
)
except (ImportError, AttributeError) as e:
warnings.warn(f"Failed to import CohereEmbeddingFunction: {str(e)}")
return EmbeddingConfigurator._create_unavailable_embedding_function()
@staticmethod
def _configure_voyageai(config, model_name):
from chromadb.utils.embedding_functions.voyageai_embedding_function import (
VoyageAIEmbeddingFunction,
)
if not CHROMADB_AVAILABLE:
return EmbeddingConfigurator._create_unavailable_embedding_function()
try:
from chromadb.utils.embedding_functions.voyageai_embedding_function import (
VoyageAIEmbeddingFunction,
)
return VoyageAIEmbeddingFunction(
model_name=model_name,
api_key=config.get("api_key"),
)
return VoyageAIEmbeddingFunction(
model_name=model_name,
api_key=config.get("api_key"),
)
except (ImportError, AttributeError) as e:
warnings.warn(f"Failed to import VoyageAIEmbeddingFunction: {str(e)}")
return EmbeddingConfigurator._create_unavailable_embedding_function()
@staticmethod
def _configure_bedrock(config, model_name):
from chromadb.utils.embedding_functions.amazon_bedrock_embedding_function import (
AmazonBedrockEmbeddingFunction,
)
if not CHROMADB_AVAILABLE:
return EmbeddingConfigurator._create_unavailable_embedding_function()
try:
from chromadb.utils.embedding_functions.amazon_bedrock_embedding_function import (
AmazonBedrockEmbeddingFunction,
)
# Allow custom model_name override with backwards compatibility
kwargs = {"session": config.get("session")}
if model_name is not None:
kwargs["model_name"] = model_name
return AmazonBedrockEmbeddingFunction(**kwargs)
# Allow custom model_name override with backwards compatibility
kwargs = {"session": config.get("session")}
if model_name is not None:
kwargs["model_name"] = model_name
return AmazonBedrockEmbeddingFunction(**kwargs)
except (ImportError, AttributeError) as e:
warnings.warn(f"Failed to import AmazonBedrockEmbeddingFunction: {str(e)}")
return EmbeddingConfigurator._create_unavailable_embedding_function()
@staticmethod
def _configure_huggingface(config, model_name):
from chromadb.utils.embedding_functions.huggingface_embedding_function import (
HuggingFaceEmbeddingServer,
)
if not CHROMADB_AVAILABLE:
return EmbeddingConfigurator._create_unavailable_embedding_function()
try:
from chromadb.utils.embedding_functions.huggingface_embedding_function import (
HuggingFaceEmbeddingServer,
)
return HuggingFaceEmbeddingServer(
url=config.get("api_url"),
)
return HuggingFaceEmbeddingServer(
url=config.get("api_url"),
)
except (ImportError, AttributeError) as e:
warnings.warn(f"Failed to import HuggingFaceEmbeddingServer: {str(e)}")
return EmbeddingConfigurator._create_unavailable_embedding_function()
@staticmethod
def _configure_watson(config, model_name):
if not CHROMADB_AVAILABLE:
return EmbeddingConfigurator._create_unavailable_embedding_function()
try:
import ibm_watsonx_ai.foundation_models as watson_models
from ibm_watsonx_ai import Credentials
from ibm_watsonx_ai.metanames import EmbedTextParamsMetaNames as EmbedParams
except ImportError as e:
raise ImportError(
warnings.warn(
"IBM Watson dependencies are not installed. Please install them to use Watson embedding."
) from e
)
return EmbeddingConfigurator._create_unavailable_embedding_function()
class WatsonEmbeddingFunction(EmbeddingFunction):
def __call__(self, input: Documents) -> Embeddings:
@@ -212,25 +334,30 @@ class EmbeddingConfigurator:
@staticmethod
def _configure_custom(config):
if not CHROMADB_AVAILABLE:
return EmbeddingConfigurator._create_unavailable_embedding_function()
custom_embedder = config.get("embedder")
if isinstance(custom_embedder, EmbeddingFunction):
try:
validate_embedding_function(custom_embedder)
return custom_embedder
except Exception as e:
raise ValueError(f"Invalid custom embedding function: {str(e)}")
warnings.warn(f"Invalid custom embedding function: {str(e)}")
return EmbeddingConfigurator._create_unavailable_embedding_function()
elif callable(custom_embedder):
try:
instance = custom_embedder()
if isinstance(instance, EmbeddingFunction):
validate_embedding_function(instance)
return instance
raise ValueError(
"Custom embedder does not create an EmbeddingFunction instance"
)
warnings.warn("Custom embedder does not create an EmbeddingFunction instance")
return EmbeddingConfigurator._create_unavailable_embedding_function()
except Exception as e:
raise ValueError(f"Error instantiating custom embedder: {str(e)}")
warnings.warn(f"Error instantiating custom embedder: {str(e)}")
return EmbeddingConfigurator._create_unavailable_embedding_function()
else:
raise ValueError(
warnings.warn(
"Custom embedder must be an instance of `EmbeddingFunction` or a callable that creates one"
)
return EmbeddingConfigurator._create_unavailable_embedding_function()

View File

@@ -0,0 +1,64 @@
import importlib
import sys
import warnings
import pytest
def test_crew_import_with_numpy():
"""Test that crewai can be imported even with NumPy compatibility issues."""
try:
# Force reload to ensure we test our fix
if "crewai" in sys.modules:
importlib.reload(sys.modules["crewai"])
# This should not raise an exception
from crewai import Crew
assert Crew is not None
except Exception as e:
pytest.fail(f"Failed to import Crew: {e}")
def test_embedding_configurator_with_numpy():
"""Test that EmbeddingConfigurator can be imported with NumPy."""
try:
# Force reload
if "crewai.utilities.embedding_configurator" in sys.modules:
importlib.reload(sys.modules["crewai.utilities.embedding_configurator"])
from crewai.utilities.embedding_configurator import EmbeddingConfigurator
configurator = EmbeddingConfigurator()
# Test that we can create an embedder (might be unavailable but shouldn't crash)
embedder = configurator.configure_embedder()
assert embedder is not None
except Exception as e:
pytest.fail(f"Failed to use EmbeddingConfigurator: {e}")
def test_rag_storage_with_numpy():
"""Test that RAGStorage can be imported and used with NumPy."""
try:
# Force reload
if "crewai.memory.storage.rag_storage" in sys.modules:
importlib.reload(sys.modules["crewai.memory.storage.rag_storage"])
from crewai.memory.storage.rag_storage import RAGStorage
# Initialize with minimal config to avoid actual DB operations
storage = RAGStorage(type="test", crew=None)
# Just verify we can create the object without errors
assert storage is not None
except Exception as e:
pytest.fail(f"Failed to use RAGStorage: {e}")
def test_knowledge_storage_with_numpy():
"""Test that KnowledgeStorage can be imported and used with NumPy."""
try:
# Force reload
if "crewai.knowledge.storage.knowledge_storage" in sys.modules:
importlib.reload(sys.modules["crewai.knowledge.storage.knowledge_storage"])
from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage
# Initialize with minimal config
storage = KnowledgeStorage()
# Just verify we can create the object without errors
assert storage is not None
except Exception as e:
pytest.fail(f"Failed to use KnowledgeStorage: {e}")