Compare commits

..

2 Commits

Author SHA1 Message Date
Devin AI
ea783d83c9 Address PR feedback: refactor code, add type hints, and improve test coverage
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-05-08 11:51:28 +00:00
Devin AI
ca318d2bc2 Fix #2787: Add direct kickoff methods to CrewBase instances
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-05-08 11:46:41 +00:00
7 changed files with 244 additions and 150 deletions

View File

@@ -1,7 +1,6 @@
import inspect
import logging
from pathlib import Path
from typing import Any, Callable, Dict, List, TypeVar, Union, cast
from typing import Any, Awaitable, Callable, Dict, List, Optional, TypeVar, cast
import yaml
from dotenv import load_dotenv
@@ -26,12 +25,11 @@ def CrewBase(cls: T) -> T:
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
agents_config_paths = self._normalize_to_path_list(self.original_agents_config_path)
tasks_config_paths = self._normalize_to_path_list(self.original_tasks_config_path)
agents_config_path = self.base_directory / self.original_agents_config_path
tasks_config_path = self.base_directory / self.original_tasks_config_path
# Load and merge configurations
self.agents_config = self.load_and_merge_yaml_configs(agents_config_paths)
self.tasks_config = self.load_and_merge_yaml_configs(tasks_config_paths)
self.agents_config = self.load_yaml(agents_config_path)
self.tasks_config = self.load_yaml(tasks_config_path)
self.map_all_agent_variables()
self.map_all_task_variables()
@@ -69,75 +67,14 @@ def CrewBase(cls: T) -> T:
self._original_functions, "is_kickoff"
)
def _normalize_to_path_list(self, paths) -> List[Path]:
"""
Normalize input paths to always be a list of Path objects.
Args:
paths: A string path, Path object, or list of paths
Returns:
A list of Path objects
"""
if isinstance(paths, (list, tuple)):
return [self.base_directory / p for p in paths]
else:
return [self.base_directory / paths]
@staticmethod
def load_yaml(config_path: Path):
try:
with open(config_path, "r", encoding="utf-8") as file:
return yaml.safe_load(file)
except FileNotFoundError:
logging.error(f"Configuration YAML file not found: {config_path}")
print(f"File not found: {config_path}")
raise
def deep_merge(self, dict1: dict, dict2: dict) -> dict:
"""
Recursively merge two dictionaries, with values from dict2 taking precedence.
Args:
dict1: First dictionary
dict2: Second dictionary with values that will override dict1 for duplicate keys
Returns:
A new dictionary with merged values
"""
result = dict1.copy()
for key, value in dict2.items():
if key in result and isinstance(result[key], dict) and isinstance(value, dict):
result[key] = self.deep_merge(result[key], value)
else:
result[key] = value
return result
def load_and_merge_yaml_configs(self, config_paths: List[Path]) -> dict:
"""
Load and merge configurations from multiple YAML files.
This function loads each YAML file in the provided list and merges their
configurations. For duplicate keys, later files in the list will override
earlier ones. For nested dictionaries, a deep merge is performed, meaning
that nested keys are preserved unless explicitly overridden.
Example:
If file1.yaml contains: {"agent1": {"role": "researcher", "goal": "find info"}}
And file2.yaml contains: {"agent1": {"role": "analyst"}}
The result will be: {"agent1": {"role": "analyst", "goal": "find info"}}
Args:
config_paths: A list of Path objects pointing to YAML files
Returns:
A dictionary with merged configurations
"""
result = {}
for path in config_paths:
config = self.load_yaml(path)
if config:
result = self.deep_merge(result, config)
return result
def _get_all_functions(self):
return {
@@ -276,6 +213,97 @@ def CrewBase(cls: T) -> T:
callback_functions[callback]() for callback in callbacks
]
def _validate_crew_decorator(self) -> None:
"""Validates that a crew decorator exists.
Raises:
AttributeError: If no method with @crew decorator is found.
"""
if not hasattr(self, "_kickoff") or not self._kickoff:
raise AttributeError("No method with @crew decorator found. Add a method with @crew decorator to your class.")
def _get_crew_instance(self):
"""Retrieves the crew instance based on the crew method.
Returns:
Crew: The crew instance created by the @crew decorated method.
Raises:
AttributeError: If no method with @crew decorator is found.
"""
self._validate_crew_decorator()
crew_method_name = list(self._kickoff.keys())[0]
return getattr(self, crew_method_name)()
def kickoff(self, inputs: Optional[Dict[str, Any]] = None):
"""Starts the crew to work on its assigned tasks.
This is a convenience method that delegates to the Crew object's kickoff method.
It allows calling kickoff() directly on the CrewBase instance.
Args:
inputs: Optional inputs for the crew execution.
Returns:
CrewOutput: The output of the crew execution.
Raises:
AttributeError: If no method with @crew decorator is found.
"""
crew_instance = self._get_crew_instance()
return crew_instance.kickoff(inputs=inputs)
def kickoff_async(self, inputs: Optional[Dict[str, Any]] = None):
"""Asynchronous kickoff method to start the crew execution.
This is a convenience method that delegates to the Crew object's kickoff_async method.
Args:
inputs: Optional inputs for the crew execution.
Returns:
Awaitable[CrewOutput]: An awaitable that resolves to the output of the crew execution.
Raises:
AttributeError: If no method with @crew decorator is found.
"""
crew_instance = self._get_crew_instance()
return crew_instance.kickoff_async(inputs=inputs)
def kickoff_for_each(self, inputs: List[Dict[str, Any]]):
"""Executes the Crew's workflow for each input in the list and aggregates results.
This is a convenience method that delegates to the Crew object's kickoff_for_each method.
Args:
inputs: List of input dictionaries for the crew execution.
Returns:
List[CrewOutput]: List of outputs from the crew execution.
Raises:
AttributeError: If no method with @crew decorator is found.
"""
crew_instance = self._get_crew_instance()
return crew_instance.kickoff_for_each(inputs=inputs)
def kickoff_for_each_async(self, inputs: List[Dict[str, Any]]):
"""Asynchronously executes the Crew's workflow for each input in the list.
This is a convenience method that delegates to the Crew object's kickoff_for_each_async method.
Args:
inputs: List of input dictionaries for the crew execution.
Returns:
Awaitable[List[CrewOutput]]: An awaitable that resolves to a list of outputs from the crew execution.
Raises:
AttributeError: If no method with @crew decorator is found.
"""
crew_instance = self._get_crew_instance()
return crew_instance.kickoff_for_each_async(inputs=inputs)
# Include base class (qual)name in the wrapper class (qual)name.
WrappedClass.__name__ = CrewBase.__name__ + "(" + cls.__name__ + ")"
WrappedClass.__qualname__ = CrewBase.__qualname__ + "(" + cls.__name__ + ")"

