Compare commits

..

5 Commits

Author SHA1 Message Date
Devin AI
1867c798ec Fix import sorting to resolve lint issues
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-05-05 14:12:37 +00:00
Devin AI
29ebdbf474 Implement PR review suggestions for improved error handling, docstrings, and tests
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-05-05 14:10:16 +00:00
Devin AI
1b9cbb67f7 Fix import formatting to resolve lint issues
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-05-05 14:05:12 +00:00
Devin AI
58a120608b Fix expected_output parameter in Task example
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-05-05 14:01:48 +00:00
Devin AI
51439c3c0a Fix #2755: Add support for custom knowledge storage with pre-existing embeddings
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-05-05 13:58:37 +00:00
7 changed files with 317 additions and 153 deletions

View File

@@ -0,0 +1,123 @@
"""Example of using a custom storage with CrewAI."""
from pathlib import Path
import chromadb
from chromadb.config import Settings
from crewai import Agent, Crew, Task
from crewai.knowledge.source.custom_storage_knowledge_source import (
CustomStorageKnowledgeSource,
)
from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage
class CustomKnowledgeStorage(KnowledgeStorage):
"""Custom knowledge storage that uses a specific persistent directory.
Args:
persist_directory (str): Path to the directory where ChromaDB will persist data.
embedder: Embedding function to use for the collection. Defaults to None.
collection_name (str, optional): Name of the collection. Defaults to None.
Raises:
ValueError: If persist_directory is empty or invalid.
"""
def __init__(self, persist_directory: str, embedder=None, collection_name=None):
if not persist_directory:
raise ValueError("persist_directory cannot be empty")
self.persist_directory = persist_directory
super().__init__(embedder=embedder, collection_name=collection_name)
def initialize_knowledge_storage(self):
"""Initialize the knowledge storage with a custom persistent directory.
Creates a ChromaDB PersistentClient with the specified directory and
initializes a collection with the provided name and embedding function.
Raises:
Exception: If collection creation or retrieval fails.
"""
try:
chroma_client = chromadb.PersistentClient(
path=self.persist_directory,
settings=Settings(allow_reset=True),
)
self.app = chroma_client
collection_name = (
"knowledge" if not self.collection_name else self.collection_name
)
self.collection = self.app.get_or_create_collection(
name=collection_name,
embedding_function=self.embedder_config,
)
except Exception as e:
raise Exception(f"Failed to create or get collection: {e}")
def get_knowledge_source_with_custom_storage(
folder_name: str,
embedder=None
) -> CustomStorageKnowledgeSource:
"""Create a knowledge source with a custom storage.
Args:
folder_name (str): Name of the folder to store embeddings and collection.
embedder: Embedding function to use. Defaults to None.
Returns:
CustomStorageKnowledgeSource: Configured knowledge source with custom storage.
Raises:
Exception: If storage initialization fails.
"""
try:
persist_path = f"vectorstores/knowledge_{folder_name}"
storage = CustomKnowledgeStorage(
persist_directory=persist_path,
embedder=embedder,
collection_name=folder_name
)
storage.initialize_knowledge_storage()
source = CustomStorageKnowledgeSource(collection_name=folder_name)
source.storage = storage
source.validate_content()
return source
except Exception as e:
raise Exception(f"Failed to initialize knowledge source: {e}")
def main() -> None:
"""Example of using a custom storage with CrewAI.
This function demonstrates how to:
1. Create a knowledge source with pre-existing embeddings
2. Use it with a Crew
3. Run the Crew to perform tasks
"""
try:
knowledge_source = get_knowledge_source_with_custom_storage(folder_name="example")
agent = Agent(role="test", goal="test", backstory="test")
task = Task(description="test", expected_output="test", agent=agent)
crew = Crew(
agents=[agent],
tasks=[task],
knowledge_sources=[knowledge_source]
)
result = crew.kickoff()
print(result)
except Exception as e:
print(f"Error running example: {e}")
if __name__ == "__main__":
main()

View File

