Compare commits

...

4 Commits

Author SHA1 Message Date
Devin AI
ddcc5bb2e9 Fix type-checker and lint issues in rag_storage.py
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-03-04 07:44:21 +00:00
Devin AI
d9c5ebe1cf Address PR comments: Improve code quality and add validation
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-03-04 07:39:27 +00:00
Devin AI
0427da467a Fix linting issues
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-03-04 07:32:52 +00:00
Devin AI
7a21564743 Fix #2271: Handle SQLite3 version check gracefully for ChromaDB
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-03-04 07:28:09 +00:00
4 changed files with 176 additions and 37 deletions

View File

@@ -83,28 +83,42 @@ class KnowledgeStorage(BaseKnowledgeStorage):
raise Exception("Collection not initialized")
def initialize_knowledge_storage(self):
base_path = os.path.join(db_storage_path(), "knowledge")
chroma_client = chromadb.PersistentClient(
path=base_path,
settings=Settings(allow_reset=True),
)
self.app = chroma_client
"""Initialize the knowledge storage with ChromaDB.
Handles SQLite3 version incompatibility gracefully by logging a warning
and continuing without ChromaDB functionality.
"""
try:
base_path = os.path.join(db_storage_path(), "knowledge")
chroma_client = chromadb.PersistentClient(
path=base_path,
settings=Settings(allow_reset=True),
)
self.app = chroma_client
collection_name = (
f"knowledge_{self.collection_name}"
if self.collection_name
else "knowledge"
)
if self.app:
self.collection = self.app.get_or_create_collection(
name=collection_name, embedding_function=self.embedder
)
else:
if not self.app:
raise Exception("Vector Database Client not initialized")
except Exception:
raise Exception("Failed to create or get collection")
self.collection = self.app.get_or_create_collection(
name=collection_name, embedding_function=self.embedder
)
except RuntimeError as e:
if "unsupported version of sqlite3" in str(e).lower():
# Log a warning but continue without ChromaDB
logging.warning("ChromaDB requires SQLite3 >= 3.35.0. Current version is too old. Some features may be limited. Error: %s", e)
self.app = None
self.collection = None
else:
raise
except Exception as e:
raise Exception(f"Failed to create or get collection: {e}")
def reset(self):
base_path = os.path.join(db_storage_path(), KNOWLEDGE_DIRECTORY)

View File