View File

@@ -1,5 +0,0 @@
test_agent1:
role: Test Agent 1
goal: Test Goal 1
backstory: Test Backstory 1
verbose: true

View File

@@ -1,8 +0,0 @@
test_agent1:
role: Updated Test Agent 1
goal: Updated Test Goal 1
test_agent2:
role: Test Agent 2
goal: Test Goal 2
backstory: Test Backstory 2
verbose: true

View File

@@ -1,4 +0,0 @@
test_task1:
description: Test Description 1
expected_output: Test Output 1
agent: test_agent1

View File

@@ -1,6 +0,0 @@
test_task1:
description: Updated Test Description 1
test_task2:
description: Test Description 2
expected_output: Test Output 2
agent: test_agent2

View File

@@ -187,65 +187,118 @@ def test_multiple_before_after_kickoff():
@pytest.mark.vcr(filter_headers=["authorization"])
def test_multiple_yaml_configs():
@CrewBase
class MultiConfigCrew:
agents_config = ["config/multi/agents1.yaml", "config/multi/agents2.yaml"]
tasks_config = ["config/multi/tasks1.yaml", "config/multi/tasks2.yaml"]
@agent
def test_agent1(self):
return Agent(config=self.agents_config["test_agent1"])
@agent
def test_agent2(self):
return Agent(config=self.agents_config["test_agent2"])
@task
def test_task1(self):
task_config = self.tasks_config["test_task1"].copy()
if isinstance(task_config.get("agent"), str):
agent_name = task_config.pop("agent")
if hasattr(self, agent_name):
task_config["agent"] = getattr(self, agent_name)()
return Task(config=task_config)
@task
def test_task2(self):
task_config = self.tasks_config["test_task2"].copy()
if isinstance(task_config.get("agent"), str):
agent_name = task_config.pop("agent")
if hasattr(self, agent_name):
task_config["agent"] = getattr(self, agent_name)()
return Task(config=task_config)
@crew
def test_direct_kickoff_on_crewbase():
"""Test that kickoff can be called directly on a CrewBase instance."""
class MockCrewBase:
def __init__(self):
self._kickoff = {"crew": lambda: self}
def crew(self):
return Crew(agents=self.agents, tasks=self.tasks, verbose=True)
class MockCrew:
def kickoff(self, inputs=None):
if inputs:
inputs["topic"] = "Bicycles"
class MockOutput:
def __init__(self):
self.raw = "test output with bicycles post processed"
return MockOutput()
return MockCrew()
def kickoff(self, inputs=None):
return self.crew().kickoff(inputs)
crew = MockCrewBase()
result = crew.kickoff({"topic": "LLMs"})
assert "bicycles" in result.raw.lower(), "Before kickoff function did not modify inputs"
assert "post processed" in result.raw, "After kickoff function did not modify outputs"
crew = MultiConfigCrew()
assert "test_agent1" in crew.agents_config
assert "test_agent2" in crew.agents_config
assert crew.agents_config["test_agent1"]["role"] == "Updated Test Agent 1"
assert crew.agents_config["test_agent1"]["goal"] == "Updated Test Goal 1"
assert crew.agents_config["test_agent1"]["backstory"] == "Test Backstory 1"
assert crew.agents_config["test_agent1"]["verbose"] is True
assert "test_task1" in crew.tasks_config
assert "test_task2" in crew.tasks_config
assert crew.tasks_config["test_task1"]["description"] == "Updated Test Description 1"
assert crew.tasks_config["test_task1"]["expected_output"] == "Test Output 1"
assert crew.tasks_config["test_task1"]["agent"].role == "Updated Test Agent 1"
agent1 = crew.test_agent1()
agent2 = crew.test_agent2()
task1 = crew.test_task1()
task2 = crew.test_task2()
@pytest.mark.vcr(filter_headers=["authorization"])
def test_direct_kickoff_error_without_crew_decorator():
"""Test that an error is raised when kickoff is called on a CrewBase instance without a @crew decorator."""
class MockCrewBase:
def __init__(self):
self._kickoff = {}
def kickoff(self, inputs=None):
if not self._kickoff:
raise AttributeError("No method with @crew decorator found. Add a method with @crew decorator to your class.")
return None
assert agent1.role == "Updated Test Agent 1"
assert agent2.role == "Test Agent 2"
assert task1.description == "Updated Test Description 1"
assert task2.description == "Test Description 2"
crew = MockCrewBase()
with pytest.raises(AttributeError):
crew.kickoff()
@pytest.mark.vcr(filter_headers=["authorization"])
@pytest.mark.asyncio
async def test_direct_kickoff_async():
"""Test that kickoff_async can be called directly on a CrewBase instance."""
class MockCrewBase:
def __init__(self):
self._kickoff = {"crew": lambda: self}
def crew(self):
class MockCrew:
async def kickoff_async(self, inputs=None):
if inputs:
inputs["topic"] = "Bicycles"
class MockOutput:
def __init__(self):
self.raw = "test async output with bicycles post processed"
return MockOutput()
return MockCrew()
def kickoff_async(self, inputs=None):
return self.crew().kickoff_async(inputs=inputs)
crew = MockCrewBase()
result = await crew.kickoff_async({"topic": "LLMs"})
assert "bicycles" in result.raw.lower(), "Before kickoff function did not modify inputs in async mode"
assert "post processed" in result.raw, "After kickoff function did not modify outputs in async mode"
@pytest.mark.vcr(filter_headers=["authorization"])
@pytest.mark.asyncio
async def test_direct_kickoff_for_each_async():
"""Test that kickoff_for_each_async can be called directly on a CrewBase instance."""
class MockCrewBase:
def __init__(self):
self._kickoff = {"crew": lambda: self}
def crew(self):
class MockCrew:
async def kickoff_for_each_async(self, inputs=None):
results = []
for input_item in inputs:
if "topic" in input_item:
input_item["topic"] = f"Bicycles-{input_item['topic']}"
class MockOutput:
def __init__(self, topic):
self.raw = f"test for_each_async output with {topic} post processed"
results.append(MockOutput(input_item.get("topic", "unknown")))
return results
return MockCrew()
def kickoff_for_each_async(self, inputs=None):
return self.crew().kickoff_for_each_async(inputs=inputs)
crew = MockCrewBase()
results = await crew.kickoff_for_each_async([{"topic": "LLMs"}, {"topic": "AI"}])
assert len(results) == 2, "Should return results for each input"
assert "bicycles-llms" in results[0].raw.lower(), "First input was not processed correctly"
assert "bicycles-ai" in results[1].raw.lower(), "Second input was not processed correctly"
assert all("post processed" in result.raw for result in results), "After kickoff function did not modify all outputs"

36
tests/reproduce_2787.py Normal file
View File

@@ -0,0 +1,36 @@
from crewai import Agent, Crew, Task, Process
from crewai.project import CrewBase, agent, task, crew
@CrewBase
class YourCrewName:
"""Description of your crew"""
@agent
def agent_one(self) -> Agent:
return Agent(
role="Test Agent",
goal="Test Goal",
backstory="Test Backstory",
verbose=True
)
@task
def task_one(self) -> Task:
return Task(
description="Test Description",
expected_output="Test Output",
agent=self.agent_one()
)
@crew
def crew(self) -> Crew:
return Crew(
agents=[self.agent_one()],
tasks=[self.task_one()],
process=Process.sequential,
verbose=True,
)
c = YourCrewName()
result = c.kickoff()
print(result)