diff --git a/poetry.lock b/poetry.lock index 19cb150ea..69ff84b99 100644 --- a/poetry.lock +++ b/poetry.lock @@ -2395,8 +2395,8 @@ langchain-core = ">=0.2.10,<0.3.0" langchain-text-splitters = ">=0.2.0,<0.3.0" langsmith = ">=0.1.17,<0.2.0" numpy = [ - {version = ">=1.26.0,<2.0.0", markers = "python_version >= \"3.12\""}, {version = ">=1,<2", markers = "python_version < \"3.12\""}, + {version = ">=1.26.0,<2.0.0", markers = "python_version >= \"3.12\""}, ] pydantic = ">=1,<3" PyYAML = ">=5.3" @@ -2437,8 +2437,8 @@ langchain = ">=0.2.6,<0.3.0" langchain-core = ">=0.2.10,<0.3.0" langsmith = ">=0.1.0,<0.2.0" numpy = [ - {version = ">=1.26.0,<2.0.0", markers = "python_version >= \"3.12\""}, {version = ">=1,<2", markers = "python_version < \"3.12\""}, + {version = ">=1.26.0,<2.0.0", markers = "python_version >= \"3.12\""}, ] PyYAML = ">=5.3" requests = ">=2,<3" @@ -2461,8 +2461,8 @@ jsonpatch = ">=1.33,<2.0" langsmith = ">=0.1.75,<0.2.0" packaging = ">=23.2,<25" pydantic = [ - {version = ">=2.7.4,<3.0.0", markers = "python_full_version >= \"3.12.4\""}, {version = ">=1,<3", markers = "python_full_version < \"3.12.4\""}, + {version = ">=2.7.4,<3.0.0", markers = "python_full_version >= \"3.12.4\""}, ] PyYAML = ">=5.3" tenacity = ">=8.1.0,<8.4.0 || >8.4.0,<9.0.0" @@ -2511,8 +2511,8 @@ files = [ [package.dependencies] orjson = ">=3.9.14,<4.0.0" pydantic = [ - {version = ">=2.7.4,<3.0.0", markers = "python_full_version >= \"3.12.4\""}, {version = ">=1,<3", markers = "python_full_version < \"3.12.4\""}, + {version = ">=2.7.4,<3.0.0", markers = "python_full_version >= \"3.12.4\""}, ] requests = ">=2,<3" @@ -3989,8 +3989,8 @@ files = [ annotated-types = ">=0.4.0" pydantic-core = "2.20.1" typing-extensions = [ - {version = ">=4.12.2", markers = "python_version >= \"3.13\""}, {version = ">=4.6.1", markers = "python_version < \"3.13\""}, + {version = ">=4.12.2", markers = "python_version >= \"3.13\""}, ] [package.extras] @@ -4279,6 +4279,24 @@ tomli = {version = ">=1", markers = "python_version < \"3.11\""} [package.extras] dev = ["argcomplete", "attrs (>=19.2)", "hypothesis (>=3.56)", "mock", "pygments (>=2.7.2)", "requests", "setuptools", "xmlschema"] +[[package]] +name = "pytest-asyncio" +version = "0.23.7" +description = "Pytest support for asyncio" +optional = false +python-versions = ">=3.8" +files = [ + {file = "pytest_asyncio-0.23.7-py3-none-any.whl", hash = "sha256:009b48127fbe44518a547bddd25611551b0e43ccdbf1e67d12479f569832c20b"}, + {file = "pytest_asyncio-0.23.7.tar.gz", hash = "sha256:5f5c72948f4c49e7db4f29f2521d4031f1c27f86e57b046126654083d4770268"}, +] + +[package.dependencies] +pytest = ">=7.0.0,<9" + +[package.extras] +docs = ["sphinx (>=5.3)", "sphinx-rtd-theme (>=1.0)"] +testing = ["coverage (>=6.2)", "hypothesis (>=5.7.1)"] + [[package]] name = "pytest-vcr" version = "1.0.2" @@ -6090,4 +6108,4 @@ tools = ["crewai-tools"] [metadata] lock-version = "2.0" python-versions = ">=3.10,<=3.13" -content-hash = "0dbf6f6e2e841fb3eec4ff87ea5d6b430f29702118fee91307983c6b2581e59e" +content-hash = "91b755743f562d0c830916fc49df0f4fb5798cbd61d313b90165f050dfc3e34f" diff --git a/pyproject.toml b/pyproject.toml index eeae5cadb..69e9c4bd8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,6 +51,7 @@ crewai-tools = "^0.4.8" pytest = "^8.0.0" pytest-vcr = "^1.0.2" python-dotenv = "1.0.0" +pytest-asyncio = "^0.23.7" [tool.poetry.scripts] crewai = "crewai.cli.cli:crewai" diff --git a/src/crewai/crew.py b/src/crewai/crew.py index 954a8f583..a7a7b1fed 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -6,15 +6,15 @@ from typing import Any, Dict, List, Optional, Tuple, Union from langchain_core.callbacks import BaseCallbackHandler from pydantic import ( - UUID4, - BaseModel, - ConfigDict, - Field, - InstanceOf, - Json, - PrivateAttr, - field_validator, - model_validator, + UUID4, + BaseModel, + ConfigDict, + Field, + InstanceOf, + Json, + PrivateAttr, + field_validator, + model_validator, ) from pydantic_core import PydanticCustomError @@ -34,8 +34,8 @@ from crewai.utilities import I18N, FileHandler, Logger, RPMController from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE from crewai.utilities.evaluators.task_evaluator import TaskEvaluator from crewai.utilities.formatter import ( - aggregate_raw_outputs_from_task_outputs, - aggregate_raw_outputs_from_tasks, + aggregate_raw_outputs_from_task_outputs, + aggregate_raw_outputs_from_tasks, ) from crewai.utilities.training_handler import CrewTrainingHandler @@ -86,7 +86,7 @@ class Crew(BaseModel): tasks: List[Task] = Field(default_factory=list) agents: List[BaseAgent] = Field(default_factory=list) process: Process = Field(default=Process.sequential) - verbose: Union[int, bool] = Field(default=0) + verbose: int = Field(default=0) memory: bool = Field( default=False, description="Whether the crew should use memory to store memories of it's execution", @@ -131,8 +131,8 @@ class Crew(BaseModel): default=None, description="Path to the prompt json file to be used for the crew.", ) - output_log_file: Optional[Union[bool, str]] = Field( - default=False, + output_log_file: Optional[str] = Field( + default="", description="output_log_file", ) diff --git a/src/crewai/procedure/__init__.py b/src/crewai/procedure/__init__.py new file mode 100644 index 000000000..394874eb2 --- /dev/null +++ b/src/crewai/procedure/__init__.py @@ -0,0 +1,3 @@ +from crewai.procedure.procedure import Procedure + +__all__ = ["Procedure"] diff --git a/src/crewai/procedure/procedure.py b/src/crewai/procedure/procedure.py new file mode 100644 index 000000000..26316937b --- /dev/null +++ b/src/crewai/procedure/procedure.py @@ -0,0 +1,38 @@ +import asyncio +from typing import Any, Dict, List + +from pydantic import BaseModel, Field + +from crewai.crew import Crew +from crewai.crews.crew_output import CrewOutput + + +class Procedure(BaseModel): + crews: List[Crew] = Field( + ..., description="List of crews to be executed in sequence" + ) + + async def kickoff(self, inputs: List[Dict[str, Any]]) -> List[CrewOutput]: + current_inputs = inputs + + for crew in self.crews: + # Process all inputs for the current crew + crew_outputs = await self._process_crew(crew, current_inputs) + print("Crew Outputs", crew_outputs) + + # Prepare inputs for the next crew + current_inputs = [output.to_dict() for output in crew_outputs] + + # Return the final outputs + return crew_outputs + + async def _process_crew( + self, crew: Crew, inputs: List[Dict[str, Any]] + ) -> List[CrewOutput]: + # Kickoff crew asynchronously for each input + crew_kickoffs = [crew.kickoff_async(inputs=input_data) for input_data in inputs] + + # Wait for all kickoffs to complete + outputs = await asyncio.gather(*crew_kickoffs) + + return outputs diff --git a/tests/procedure/test_procedure.py b/tests/procedure/test_procedure.py new file mode 100644 index 000000000..b96a719c3 --- /dev/null +++ b/tests/procedure/test_procedure.py @@ -0,0 +1,201 @@ +from unittest.mock import Mock, PropertyMock + +import pytest + +from crewai.crew import Crew +from crewai.crews.crew_output import CrewOutput +from crewai.procedure.procedure import Procedure +from crewai.tasks.task_output import TaskOutput + + +@pytest.fixture +def mock_crew(): + crew = Mock(spec=Crew) + task_output = TaskOutput( + description="Test task", raw="Task output", agent="Test Agent" + ) + crew_output = CrewOutput( + raw="Test output", + tasks_output=[task_output], + token_usage={"total_tokens": 100, "prompt_tokens": 50, "completion_tokens": 50}, + json_dict={"key": "value"}, # Add this line + ) + + async def async_kickoff(inputs=None): + return crew_output + + crew.kickoff.return_value = crew_output + crew.kickoff_async.side_effect = async_kickoff + return crew + + +def test_procedure_initialization(): + """ + Test that a Procedure is correctly initialized with the given crews. + """ + crew1 = Mock(spec=Crew) + crew2 = Mock(spec=Crew) + + # Add properties required by validators + type(crew1).verbose = PropertyMock(return_value=True) + type(crew2).verbose = PropertyMock(return_value=True) + type(crew1).output_log_file = PropertyMock(return_value=False) + type(crew2).output_log_file = PropertyMock(return_value=False) + + procedure = Procedure(crews=[crew1, crew2]) + assert len(procedure.crews) == 2 + assert procedure.crews[0] == crew1 + assert procedure.crews[1] == crew2 + + +@pytest.mark.asyncio +async def test_procedure_kickoff_single_input(mock_crew): + """ + Test that Procedure.kickoff() correctly processes a single input + and returns the expected CrewOutput. + """ + procedure = Procedure(crews=[mock_crew]) + input_data = {"key": "value"} + result = await procedure.kickoff([input_data]) + + mock_crew.kickoff_async.assert_called_once_with(inputs=input_data) + assert len(result) == 1 + assert isinstance(result[0], CrewOutput) + assert result[0].raw == "Test output" + assert len(result[0].tasks_output) == 1 + assert result[0].tasks_output[0].raw == "Task output" + assert result[0].token_usage == { + "total_tokens": 100, + "prompt_tokens": 50, + "completion_tokens": 50, + } + + +@pytest.mark.asyncio +async def test_procedure_kickoff_multiple_inputs(mock_crew): + """ + Test that Procedure.kickoff() correctly processes multiple inputs + and returns the expected CrewOutputs. + """ + procedure = Procedure(crews=[mock_crew, mock_crew]) + input_data = [{"key1": "value1"}, {"key2": "value2"}] + result = await procedure.kickoff(input_data) + + expected_call_count = 4 # 2 crews x 2 inputs = 4 + assert mock_crew.kickoff_async.call_count == expected_call_count + assert len(result) == 2 + assert all(isinstance(r, CrewOutput) for r in result) + assert all(len(r.tasks_output) == 1 for r in result) + assert all( + r.token_usage + == {"total_tokens": 100, "prompt_tokens": 50, "completion_tokens": 50} + for r in result + ) + + +@pytest.mark.asyncio +async def test_procedure_chaining(): + """ + Test that Procedure correctly chains multiple crews, passing the output + of one crew as input to the next crew in the sequence. + + This test verifies: + 1. The first crew receives the initial input. + 2. The second crew receives the output from the first crew as its input. + 3. The final output contains the result from the last crew in the chain. + 4. Task outputs and token usage are correctly propagated through the chain. + """ + crew1, crew2 = Mock(spec=Crew), Mock(spec=Crew) + task_output1 = TaskOutput(description="Task 1", raw="Output 1", agent="Agent 1") + task_output2 = TaskOutput(description="Task 2", raw="Final output", agent="Agent 2") + + crew_output1 = CrewOutput( + raw="Output 1", + tasks_output=[task_output1], + token_usage={"total_tokens": 100, "prompt_tokens": 50, "completion_tokens": 50}, + json_dict={"key1": "value1"}, + ) + crew_output2 = CrewOutput( + raw="Final output", + tasks_output=[task_output2], + token_usage={"total_tokens": 150, "prompt_tokens": 75, "completion_tokens": 75}, + json_dict={"key2": "value2"}, + ) + + async def async_kickoff1(inputs=None): + return crew_output1 + + async def async_kickoff2(inputs=None): + return crew_output2 + + crew1.kickoff_async.side_effect = async_kickoff1 + crew2.kickoff_async.side_effect = async_kickoff2 + + procedure = Procedure(crews=[crew1, crew2]) + input_data = [{"initial": "data"}] + result = await procedure.kickoff(input_data) + + # Check that the first crew received the initial input + crew1.kickoff_async.assert_called_once_with(inputs={"initial": "data"}) + + # Check that the second crew received the output from the first crew as its input + crew2.kickoff_async.assert_called_once_with(inputs=crew_output1.to_dict()) + + # Check the final output + assert len(result) == 1 + assert isinstance(result[0], CrewOutput) + assert result[0].raw == "Final output" + assert len(result[0].tasks_output) == 1 + assert result[0].tasks_output[0].raw == "Final output" + assert result[0].token_usage == { + "total_tokens": 150, + "prompt_tokens": 75, + "completion_tokens": 75, + } + assert result[0].json_dict == {"key2": "value2"} + + +@pytest.mark.asyncio +async def test_procedure_invalid_input_type(): + """ + Test that Procedure.kickoff() raises a TypeError when given an invalid input type. + """ + procedure = Procedure(crews=[Mock(spec=Crew)]) + with pytest.raises(TypeError): + await procedure.kickoff("invalid input") + + +@pytest.mark.asyncio +async def test_procedure_token_usage_aggregation(): + """ + Test that Procedure correctly aggregates token usage across multiple crews. + """ + crew1, crew2 = Mock(spec=Crew), Mock(spec=Crew) + crew1.kickoff.return_value = CrewOutput( + raw="Output 1", + tasks_output=[ + TaskOutput(description="Task 1", raw="Output 1", agent="Agent 1") + ], + token_usage={"total_tokens": 100, "prompt_tokens": 50, "completion_tokens": 50}, + ) + crew2.kickoff.return_value = CrewOutput( + raw="Output 2", + tasks_output=[ + TaskOutput(description="Task 2", raw="Output 2", agent="Agent 2") + ], + token_usage={"total_tokens": 150, "prompt_tokens": 75, "completion_tokens": 75}, + ) + + procedure = Procedure([crew1, crew2]) + result = await procedure.kickoff([{"initial": "data"}]) + + assert result[0].token_usage == { + "total_tokens": 250, + "prompt_tokens": 125, + "completion_tokens": 125, + } + assert result[0].token_usage == { + "total_tokens": 250, + "prompt_tokens": 125, + "completion_tokens": 125, + }