@@ -1076,50 +1076,18 @@ class Crew(BaseModel):
self,
n_iterations: int,
openai_model_name: Optional[str] = None,
llm: Optional[Union[str, LLM]] = None,
inputs: Optional[Dict[str, Any]] = None,
) -> None:
"""Test and evaluate the Crew with the given inputs for n iterations.
Args:
n_iterations: Number of test iterations to run
openai_model_name: (Deprecated) Name of OpenAI model to use for evaluation
llm: LLM instance or model name to use for evaluation
inputs: Optional dictionary of inputs to pass to the crew
"""
if openai_model_name:
warnings.warn(
"openai_model_name is deprecated and will be removed in future versions. Use llm parameter instead.",
DeprecationWarning,
stacklevel=2
)
"""Test and evaluate the Crew with the given inputs for n iterations concurrently using concurrent.futures."""
test_crew = self.copy()
model = llm if llm else openai_model_name
try:
if not model:
raise ValueError(
"Either llm or openai_model_name must be provided. Please provide either "
"a custom LLM instance or an OpenAI model name."
)
if isinstance(model, LLM):
if not hasattr(model, 'model'):
raise ValueError("Provided LLM instance must have a 'model' attribute")
elif isinstance(model, str):
model = LLM(model=model)
else:
raise ValueError("LLM must be either a string model name or an LLM instance")
except Exception as e:
raise ValueError(f"Failed to initialize LLM: {str(e)}")
self._test_execution_span = test_crew._telemetry.test_execution_span(
test_crew,
n_iterations,
inputs,
str(model), # type: ignore[arg-type]
openai_model_name, # type: ignore[arg-type]
) # type: ignore[arg-type]
evaluator = CrewEvaluator(test_crew, model)
evaluator = CrewEvaluator(test_crew, openai_model_name) # type: ignore[arg-type]
for i in range(1, n_iterations + 1):
evaluator.set_iteration(i)

View File

@@ -0,0 +1,45 @@
import logging
from typing import Optional
from pydantic import Field
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage
logger = logging.getLogger(__name__)
class CustomStorageKnowledgeSource(BaseKnowledgeSource):
"""A knowledge source that uses a pre-existing storage with embeddings.
This class allows users to use pre-existing vector embeddings without re-embedding
when using CrewAI. It acts as a bridge between BaseKnowledgeSource and KnowledgeStorage.
Args:
collection_name (Optional[str]): Name of the collection in the vector database.
Defaults to None.
Attributes:
storage (KnowledgeStorage): The underlying storage implementation that contains
the pre-existing embeddings.
"""
collection_name: Optional[str] = Field(default=None)
def validate_content(self):
"""Validates that the storage is properly initialized.
Raises:
ValueError: If storage is not initialized before use.
"""
if not hasattr(self, 'storage') or self.storage is None:
raise ValueError("Storage not initialized. Please set storage before use.")
logger.debug(f"Storage validated for collection: {self.collection_name}")
def add(self) -> None:
"""No need to add content as we're using pre-existing storage.
This method is intentionally empty as the embeddings already exist in the storage.
"""
logger.debug(f"Skipping add operation for pre-existing storage: {self.collection_name}")
pass

View File