@@ -4,15 +4,19 @@ import logging
import os
import shutil
import uuid
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Union, Collection as TypeCollection
from chromadb.api import ClientAPI
from chromadb.api.models.Collection import Collection as ChromaCollection
from crewai.memory.storage.base_rag_storage import BaseRAGStorage
from crewai.utilities import EmbeddingConfigurator
from crewai.utilities.constants import MAX_FILE_NAME_LENGTH
from crewai.utilities.paths import db_storage_path
# Constants
SQLITE_VERSION_ERROR = "ChromaDB requires SQLite3 >= 3.35.0. Current version is too old. Some features may be limited. Error: {}"
@contextlib.contextmanager
def suppress_logging(
@@ -60,26 +64,41 @@ class RAGStorage(BaseRAGStorage):
self.embedder_config = configurator.configure_embedder(self.embedder_config)
def _initialize_app(self):
import chromadb
from chromadb.config import Settings
self._set_embedder_config()
chroma_client = chromadb.PersistentClient(
path=self.path if self.path else self.storage_file_name,
settings=Settings(allow_reset=self.allow_reset),
)
self.app = chroma_client
try:
self.collection = self.app.get_collection(
name=self.type, embedding_function=self.embedder_config
)
except Exception:
self.collection = self.app.create_collection(
name=self.type, embedding_function=self.embedder_config
import chromadb
from chromadb.config import Settings
self._set_embedder_config()
if self.embedder_config is None:
# ChromaDB is not available, skip initialization
self.app = None
self.collection = None
return
chroma_client = chromadb.PersistentClient(
path=self.path if self.path else self.storage_file_name,
settings=Settings(allow_reset=self.allow_reset),
)
self.app = chroma_client
try:
self.collection = self.app.get_collection(
name=self.type, embedding_function=self.embedder_config
)
except Exception:
self.collection = self.app.create_collection(
name=self.type, embedding_function=self.embedder_config
)
except RuntimeError as e:
if "unsupported version of sqlite3" in str(e).lower():
# Log a warning but continue without ChromaDB
logging.warning(SQLITE_VERSION_ERROR.format(e))
self.app = None
self.collection = None
else:
raise
def _sanitize_role(self, role: str) -> str:
"""
Sanitizes agent roles to ensure valid directory names.

View File

@@ -1,12 +1,31 @@
import logging
import os
from typing import Any, Dict, Optional, cast
from chromadb import Documents, EmbeddingFunction, Embeddings
from chromadb.api.types import validate_embedding_function
# Import chromadb conditionally to handle SQLite3 version errors
try:
from chromadb import Documents, EmbeddingFunction, Embeddings
from chromadb.api.types import validate_embedding_function
CHROMADB_AVAILABLE = True
except RuntimeError as e:
if "unsupported version of sqlite3" in str(e).lower():
logging.warning(f"ChromaDB requires SQLite3 >= 3.35.0. Current version is too old. Some features may be limited. Error: {e}")
CHROMADB_AVAILABLE = False
# Define placeholder types for type hints
Documents = Any
EmbeddingFunction = Any
Embeddings = Any
validate_embedding_function = lambda x: x # noqa: E731
else:
raise
class EmbeddingConfigurator:
def __init__(self):
if not CHROMADB_AVAILABLE:
self.embedding_functions = {}
return
self.embedding_functions = {
"openai": self._configure_openai,
"azure": self._configure_azure,
@@ -21,13 +40,45 @@ class EmbeddingConfigurator:
"custom": self._configure_custom,
}
def _validate_config(self, config: Dict[str, Any]) -> bool:
"""Validates that the configuration contains the required keys.
Args:
config: The configuration dictionary to validate
Returns:
bool: True if the configuration is valid, False otherwise
"""
if not config:
return False
required_keys = {'provider'}
return all(key in config for key in required_keys)
def configure_embedder(
self,
embedder_config: Optional[Dict[str, Any]] = None,
) -> EmbeddingFunction:
"""Configures and returns an embedding function based on the provided config."""
) -> Optional[EmbeddingFunction]:
"""Configures and returns an embedding function based on the provided config.
Args:
embedder_config: Configuration dictionary for the embedder
Returns:
Optional[EmbeddingFunction]: The configured embedding function or None if ChromaDB is not available
Raises:
ValueError: If the configuration is invalid
Exception: If the provider is not supported
"""
if not CHROMADB_AVAILABLE:
return None
if embedder_config is None:
return self._create_default_embedding_function()
if not self._validate_config(embedder_config):
raise ValueError("Invalid embedder configuration: missing required keys")
provider = embedder_config.get("provider")
config = embedder_config.get("config", {})
@@ -47,6 +98,9 @@ class EmbeddingConfigurator:
@staticmethod
def _create_default_embedding_function():
if not CHROMADB_AVAILABLE:
return None
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)

View File

@@ -0,0 +1,52 @@
import unittest
from unittest.mock import MagicMock, patch
class TestEmbeddingConfigurator(unittest.TestCase):
@patch('crewai.utilities.embedding_configurator.CHROMADB_AVAILABLE', False)
def test_embedding_configurator_with_chromadb_unavailable(self):
from crewai.utilities.embedding_configurator import EmbeddingConfigurator
# Create an instance of EmbeddingConfigurator
configurator = EmbeddingConfigurator()
# Verify that embedding_functions is empty
self.assertEqual(configurator.embedding_functions, {})
# Verify that configure_embedder returns None
self.assertIsNone(configurator.configure_embedder())
@patch('crewai.utilities.embedding_configurator.CHROMADB_AVAILABLE', True)
def test_embedding_configurator_with_chromadb_available(self):
from crewai.utilities.embedding_configurator import EmbeddingConfigurator
# Create an instance of EmbeddingConfigurator
configurator = EmbeddingConfigurator()
# Verify that embedding_functions is not empty
self.assertNotEqual(configurator.embedding_functions, {})
# Mock the _create_default_embedding_function method
configurator._create_default_embedding_function = MagicMock(return_value="mock_embedding_function")
# Verify that configure_embedder returns the mock embedding function
self.assertEqual(configurator.configure_embedder(), "mock_embedding_function")
@patch('crewai.utilities.embedding_configurator.CHROMADB_AVAILABLE', True)
def test_embedding_configurator_with_invalid_config(self):
from crewai.utilities.embedding_configurator import EmbeddingConfigurator
# Create an instance of EmbeddingConfigurator
configurator = EmbeddingConfigurator()
# Test with empty config
with self.assertRaises(ValueError):
configurator.configure_embedder({})
# Test with missing required keys
with self.assertRaises(ValueError):
configurator.configure_embedder({"config": {}})
# Test with unsupported provider
with self.assertRaises(Exception):
configurator.configure_embedder({"provider": "unsupported_provider", "config": {}})