mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-07 23:28:30 +00:00
Compare commits
5 Commits
devin/1744
...
devin/1746
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1867c798ec | ||
|
|
29ebdbf474 | ||
|
|
1b9cbb67f7 | ||
|
|
58a120608b | ||
|
|
51439c3c0a |
123
docs/examples/custom_storage_knowledge_source_example.py
Normal file
123
docs/examples/custom_storage_knowledge_source_example.py
Normal file
@@ -0,0 +1,123 @@
|
||||
"""Example of using a custom storage with CrewAI."""
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import chromadb
|
||||
from chromadb.config import Settings
|
||||
|
||||
from crewai import Agent, Crew, Task
|
||||
from crewai.knowledge.source.custom_storage_knowledge_source import (
|
||||
CustomStorageKnowledgeSource,
|
||||
)
|
||||
from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage
|
||||
|
||||
|
||||
class CustomKnowledgeStorage(KnowledgeStorage):
|
||||
"""Custom knowledge storage that uses a specific persistent directory.
|
||||
|
||||
Args:
|
||||
persist_directory (str): Path to the directory where ChromaDB will persist data.
|
||||
embedder: Embedding function to use for the collection. Defaults to None.
|
||||
collection_name (str, optional): Name of the collection. Defaults to None.
|
||||
|
||||
Raises:
|
||||
ValueError: If persist_directory is empty or invalid.
|
||||
"""
|
||||
|
||||
def __init__(self, persist_directory: str, embedder=None, collection_name=None):
|
||||
if not persist_directory:
|
||||
raise ValueError("persist_directory cannot be empty")
|
||||
self.persist_directory = persist_directory
|
||||
super().__init__(embedder=embedder, collection_name=collection_name)
|
||||
|
||||
def initialize_knowledge_storage(self):
|
||||
"""Initialize the knowledge storage with a custom persistent directory.
|
||||
|
||||
Creates a ChromaDB PersistentClient with the specified directory and
|
||||
initializes a collection with the provided name and embedding function.
|
||||
|
||||
Raises:
|
||||
Exception: If collection creation or retrieval fails.
|
||||
"""
|
||||
try:
|
||||
chroma_client = chromadb.PersistentClient(
|
||||
path=self.persist_directory,
|
||||
settings=Settings(allow_reset=True),
|
||||
)
|
||||
self.app = chroma_client
|
||||
|
||||
collection_name = (
|
||||
"knowledge" if not self.collection_name else self.collection_name
|
||||
)
|
||||
self.collection = self.app.get_or_create_collection(
|
||||
name=collection_name,
|
||||
embedding_function=self.embedder_config,
|
||||
)
|
||||
except Exception as e:
|
||||
raise Exception(f"Failed to create or get collection: {e}")
|
||||
|
||||
|
||||
def get_knowledge_source_with_custom_storage(
|
||||
folder_name: str,
|
||||
embedder=None
|
||||
) -> CustomStorageKnowledgeSource:
|
||||
"""Create a knowledge source with a custom storage.
|
||||
|
||||
Args:
|
||||
folder_name (str): Name of the folder to store embeddings and collection.
|
||||
embedder: Embedding function to use. Defaults to None.
|
||||
|
||||
Returns:
|
||||
CustomStorageKnowledgeSource: Configured knowledge source with custom storage.
|
||||
|
||||
Raises:
|
||||
Exception: If storage initialization fails.
|
||||
"""
|
||||
try:
|
||||
persist_path = f"vectorstores/knowledge_{folder_name}"
|
||||
storage = CustomKnowledgeStorage(
|
||||
persist_directory=persist_path,
|
||||
embedder=embedder,
|
||||
collection_name=folder_name
|
||||
)
|
||||
|
||||
storage.initialize_knowledge_storage()
|
||||
|
||||
source = CustomStorageKnowledgeSource(collection_name=folder_name)
|
||||
source.storage = storage
|
||||
|
||||
source.validate_content()
|
||||
|
||||
return source
|
||||
except Exception as e:
|
||||
raise Exception(f"Failed to initialize knowledge source: {e}")
|
||||
|
||||
|
||||
def main() -> None:
|
||||
"""Example of using a custom storage with CrewAI.
|
||||
|
||||
This function demonstrates how to:
|
||||
1. Create a knowledge source with pre-existing embeddings
|
||||
2. Use it with a Crew
|
||||
3. Run the Crew to perform tasks
|
||||
"""
|
||||
try:
|
||||
knowledge_source = get_knowledge_source_with_custom_storage(folder_name="example")
|
||||
|
||||
agent = Agent(role="test", goal="test", backstory="test")
|
||||
task = Task(description="test", expected_output="test", agent=agent)
|
||||
|
||||
crew = Crew(
|
||||
agents=[agent],
|
||||
tasks=[task],
|
||||
knowledge_sources=[knowledge_source]
|
||||
)
|
||||
|
||||
result = crew.kickoff()
|
||||
print(result)
|
||||
except Exception as e:
|
||||
print(f"Error running example: {e}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
17
src/crewai/agents/cache/cache_handler.py
vendored
17
src/crewai/agents/cache/cache_handler.py
vendored
@@ -1,28 +1,15 @@
|
||||
from typing import Any, Dict, Optional
|
||||
import threading
|
||||
from threading import local
|
||||
|
||||
from pydantic import BaseModel, PrivateAttr
|
||||
|
||||
|
||||
_thread_local = local()
|
||||
|
||||
|
||||
class CacheHandler(BaseModel):
|
||||
"""Callback handler for tool usage."""
|
||||
|
||||
_cache: Dict[str, Any] = PrivateAttr(default_factory=dict)
|
||||
|
||||
def _get_lock(self):
|
||||
"""Get a thread-local lock to avoid pickling issues."""
|
||||
if not hasattr(_thread_local, "cache_lock"):
|
||||
_thread_local.cache_lock = threading.Lock()
|
||||
return _thread_local.cache_lock
|
||||
|
||||
def add(self, tool, input, output):
|
||||
with self._get_lock():
|
||||
self._cache[f"{tool}-{input}"] = output
|
||||
self._cache[f"{tool}-{input}"] = output
|
||||
|
||||
def read(self, tool, input) -> Optional[str]:
|
||||
with self._get_lock():
|
||||
return self._cache.get(f"{tool}-{input}")
|
||||
return self._cache.get(f"{tool}-{input}")
|
||||
|
||||
@@ -88,7 +88,7 @@ class Crew(BaseModel):
|
||||
_rpm_controller: RPMController = PrivateAttr()
|
||||
_logger: Logger = PrivateAttr()
|
||||
_file_handler: FileHandler = PrivateAttr()
|
||||
_cache_handler: InstanceOf[CacheHandler] = PrivateAttr()
|
||||
_cache_handler: InstanceOf[CacheHandler] = PrivateAttr(default=CacheHandler())
|
||||
_short_term_memory: Optional[InstanceOf[ShortTermMemory]] = PrivateAttr()
|
||||
_long_term_memory: Optional[InstanceOf[LongTermMemory]] = PrivateAttr()
|
||||
_entity_memory: Optional[InstanceOf[EntityMemory]] = PrivateAttr()
|
||||
|
||||
@@ -0,0 +1,45 @@
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from pydantic import Field
|
||||
|
||||
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
|
||||
from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class CustomStorageKnowledgeSource(BaseKnowledgeSource):
|
||||
"""A knowledge source that uses a pre-existing storage with embeddings.
|
||||
|
||||
This class allows users to use pre-existing vector embeddings without re-embedding
|
||||
when using CrewAI. It acts as a bridge between BaseKnowledgeSource and KnowledgeStorage.
|
||||
|
||||
Args:
|
||||
collection_name (Optional[str]): Name of the collection in the vector database.
|
||||
Defaults to None.
|
||||
|
||||
Attributes:
|
||||
storage (KnowledgeStorage): The underlying storage implementation that contains
|
||||
the pre-existing embeddings.
|
||||
"""
|
||||
|
||||
collection_name: Optional[str] = Field(default=None)
|
||||
|
||||
def validate_content(self):
|
||||
"""Validates that the storage is properly initialized.
|
||||
|
||||
Raises:
|
||||
ValueError: If storage is not initialized before use.
|
||||
"""
|
||||
if not hasattr(self, 'storage') or self.storage is None:
|
||||
raise ValueError("Storage not initialized. Please set storage before use.")
|
||||
logger.debug(f"Storage validated for collection: {self.collection_name}")
|
||||
|
||||
def add(self) -> None:
|
||||
"""No need to add content as we're using pre-existing storage.
|
||||
|
||||
This method is intentionally empty as the embeddings already exist in the storage.
|
||||
"""
|
||||
logger.debug(f"Skipping add operation for pre-existing storage: {self.collection_name}")
|
||||
pass
|
||||
@@ -4,15 +4,11 @@ import asyncio
|
||||
import json
|
||||
import os
|
||||
import platform
|
||||
import threading
|
||||
import warnings
|
||||
from contextlib import contextmanager
|
||||
from importlib.metadata import version
|
||||
from threading import local
|
||||
from typing import TYPE_CHECKING, Any, Optional
|
||||
|
||||
_thread_local = local()
|
||||
|
||||
|
||||
@contextmanager
|
||||
def suppress_warnings():
|
||||
@@ -80,20 +76,12 @@ class Telemetry:
|
||||
raise # Re-raise the exception to not interfere with system signals
|
||||
self.ready = False
|
||||
|
||||
def _get_lock(self):
|
||||
"""Get a thread-local lock to avoid pickling issues."""
|
||||
if not hasattr(_thread_local, "telemetry_lock"):
|
||||
_thread_local.telemetry_lock = threading.Lock()
|
||||
return _thread_local.telemetry_lock
|
||||
|
||||
def set_tracer(self):
|
||||
if self.ready and not self.trace_set:
|
||||
try:
|
||||
with self._get_lock():
|
||||
if not self.trace_set: # Double-check to avoid race condition
|
||||
with suppress_warnings():
|
||||
trace.set_tracer_provider(self.provider)
|
||||
self.trace_set = True
|
||||
with suppress_warnings():
|
||||
trace.set_tracer_provider(self.provider)
|
||||
self.trace_set = True
|
||||
except Exception:
|
||||
self.ready = False
|
||||
self.trace_set = False
|
||||
@@ -102,8 +90,7 @@ class Telemetry:
|
||||
if not self.ready:
|
||||
return
|
||||
try:
|
||||
with self._get_lock():
|
||||
operation()
|
||||
operation()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
@@ -1,186 +0,0 @@
|
||||
import asyncio
|
||||
import time
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
import pytest
|
||||
from unittest.mock import patch
|
||||
|
||||
from crewai import Agent, Crew, Task
|
||||
|
||||
|
||||
class MockLLM:
|
||||
"""Mock LLM for testing."""
|
||||
def __init__(self, model="gpt-3.5-turbo", **kwargs):
|
||||
self.model = model
|
||||
self.stop = None
|
||||
self.timeout = None
|
||||
self.temperature = None
|
||||
self.top_p = None
|
||||
self.n = None
|
||||
self.max_completion_tokens = None
|
||||
self.max_tokens = None
|
||||
self.presence_penalty = None
|
||||
self.frequency_penalty = None
|
||||
self.logit_bias = None
|
||||
self.response_format = None
|
||||
self.seed = None
|
||||
self.logprobs = None
|
||||
self.top_logprobs = None
|
||||
self.base_url = None
|
||||
self.api_version = None
|
||||
self.api_key = None
|
||||
self.callbacks = []
|
||||
self.context_window_size = 8192
|
||||
self.kwargs = {}
|
||||
|
||||
for key, value in kwargs.items():
|
||||
setattr(self, key, value)
|
||||
|
||||
def complete(self, prompt, **kwargs):
|
||||
"""Mock completion method."""
|
||||
return f"Mock response for: {prompt[:20]}..."
|
||||
|
||||
def chat_completion(self, messages, **kwargs):
|
||||
"""Mock chat completion method."""
|
||||
return {"choices": [{"message": {"content": "Mock response"}}]}
|
||||
|
||||
def function_call(self, messages, functions, **kwargs):
|
||||
"""Mock function call method."""
|
||||
return {
|
||||
"choices": [
|
||||
{
|
||||
"message": {
|
||||
"content": "Mock response",
|
||||
"function_call": {
|
||||
"name": "test_function",
|
||||
"arguments": '{"arg1": "value1"}'
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
def supports_stop_words(self):
|
||||
"""Mock supports_stop_words method."""
|
||||
return False
|
||||
|
||||
def supports_function_calling(self):
|
||||
"""Mock supports_function_calling method."""
|
||||
return True
|
||||
|
||||
def get_context_window_size(self):
|
||||
"""Mock get_context_window_size method."""
|
||||
return self.context_window_size
|
||||
|
||||
def call(self, messages, callbacks=None):
|
||||
"""Mock call method."""
|
||||
return "Mock response from call method"
|
||||
|
||||
def set_callbacks(self, callbacks):
|
||||
"""Mock set_callbacks method."""
|
||||
self.callbacks = callbacks
|
||||
|
||||
def set_env_callbacks(self):
|
||||
"""Mock set_env_callbacks method."""
|
||||
pass
|
||||
|
||||
|
||||
def create_test_crew():
|
||||
"""Create a simple test crew for concurrency testing."""
|
||||
with patch("crewai.agent.LLM", MockLLM):
|
||||
agent = Agent(
|
||||
role="Test Agent",
|
||||
goal="Test concurrent execution",
|
||||
backstory="I am a test agent for concurrent execution",
|
||||
)
|
||||
|
||||
task = Task(
|
||||
description="Test task for concurrent execution",
|
||||
expected_output="Test output",
|
||||
agent=agent,
|
||||
)
|
||||
|
||||
crew = Crew(
|
||||
agents=[agent],
|
||||
tasks=[task],
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
return crew
|
||||
|
||||
|
||||
def test_threading_concurrency():
|
||||
"""Test concurrent execution using ThreadPoolExecutor."""
|
||||
num_threads = 5
|
||||
results = []
|
||||
|
||||
def generate_response(idx):
|
||||
try:
|
||||
crew = create_test_crew()
|
||||
with patch("crewai.agent.LLM", MockLLM):
|
||||
output = crew.kickoff(inputs={"test_input": f"input_{idx}"})
|
||||
return output
|
||||
except Exception as e:
|
||||
pytest.fail(f"Exception in thread {idx}: {e}")
|
||||
return None
|
||||
|
||||
with ThreadPoolExecutor(max_workers=num_threads) as executor:
|
||||
futures = [executor.submit(generate_response, i) for i in range(num_threads)]
|
||||
|
||||
for future in as_completed(futures):
|
||||
result = future.result()
|
||||
assert result is not None
|
||||
results.append(result)
|
||||
|
||||
assert len(results) == num_threads
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_asyncio_concurrency():
|
||||
"""Test concurrent execution using asyncio."""
|
||||
num_tasks = 5
|
||||
sem = asyncio.Semaphore(num_tasks)
|
||||
|
||||
async def generate_response_async(idx):
|
||||
async with sem:
|
||||
try:
|
||||
crew = create_test_crew()
|
||||
with patch("crewai.agent.LLM", MockLLM):
|
||||
output = await crew.kickoff_async(inputs={"test_input": f"input_{idx}"})
|
||||
return output
|
||||
except Exception as e:
|
||||
pytest.fail(f"Exception in task {idx}: {e}")
|
||||
return None
|
||||
|
||||
tasks = [generate_response_async(i) for i in range(num_tasks)]
|
||||
results = await asyncio.gather(*tasks)
|
||||
|
||||
assert len(results) == num_tasks
|
||||
assert all(result is not None for result in results)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_extended_asyncio_concurrency():
|
||||
"""Extended test for asyncio concurrency with more iterations."""
|
||||
num_tasks = 5 # Reduced from 10 for faster testing
|
||||
iterations = 2 # Reduced from 3 for faster testing
|
||||
sem = asyncio.Semaphore(num_tasks)
|
||||
|
||||
async def generate_response_async(idx):
|
||||
async with sem:
|
||||
crew = create_test_crew()
|
||||
for i in range(iterations):
|
||||
try:
|
||||
with patch("crewai.agent.LLM", MockLLM):
|
||||
output = await crew.kickoff_async(
|
||||
inputs={"test_input": f"input_{idx}_{i}"}
|
||||
)
|
||||
assert output is not None
|
||||
except Exception as e:
|
||||
pytest.fail(f"Exception in task {idx}, iteration {i}: {e}")
|
||||
return False
|
||||
return True
|
||||
|
||||
tasks = [generate_response_async(i) for i in range(num_tasks)]
|
||||
results = await asyncio.gather(*tasks)
|
||||
|
||||
assert all(results)
|
||||
125
tests/knowledge/custom_storage_knowledge_source_test.py
Normal file
125
tests/knowledge/custom_storage_knowledge_source_test.py
Normal file
@@ -0,0 +1,125 @@
|
||||
"""Test CustomStorageKnowledgeSource functionality."""
|
||||
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from crewai.knowledge.knowledge import Knowledge
|
||||
from crewai.knowledge.source.custom_storage_knowledge_source import (
|
||||
CustomStorageKnowledgeSource,
|
||||
)
|
||||
from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def custom_storage():
|
||||
"""Create a custom KnowledgeStorage instance."""
|
||||
storage = KnowledgeStorage(collection_name="test_collection")
|
||||
return storage
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def temp_dir():
|
||||
"""Create a temporary directory for test files."""
|
||||
temp_dir = tempfile.mkdtemp()
|
||||
yield temp_dir
|
||||
if os.path.exists(temp_dir):
|
||||
shutil.rmtree(temp_dir)
|
||||
|
||||
|
||||
def test_custom_storage_knowledge_source(custom_storage):
|
||||
"""Test that a CustomStorageKnowledgeSource can be created with a pre-existing storage."""
|
||||
source = CustomStorageKnowledgeSource(collection_name="test_collection")
|
||||
|
||||
assert source is not None
|
||||
assert source.collection_name == "test_collection"
|
||||
|
||||
|
||||
def test_custom_storage_knowledge_source_validation():
|
||||
"""Test that validation fails when storage is not properly initialized."""
|
||||
source = CustomStorageKnowledgeSource(collection_name="test_collection")
|
||||
|
||||
source.storage = None
|
||||
|
||||
with pytest.raises(ValueError, match="Storage not initialized"):
|
||||
source.validate_content()
|
||||
|
||||
|
||||
def test_custom_storage_knowledge_source_with_knowledge(custom_storage):
|
||||
"""Test that a CustomStorageKnowledgeSource can be used with Knowledge."""
|
||||
source = CustomStorageKnowledgeSource(collection_name="test_collection")
|
||||
source.storage = custom_storage
|
||||
|
||||
with patch.object(KnowledgeStorage, 'initialize_knowledge_storage'):
|
||||
with patch.object(CustomStorageKnowledgeSource, 'add'):
|
||||
knowledge = Knowledge(
|
||||
sources=[source],
|
||||
storage=custom_storage,
|
||||
collection_name="test_collection"
|
||||
)
|
||||
|
||||
assert knowledge is not None
|
||||
assert knowledge.sources[0] == source
|
||||
assert knowledge.storage == custom_storage
|
||||
|
||||
|
||||
def test_custom_storage_knowledge_source_with_crew():
|
||||
"""Test that a CustomStorageKnowledgeSource can be used with Crew."""
|
||||
from crewai.agent import Agent
|
||||
from crewai.crew import Crew
|
||||
from crewai.task import Task
|
||||
|
||||
storage = KnowledgeStorage(collection_name="test_collection")
|
||||
|
||||
source = CustomStorageKnowledgeSource(collection_name="test_collection")
|
||||
source.storage = storage
|
||||
|
||||
agent = Agent(role="test", goal="test", backstory="test")
|
||||
task = Task(description="test", expected_output="test", agent=agent)
|
||||
|
||||
with patch.object(KnowledgeStorage, 'initialize_knowledge_storage'):
|
||||
with patch.object(CustomStorageKnowledgeSource, 'add'):
|
||||
crew = Crew(
|
||||
agents=[agent],
|
||||
tasks=[task],
|
||||
knowledge_sources=[source]
|
||||
)
|
||||
|
||||
assert crew is not None
|
||||
assert crew.knowledge_sources[0] == source
|
||||
|
||||
|
||||
def test_custom_storage_knowledge_source_add_method():
|
||||
"""Test that the add method doesn't modify the storage."""
|
||||
source = CustomStorageKnowledgeSource(collection_name="test_collection")
|
||||
storage = MagicMock(spec=KnowledgeStorage)
|
||||
source.storage = storage
|
||||
|
||||
source.add()
|
||||
|
||||
storage.assert_not_called()
|
||||
|
||||
|
||||
def test_integration_with_existing_storage(temp_dir):
|
||||
"""Test integration with an existing storage directory."""
|
||||
storage_path = os.path.join(temp_dir, "test_storage")
|
||||
os.makedirs(storage_path, exist_ok=True)
|
||||
|
||||
class MockStorage(KnowledgeStorage):
|
||||
def initialize_knowledge_storage(self):
|
||||
self.initialized = True
|
||||
|
||||
storage = MockStorage(collection_name="test_integration")
|
||||
storage.initialize_knowledge_storage()
|
||||
|
||||
source = CustomStorageKnowledgeSource(collection_name="test_integration")
|
||||
source.storage = storage
|
||||
|
||||
source.validate_content()
|
||||
|
||||
assert hasattr(storage, "initialized")
|
||||
assert storage.initialized is True
|
||||
Reference in New Issue
Block a user