From 834c62fecaa3cc3ff36346ae1b33dcfbb3749898 Mon Sep 17 00:00:00 2001 From: Brandon Hancock Date: Thu, 18 Jul 2024 11:20:26 -0400 Subject: [PATCH] Going to start refactoring for pipeline_output --- src/crewai/crew.py | 10 +- src/crewai/pipeline/__init__.py | 3 + src/crewai/pipeline/pipeline.py | 92 +++++++++++ src/crewai/pipeline/pipeline_output.py | 21 +++ src/crewai/procedure/__init__.py | 3 - src/crewai/procedure/procedure.py | 76 --------- tests/pipeline/test_pipeline.py | 201 +++++++++++++++++++++++ tests/procedure/test_procedure.py | 215 ------------------------- 8 files changed, 322 insertions(+), 299 deletions(-) create mode 100644 src/crewai/pipeline/__init__.py create mode 100644 src/crewai/pipeline/pipeline.py create mode 100644 src/crewai/pipeline/pipeline_output.py delete mode 100644 src/crewai/procedure/__init__.py delete mode 100644 src/crewai/procedure/procedure.py create mode 100644 tests/pipeline/test_pipeline.py delete mode 100644 tests/procedure/test_procedure.py diff --git a/src/crewai/crew.py b/src/crewai/crew.py index 62f9421ee..2e2c66d69 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -48,7 +48,7 @@ except ImportError: agentops = None if TYPE_CHECKING: - from crewai.procedure.procedure import Procedure + from crewai.pipeline.pipeline import Pipeline class Crew(BaseModel): @@ -946,17 +946,17 @@ class Crew(BaseModel): return total_usage_metrics - def __rshift__(self, other: "Crew") -> "Procedure": + def __rshift__(self, other: "Crew") -> "Pipeline": """ - Implements the >> operator to add another Crew to an existing Procedure. + Implements the >> operator to add another Crew to an existing Pipeline. """ - from crewai.procedure.procedure import Procedure + from crewai.pipeline.pipeline import Pipeline if not isinstance(other, Crew): raise TypeError( f"Unsupported operand type for >>: '{type(self).__name__}' and '{type(other).__name__}'" ) - return Procedure(crews=[self, other]) + return Pipeline(stages=[self, other]) def __repr__(self): return f"Crew(id={self.id}, process={self.process}, number_of_agents={len(self.agents)}, number_of_tasks={len(self.tasks)})" diff --git a/src/crewai/pipeline/__init__.py b/src/crewai/pipeline/__init__.py new file mode 100644 index 000000000..88933b0d9 --- /dev/null +++ b/src/crewai/pipeline/__init__.py @@ -0,0 +1,3 @@ +from crewai.pipeline.pipeline import Pipeline + +__all__ = ["Pipeline"] diff --git a/src/crewai/pipeline/pipeline.py b/src/crewai/pipeline/pipeline.py new file mode 100644 index 000000000..629214985 --- /dev/null +++ b/src/crewai/pipeline/pipeline.py @@ -0,0 +1,92 @@ +import asyncio +from typing import Any, Dict, List, Union + +from pydantic import BaseModel, Field + +from crewai.crew import Crew +from crewai.crews.crew_output import CrewOutput + +""" +Pipeline Terminology: +Pipeline: The overall structure that defines a sequence of operations. +Stage: A distinct part of the pipeline, which can be either sequential or parallel. +Branch: Parallel executions within a stage (e.g., concurrent crew operations). +Stream: The journey of an individual input through the entire pipeline. + +Example pipeline structure: +crew1 >> [crew2, crew3] >> crew4 + +This represents a pipeline with three stages: +1. A sequential stage (crew1) +2. A parallel stage with two branches (crew2 and crew3 executing concurrently) +3. Another sequential stage (crew4) + +Each input creates its own stream, flowing through all stages of the pipeline. +Multiple streams can be processed concurrently, each following the defined pipeline structure. +""" + + +class Pipeline(BaseModel): + stages: List[Union[Crew, List[Crew]]] = Field( + ..., description="List of crews representing stages to be executed in sequence" + ) + + async def process_streams( + self, stream_inputs: List[Dict[str, Any]] + ) -> List[List[CrewOutput]]: + """ + Process multiple streams in parallel, with each stream going through all stages. + """ + + async def process_single_stream( + stream_input: Dict[str, Any] + ) -> List[CrewOutput]: + print("current_input in stream", stream_input) + stage_outputs = [] + + for stage in self.stages: + if isinstance(stage, Crew): + # Process single crew + stage_output = await stage.kickoff_async(inputs=stream_input) + stage_outputs = [stage_output] + else: + # Process each crew in parallel + parallel_outputs = await asyncio.gather( + *[crew.kickoff_async(inputs=stream_input) for crew in stage] + ) + stage_outputs = parallel_outputs + + # Convert all CrewOutputs from stage into a dictionary for next stage + # and update original stream_input dictionary with new values + stage_output_dicts = [output.to_dict() for output in stage_outputs] + for stage_dict in stage_output_dicts: + stream_input.update(stage_dict) + print("UPDATING stream_input - new values:", stream_input) + + # Return all CrewOutputs from this stream + return stage_outputs + + # Process all streams in parallel + return await asyncio.gather( + *(process_single_stream(input_data) for input_data in stream_inputs) + ) + + def __rshift__(self, other: Any) -> "Pipeline": + """ + Implements the >> operator to add another Stage (Crew or List[Crew]) to an existing Pipeline. + """ + if isinstance(other, Crew): + return type(self)(stages=self.stages + [other]) + elif isinstance(other, list) and all(isinstance(crew, Crew) for crew in other): + return type(self)(stages=self.stages + [other]) + else: + raise TypeError( + f"Unsupported operand type for >>: '{type(self).__name__}' and '{type(other).__name__}'" + ) + + +# Helper function to run the pipeline +async def run_pipeline( + pipeline: Pipeline, inputs: List[Dict[str, Any]] +) -> List[List[CrewOutput]]: + return await pipeline.process_streams(inputs) diff --git a/src/crewai/pipeline/pipeline_output.py b/src/crewai/pipeline/pipeline_output.py new file mode 100644 index 000000000..242799a12 --- /dev/null +++ b/src/crewai/pipeline/pipeline_output.py @@ -0,0 +1,21 @@ +from typing import Any, Dict, List + +from pydantic import BaseModel, Field + +from crewai.crews.crew_output import CrewOutput + + +class PipelineOutput(BaseModel): + final_outputs: List[CrewOutput] = Field( + description="List of final outputs from the last crew in the pipeline", + default=[], + ) + token_usage: List[List[Dict[str, Any]]] = Field( + description="Token usage for each crew in each stream", default=[] + ) + + def add_final_output(self, output: CrewOutput): + self.final_outputs.append(output) + + def add_token_usage(self, usage: List[Dict[str, Any]]): + self.token_usage.append(usage) diff --git a/src/crewai/procedure/__init__.py b/src/crewai/procedure/__init__.py deleted file mode 100644 index 394874eb2..000000000 --- a/src/crewai/procedure/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from crewai.procedure.procedure import Procedure - -__all__ = ["Procedure"] diff --git a/src/crewai/procedure/procedure.py b/src/crewai/procedure/procedure.py deleted file mode 100644 index d6c640f9f..000000000 --- a/src/crewai/procedure/procedure.py +++ /dev/null @@ -1,76 +0,0 @@ -import asyncio -from typing import Any, Dict, List - -from pydantic import BaseModel, Field - -from crewai.crew import Crew -from crewai.crews.crew_output import CrewOutput - - -class Procedure(BaseModel): - crews: List[Crew] = Field( - ..., description="List of crews to be executed in sequence" - ) - - def kickoff(self, inputs: List[Dict[str, Any]]) -> List[CrewOutput]: - current_inputs = inputs - - crew_outputs = [] - for index, crew in enumerate(self.crews): - # Process all inputs for the current crew - crew_outputs = self._process_crew(crew, current_inputs) - - # If this is not the last crew, prepare inputs for the next crew - if index < len(self.crews) - 1: - current_inputs = [output.to_dict() for output in crew_outputs] - else: - # For the last crew, we don't need to convert the output to input - return crew_outputs - - return crew_outputs - - async def kickoff_async(self, inputs: List[Dict[str, Any]]) -> List[CrewOutput]: - current_inputs = inputs - - crew_outputs = [] - for index, crew in enumerate(self.crews): - # Process all inputs for the current crew - crew_outputs = await self._process_crew(crew, current_inputs) - - # If this is not the last crew, prepare inputs for the next crew - if index < len(self.crews) - 1: - current_inputs = [output.to_dict() for output in crew_outputs] - else: - # For the last crew, we don't need to convert the output to input - return crew_outputs - - return crew_outputs - - def _process_crew( - self, crew: Crew, inputs: List[Dict[str, Any]] - ) -> List[CrewOutput]: - # Kickoff crew for each input - outputs = [crew.kickoff(inputs=input_data) for input_data in inputs] - - return outputs - - async def _process_crew_async( - self, crew: Crew, inputs: List[Dict[str, Any]] - ) -> List[CrewOutput]: - # Kickoff crew asynchronously for each input - crew_kickoffs = [crew.kickoff_async(inputs=input_data) for input_data in inputs] - - # Wait for all kickoffs to complete - outputs = await asyncio.gather(*crew_kickoffs) - - return outputs - - def __rshift__(self, other: Crew) -> "Procedure": - """ - Implements the >> operator to add another Crew to an existing Procedure. - """ - if not isinstance(other, Crew): - raise TypeError( - f"Unsupported operand type for >>: '{type(self).__name__}' and '{type(other).__name__}'" - ) - return type(self)(crews=self.crews + [other]) diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py new file mode 100644 index 000000000..cd2f250fe --- /dev/null +++ b/tests/pipeline/test_pipeline.py @@ -0,0 +1,201 @@ +from unittest.mock import MagicMock + +import pytest +from crewai.agent import Agent +from crewai.crew import Crew +from crewai.crews.crew_output import CrewOutput +from crewai.pipeline.pipeline import Pipeline +from crewai.process import Process +from crewai.task import Task +from crewai.tasks.task_output import TaskOutput + + +@pytest.fixture +def mock_crew_factory(): + def _create_mock_crew(output_json_dict=None): + crew = MagicMock(spec=Crew) + task_output = TaskOutput( + description="Test task", raw="Task output", agent="Test Agent" + ) + crew_output = CrewOutput( + raw="Test output", + tasks_output=[task_output], + token_usage={ + "total_tokens": 100, + "prompt_tokens": 50, + "completion_tokens": 50, + }, + json_dict=output_json_dict if output_json_dict else {"key": "value"}, + ) + + async def async_kickoff(inputs=None): + print("inputs in async_kickoff", inputs) + return crew_output + + crew.kickoff_async.side_effect = async_kickoff + + # Add more attributes that Procedure might be expecting + crew.verbose = False + crew.output_log_file = None + crew.max_rpm = None + crew.memory = False + crew.process = Process.sequential + crew.config = None + crew.cache = True + + # Add non-empty agents and tasks + mock_agent = MagicMock(spec=Agent) + mock_task = MagicMock(spec=Task) + mock_task.agent = mock_agent + mock_task.async_execution = False + mock_task.context = None + + crew.agents = [mock_agent] + crew.tasks = [mock_task] + + return crew + + return _create_mock_crew + + +def test_pipeline_initialization(mock_crew_factory): + """ + Test that a Pipeline is correctly initialized with the given stages. + """ + crew1 = mock_crew_factory() + crew2 = mock_crew_factory() + + pipeline = Pipeline(stages=[crew1, crew2]) + assert len(pipeline.stages) == 2 + assert pipeline.stages[0] == crew1 + assert pipeline.stages[1] == crew2 + + +@pytest.mark.asyncio +async def test_pipeline_process_streams_single_input(mock_crew_factory): + """ + Test that Pipeline.process_streams() correctly processes a single input + and returns the expected CrewOutput. + """ + mock_crew = mock_crew_factory() + pipeline = Pipeline(stages=[mock_crew]) + input_data = [{"key": "value"}] + pipeline_result = await pipeline.process_streams(input_data) + + mock_crew.kickoff_async.assert_called_once_with(inputs={"key": "value"}) + for stream_result in pipeline_result: + assert isinstance(stream_result[0], CrewOutput) + assert stream_result[0].raw == "Test output" + assert len(stream_result[0].tasks_output) == 1 + assert stream_result[0].tasks_output[0].raw == "Task output" + assert stream_result[0].token_usage == { + "total_tokens": 100, + "prompt_tokens": 50, + "completion_tokens": 50, + } + + +@pytest.mark.asyncio +async def test_pipeline_process_streams_multiple_inputs(mock_crew_factory): + """ + Test that Pipeline.process_streams() correctly processes multiple inputs + and returns the expected CrewOutputs. + """ + mock_crew = mock_crew_factory() + pipeline = Pipeline(stages=[mock_crew]) + input_data = [{"key1": "value1"}, {"key2": "value2"}] + pipeline_result = await pipeline.process_streams(input_data) + + assert mock_crew.kickoff_async.call_count == 2 + assert len(pipeline_result) == 2 + for stream_result in pipeline_result: + assert all( + isinstance(stream_output, CrewOutput) for stream_output in stream_result + ) + + +@pytest.mark.asyncio +async def test_pipeline_with_parallel_stages(mock_crew_factory): + """ + Test that Pipeline correctly handles parallel stages. + """ + crew1 = mock_crew_factory() + crew2 = mock_crew_factory() + crew3 = mock_crew_factory() + + pipeline = Pipeline(stages=[crew1, [crew2, crew3]]) + input_data = [{"initial": "data"}] + + pipeline_result = await pipeline.process_streams(input_data) + + crew1.kickoff_async.assert_called_once_with( + inputs={"initial": "data", "key": "value"} + ) + crew2.kickoff_async.assert_called_once_with( + inputs={"initial": "data", "key": "value"} + ) + crew3.kickoff_async.assert_called_once_with( + inputs={"initial": "data", "key": "value"} + ) + + assert len(pipeline_result) == 1 + for stage_result in pipeline_result: + assert isinstance(stage_result[0], CrewOutput) + + +def test_pipeline_rshift_operator(mock_crew_factory): + """ + Test that the >> operator correctly creates a Pipeline from Crews and lists of Crews. + """ + crew1 = mock_crew_factory() + crew2 = mock_crew_factory() + crew3 = mock_crew_factory() + + # Test single crew addition + pipeline = Pipeline(stages=[]) >> crew1 + assert len(pipeline.stages) == 1 + assert pipeline.stages[0] == crew1 + + # Test adding a list of crews + pipeline = Pipeline(stages=[crew1]) + pipeline = pipeline >> [crew2, crew3] + print("pipeline.stages:", pipeline.stages) + assert len(pipeline.stages) == 2 + assert pipeline.stages[1] == [crew2, crew3] + + # Test error case: trying to shift with non-Crew object + with pytest.raises(TypeError): + pipeline >> "not a crew" + + +""" +TODO: Figure out what is the proper output for a pipeline with multiple stages + +Options: +- Should the final output only include the last stage's output? +- Should the final output include the accumulation of previous stages' outputs? + +""" + + +@pytest.mark.asyncio +async def test_pipeline_data_accumulation(mock_crew_factory): + """ + Test that data is correctly accumulated through the pipeline stages. + """ + crew1 = mock_crew_factory(output_json_dict={"key1": "value1"}) + crew2 = mock_crew_factory(output_json_dict={"key2": "value2"}) + + pipeline = Pipeline(stages=[crew1, crew2]) + input_data = [{"initial": "data"}] + pipeline_result = await pipeline.process_streams(input_data) + + assert len(pipeline_result) == 1 + print("RESULT: ", pipeline_result) + for stream_result in pipeline_result: + print("STREAM RESULT: ", stream_result) + assert stream_result[0].json_dict == { + "initial": "data", + "key1": "value1", + "key2": "value2", + } diff --git a/tests/procedure/test_procedure.py b/tests/procedure/test_procedure.py deleted file mode 100644 index 696166973..000000000 --- a/tests/procedure/test_procedure.py +++ /dev/null @@ -1,215 +0,0 @@ -from unittest.mock import MagicMock - -import pytest - -from crewai.agent import Agent -from crewai.crew import Crew -from crewai.crews.crew_output import CrewOutput -from crewai.procedure.procedure import Procedure -from crewai.process import Process -from crewai.task import Task -from crewai.tasks.task_output import TaskOutput - - -@pytest.fixture -def mock_crew_factory(): - def _create_mock_crew(): - crew = MagicMock(spec=Crew) - task_output = TaskOutput( - description="Test task", raw="Task output", agent="Test Agent" - ) - crew_output = CrewOutput( - raw="Test output", - tasks_output=[task_output], - token_usage={ - "total_tokens": 100, - "prompt_tokens": 50, - "completion_tokens": 50, - }, - json_dict={"key": "value"}, - ) - - async def async_kickoff(inputs=None): - return crew_output - - crew.kickoff.return_value = crew_output - crew.kickoff_async.side_effect = async_kickoff - - # Add more attributes that Procedure might be expecting - crew.verbose = 0 - crew.output_log_file = None - crew.max_rpm = None - crew.memory = False - crew.process = Process.sequential - crew.config = None - crew.cache = True - - # Add non-empty agents and tasks - mock_agent = MagicMock(spec=Agent) - mock_task = MagicMock(spec=Task) - mock_task.agent = mock_agent - mock_task.async_execution = False - mock_task.context = None - - crew.agents = [mock_agent] - crew.tasks = [mock_task] - - return crew - - return _create_mock_crew - - -def test_procedure_initialization(mock_crew_factory): - """ - Test that a Procedure is correctly initialized with the given crews. - """ - crew1 = mock_crew_factory() - crew2 = mock_crew_factory() - - procedure = Procedure(crews=[crew1, crew2]) - assert len(procedure.crews) == 2 - assert procedure.crews[0] == crew1 - assert procedure.crews[1] == crew2 - - -@pytest.mark.asyncio -async def test_procedure_kickoff_single_input(mock_crew_factory): - """ - Test that Procedure.kickoff() correctly processes a single input - and returns the expected CrewOutput. - """ - mock_crew_1 = mock_crew_factory() - procedure = Procedure(crews=[mock_crew_1]) - input_data = {"key": "value"} - result = await procedure.kickoff([input_data]) - - mock_crew_1.kickoff_async.assert_called_once_with(inputs=input_data) - assert len(result) == 1 - assert isinstance(result[0], CrewOutput) - assert result[0].raw == "Test output" - assert len(result[0].tasks_output) == 1 - assert result[0].tasks_output[0].raw == "Task output" - assert result[0].token_usage == { - "total_tokens": 100, - "prompt_tokens": 50, - "completion_tokens": 50, - } - - -@pytest.mark.asyncio -async def test_procedure_kickoff_multiple_inputs(mock_crew_factory): - """ - Test that Procedure.kickoff() correctly processes multiple inputs - and returns the expected CrewOutputs. - """ - mock_crew_1, mock_crew_2 = mock_crew_factory(), mock_crew_factory() - procedure = Procedure(crews=[mock_crew_1, mock_crew_2]) - input_data = [{"key1": "value1"}, {"key2": "value2"}] - result = await procedure.kickoff(input_data) - - expected_call_count_per_crew = 2 - assert mock_crew_1.kickoff_async.call_count == expected_call_count_per_crew - assert mock_crew_2.kickoff_async.call_count == expected_call_count_per_crew - assert len(result) == 2 - assert all(isinstance(r, CrewOutput) for r in result) - assert all(len(r.tasks_output) == 1 for r in result) - assert all( - r.token_usage - == {"total_tokens": 100, "prompt_tokens": 50, "completion_tokens": 50} - for r in result - ) - - -@pytest.mark.asyncio -async def test_procedure_chaining(mock_crew_factory): - """ - Test that Procedure correctly chains multiple crews, passing the output - of one crew as input to the next crew in the sequence. - - This test verifies: - 1. The first crew receives the initial input. - 2. The second crew receives the output from the first crew as its input. - 3. The final output contains the result from the last crew in the chain. - 4. Task outputs and token usage are correctly propagated through the chain. - """ - crew1, crew2 = mock_crew_factory(), mock_crew_factory() - task_output1 = TaskOutput(description="Task 1", raw="Output 1", agent="Agent 1") - task_output2 = TaskOutput(description="Task 2", raw="Final output", agent="Agent 2") - - crew_output1 = CrewOutput( - raw="Output 1", - tasks_output=[task_output1], - token_usage={"total_tokens": 100, "prompt_tokens": 50, "completion_tokens": 50}, - json_dict={"key1": "value1"}, - ) - crew_output2 = CrewOutput( - raw="Final output", - tasks_output=[task_output2], - token_usage={"total_tokens": 150, "prompt_tokens": 75, "completion_tokens": 75}, - json_dict={"key2": "value2"}, - ) - - async def async_kickoff1(inputs=None): - return crew_output1 - - async def async_kickoff2(inputs=None): - return crew_output2 - - crew1.kickoff_async.side_effect = async_kickoff1 - crew2.kickoff_async.side_effect = async_kickoff2 - - procedure = Procedure(crews=[crew1, crew2]) - input_data = [{"initial": "data"}] - result = await procedure.kickoff(input_data) - - # Check that the first crew received the initial input - crew1.kickoff_async.assert_called_once_with(inputs={"initial": "data"}) - - # Check that the second crew received the output from the first crew as its input - crew2.kickoff_async.assert_called_once_with(inputs=crew_output1.to_dict()) - - # Check the final output - assert len(result) == 1 - assert isinstance(result[0], CrewOutput) - assert result[0].raw == "Final output" - assert len(result[0].tasks_output) == 1 - assert result[0].tasks_output[0].raw == "Final output" - assert result[0].token_usage == { - "total_tokens": 150, - "prompt_tokens": 75, - "completion_tokens": 75, - } - assert result[0].json_dict == {"key2": "value2"} - - -def test_crew_rshift_operator(): - """ - Test that the >> operator correctly creates a Procedure from two Crews. - """ - # Create minimal Crew instances - agent = Agent(role="Test Agent", goal="Test Goal", backstory="Test Backstory") - task = Task(agent=agent, description="Test Task", expected_output="Test Output") - crew1 = Crew(agents=[agent], tasks=[task]) - crew2 = Crew(agents=[agent], tasks=[task]) - crew3 = Crew(agents=[agent], tasks=[task]) - - # Test the >> operator - procedure = crew1 >> crew2 - - assert isinstance(procedure, Procedure) - assert len(procedure.crews) == 2 - assert procedure.crews[0] == crew1 - assert procedure.crews[1] == crew2 - - # Test chaining multiple crews - procedure = crew1 >> crew2 >> crew3 - - assert isinstance(procedure, Procedure) - assert len(procedure.crews) == 3 - assert procedure.crews[0] == crew1 - assert procedure.crews[1] == crew2 - assert procedure.crews[2] == crew3 - - # Test error case: trying to shift with non-Crew object - with pytest.raises(TypeError): - crew1 >> "not a crew"