diff --git a/src/crewai/project/crew_base.py b/src/crewai/project/crew_base.py index c82a0e04d..037e449e0 100644 --- a/src/crewai/project/crew_base.py +++ b/src/crewai/project/crew_base.py @@ -1,6 +1,6 @@ import inspect from pathlib import Path -from typing import Any, Callable, Dict, TypeVar, cast +from typing import Any, Awaitable, Callable, Dict, List, Optional, TypeVar, cast import yaml from dotenv import load_dotenv @@ -213,85 +213,95 @@ def CrewBase(cls: T) -> T: callback_functions[callback]() for callback in callbacks ] - def kickoff(self, inputs=None): + def _validate_crew_decorator(self) -> None: + """Validates that a crew decorator exists. + + Raises: + AttributeError: If no method with @crew decorator is found. + """ + if not hasattr(self, "_kickoff") or not self._kickoff: + raise AttributeError("No method with @crew decorator found. Add a method with @crew decorator to your class.") + + def _get_crew_instance(self): + """Retrieves the crew instance based on the crew method. + + Returns: + Crew: The crew instance created by the @crew decorated method. + + Raises: + AttributeError: If no method with @crew decorator is found. + """ + self._validate_crew_decorator() + crew_method_name = list(self._kickoff.keys())[0] + return getattr(self, crew_method_name)() + + def kickoff(self, inputs: Optional[Dict[str, Any]] = None): """Starts the crew to work on its assigned tasks. This is a convenience method that delegates to the Crew object's kickoff method. It allows calling kickoff() directly on the CrewBase instance. Args: - inputs (Optional[Dict[str, Any]]): Optional inputs for the crew execution. + inputs: Optional inputs for the crew execution. Returns: CrewOutput: The output of the crew execution. - """ - if not hasattr(self, "_kickoff") or not self._kickoff: - raise AttributeError("No method with @crew decorator found. Add a method with @crew decorator to your class.") - # Get the crew instance - crew_method_name = list(self._kickoff.keys())[0] - crew_instance = getattr(self, crew_method_name)() - + Raises: + AttributeError: If no method with @crew decorator is found. + """ + crew_instance = self._get_crew_instance() return crew_instance.kickoff(inputs=inputs) - def kickoff_async(self, inputs=None): + def kickoff_async(self, inputs: Optional[Dict[str, Any]] = None): """Asynchronous kickoff method to start the crew execution. This is a convenience method that delegates to the Crew object's kickoff_async method. Args: - inputs (Optional[Dict[str, Any]]): Optional inputs for the crew execution. + inputs: Optional inputs for the crew execution. Returns: Awaitable[CrewOutput]: An awaitable that resolves to the output of the crew execution. - """ - if not hasattr(self, "_kickoff") or not self._kickoff: - raise AttributeError("No method with @crew decorator found. Add a method with @crew decorator to your class.") - # Get the crew instance - crew_method_name = list(self._kickoff.keys())[0] - crew_instance = getattr(self, crew_method_name)() - + Raises: + AttributeError: If no method with @crew decorator is found. + """ + crew_instance = self._get_crew_instance() return crew_instance.kickoff_async(inputs=inputs) - def kickoff_for_each(self, inputs): + def kickoff_for_each(self, inputs: List[Dict[str, Any]]): """Executes the Crew's workflow for each input in the list and aggregates results. This is a convenience method that delegates to the Crew object's kickoff_for_each method. Args: - inputs (List[Dict[str, Any]]): List of input dictionaries for the crew execution. + inputs: List of input dictionaries for the crew execution. Returns: List[CrewOutput]: List of outputs from the crew execution. - """ - if not hasattr(self, "_kickoff") or not self._kickoff: - raise AttributeError("No method with @crew decorator found. Add a method with @crew decorator to your class.") - # Get the crew instance - crew_method_name = list(self._kickoff.keys())[0] - crew_instance = getattr(self, crew_method_name)() - + Raises: + AttributeError: If no method with @crew decorator is found. + """ + crew_instance = self._get_crew_instance() return crew_instance.kickoff_for_each(inputs=inputs) - def kickoff_for_each_async(self, inputs): + def kickoff_for_each_async(self, inputs: List[Dict[str, Any]]): """Asynchronously executes the Crew's workflow for each input in the list. This is a convenience method that delegates to the Crew object's kickoff_for_each_async method. Args: - inputs (List[Dict[str, Any]]): List of input dictionaries for the crew execution. + inputs: List of input dictionaries for the crew execution. Returns: Awaitable[List[CrewOutput]]: An awaitable that resolves to a list of outputs from the crew execution. - """ - if not hasattr(self, "_kickoff") or not self._kickoff: - raise AttributeError("No method with @crew decorator found. Add a method with @crew decorator to your class.") - # Get the crew instance - crew_method_name = list(self._kickoff.keys())[0] - crew_instance = getattr(self, crew_method_name)() - + Raises: + AttributeError: If no method with @crew decorator is found. + """ + crew_instance = self._get_crew_instance() return crew_instance.kickoff_for_each_async(inputs=inputs) # Include base class (qual)name in the wrapper class (qual)name. diff --git a/tests/project_test.py b/tests/project_test.py index c5ee1c44f..153f7ae9b 100644 --- a/tests/project_test.py +++ b/tests/project_test.py @@ -232,3 +232,73 @@ def test_direct_kickoff_error_without_crew_decorator(): crew = MockCrewBase() with pytest.raises(AttributeError): crew.kickoff() + + +@pytest.mark.vcr(filter_headers=["authorization"]) +@pytest.mark.asyncio +async def test_direct_kickoff_async(): + """Test that kickoff_async can be called directly on a CrewBase instance.""" + class MockCrewBase: + def __init__(self): + self._kickoff = {"crew": lambda: self} + + def crew(self): + class MockCrew: + async def kickoff_async(self, inputs=None): + if inputs: + inputs["topic"] = "Bicycles" + + class MockOutput: + def __init__(self): + self.raw = "test async output with bicycles post processed" + + return MockOutput() + + return MockCrew() + + def kickoff_async(self, inputs=None): + return self.crew().kickoff_async(inputs=inputs) + + crew = MockCrewBase() + result = await crew.kickoff_async({"topic": "LLMs"}) + + assert "bicycles" in result.raw.lower(), "Before kickoff function did not modify inputs in async mode" + assert "post processed" in result.raw, "After kickoff function did not modify outputs in async mode" + + +@pytest.mark.vcr(filter_headers=["authorization"]) +@pytest.mark.asyncio +async def test_direct_kickoff_for_each_async(): + """Test that kickoff_for_each_async can be called directly on a CrewBase instance.""" + class MockCrewBase: + def __init__(self): + self._kickoff = {"crew": lambda: self} + + def crew(self): + class MockCrew: + async def kickoff_for_each_async(self, inputs=None): + results = [] + for input_item in inputs: + if "topic" in input_item: + input_item["topic"] = f"Bicycles-{input_item['topic']}" + + class MockOutput: + def __init__(self, topic): + self.raw = f"test for_each_async output with {topic} post processed" + + results.append(MockOutput(input_item.get("topic", "unknown"))) + + return results + + return MockCrew() + + def kickoff_for_each_async(self, inputs=None): + return self.crew().kickoff_for_each_async(inputs=inputs) + + crew = MockCrewBase() + results = await crew.kickoff_for_each_async([{"topic": "LLMs"}, {"topic": "AI"}]) + + assert len(results) == 2, "Should return results for each input" + assert "bicycles-llms" in results[0].raw.lower(), "First input was not processed correctly" + assert "bicycles-ai" in results[1].raw.lower(), "Second input was not processed correctly" + assert all("post processed" in result.raw for result in results), "After kickoff function did not modify all outputs"