Implemented additional tests for pipeline. One test is failing. Need team support

This commit is contained in:
Brandon Hancock
2024-07-22 16:35:16 -04:00
parent e95ef6fca9
commit 33d9828edc
3 changed files with 384 additions and 78 deletions

View File

@@ -35,9 +35,6 @@ class CrewOutput(BaseModel):
def to_dict(self) -> Dict[str, Any]: def to_dict(self) -> Dict[str, Any]:
"""Convert json_output and pydantic_output to a dictionary.""" """Convert json_output and pydantic_output to a dictionary."""
print("Crew Output RAW", self.raw)
print("Crew Output JSON", self.json_dict)
print("Crew Output Pydantic", self.pydantic)
output_dict = {} output_dict = {}
if self.json_dict: if self.json_dict:
output_dict.update(self.json_dict) output_dict.update(self.json_dict)

View File

@@ -2,7 +2,8 @@ import asyncio
from collections import deque from collections import deque
from typing import Any, Dict, List, Union from typing import Any, Dict, List, Union
from pydantic import BaseModel, Field from pydantic import BaseModel, Field, model_validator
from pydantic_core import PydanticCustomError
from crewai.crew import Crew from crewai.crew import Crew
from crewai.crews.crew_output import CrewOutput from crewai.crews.crew_output import CrewOutput
@@ -47,6 +48,26 @@ class Pipeline(BaseModel):
..., description="List of crews representing stages to be executed in sequence" ..., description="List of crews representing stages to be executed in sequence"
) )
@model_validator(mode="before")
@classmethod
def validate_stages(cls, values):
stages = values.get("stages", [])
def check_nesting_and_type(item, depth=0):
if depth > 1:
raise ValueError("Double nesting is not allowed in pipeline stages")
if isinstance(item, list):
for sub_item in item:
check_nesting_and_type(sub_item, depth + 1)
elif not isinstance(item, Crew):
raise ValueError(
f"Expected Crew instance or list of Crews, got {type(item)}"
)
for stage in stages:
check_nesting_and_type(stage)
return values
async def process_runs( async def process_runs(
self, run_inputs: List[Dict[str, Any]] self, run_inputs: List[Dict[str, Any]]
) -> List[PipelineRunResult]: ) -> List[PipelineRunResult]:
@@ -58,6 +79,7 @@ class Pipeline(BaseModel):
def format_traces( def format_traces(
traces: List[List[Union[str, Dict[str, Any]]]], traces: List[List[Union[str, Dict[str, Any]]]],
) -> List[List[Trace]]: ) -> List[List[Trace]]:
print("INCOMING TRACES: ", traces)
formatted_traces: List[Trace] = [] formatted_traces: List[Trace] = []
# Process all traces except the last one # Process all traces except the last one
@@ -67,12 +89,15 @@ class Pipeline(BaseModel):
else: else:
formatted_traces.append(trace) formatted_traces.append(trace)
print("FORMATTED TRACES PRE LAST TRACE: ", formatted_traces)
# Handle the final stage trace # Handle the final stage trace
traces_to_return: List[List[Trace]] = [] traces_to_return: List[List[Trace]] = []
final_trace = traces[-1] final_trace = traces[-1]
print("FINAL TRACE: ", final_trace)
if len(final_trace) == 1: if len(final_trace) == 1:
formatted_traces.append(final_trace) formatted_traces.append(final_trace[0])
traces_to_return.append(formatted_traces) traces_to_return.append(formatted_traces)
else: else:
for trace in final_trace: for trace in final_trace:
@@ -80,6 +105,8 @@ class Pipeline(BaseModel):
copied_traces.append(trace) copied_traces.append(trace)
traces_to_return.append(copied_traces) traces_to_return.append(copied_traces)
print("TRACES TO RETURN", traces_to_return)
return traces_to_return return traces_to_return
def format_crew_outputs( def format_crew_outputs(
@@ -136,11 +163,17 @@ class Pipeline(BaseModel):
async def process_single_run( async def process_single_run(
run_input: Dict[str, Any] run_input: Dict[str, Any]
) -> List[PipelineRunResult]: ) -> List[PipelineRunResult]:
stages_queue = deque(self.stages) # TODO: Change over to forloop initial_input = run_input.copy() # Create a copy of the initial input
current_input = (
run_input.copy()
) # Create a working copy that will be updated
stages_queue = deque(self.stages)
usage_metrics = {} usage_metrics = {}
stage_outputs: List[CrewOutput] = [] stage_outputs: List[CrewOutput] = []
all_stage_outputs: List[List[CrewOutput]] = [] all_stage_outputs: List[List[CrewOutput]] = []
traces: List[List[Union[str, Dict[str, Any]]]] = [[run_input]] traces: List[List[Union[str, Dict[str, Any]]]] = [
[initial_input]
] # Use the initial input here
stage = None stage = None
while stages_queue: while stages_queue:
@@ -148,35 +181,37 @@ class Pipeline(BaseModel):
if isinstance(stage, Crew): if isinstance(stage, Crew):
# Process single crew # Process single crew
output = await stage.kickoff_async(inputs=run_input) output = await stage.kickoff_async(inputs=current_input)
# Update usage metrics and setup inputs for next stage # Update usage metrics and setup inputs for next stage
usage_metrics[stage.name] = output.token_usage usage_metrics[stage.name or stage.id] = output.token_usage
run_input.update(output.to_dict()) current_input.update(output.to_dict()) # Update the working copy
# Update traces for single crew stage # Update traces for single crew stage
traces.append([stage.name or "No name"]) traces.append([stage.name or str(stage.id)])
# Store output for final results # Store output for final results
stage_outputs = [output] stage_outputs = [output]
else: else:
# Process each crew in parallel # Process each crew in parallel
parallel_outputs = await asyncio.gather( parallel_outputs = await asyncio.gather(
*[crew.kickoff_async(inputs=run_input) for crew in stage] *[crew.kickoff_async(inputs=current_input) for crew in stage]
) )
# Update usage metrics and setup inputs for next stage # Update usage metrics and setup inputs for next stage
for crew, output in zip(stage, parallel_outputs): for crew, output in zip(stage, parallel_outputs):
usage_metrics[crew.name] = output.token_usage usage_metrics[crew.name] = output.token_usage
run_input.update(output.to_dict()) current_input.update(
output.to_dict()
) # Update the working copy
# Update traces for parallel stage # Update traces for parallel stage
traces.append([crew.name or "No name" for crew in stage]) traces.append([crew.name or str(crew.id) for crew in stage])
# Store output for final results # Store output for final results
stage_outputs = parallel_outputs stage_outputs = parallel_outputs
all_stage_outputs.append(stage_outputs) all_stage_outputs.append(stage_outputs)
print("STAGE OUTPUTS: ", stage_outputs) # print("STAGE OUTPUTS: ", stage_outputs)
print("TRACES: ", traces) # print("TRACES: ", traces)
print("TOKEN USAGE: ", usage_metrics) # print("TOKEN USAGE: ", usage_metrics)
print("ALL STAGE OUTPUTS: ", all_stage_outputs) # print("ALL STAGE OUTPUTS: ", all_stage_outputs)
# Build final pipeline run results # Build final pipeline run results
final_results = build_pipeline_run_results( final_results = build_pipeline_run_results(

View File

@@ -1,3 +1,4 @@
import json
from unittest.mock import MagicMock from unittest.mock import MagicMock
import pytest import pytest
@@ -9,11 +10,18 @@ from crewai.pipeline.pipeline_run_result import PipelineRunResult
from crewai.process import Process from crewai.process import Process
from crewai.task import Task from crewai.task import Task
from crewai.tasks.task_output import TaskOutput from crewai.tasks.task_output import TaskOutput
from pydantic import BaseModel, ValidationError
DEFAULT_TOKEN_USAGE = {
"total_tokens": 100,
"prompt_tokens": 50,
"completion_tokens": 50,
}
@pytest.fixture @pytest.fixture
def mock_crew_factory(): def mock_crew_factory():
def _create_mock_crew(output_json_dict=None): def _create_mock_crew(name: str, output_json_dict=None, pydantic_output=None):
crew = MagicMock(spec=Crew) crew = MagicMock(spec=Crew)
task_output = TaskOutput( task_output = TaskOutput(
description="Test task", raw="Task output", agent="Test Agent" description="Test task", raw="Task output", agent="Test Agent"
@@ -21,12 +29,9 @@ def mock_crew_factory():
crew_output = CrewOutput( crew_output = CrewOutput(
raw="Test output", raw="Test output",
tasks_output=[task_output], tasks_output=[task_output],
token_usage={ token_usage=DEFAULT_TOKEN_USAGE,
"total_tokens": 100, json_dict=output_json_dict if output_json_dict else None,
"prompt_tokens": 50, pydantic=pydantic_output,
"completion_tokens": 50,
},
json_dict=output_json_dict if output_json_dict else {"key": "value"},
) )
async def async_kickoff(inputs=None): async def async_kickoff(inputs=None):
@@ -43,6 +48,7 @@ def mock_crew_factory():
crew.process = Process.sequential crew.process = Process.sequential
crew.config = None crew.config = None
crew.cache = True crew.cache = True
crew.name = name
# Add non-empty agents and tasks # Add non-empty agents and tasks
mock_agent = MagicMock(spec=Agent) mock_agent = MagicMock(spec=Agent)
@@ -63,8 +69,8 @@ def test_pipeline_initialization(mock_crew_factory):
""" """
Test that a Pipeline is correctly initialized with the given stages. Test that a Pipeline is correctly initialized with the given stages.
""" """
crew1 = mock_crew_factory() crew1 = mock_crew_factory(name="Crew 1")
crew2 = mock_crew_factory() crew2 = mock_crew_factory(name="Crew 2")
pipeline = Pipeline(stages=[crew1, crew2]) pipeline = Pipeline(stages=[crew1, crew2])
assert len(pipeline.stages) == 2 assert len(pipeline.stages) == 2
@@ -72,32 +78,165 @@ def test_pipeline_initialization(mock_crew_factory):
assert pipeline.stages[1] == crew2 assert pipeline.stages[1] == crew2
@pytest.mark.asyncio
async def test_pipeline_with_empty_input(mock_crew_factory):
"""
Ensure the pipeline handles an empty input list correctly.
"""
crew = mock_crew_factory(name="Test Crew")
pipeline = Pipeline(stages=[crew])
input_data = []
pipeline_results = await pipeline.process_runs(input_data)
assert (
len(pipeline_results) == 0
), "Pipeline should return empty results for empty input"
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_pipeline_process_streams_single_input(mock_crew_factory): async def test_pipeline_process_streams_single_input(mock_crew_factory):
""" """
Test that Pipeline.process_streams() correctly processes a single input Test that Pipeline.process_streams() correctly processes a single input
and returns the expected CrewOutput. and returns the expected CrewOutput.
""" """
mock_crew = mock_crew_factory() crew_name = "Test Crew"
mock_crew = mock_crew_factory(name="Test Crew")
pipeline = Pipeline(stages=[mock_crew]) pipeline = Pipeline(stages=[mock_crew])
input_data = [{"key": "value"}] input_data = [{"key": "value"}]
pipeline_result = await pipeline.process_runs(input_data) pipeline_results = await pipeline.process_runs(input_data)
mock_crew.kickoff_async.assert_called_once_with(inputs={"key": "value"}) mock_crew.kickoff_async.assert_called_once_with(inputs={"key": "value"})
for pipeline_line_result in pipeline_result: for pipeline_result in pipeline_results:
assert isinstance(pipeline_line_result, PipelineRunResult) assert isinstance(pipeline_result, PipelineRunResult)
assert pipeline_result.raw == "Test output"
assert len(pipeline_result.crews_outputs) == 1
print("pipeline_result.token_usage", pipeline_result.token_usage)
assert pipeline_result.token_usage == {crew_name: DEFAULT_TOKEN_USAGE}
assert pipeline_result.trace == [input_data[0], "Test Crew"]
# for stream_result in pipeline_result:
# assert isinstance(stream_result[0], CrewOutput) @pytest.mark.asyncio
# assert stream_result[0].raw == "Test output" async def test_pipeline_result_ordering(mock_crew_factory):
# assert len(stream_result[0].tasks_output) == 1 """
# assert stream_result[0].tasks_output[0].raw == "Task output" Ensure that results are returned in the same order as the inputs, especially with parallel processing.
# assert stream_result[0].token_usage == { """
# "total_tokens": 100, crew1 = mock_crew_factory(name="Crew 1", output_json_dict={"output": "crew1"})
# "prompt_tokens": 50, crew2 = mock_crew_factory(name="Crew 2", output_json_dict={"output": "crew2"})
# "completion_tokens": 50, crew3 = mock_crew_factory(name="Crew 3", output_json_dict={"output": "crew3"})
# }
pipeline = Pipeline(
stages=[crew1, [crew2, crew3]]
) # Parallel stage to test ordering
input_data = [{"id": 1}, {"id": 2}, {"id": 3}]
pipeline_results = await pipeline.process_runs(input_data)
assert (
len(pipeline_results) == 6
), "Should have 2 results for each input due to the parallel final stage"
# Group results by their original input id
grouped_results = {}
for result in pipeline_results:
input_id = result.trace[0]["id"]
if input_id not in grouped_results:
grouped_results[input_id] = []
grouped_results[input_id].append(result)
# Check that we have the correct number of groups and results per group
assert len(grouped_results) == 3, "Should have results for each of the 3 inputs"
for input_id, results in grouped_results.items():
assert (
len(results) == 2
), f"Each input should have 2 results, but input {input_id} has {len(results)}"
# Check the ordering and content of the results
for input_id in range(1, 4):
group = grouped_results[input_id]
assert group[0].trace == [
{"id": input_id},
"Crew 1",
"Crew 2",
], f"Unexpected trace for first result of input {input_id}"
assert group[1].trace == [
{"id": input_id},
"Crew 1",
"Crew 3",
], f"Unexpected trace for second result of input {input_id}"
assert (
group[0].json_dict["output"] == "crew2"
), f"Unexpected output for first result of input {input_id}"
assert (
group[1].json_dict["output"] == "crew3"
), f"Unexpected output for second result of input {input_id}"
class TestPydanticOutput(BaseModel):
key: str
value: int
@pytest.mark.asyncio
async def test_pipeline_process_streams_single_input_pydantic_output(mock_crew_factory):
crew_name = "Test Crew"
mock_crew = mock_crew_factory(
name=crew_name,
output_json_dict=None,
pydantic_output=TestPydanticOutput(key="test", value=42),
)
pipeline = Pipeline(stages=[mock_crew])
input_data = [{"key": "value"}]
pipeline_results = await pipeline.process_runs(input_data)
assert len(pipeline_results) == 1
pipeline_result = pipeline_results[0]
print("pipeline_result.trace", pipeline_result.trace)
assert isinstance(pipeline_result, PipelineRunResult)
assert pipeline_result.raw == "Test output"
assert len(pipeline_result.crews_outputs) == 1
assert pipeline_result.token_usage == {crew_name: DEFAULT_TOKEN_USAGE}
print("INPUT DATA POST PROCESS", input_data)
assert pipeline_result.trace == [input_data[0], "Test Crew"]
assert isinstance(pipeline_result.pydantic, TestPydanticOutput)
assert pipeline_result.pydantic.key == "test"
assert pipeline_result.pydantic.value == 42
assert pipeline_result.json_dict is None
@pytest.mark.asyncio
async def test_pipeline_preserves_original_input(mock_crew_factory):
crew_name = "Test Crew"
mock_crew = mock_crew_factory(
name=crew_name,
output_json_dict={"new_key": "new_value"},
)
pipeline = Pipeline(stages=[mock_crew])
# Create a deep copy of the input data to ensure we're not comparing references
original_input_data = [{"key": "value", "nested": {"a": 1}}]
input_data = json.loads(json.dumps(original_input_data))
await pipeline.process_runs(input_data)
# Assert that the original input hasn't been modified
assert (
input_data == original_input_data
), "The original input data should not be modified"
# Ensure that even nested structures haven't been modified
assert (
input_data[0]["nested"] == original_input_data[0]["nested"]
), "Nested structures should not be modified"
# Verify that adding new keys to the crew output doesn't affect the original input
assert (
"new_key" not in input_data[0]
), "New keys from crew output should not be added to the original input"
@pytest.mark.asyncio @pytest.mark.asyncio
@@ -106,15 +245,19 @@ async def test_pipeline_process_streams_multiple_inputs(mock_crew_factory):
Test that Pipeline.process_streams() correctly processes multiple inputs Test that Pipeline.process_streams() correctly processes multiple inputs
and returns the expected CrewOutputs. and returns the expected CrewOutputs.
""" """
mock_crew = mock_crew_factory() mock_crew = mock_crew_factory(name="Test Crew")
pipeline = Pipeline(stages=[mock_crew]) pipeline = Pipeline(stages=[mock_crew])
input_data = [{"key1": "value1"}, {"key2": "value2"}] input_data = [{"key1": "value1"}, {"key2": "value2"}]
pipeline_result = await pipeline.process_runs(input_data) pipeline_results = await pipeline.process_runs(input_data)
assert mock_crew.kickoff_async.call_count == 2 assert mock_crew.kickoff_async.call_count == 2
assert len(pipeline_result) == 2 assert len(pipeline_results) == 2
for run_result in pipeline_result: for pipeline_result in pipeline_results:
assert all(isinstance(run_output, CrewOutput) for run_output in run_result) print("pipeline_result,", pipeline_result)
assert all(
isinstance(crew_output, CrewOutput)
for crew_output in pipeline_result.crews_outputs
)
@pytest.mark.asyncio @pytest.mark.asyncio
@@ -122,37 +265,89 @@ async def test_pipeline_with_parallel_stages(mock_crew_factory):
""" """
Test that Pipeline correctly handles parallel stages. Test that Pipeline correctly handles parallel stages.
""" """
crew1 = mock_crew_factory() crew1 = mock_crew_factory(name="Crew 1")
crew2 = mock_crew_factory() crew2 = mock_crew_factory(name="Crew 2")
crew3 = mock_crew_factory() crew3 = mock_crew_factory(name="Crew 3")
pipeline = Pipeline(stages=[crew1, [crew2, crew3]]) pipeline = Pipeline(stages=[crew1, [crew2, crew3]])
input_data = [{"initial": "data"}] input_data = [{"initial": "data"}]
pipeline_result = await pipeline.process_runs(input_data) pipeline_result = await pipeline.process_runs(input_data)
crew1.kickoff_async.assert_called_once_with( crew1.kickoff_async.assert_called_once_with(inputs={"initial": "data"})
inputs={"initial": "data", "key": "value"}
) assert len(pipeline_result) == 2
crew2.kickoff_async.assert_called_once_with( pipeline_result_1, pipeline_result_2 = pipeline_result
inputs={"initial": "data", "key": "value"}
) pipeline_result_1.trace = [
crew3.kickoff_async.assert_called_once_with( "Crew 1",
inputs={"initial": "data", "key": "value"} "Crew 2",
) ]
pipeline_result_2.trace = [
"Crew 1",
"Crew 3",
]
expected_token_usage = {
"Crew 1": DEFAULT_TOKEN_USAGE,
"Crew 2": DEFAULT_TOKEN_USAGE,
"Crew 3": DEFAULT_TOKEN_USAGE,
}
assert pipeline_result_1.token_usage == expected_token_usage
assert pipeline_result_2.token_usage == expected_token_usage
@pytest.mark.asyncio
async def test_pipeline_with_parallel_stages_end_in_single_stage(mock_crew_factory):
"""
Test that Pipeline correctly handles parallel stages.
"""
crew1 = mock_crew_factory(name="Crew 1")
crew2 = mock_crew_factory(name="Crew 2")
crew3 = mock_crew_factory(name="Crew 3")
crew4 = mock_crew_factory(name="Crew 4")
pipeline = Pipeline(stages=[crew1, [crew2, crew3], crew4])
input_data = [{"initial": "data"}]
pipeline_result = await pipeline.process_runs(input_data)
crew1.kickoff_async.assert_called_once_with(inputs={"initial": "data"})
assert len(pipeline_result) == 1 assert len(pipeline_result) == 1
for stage_result in pipeline_result: pipeline_result_1 = pipeline_result[0]
assert isinstance(stage_result[0], CrewOutput)
pipeline_result_1.trace = [
input_data[0],
"Crew 1",
["Crew 2", "Crew 3"],
"Crew 4",
]
expected_token_usage = {
"Crew 1": DEFAULT_TOKEN_USAGE,
"Crew 2": DEFAULT_TOKEN_USAGE,
"Crew 3": DEFAULT_TOKEN_USAGE,
"Crew 4": DEFAULT_TOKEN_USAGE,
}
assert pipeline_result_1.token_usage == expected_token_usage
@pytest.mark.asyncio
async def test_pipeline_with_parallel_stages_multiple_inputs(mock_crew_factory):
# TODO: implement
pass
def test_pipeline_rshift_operator(mock_crew_factory): def test_pipeline_rshift_operator(mock_crew_factory):
""" """
Test that the >> operator correctly creates a Pipeline from Crews and lists of Crews. Test that the >> operator correctly creates a Pipeline from Crews and lists of Crews.
""" """
crew1 = mock_crew_factory() crew1 = mock_crew_factory(name="Crew 1")
crew2 = mock_crew_factory() crew2 = mock_crew_factory(name="Crew 2")
crew3 = mock_crew_factory() crew3 = mock_crew_factory(name="Crew 3")
# Test single crew addition # Test single crew addition
pipeline = Pipeline(stages=[]) >> crew1 pipeline = Pipeline(stages=[]) >> crew1
@@ -171,6 +366,75 @@ def test_pipeline_rshift_operator(mock_crew_factory):
pipeline >> "not a crew" pipeline >> "not a crew"
@pytest.mark.asyncio
async def test_pipeline_parallel_crews_to_parallel_crews(mock_crew_factory):
"""
Test that feeding parallel crews to parallel crews works correctly.
"""
crew1 = mock_crew_factory(name="Crew 1", output_json_dict={"output1": "crew1"})
crew2 = mock_crew_factory(name="Crew 2", output_json_dict={"output2": "crew2"})
crew3 = mock_crew_factory(name="Crew 3", output_json_dict={"output3": "crew3"})
crew4 = mock_crew_factory(name="Crew 4", output_json_dict={"output4": "crew4"})
pipeline = Pipeline(stages=[[crew1, crew2], [crew3, crew4]])
input_data = [{"input": "test"}]
pipeline_results = await pipeline.process_runs(input_data)
assert len(pipeline_results) == 2, "Should have 2 results for final parallel stage"
pipeline_result_1, pipeline_result_2 = pipeline_results
# Check the outputs
assert pipeline_result_1.json_dict == {"output3": "crew3"}
assert pipeline_result_2.json_dict == {"output4": "crew4"}
# Check the traces
expected_traces = [
[{"input": "test"}, ["Crew 1", "Crew 2"], "Crew 3"],
[{"input": "test"}, ["Crew 1", "Crew 2"], "Crew 4"],
]
for result, expected_trace in zip(pipeline_results, expected_traces):
assert result.trace == expected_trace, f"Unexpected trace: {result.trace}"
def test_pipeline_double_nesting_not_allowed(mock_crew_factory):
"""
Test that double nesting in pipeline stages is not allowed.
"""
crew1 = mock_crew_factory(name="Crew 1")
crew2 = mock_crew_factory(name="Crew 2")
crew3 = mock_crew_factory(name="Crew 3")
crew4 = mock_crew_factory(name="Crew 4")
with pytest.raises(ValidationError) as exc_info:
Pipeline(stages=[crew1, [[crew2, crew3], crew4]])
error_msg = str(exc_info.value)
print(f"Full error message: {error_msg}") # For debugging
assert (
"Double nesting is not allowed in pipeline stages" in error_msg
), f"Unexpected error message: {error_msg}"
def test_pipeline_invalid_crew(mock_crew_factory):
"""
Test that non-Crew objects are not allowed in pipeline stages.
"""
crew1 = mock_crew_factory(name="Crew 1")
not_a_crew = "This is not a crew"
with pytest.raises(ValidationError) as exc_info:
Pipeline(stages=[crew1, not_a_crew])
error_msg = str(exc_info.value)
print(f"Full error message: {error_msg}") # For debugging
assert (
"Expected Crew instance or list of Crews, got <class 'str'>" in error_msg
), f"Unexpected error message: {error_msg}"
""" """
TODO: Figure out what is the proper output for a pipeline with multiple stages TODO: Figure out what is the proper output for a pipeline with multiple stages
@@ -178,27 +442,37 @@ Options:
- Should the final output only include the last stage's output? - Should the final output only include the last stage's output?
- Should the final output include the accumulation of previous stages' outputs? - Should the final output include the accumulation of previous stages' outputs?
""" """
# TODO: GET HELP FROM TEAM ON THIS ONE
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_pipeline_data_accumulation(mock_crew_factory): async def test_pipeline_data_accumulation(mock_crew_factory):
""" crew1 = mock_crew_factory(name="Crew 1", output_json_dict={"key1": "value1"})
Test that data is correctly accumulated through the pipeline stages. crew2 = mock_crew_factory(name="Crew 2", output_json_dict={"key2": "value2"})
"""
crew1 = mock_crew_factory(output_json_dict={"key1": "value1"})
crew2 = mock_crew_factory(output_json_dict={"key2": "value2"})
pipeline = Pipeline(stages=[crew1, crew2]) pipeline = Pipeline(stages=[crew1, crew2])
input_data = [{"initial": "data"}] input_data = [{"initial": "data"}]
pipeline_result = await pipeline.process_runs(input_data) results = await pipeline.process_runs(input_data)
assert len(pipeline_result) == 1 # Check that crew1 was called with only the initial input
print("RESULT: ", pipeline_result) crew1.kickoff_async.assert_called_once_with(inputs={"initial": "data"})
for run_result in pipeline_result:
print("RUN RESULT: ", run_result) # Check that crew2 was called with the combined input from the initial data and crew1's output
assert run_result[0].json_dict == { crew2.kickoff_async.assert_called_once_with(
"initial": "data", inputs={"initial": "data", "key1": "value1"}
"key1": "value1", )
"key2": "value2",
} # Check the final output
assert len(results) == 1
final_result = results[0]
assert final_result.json_dict == {"key2": "value2"}
# Check that the trace includes all stages
assert final_result.trace == [{"initial": "data"}, "Crew 1", "Crew 2"]
# Check that crews_outputs contain the correct information
assert len(final_result.crews_outputs) == 2
assert final_result.crews_outputs[0].json_dict == {"key1": "value1"}
assert final_result.crews_outputs[1].json_dict == {"key2": "value2"}