@@ -1,10 +1,6 @@
from collections import defaultdict
from typing import Union
from pydantic import BaseModel, Field
from crewai.llm import LLM
from rich.box import HEAVY_EDGE
from rich.console import Console
from rich.table import Table
@@ -27,7 +23,7 @@ class CrewEvaluator:
Attributes:
crew (Crew): The crew of agents to evaluate.
llm (LLM): The language model to use for evaluating the performance of the agents.
openai_model_name (str): The model to use for evaluating the performance of the agents (for now ONLY OpenAI accepted).
tasks_scores (defaultdict): A dictionary to store the scores of the agents for each task.
iteration (int): The current iteration of the evaluation.
"""
@@ -36,20 +32,12 @@ class CrewEvaluator:
run_execution_times: defaultdict = defaultdict(list)
iteration: int = 0
def __init__(self, crew, llm: Union[str, LLM]):
def __init__(self, crew, openai_model_name: str):
self.crew = crew
try:
self.llm = llm if isinstance(llm, LLM) else LLM(model=llm)
if not hasattr(self.llm, 'model'):
raise ValueError("Provided LLM instance must have a 'model' attribute")
except Exception as e:
raise ValueError(f"Failed to initialize LLM: {str(e)}")
self.openai_model_name = openai_model_name
self._telemetry = Telemetry()
self._setup_for_evaluating()
def __str__(self) -> str:
return f"CrewEvaluator(model={str(self.llm)}, iteration={self.iteration})"
def _setup_for_evaluating(self) -> None:
"""Sets up the crew for evaluating."""
for task in self.crew.tasks:
@@ -63,7 +51,7 @@ class CrewEvaluator:
),
backstory="Evaluator agent for crew evaluation with precise capabilities to evaluate the performance of the agents in the crew based on the tasks they have performed",
verbose=False,
llm=self.llm,
llm=self.openai_model_name,
)
def _evaluation_task(
@@ -193,7 +181,7 @@ class CrewEvaluator:
self.crew,
evaluation_result.pydantic.quality,
current_task._execution_time,
str(self.llm),
self.openai_model_name,
)
self.tasks_scores[self.iteration].append(evaluation_result.pydantic.quality)
self.run_execution_times[self.iteration].append(

View File

@@ -13,7 +13,6 @@ import pytest
from crewai.agent import Agent
from crewai.agents.cache import CacheHandler
from crewai.crew import Crew
from crewai.llm import LLM
from crewai.crews.crew_output import CrewOutput
from crewai.memory.contextual.contextual_memory import ContextualMemory
from crewai.process import Process
@@ -1124,7 +1123,7 @@ def test_kickoff_for_each_empty_input():
assert results == []
@pytest.mark.vcr(filter_headeruvs=["authorization"])
@pytest.mark.vcr(filter_headers=["authorization"])
def test_kickoff_for_each_invalid_input():
"""Tests if kickoff_for_each raises TypeError for invalid input types."""
@@ -2813,10 +2812,10 @@ def test_conditional_should_execute():
@mock.patch("crewai.crew.CrewEvaluator")
@mock.patch("crewai.crew.Crew.copy")
@mock.patch("crewai.crew.Crew.kickoff")
def test_crew_testing_with_custom_llm(kickoff_mock, copy_mock, crew_evaluator):
def test_crew_testing_function(kickoff_mock, copy_mock, crew_evaluator):
task = Task(
description="Test task",
expected_output="Test output",
description="Come up with a list of 5 interesting ideas to explore for an article, then write one amazing paragraph highlight for each idea that showcases how good an article about this topic could be. Return the list of ideas with their paragraph and your notes.",
expected_output="5 bullet points with a paragraph for each idea.",
agent=researcher,
)
@@ -2828,107 +2827,23 @@ def test_crew_testing_with_custom_llm(kickoff_mock, copy_mock, crew_evaluator):
# Create a mock for the copied crew
copy_mock.return_value = crew
custom_llm = LLM(model="gpt-4o-mini")
n_iterations = 2
crew.test(n_iterations, llm=custom_llm)
crew.test(n_iterations, openai_model_name="gpt-4o-mini", inputs={"topic": "AI"})
# Ensure kickoff is called on the copied crew
kickoff_mock.assert_has_calls([mock.call(inputs=None), mock.call(inputs=None)])
# Verify CrewEvaluator was called with custom LLM
crew_evaluator.assert_has_calls([
mock.call(crew, custom_llm),
mock.call().set_iteration(1),
mock.call().set_iteration(2),
mock.call().print_crew_evaluation_result(),
])
@mock.patch("crewai.crew.CrewEvaluator")
@mock.patch("crewai.crew.Crew.copy")
@mock.patch("crewai.crew.Crew.kickoff")
def test_crew_testing_backward_compatibility(kickoff_mock, copy_mock, crew_evaluator):
task = Task(
description="Test task",
expected_output="Test output",
agent=researcher,
kickoff_mock.assert_has_calls(
[mock.call(inputs={"topic": "AI"}), mock.call(inputs={"topic": "AI"})]
)
crew = Crew(
agents=[researcher],
tasks=[task],
crew_evaluator.assert_has_calls(
[
mock.call(crew, "gpt-4o-mini"),
mock.call().set_iteration(1),
mock.call().set_iteration(2),
mock.call().print_crew_evaluation_result(),
]
)
# Create a mock for the copied crew
copy_mock.return_value = crew
n_iterations = 2
with pytest.warns(DeprecationWarning, match="openai_model_name is deprecated"):
crew.test(n_iterations, openai_model_name="gpt-4o-mini", inputs={"topic": "AI"})
# Ensure kickoff is called on the copied crew
kickoff_mock.assert_has_calls([
mock.call(inputs={"topic": "AI"}),
mock.call(inputs={"topic": "AI"})
])
# Verify CrewEvaluator was called with string model name
crew_evaluator.assert_has_calls([
mock.call(crew, mock.ANY),
mock.call().set_iteration(1),
mock.call().set_iteration(2),
mock.call().print_crew_evaluation_result(),
])
@mock.patch("crewai.crew.CrewEvaluator")
@mock.patch("crewai.crew.Crew.copy")
@mock.patch("crewai.crew.Crew.kickoff")
def test_crew_testing_missing_llm(kickoff_mock, copy_mock, crew_evaluator):
task = Task(
description="Test task",
expected_output="Test output",
agent=researcher,
)
crew = Crew(
agents=[researcher],
tasks=[task],
)
# Create a mock for the copied crew
copy_mock.return_value = crew
n_iterations = 2
with pytest.raises(ValueError, match="Either llm or openai_model_name must be provided"):
crew.test(n_iterations)
@mock.patch("crewai.crew.CrewEvaluator")
@mock.patch("crewai.crew.Crew.copy")
@mock.patch("crewai.crew.Crew.kickoff")
def test_crew_testing_with_invalid_llm(kickoff_mock, copy_mock, crew_evaluator):
task = Task(
description="Test task",
expected_output="Test output",
agent=researcher,
)
crew = Crew(
agents=[researcher],
tasks=[task],
)
# Create a mock for the copied crew
copy_mock.return_value = crew
# Test invalid LLM type
with pytest.raises(ValueError, match="Failed to initialize LLM"):
crew.test(n_iterations=2, llm={})
# Test LLM without model attribute
class InvalidLLM:
def __init__(self): pass
with pytest.raises(ValueError, match="LLM must be either a string model name or an LLM instance"):
crew.test(n_iterations=2, llm=InvalidLLM())
@pytest.mark.vcr(filter_headers=["authorization"])
def test_hierarchical_verbose_manager_agent():
@@ -3210,4 +3125,4 @@ def test_multimodal_agent_live_image_analysis():
# Verify we got a meaningful response
assert isinstance(result.raw, str)
assert len(result.raw) > 100 # Expecting a detailed analysis
assert "error" not in result.raw.lower() # No error messages in response
assert "error" not in result.raw.lower() # No error messages in response

View File

@@ -0,0 +1,125 @@
"""Test CustomStorageKnowledgeSource functionality."""
import os
import shutil
import tempfile
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from crewai.knowledge.knowledge import Knowledge
from crewai.knowledge.source.custom_storage_knowledge_source import (
CustomStorageKnowledgeSource,
)
from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage
@pytest.fixture
def custom_storage():
"""Create a custom KnowledgeStorage instance."""
storage = KnowledgeStorage(collection_name="test_collection")
return storage
@pytest.fixture
def temp_dir():
"""Create a temporary directory for test files."""
temp_dir = tempfile.mkdtemp()
yield temp_dir
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
def test_custom_storage_knowledge_source(custom_storage):
"""Test that a CustomStorageKnowledgeSource can be created with a pre-existing storage."""
source = CustomStorageKnowledgeSource(collection_name="test_collection")
assert source is not None
assert source.collection_name == "test_collection"
def test_custom_storage_knowledge_source_validation():
"""Test that validation fails when storage is not properly initialized."""
source = CustomStorageKnowledgeSource(collection_name="test_collection")
source.storage = None
with pytest.raises(ValueError, match="Storage not initialized"):
source.validate_content()
def test_custom_storage_knowledge_source_with_knowledge(custom_storage):
"""Test that a CustomStorageKnowledgeSource can be used with Knowledge."""
source = CustomStorageKnowledgeSource(collection_name="test_collection")
source.storage = custom_storage
with patch.object(KnowledgeStorage, 'initialize_knowledge_storage'):
with patch.object(CustomStorageKnowledgeSource, 'add'):
knowledge = Knowledge(
sources=[source],
storage=custom_storage,
collection_name="test_collection"
)
assert knowledge is not None
assert knowledge.sources[0] == source
assert knowledge.storage == custom_storage
def test_custom_storage_knowledge_source_with_crew():
"""Test that a CustomStorageKnowledgeSource can be used with Crew."""
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.task import Task
storage = KnowledgeStorage(collection_name="test_collection")
source = CustomStorageKnowledgeSource(collection_name="test_collection")
source.storage = storage
agent = Agent(role="test", goal="test", backstory="test")
task = Task(description="test", expected_output="test", agent=agent)
with patch.object(KnowledgeStorage, 'initialize_knowledge_storage'):
with patch.object(CustomStorageKnowledgeSource, 'add'):
crew = Crew(
agents=[agent],
tasks=[task],
knowledge_sources=[source]
)
assert crew is not None
assert crew.knowledge_sources[0] == source
def test_custom_storage_knowledge_source_add_method():
"""Test that the add method doesn't modify the storage."""
source = CustomStorageKnowledgeSource(collection_name="test_collection")
storage = MagicMock(spec=KnowledgeStorage)
source.storage = storage
source.add()
storage.assert_not_called()
def test_integration_with_existing_storage(temp_dir):
"""Test integration with an existing storage directory."""
storage_path = os.path.join(temp_dir, "test_storage")
os.makedirs(storage_path, exist_ok=True)
class MockStorage(KnowledgeStorage):
def initialize_knowledge_storage(self):
self.initialized = True
storage = MockStorage(collection_name="test_integration")
storage.initialize_knowledge_storage()
source = CustomStorageKnowledgeSource(collection_name="test_integration")
source.storage = storage
source.validate_content()
assert hasattr(storage, "initialized")
assert storage.initialized is True

View File

@@ -23,7 +23,7 @@ class TestCrewEvaluator:
)
crew = Crew(agents=[agent], tasks=[task])
return CrewEvaluator(crew, llm="gpt-4o-mini")
return CrewEvaluator(crew, openai_model_name="gpt-4o-mini")
def test_setup_for_evaluating(self, crew_planner):
crew_planner._setup_for_evaluating()