mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-11 00:58:30 +00:00
Change names
This commit is contained in:
@@ -110,7 +110,7 @@ class Pipeline(BaseModel):
|
|||||||
check_nesting_and_type(stage)
|
check_nesting_and_type(stage)
|
||||||
return values
|
return values
|
||||||
|
|
||||||
async def process_runs(
|
async def kickoff(
|
||||||
self, run_inputs: List[Dict[str, Any]]
|
self, run_inputs: List[Dict[str, Any]]
|
||||||
) -> List[PipelineRunResult]:
|
) -> List[PipelineRunResult]:
|
||||||
"""
|
"""
|
||||||
@@ -126,7 +126,7 @@ class Pipeline(BaseModel):
|
|||||||
|
|
||||||
# Process all runs in parallel
|
# Process all runs in parallel
|
||||||
all_run_results = await asyncio.gather(
|
all_run_results = await asyncio.gather(
|
||||||
*(self.process_single_run(input_data) for input_data in run_inputs)
|
*(self.process_single_kickoff(input_data) for input_data in run_inputs)
|
||||||
)
|
)
|
||||||
|
|
||||||
# Flatten the list of lists into a single list of results
|
# Flatten the list of lists into a single list of results
|
||||||
@@ -136,7 +136,7 @@ class Pipeline(BaseModel):
|
|||||||
|
|
||||||
return pipeline_results
|
return pipeline_results
|
||||||
|
|
||||||
async def process_single_run(
|
async def process_single_kickoff(
|
||||||
self, run_input: Dict[str, Any]
|
self, run_input: Dict[str, Any]
|
||||||
) -> List[PipelineRunResult]:
|
) -> List[PipelineRunResult]:
|
||||||
"""
|
"""
|
||||||
|
|||||||
@@ -86,7 +86,7 @@ async def test_pipeline_with_empty_input(mock_crew_factory):
|
|||||||
pipeline = Pipeline(stages=[crew])
|
pipeline = Pipeline(stages=[crew])
|
||||||
|
|
||||||
input_data = []
|
input_data = []
|
||||||
pipeline_results = await pipeline.process_runs(input_data)
|
pipeline_results = await pipeline.kickoff(input_data)
|
||||||
|
|
||||||
assert (
|
assert (
|
||||||
len(pipeline_results) == 0
|
len(pipeline_results) == 0
|
||||||
@@ -103,7 +103,7 @@ async def test_pipeline_process_streams_single_input(mock_crew_factory):
|
|||||||
mock_crew = mock_crew_factory(name="Test Crew")
|
mock_crew = mock_crew_factory(name="Test Crew")
|
||||||
pipeline = Pipeline(stages=[mock_crew])
|
pipeline = Pipeline(stages=[mock_crew])
|
||||||
input_data = [{"key": "value"}]
|
input_data = [{"key": "value"}]
|
||||||
pipeline_results = await pipeline.process_runs(input_data)
|
pipeline_results = await pipeline.kickoff(input_data)
|
||||||
|
|
||||||
mock_crew.kickoff_async.assert_called_once_with(inputs={"key": "value"})
|
mock_crew.kickoff_async.assert_called_once_with(inputs={"key": "value"})
|
||||||
|
|
||||||
@@ -130,7 +130,7 @@ async def test_pipeline_result_ordering(mock_crew_factory):
|
|||||||
) # Parallel stage to test ordering
|
) # Parallel stage to test ordering
|
||||||
|
|
||||||
input_data = [{"id": 1}, {"id": 2}, {"id": 3}]
|
input_data = [{"id": 1}, {"id": 2}, {"id": 3}]
|
||||||
pipeline_results = await pipeline.process_runs(input_data)
|
pipeline_results = await pipeline.kickoff(input_data)
|
||||||
|
|
||||||
assert (
|
assert (
|
||||||
len(pipeline_results) == 6
|
len(pipeline_results) == 6
|
||||||
@@ -187,7 +187,7 @@ async def test_pipeline_process_streams_single_input_pydantic_output(mock_crew_f
|
|||||||
)
|
)
|
||||||
pipeline = Pipeline(stages=[mock_crew])
|
pipeline = Pipeline(stages=[mock_crew])
|
||||||
input_data = [{"key": "value"}]
|
input_data = [{"key": "value"}]
|
||||||
pipeline_results = await pipeline.process_runs(input_data)
|
pipeline_results = await pipeline.kickoff(input_data)
|
||||||
|
|
||||||
assert len(pipeline_results) == 1
|
assert len(pipeline_results) == 1
|
||||||
pipeline_result = pipeline_results[0]
|
pipeline_result = pipeline_results[0]
|
||||||
@@ -220,7 +220,7 @@ async def test_pipeline_preserves_original_input(mock_crew_factory):
|
|||||||
original_input_data = [{"key": "value", "nested": {"a": 1}}]
|
original_input_data = [{"key": "value", "nested": {"a": 1}}]
|
||||||
input_data = json.loads(json.dumps(original_input_data))
|
input_data = json.loads(json.dumps(original_input_data))
|
||||||
|
|
||||||
await pipeline.process_runs(input_data)
|
await pipeline.kickoff(input_data)
|
||||||
|
|
||||||
# Assert that the original input hasn't been modified
|
# Assert that the original input hasn't been modified
|
||||||
assert (
|
assert (
|
||||||
@@ -247,7 +247,7 @@ async def test_pipeline_process_streams_multiple_inputs(mock_crew_factory):
|
|||||||
mock_crew = mock_crew_factory(name="Test Crew")
|
mock_crew = mock_crew_factory(name="Test Crew")
|
||||||
pipeline = Pipeline(stages=[mock_crew])
|
pipeline = Pipeline(stages=[mock_crew])
|
||||||
input_data = [{"key1": "value1"}, {"key2": "value2"}]
|
input_data = [{"key1": "value1"}, {"key2": "value2"}]
|
||||||
pipeline_results = await pipeline.process_runs(input_data)
|
pipeline_results = await pipeline.kickoff(input_data)
|
||||||
|
|
||||||
assert mock_crew.kickoff_async.call_count == 2
|
assert mock_crew.kickoff_async.call_count == 2
|
||||||
assert len(pipeline_results) == 2
|
assert len(pipeline_results) == 2
|
||||||
@@ -271,7 +271,7 @@ async def test_pipeline_with_parallel_stages(mock_crew_factory):
|
|||||||
pipeline = Pipeline(stages=[crew1, [crew2, crew3]])
|
pipeline = Pipeline(stages=[crew1, [crew2, crew3]])
|
||||||
input_data = [{"initial": "data"}]
|
input_data = [{"initial": "data"}]
|
||||||
|
|
||||||
pipeline_result = await pipeline.process_runs(input_data)
|
pipeline_result = await pipeline.kickoff(input_data)
|
||||||
|
|
||||||
crew1.kickoff_async.assert_called_once_with(inputs={"initial": "data"})
|
crew1.kickoff_async.assert_called_once_with(inputs={"initial": "data"})
|
||||||
|
|
||||||
@@ -310,7 +310,7 @@ async def test_pipeline_with_parallel_stages_end_in_single_stage(mock_crew_facto
|
|||||||
pipeline = Pipeline(stages=[crew1, [crew2, crew3], crew4])
|
pipeline = Pipeline(stages=[crew1, [crew2, crew3], crew4])
|
||||||
input_data = [{"initial": "data"}]
|
input_data = [{"initial": "data"}]
|
||||||
|
|
||||||
pipeline_result = await pipeline.process_runs(input_data)
|
pipeline_result = await pipeline.kickoff(input_data)
|
||||||
|
|
||||||
crew1.kickoff_async.assert_called_once_with(inputs={"initial": "data"})
|
crew1.kickoff_async.assert_called_once_with(inputs={"initial": "data"})
|
||||||
|
|
||||||
@@ -378,7 +378,7 @@ async def test_pipeline_parallel_crews_to_parallel_crews(mock_crew_factory):
|
|||||||
pipeline = Pipeline(stages=[[crew1, crew2], [crew3, crew4]])
|
pipeline = Pipeline(stages=[[crew1, crew2], [crew3, crew4]])
|
||||||
|
|
||||||
input_data = [{"input": "test"}]
|
input_data = [{"input": "test"}]
|
||||||
pipeline_results = await pipeline.process_runs(input_data)
|
pipeline_results = await pipeline.kickoff(input_data)
|
||||||
|
|
||||||
assert len(pipeline_results) == 2, "Should have 2 results for final parallel stage"
|
assert len(pipeline_results) == 2, "Should have 2 results for final parallel stage"
|
||||||
|
|
||||||
@@ -450,7 +450,7 @@ async def test_pipeline_data_accumulation(mock_crew_factory):
|
|||||||
|
|
||||||
pipeline = Pipeline(stages=[crew1, crew2])
|
pipeline = Pipeline(stages=[crew1, crew2])
|
||||||
input_data = [{"initial": "data"}]
|
input_data = [{"initial": "data"}]
|
||||||
results = await pipeline.process_runs(input_data)
|
results = await pipeline.kickoff(input_data)
|
||||||
|
|
||||||
# Check that crew1 was called with only the initial input
|
# Check that crew1 was called with only the initial input
|
||||||
crew1.kickoff_async.assert_called_once_with(inputs={"initial": "data"})
|
crew1.kickoff_async.assert_called_once_with(inputs={"initial": "data"})
|
||||||
|
|||||||
Reference in New Issue
Block a user