mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-10 00:28:31 +00:00
Fix #2755: Add support for custom knowledge storage with pre-existing embeddings
Co-Authored-By: Joe Moura <joao@crewai.com>
This commit is contained in:
72
docs/examples/custom_storage_knowledge_source_example.py
Normal file
72
docs/examples/custom_storage_knowledge_source_example.py
Normal file
@@ -0,0 +1,72 @@
|
||||
"""Example of using a custom storage with CrewAI."""
|
||||
|
||||
import chromadb
|
||||
from chromadb.config import Settings
|
||||
from crewai import Agent, Crew, Task
|
||||
from crewai.knowledge.source.custom_storage_knowledge_source import CustomStorageKnowledgeSource
|
||||
from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage
|
||||
|
||||
|
||||
class CustomKnowledgeStorage(KnowledgeStorage):
|
||||
"""Custom knowledge storage that uses a specific persistent directory."""
|
||||
|
||||
def __init__(self, persist_directory: str, embedder=None, collection_name=None):
|
||||
self.persist_directory = persist_directory
|
||||
super().__init__(embedder=embedder, collection_name=collection_name)
|
||||
|
||||
def initialize_knowledge_storage(self):
|
||||
"""Initialize the knowledge storage with a custom persistent directory."""
|
||||
chroma_client = chromadb.PersistentClient(
|
||||
path=self.persist_directory,
|
||||
settings=Settings(allow_reset=True),
|
||||
)
|
||||
self.app = chroma_client
|
||||
try:
|
||||
collection_name = (
|
||||
"knowledge" if not self.collection_name else self.collection_name
|
||||
)
|
||||
self.collection = self.app.get_or_create_collection(
|
||||
name=collection_name,
|
||||
embedding_function=self.embedder_config,
|
||||
)
|
||||
except Exception as e:
|
||||
raise Exception(f"Failed to create or get collection: {e}")
|
||||
|
||||
|
||||
def get_knowledge_source_with_custom_storage(folder_name: str, embedder=None):
|
||||
"""Create a knowledge source with a custom storage."""
|
||||
persist_path = f"vectorstores/knowledge_{folder_name}"
|
||||
storage = CustomKnowledgeStorage(
|
||||
persist_directory=persist_path,
|
||||
embedder=embedder,
|
||||
collection_name=folder_name
|
||||
)
|
||||
|
||||
storage.initialize_knowledge_storage()
|
||||
|
||||
source = CustomStorageKnowledgeSource(collection_name=folder_name)
|
||||
|
||||
source.storage = storage
|
||||
|
||||
return source
|
||||
|
||||
|
||||
def main():
|
||||
"""Example of using a custom storage with CrewAI."""
|
||||
knowledge_source = get_knowledge_source_with_custom_storage(folder_name="example")
|
||||
|
||||
agent = Agent(role="test", goal="test", backstory="test")
|
||||
task = Task(description="test", agent=agent)
|
||||
|
||||
crew = Crew(
|
||||
agents=[agent],
|
||||
tasks=[task],
|
||||
knowledge_sources=[knowledge_source]
|
||||
)
|
||||
|
||||
result = crew.kickoff()
|
||||
print(result)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,20 @@
|
||||
from typing import Optional
|
||||
|
||||
from pydantic import Field
|
||||
|
||||
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
|
||||
from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage
|
||||
|
||||
|
||||
class CustomStorageKnowledgeSource(BaseKnowledgeSource):
|
||||
"""A knowledge source that uses a pre-existing storage with embeddings."""
|
||||
|
||||
collection_name: Optional[str] = Field(default=None)
|
||||
|
||||
def validate_content(self):
|
||||
"""No content to validate as we're using pre-existing storage."""
|
||||
pass
|
||||
|
||||
def add(self) -> None:
|
||||
"""No need to add content as we're using pre-existing storage."""
|
||||
pass
|
||||
69
tests/knowledge/custom_storage_knowledge_source_test.py
Normal file
69
tests/knowledge/custom_storage_knowledge_source_test.py
Normal file
@@ -0,0 +1,69 @@
|
||||
"""Test CustomStorageKnowledgeSource functionality."""
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from crewai.knowledge.knowledge import Knowledge
|
||||
from crewai.knowledge.source.custom_storage_knowledge_source import CustomStorageKnowledgeSource
|
||||
from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def custom_storage():
|
||||
"""Create a custom KnowledgeStorage instance."""
|
||||
storage = KnowledgeStorage(collection_name="test_collection")
|
||||
return storage
|
||||
|
||||
|
||||
def test_custom_storage_knowledge_source(custom_storage):
|
||||
"""Test that a CustomStorageKnowledgeSource can be created with a pre-existing storage."""
|
||||
source = CustomStorageKnowledgeSource(collection_name="test_collection")
|
||||
|
||||
assert source is not None
|
||||
assert source.collection_name == "test_collection"
|
||||
|
||||
|
||||
def test_custom_storage_knowledge_source_with_knowledge(custom_storage):
|
||||
"""Test that a CustomStorageKnowledgeSource can be used with Knowledge."""
|
||||
source = CustomStorageKnowledgeSource(collection_name="test_collection")
|
||||
|
||||
with patch.object(KnowledgeStorage, 'initialize_knowledge_storage'):
|
||||
with patch.object(CustomStorageKnowledgeSource, 'add'):
|
||||
knowledge = Knowledge(
|
||||
sources=[source],
|
||||
storage=custom_storage,
|
||||
collection_name="test_collection"
|
||||
)
|
||||
|
||||
assert knowledge is not None
|
||||
assert knowledge.sources[0] == source
|
||||
assert knowledge.storage == custom_storage
|
||||
|
||||
|
||||
def test_custom_storage_knowledge_source_with_crew():
|
||||
"""Test that a CustomStorageKnowledgeSource can be used with Crew."""
|
||||
from crewai.agent import Agent
|
||||
from crewai.crew import Crew
|
||||
from crewai.task import Task
|
||||
|
||||
storage = KnowledgeStorage(collection_name="test_collection")
|
||||
|
||||
source = CustomStorageKnowledgeSource(collection_name="test_collection")
|
||||
source.storage = storage
|
||||
|
||||
agent = Agent(role="test", goal="test", backstory="test")
|
||||
task = Task(description="test", expected_output="test", agent=agent)
|
||||
|
||||
with patch.object(KnowledgeStorage, 'initialize_knowledge_storage'):
|
||||
with patch.object(CustomStorageKnowledgeSource, 'add'):
|
||||
crew = Crew(
|
||||
agents=[agent],
|
||||
tasks=[task],
|
||||
knowledge_sources=[source]
|
||||
)
|
||||
|
||||
assert crew is not None
|
||||
assert crew.knowledge_sources[0] == source
|
||||
Reference in New Issue
Block a user