diff --git a/src/crewai/crew.py b/src/crewai/crew.py index 2e2c66d69..5277f8c18 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -94,6 +94,7 @@ class Crew(BaseModel): default_factory=TaskOutputStorageHandler ) + name: Optional[str] = Field(default="") cache: bool = Field(default=True) model_config = ConfigDict(arbitrary_types_allowed=True) tasks: List[Task] = Field(default_factory=list) diff --git a/src/crewai/pipeline/pipeline.py b/src/crewai/pipeline/pipeline.py index 629214985..59f15b8b5 100644 --- a/src/crewai/pipeline/pipeline.py +++ b/src/crewai/pipeline/pipeline.py @@ -5,15 +5,28 @@ from pydantic import BaseModel, Field from crewai.crew import Crew from crewai.crews.crew_output import CrewOutput +from crewai.pipeline.pipeline_output import PipelineOutput """ Pipeline Terminology: Pipeline: The overall structure that defines a sequence of operations. Stage: A distinct part of the pipeline, which can be either sequential or parallel. +Run: A specific execution of the pipeline for a given set of inputs, representing a single instance of processing through the pipeline. Branch: Parallel executions within a stage (e.g., concurrent crew operations). -Stream: The journey of an individual input through the entire pipeline. +Trace: The journey of an individual input through the entire pipeline. Example pipeline structure: +crew1 >> crew2 >> crew3 + +This represents a pipeline with three sequential stages: +1. crew1 is the first stage, which processes the input and passes its output to crew2. +2. crew2 is the second stage, which takes the output from crew1 as its input, processes it, and passes its output to crew3. +3. crew3 is the final stage, which takes the output from crew2 as its input and produces the final output of the pipeline. + +Each input creates its own run, flowing through all stages of the pipeline. +Multiple runs can be processed concurrently, each following the defined pipeline structure. + +Another example pipeline structure: crew1 >> [crew2, crew3] >> crew4 This represents a pipeline with three stages: @@ -21,8 +34,8 @@ This represents a pipeline with three stages: 2. A parallel stage with two branches (crew2 and crew3 executing concurrently) 3. Another sequential stage (crew4) -Each input creates its own stream, flowing through all stages of the pipeline. -Multiple streams can be processed concurrently, each following the defined pipeline structure. +Each input creates its own run, flowing through all stages of the pipeline. +Multiple runs can be processed concurrently, each following the defined pipeline structure. """ @@ -31,44 +44,43 @@ class Pipeline(BaseModel): ..., description="List of crews representing stages to be executed in sequence" ) - async def process_streams( - self, stream_inputs: List[Dict[str, Any]] + async def process_runs( + self, run_inputs: List[Dict[str, Any]] ) -> List[List[CrewOutput]]: """ - Process multiple streams in parallel, with each stream going through all stages. + Process multiple runs in parallel, with each run going through all stages. """ + pipeline_output = PipelineOutput() - async def process_single_stream( - stream_input: Dict[str, Any] - ) -> List[CrewOutput]: - print("current_input in stream", stream_input) + async def process_single_run(run_input: Dict[str, Any]) -> List[CrewOutput]: + print("current_input in run", run_input) stage_outputs = [] for stage in self.stages: if isinstance(stage, Crew): # Process single crew - stage_output = await stage.kickoff_async(inputs=stream_input) + stage_output = await stage.kickoff_async(inputs=run_input) stage_outputs = [stage_output] else: # Process each crew in parallel parallel_outputs = await asyncio.gather( - *[crew.kickoff_async(inputs=stream_input) for crew in stage] + *[crew.kickoff_async(inputs=run_input) for crew in stage] ) stage_outputs = parallel_outputs # Convert all CrewOutputs from stage into a dictionary for next stage - # and update original stream_input dictionary with new values + # and update original run_input dictionary with new values stage_output_dicts = [output.to_dict() for output in stage_outputs] for stage_dict in stage_output_dicts: - stream_input.update(stage_dict) - print("UPDATING stream_input - new values:", stream_input) + run_input.update(stage_dict) + print("UPDATING run_input - new values:", run_input) - # Return all CrewOutputs from this stream + # Return all CrewOutputs from this run return stage_outputs - # Process all streams in parallel + # Process all runs in parallel return await asyncio.gather( - *(process_single_stream(input_data) for input_data in stream_inputs) + *(process_single_run(input_data) for input_data in run_inputs) ) def __rshift__(self, other: Any) -> "Pipeline": @@ -89,4 +101,4 @@ class Pipeline(BaseModel): async def run_pipeline( pipeline: Pipeline, inputs: List[Dict[str, Any]] ) -> List[List[CrewOutput]]: - return await pipeline.process_streams(inputs) + return await pipeline.process_runs(inputs) diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index cd2f250fe..1d4047d64 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -80,7 +80,7 @@ async def test_pipeline_process_streams_single_input(mock_crew_factory): mock_crew = mock_crew_factory() pipeline = Pipeline(stages=[mock_crew]) input_data = [{"key": "value"}] - pipeline_result = await pipeline.process_streams(input_data) + pipeline_result = await pipeline.process_runs(input_data) mock_crew.kickoff_async.assert_called_once_with(inputs={"key": "value"}) for stream_result in pipeline_result: @@ -104,14 +104,12 @@ async def test_pipeline_process_streams_multiple_inputs(mock_crew_factory): mock_crew = mock_crew_factory() pipeline = Pipeline(stages=[mock_crew]) input_data = [{"key1": "value1"}, {"key2": "value2"}] - pipeline_result = await pipeline.process_streams(input_data) + pipeline_result = await pipeline.process_runs(input_data) assert mock_crew.kickoff_async.call_count == 2 assert len(pipeline_result) == 2 - for stream_result in pipeline_result: - assert all( - isinstance(stream_output, CrewOutput) for stream_output in stream_result - ) + for run_result in pipeline_result: + assert all(isinstance(run_output, CrewOutput) for run_output in run_result) @pytest.mark.asyncio @@ -126,7 +124,7 @@ async def test_pipeline_with_parallel_stages(mock_crew_factory): pipeline = Pipeline(stages=[crew1, [crew2, crew3]]) input_data = [{"initial": "data"}] - pipeline_result = await pipeline.process_streams(input_data) + pipeline_result = await pipeline.process_runs(input_data) crew1.kickoff_async.assert_called_once_with( inputs={"initial": "data", "key": "value"} @@ -188,13 +186,13 @@ async def test_pipeline_data_accumulation(mock_crew_factory): pipeline = Pipeline(stages=[crew1, crew2]) input_data = [{"initial": "data"}] - pipeline_result = await pipeline.process_streams(input_data) + pipeline_result = await pipeline.process_runs(input_data) assert len(pipeline_result) == 1 print("RESULT: ", pipeline_result) - for stream_result in pipeline_result: - print("STREAM RESULT: ", stream_result) - assert stream_result[0].json_dict == { + for run_result in pipeline_result: + print("RUN RESULT: ", run_result) + assert run_result[0].json_dict == { "initial": "data", "key1": "value1", "key2": "value2",