diff --git a/src/crewai/pipeline/pipeline.py b/src/crewai/pipeline/pipeline.py index 9065de902..e6c74a671 100644 --- a/src/crewai/pipeline/pipeline.py +++ b/src/crewai/pipeline/pipeline.py @@ -110,7 +110,7 @@ class Pipeline(BaseModel): check_nesting_and_type(stage) return values - async def process_runs( + async def kickoff( self, run_inputs: List[Dict[str, Any]] ) -> List[PipelineRunResult]: """ @@ -126,7 +126,7 @@ class Pipeline(BaseModel): # Process all runs in parallel all_run_results = await asyncio.gather( - *(self.process_single_run(input_data) for input_data in run_inputs) + *(self.process_single_kickoff(input_data) for input_data in run_inputs) ) # Flatten the list of lists into a single list of results @@ -136,7 +136,7 @@ class Pipeline(BaseModel): return pipeline_results - async def process_single_run( + async def process_single_kickoff( self, run_input: Dict[str, Any] ) -> List[PipelineRunResult]: """ diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index 7f3cb4bf9..fef6a87a6 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -86,7 +86,7 @@ async def test_pipeline_with_empty_input(mock_crew_factory): pipeline = Pipeline(stages=[crew]) input_data = [] - pipeline_results = await pipeline.process_runs(input_data) + pipeline_results = await pipeline.kickoff(input_data) assert ( len(pipeline_results) == 0 @@ -103,7 +103,7 @@ async def test_pipeline_process_streams_single_input(mock_crew_factory): mock_crew = mock_crew_factory(name="Test Crew") pipeline = Pipeline(stages=[mock_crew]) input_data = [{"key": "value"}] - pipeline_results = await pipeline.process_runs(input_data) + pipeline_results = await pipeline.kickoff(input_data) mock_crew.kickoff_async.assert_called_once_with(inputs={"key": "value"}) @@ -130,7 +130,7 @@ async def test_pipeline_result_ordering(mock_crew_factory): ) # Parallel stage to test ordering input_data = [{"id": 1}, {"id": 2}, {"id": 3}] - pipeline_results = await pipeline.process_runs(input_data) + pipeline_results = await pipeline.kickoff(input_data) assert ( len(pipeline_results) == 6 @@ -187,7 +187,7 @@ async def test_pipeline_process_streams_single_input_pydantic_output(mock_crew_f ) pipeline = Pipeline(stages=[mock_crew]) input_data = [{"key": "value"}] - pipeline_results = await pipeline.process_runs(input_data) + pipeline_results = await pipeline.kickoff(input_data) assert len(pipeline_results) == 1 pipeline_result = pipeline_results[0] @@ -220,7 +220,7 @@ async def test_pipeline_preserves_original_input(mock_crew_factory): original_input_data = [{"key": "value", "nested": {"a": 1}}] input_data = json.loads(json.dumps(original_input_data)) - await pipeline.process_runs(input_data) + await pipeline.kickoff(input_data) # Assert that the original input hasn't been modified assert ( @@ -247,7 +247,7 @@ async def test_pipeline_process_streams_multiple_inputs(mock_crew_factory): mock_crew = mock_crew_factory(name="Test Crew") pipeline = Pipeline(stages=[mock_crew]) input_data = [{"key1": "value1"}, {"key2": "value2"}] - pipeline_results = await pipeline.process_runs(input_data) + pipeline_results = await pipeline.kickoff(input_data) assert mock_crew.kickoff_async.call_count == 2 assert len(pipeline_results) == 2 @@ -271,7 +271,7 @@ async def test_pipeline_with_parallel_stages(mock_crew_factory): pipeline = Pipeline(stages=[crew1, [crew2, crew3]]) input_data = [{"initial": "data"}] - pipeline_result = await pipeline.process_runs(input_data) + pipeline_result = await pipeline.kickoff(input_data) crew1.kickoff_async.assert_called_once_with(inputs={"initial": "data"}) @@ -310,7 +310,7 @@ async def test_pipeline_with_parallel_stages_end_in_single_stage(mock_crew_facto pipeline = Pipeline(stages=[crew1, [crew2, crew3], crew4]) input_data = [{"initial": "data"}] - pipeline_result = await pipeline.process_runs(input_data) + pipeline_result = await pipeline.kickoff(input_data) crew1.kickoff_async.assert_called_once_with(inputs={"initial": "data"}) @@ -378,7 +378,7 @@ async def test_pipeline_parallel_crews_to_parallel_crews(mock_crew_factory): pipeline = Pipeline(stages=[[crew1, crew2], [crew3, crew4]]) input_data = [{"input": "test"}] - pipeline_results = await pipeline.process_runs(input_data) + pipeline_results = await pipeline.kickoff(input_data) assert len(pipeline_results) == 2, "Should have 2 results for final parallel stage" @@ -450,7 +450,7 @@ async def test_pipeline_data_accumulation(mock_crew_factory): pipeline = Pipeline(stages=[crew1, crew2]) input_data = [{"initial": "data"}] - results = await pipeline.process_runs(input_data) + results = await pipeline.kickoff(input_data) # Check that crew1 was called with only the initial input crew1.kickoff_async.assert_called_once_with(inputs={"initial": "data"})