mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-26 16:48:13 +00:00
Compare commits
5 Commits
devin/1769
...
devin/1746
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1867c798ec | ||
|
|
29ebdbf474 | ||
|
|
1b9cbb67f7 | ||
|
|
58a120608b | ||
|
|
51439c3c0a |
123
docs/examples/custom_storage_knowledge_source_example.py
Normal file
123
docs/examples/custom_storage_knowledge_source_example.py
Normal file
@@ -0,0 +1,123 @@
|
|||||||
|
"""Example of using a custom storage with CrewAI."""
|
||||||
|
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import chromadb
|
||||||
|
from chromadb.config import Settings
|
||||||
|
|
||||||
|
from crewai import Agent, Crew, Task
|
||||||
|
from crewai.knowledge.source.custom_storage_knowledge_source import (
|
||||||
|
CustomStorageKnowledgeSource,
|
||||||
|
)
|
||||||
|
from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage
|
||||||
|
|
||||||
|
|
||||||
|
class CustomKnowledgeStorage(KnowledgeStorage):
|
||||||
|
"""Custom knowledge storage that uses a specific persistent directory.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
persist_directory (str): Path to the directory where ChromaDB will persist data.
|
||||||
|
embedder: Embedding function to use for the collection. Defaults to None.
|
||||||
|
collection_name (str, optional): Name of the collection. Defaults to None.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ValueError: If persist_directory is empty or invalid.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, persist_directory: str, embedder=None, collection_name=None):
|
||||||
|
if not persist_directory:
|
||||||
|
raise ValueError("persist_directory cannot be empty")
|
||||||
|
self.persist_directory = persist_directory
|
||||||
|
super().__init__(embedder=embedder, collection_name=collection_name)
|
||||||
|
|
||||||
|
def initialize_knowledge_storage(self):
|
||||||
|
"""Initialize the knowledge storage with a custom persistent directory.
|
||||||
|
|
||||||
|
Creates a ChromaDB PersistentClient with the specified directory and
|
||||||
|
initializes a collection with the provided name and embedding function.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
Exception: If collection creation or retrieval fails.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
chroma_client = chromadb.PersistentClient(
|
||||||
|
path=self.persist_directory,
|
||||||
|
settings=Settings(allow_reset=True),
|
||||||
|
)
|
||||||
|
self.app = chroma_client
|
||||||
|
|
||||||
|
collection_name = (
|
||||||
|
"knowledge" if not self.collection_name else self.collection_name
|
||||||
|
)
|
||||||
|
self.collection = self.app.get_or_create_collection(
|
||||||
|
name=collection_name,
|
||||||
|
embedding_function=self.embedder_config,
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
raise Exception(f"Failed to create or get collection: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
def get_knowledge_source_with_custom_storage(
|
||||||
|
folder_name: str,
|
||||||
|
embedder=None
|
||||||
|
) -> CustomStorageKnowledgeSource:
|
||||||
|
"""Create a knowledge source with a custom storage.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
folder_name (str): Name of the folder to store embeddings and collection.
|
||||||
|
embedder: Embedding function to use. Defaults to None.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
CustomStorageKnowledgeSource: Configured knowledge source with custom storage.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
Exception: If storage initialization fails.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
persist_path = f"vectorstores/knowledge_{folder_name}"
|
||||||
|
storage = CustomKnowledgeStorage(
|
||||||
|
persist_directory=persist_path,
|
||||||
|
embedder=embedder,
|
||||||
|
collection_name=folder_name
|
||||||
|
)
|
||||||
|
|
||||||
|
storage.initialize_knowledge_storage()
|
||||||
|
|
||||||
|
source = CustomStorageKnowledgeSource(collection_name=folder_name)
|
||||||
|
source.storage = storage
|
||||||
|
|
||||||
|
source.validate_content()
|
||||||
|
|
||||||
|
return source
|
||||||
|
except Exception as e:
|
||||||
|
raise Exception(f"Failed to initialize knowledge source: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
"""Example of using a custom storage with CrewAI.
|
||||||
|
|
||||||
|
This function demonstrates how to:
|
||||||
|
1. Create a knowledge source with pre-existing embeddings
|
||||||
|
2. Use it with a Crew
|
||||||
|
3. Run the Crew to perform tasks
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
knowledge_source = get_knowledge_source_with_custom_storage(folder_name="example")
|
||||||
|
|
||||||
|
agent = Agent(role="test", goal="test", backstory="test")
|
||||||
|
task = Task(description="test", expected_output="test", agent=agent)
|
||||||
|
|
||||||
|
crew = Crew(
|
||||||
|
agents=[agent],
|
||||||
|
tasks=[task],
|
||||||
|
knowledge_sources=[knowledge_source]
|
||||||
|
)
|
||||||
|
|
||||||
|
result = crew.kickoff()
|
||||||
|
print(result)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error running example: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@@ -0,0 +1,45 @@
|
|||||||
|
import logging
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from pydantic import Field
|
||||||
|
|
||||||
|
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
|
||||||
|
from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class CustomStorageKnowledgeSource(BaseKnowledgeSource):
|
||||||
|
"""A knowledge source that uses a pre-existing storage with embeddings.
|
||||||
|
|
||||||
|
This class allows users to use pre-existing vector embeddings without re-embedding
|
||||||
|
when using CrewAI. It acts as a bridge between BaseKnowledgeSource and KnowledgeStorage.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
collection_name (Optional[str]): Name of the collection in the vector database.
|
||||||
|
Defaults to None.
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
storage (KnowledgeStorage): The underlying storage implementation that contains
|
||||||
|
the pre-existing embeddings.
|
||||||
|
"""
|
||||||
|
|
||||||
|
collection_name: Optional[str] = Field(default=None)
|
||||||
|
|
||||||
|
def validate_content(self):
|
||||||
|
"""Validates that the storage is properly initialized.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ValueError: If storage is not initialized before use.
|
||||||
|
"""
|
||||||
|
if not hasattr(self, 'storage') or self.storage is None:
|
||||||
|
raise ValueError("Storage not initialized. Please set storage before use.")
|
||||||
|
logger.debug(f"Storage validated for collection: {self.collection_name}")
|
||||||
|
|
||||||
|
def add(self) -> None:
|
||||||
|
"""No need to add content as we're using pre-existing storage.
|
||||||
|
|
||||||
|
This method is intentionally empty as the embeddings already exist in the storage.
|
||||||
|
"""
|
||||||
|
logger.debug(f"Skipping add operation for pre-existing storage: {self.collection_name}")
|
||||||
|
pass
|
||||||
125
tests/knowledge/custom_storage_knowledge_source_test.py
Normal file
125
tests/knowledge/custom_storage_knowledge_source_test.py
Normal file
@@ -0,0 +1,125 @@
|
|||||||
|
"""Test CustomStorageKnowledgeSource functionality."""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import shutil
|
||||||
|
import tempfile
|
||||||
|
from pathlib import Path
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from crewai.knowledge.knowledge import Knowledge
|
||||||
|
from crewai.knowledge.source.custom_storage_knowledge_source import (
|
||||||
|
CustomStorageKnowledgeSource,
|
||||||
|
)
|
||||||
|
from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def custom_storage():
|
||||||
|
"""Create a custom KnowledgeStorage instance."""
|
||||||
|
storage = KnowledgeStorage(collection_name="test_collection")
|
||||||
|
return storage
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def temp_dir():
|
||||||
|
"""Create a temporary directory for test files."""
|
||||||
|
temp_dir = tempfile.mkdtemp()
|
||||||
|
yield temp_dir
|
||||||
|
if os.path.exists(temp_dir):
|
||||||
|
shutil.rmtree(temp_dir)
|
||||||
|
|
||||||
|
|
||||||
|
def test_custom_storage_knowledge_source(custom_storage):
|
||||||
|
"""Test that a CustomStorageKnowledgeSource can be created with a pre-existing storage."""
|
||||||
|
source = CustomStorageKnowledgeSource(collection_name="test_collection")
|
||||||
|
|
||||||
|
assert source is not None
|
||||||
|
assert source.collection_name == "test_collection"
|
||||||
|
|
||||||
|
|
||||||
|
def test_custom_storage_knowledge_source_validation():
|
||||||
|
"""Test that validation fails when storage is not properly initialized."""
|
||||||
|
source = CustomStorageKnowledgeSource(collection_name="test_collection")
|
||||||
|
|
||||||
|
source.storage = None
|
||||||
|
|
||||||
|
with pytest.raises(ValueError, match="Storage not initialized"):
|
||||||
|
source.validate_content()
|
||||||
|
|
||||||
|
|
||||||
|
def test_custom_storage_knowledge_source_with_knowledge(custom_storage):
|
||||||
|
"""Test that a CustomStorageKnowledgeSource can be used with Knowledge."""
|
||||||
|
source = CustomStorageKnowledgeSource(collection_name="test_collection")
|
||||||
|
source.storage = custom_storage
|
||||||
|
|
||||||
|
with patch.object(KnowledgeStorage, 'initialize_knowledge_storage'):
|
||||||
|
with patch.object(CustomStorageKnowledgeSource, 'add'):
|
||||||
|
knowledge = Knowledge(
|
||||||
|
sources=[source],
|
||||||
|
storage=custom_storage,
|
||||||
|
collection_name="test_collection"
|
||||||
|
)
|
||||||
|
|
||||||
|
assert knowledge is not None
|
||||||
|
assert knowledge.sources[0] == source
|
||||||
|
assert knowledge.storage == custom_storage
|
||||||
|
|
||||||
|
|
||||||
|
def test_custom_storage_knowledge_source_with_crew():
|
||||||
|
"""Test that a CustomStorageKnowledgeSource can be used with Crew."""
|
||||||
|
from crewai.agent import Agent
|
||||||
|
from crewai.crew import Crew
|
||||||
|
from crewai.task import Task
|
||||||
|
|
||||||
|
storage = KnowledgeStorage(collection_name="test_collection")
|
||||||
|
|
||||||
|
source = CustomStorageKnowledgeSource(collection_name="test_collection")
|
||||||
|
source.storage = storage
|
||||||
|
|
||||||
|
agent = Agent(role="test", goal="test", backstory="test")
|
||||||
|
task = Task(description="test", expected_output="test", agent=agent)
|
||||||
|
|
||||||
|
with patch.object(KnowledgeStorage, 'initialize_knowledge_storage'):
|
||||||
|
with patch.object(CustomStorageKnowledgeSource, 'add'):
|
||||||
|
crew = Crew(
|
||||||
|
agents=[agent],
|
||||||
|
tasks=[task],
|
||||||
|
knowledge_sources=[source]
|
||||||
|
)
|
||||||
|
|
||||||
|
assert crew is not None
|
||||||
|
assert crew.knowledge_sources[0] == source
|
||||||
|
|
||||||
|
|
||||||
|
def test_custom_storage_knowledge_source_add_method():
|
||||||
|
"""Test that the add method doesn't modify the storage."""
|
||||||
|
source = CustomStorageKnowledgeSource(collection_name="test_collection")
|
||||||
|
storage = MagicMock(spec=KnowledgeStorage)
|
||||||
|
source.storage = storage
|
||||||
|
|
||||||
|
source.add()
|
||||||
|
|
||||||
|
storage.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
|
def test_integration_with_existing_storage(temp_dir):
|
||||||
|
"""Test integration with an existing storage directory."""
|
||||||
|
storage_path = os.path.join(temp_dir, "test_storage")
|
||||||
|
os.makedirs(storage_path, exist_ok=True)
|
||||||
|
|
||||||
|
class MockStorage(KnowledgeStorage):
|
||||||
|
def initialize_knowledge_storage(self):
|
||||||
|
self.initialized = True
|
||||||
|
|
||||||
|
storage = MockStorage(collection_name="test_integration")
|
||||||
|
storage.initialize_knowledge_storage()
|
||||||
|
|
||||||
|
source = CustomStorageKnowledgeSource(collection_name="test_integration")
|
||||||
|
source.storage = storage
|
||||||
|
|
||||||
|
source.validate_content()
|
||||||
|
|
||||||
|
assert hasattr(storage, "initialized")
|
||||||
|
assert storage.initialized is True
|
||||||
Reference in New Issue
Block a user