diff --git a/src/crewai/crews/crew_output.py b/src/crewai/crews/crew_output.py index 81a8752c8..e630c1f3a 100644 --- a/src/crewai/crews/crew_output.py +++ b/src/crewai/crews/crew_output.py @@ -35,9 +35,6 @@ class CrewOutput(BaseModel): def to_dict(self) -> Dict[str, Any]: """Convert json_output and pydantic_output to a dictionary.""" - print("Crew Output RAW", self.raw) - print("Crew Output JSON", self.json_dict) - print("Crew Output Pydantic", self.pydantic) output_dict = {} if self.json_dict: output_dict.update(self.json_dict) diff --git a/src/crewai/pipeline/pipeline.py b/src/crewai/pipeline/pipeline.py index fed3eb395..8ee5747de 100644 --- a/src/crewai/pipeline/pipeline.py +++ b/src/crewai/pipeline/pipeline.py @@ -2,7 +2,8 @@ import asyncio from collections import deque from typing import Any, Dict, List, Union -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, model_validator +from pydantic_core import PydanticCustomError from crewai.crew import Crew from crewai.crews.crew_output import CrewOutput @@ -47,6 +48,26 @@ class Pipeline(BaseModel): ..., description="List of crews representing stages to be executed in sequence" ) + @model_validator(mode="before") + @classmethod + def validate_stages(cls, values): + stages = values.get("stages", []) + + def check_nesting_and_type(item, depth=0): + if depth > 1: + raise ValueError("Double nesting is not allowed in pipeline stages") + if isinstance(item, list): + for sub_item in item: + check_nesting_and_type(sub_item, depth + 1) + elif not isinstance(item, Crew): + raise ValueError( + f"Expected Crew instance or list of Crews, got {type(item)}" + ) + + for stage in stages: + check_nesting_and_type(stage) + return values + async def process_runs( self, run_inputs: List[Dict[str, Any]] ) -> List[PipelineRunResult]: @@ -58,6 +79,7 @@ class Pipeline(BaseModel): def format_traces( traces: List[List[Union[str, Dict[str, Any]]]], ) -> List[List[Trace]]: + print("INCOMING TRACES: ", traces) formatted_traces: List[Trace] = [] # Process all traces except the last one @@ -67,12 +89,15 @@ class Pipeline(BaseModel): else: formatted_traces.append(trace) + print("FORMATTED TRACES PRE LAST TRACE: ", formatted_traces) + # Handle the final stage trace traces_to_return: List[List[Trace]] = [] final_trace = traces[-1] + print("FINAL TRACE: ", final_trace) if len(final_trace) == 1: - formatted_traces.append(final_trace) + formatted_traces.append(final_trace[0]) traces_to_return.append(formatted_traces) else: for trace in final_trace: @@ -80,6 +105,8 @@ class Pipeline(BaseModel): copied_traces.append(trace) traces_to_return.append(copied_traces) + print("TRACES TO RETURN", traces_to_return) + return traces_to_return def format_crew_outputs( @@ -136,11 +163,17 @@ class Pipeline(BaseModel): async def process_single_run( run_input: Dict[str, Any] ) -> List[PipelineRunResult]: - stages_queue = deque(self.stages) # TODO: Change over to forloop + initial_input = run_input.copy() # Create a copy of the initial input + current_input = ( + run_input.copy() + ) # Create a working copy that will be updated + stages_queue = deque(self.stages) usage_metrics = {} stage_outputs: List[CrewOutput] = [] all_stage_outputs: List[List[CrewOutput]] = [] - traces: List[List[Union[str, Dict[str, Any]]]] = [[run_input]] + traces: List[List[Union[str, Dict[str, Any]]]] = [ + [initial_input] + ] # Use the initial input here stage = None while stages_queue: @@ -148,35 +181,37 @@ class Pipeline(BaseModel): if isinstance(stage, Crew): # Process single crew - output = await stage.kickoff_async(inputs=run_input) + output = await stage.kickoff_async(inputs=current_input) # Update usage metrics and setup inputs for next stage - usage_metrics[stage.name] = output.token_usage - run_input.update(output.to_dict()) + usage_metrics[stage.name or stage.id] = output.token_usage + current_input.update(output.to_dict()) # Update the working copy # Update traces for single crew stage - traces.append([stage.name or "No name"]) + traces.append([stage.name or str(stage.id)]) # Store output for final results stage_outputs = [output] else: # Process each crew in parallel parallel_outputs = await asyncio.gather( - *[crew.kickoff_async(inputs=run_input) for crew in stage] + *[crew.kickoff_async(inputs=current_input) for crew in stage] ) # Update usage metrics and setup inputs for next stage for crew, output in zip(stage, parallel_outputs): usage_metrics[crew.name] = output.token_usage - run_input.update(output.to_dict()) + current_input.update( + output.to_dict() + ) # Update the working copy # Update traces for parallel stage - traces.append([crew.name or "No name" for crew in stage]) + traces.append([crew.name or str(crew.id) for crew in stage]) # Store output for final results stage_outputs = parallel_outputs all_stage_outputs.append(stage_outputs) - print("STAGE OUTPUTS: ", stage_outputs) - print("TRACES: ", traces) - print("TOKEN USAGE: ", usage_metrics) - print("ALL STAGE OUTPUTS: ", all_stage_outputs) + # print("STAGE OUTPUTS: ", stage_outputs) + # print("TRACES: ", traces) + # print("TOKEN USAGE: ", usage_metrics) + # print("ALL STAGE OUTPUTS: ", all_stage_outputs) # Build final pipeline run results final_results = build_pipeline_run_results( diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index 8615391ac..7993c0804 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -1,3 +1,4 @@ +import json from unittest.mock import MagicMock import pytest @@ -9,11 +10,18 @@ from crewai.pipeline.pipeline_run_result import PipelineRunResult from crewai.process import Process from crewai.task import Task from crewai.tasks.task_output import TaskOutput +from pydantic import BaseModel, ValidationError + +DEFAULT_TOKEN_USAGE = { + "total_tokens": 100, + "prompt_tokens": 50, + "completion_tokens": 50, +} @pytest.fixture def mock_crew_factory(): - def _create_mock_crew(output_json_dict=None): + def _create_mock_crew(name: str, output_json_dict=None, pydantic_output=None): crew = MagicMock(spec=Crew) task_output = TaskOutput( description="Test task", raw="Task output", agent="Test Agent" @@ -21,12 +29,9 @@ def mock_crew_factory(): crew_output = CrewOutput( raw="Test output", tasks_output=[task_output], - token_usage={ - "total_tokens": 100, - "prompt_tokens": 50, - "completion_tokens": 50, - }, - json_dict=output_json_dict if output_json_dict else {"key": "value"}, + token_usage=DEFAULT_TOKEN_USAGE, + json_dict=output_json_dict if output_json_dict else None, + pydantic=pydantic_output, ) async def async_kickoff(inputs=None): @@ -43,6 +48,7 @@ def mock_crew_factory(): crew.process = Process.sequential crew.config = None crew.cache = True + crew.name = name # Add non-empty agents and tasks mock_agent = MagicMock(spec=Agent) @@ -63,8 +69,8 @@ def test_pipeline_initialization(mock_crew_factory): """ Test that a Pipeline is correctly initialized with the given stages. """ - crew1 = mock_crew_factory() - crew2 = mock_crew_factory() + crew1 = mock_crew_factory(name="Crew 1") + crew2 = mock_crew_factory(name="Crew 2") pipeline = Pipeline(stages=[crew1, crew2]) assert len(pipeline.stages) == 2 @@ -72,32 +78,165 @@ def test_pipeline_initialization(mock_crew_factory): assert pipeline.stages[1] == crew2 +@pytest.mark.asyncio +async def test_pipeline_with_empty_input(mock_crew_factory): + """ + Ensure the pipeline handles an empty input list correctly. + """ + crew = mock_crew_factory(name="Test Crew") + pipeline = Pipeline(stages=[crew]) + + input_data = [] + pipeline_results = await pipeline.process_runs(input_data) + + assert ( + len(pipeline_results) == 0 + ), "Pipeline should return empty results for empty input" + + @pytest.mark.asyncio async def test_pipeline_process_streams_single_input(mock_crew_factory): """ Test that Pipeline.process_streams() correctly processes a single input and returns the expected CrewOutput. """ - mock_crew = mock_crew_factory() + crew_name = "Test Crew" + mock_crew = mock_crew_factory(name="Test Crew") pipeline = Pipeline(stages=[mock_crew]) input_data = [{"key": "value"}] - pipeline_result = await pipeline.process_runs(input_data) + pipeline_results = await pipeline.process_runs(input_data) mock_crew.kickoff_async.assert_called_once_with(inputs={"key": "value"}) - for pipeline_line_result in pipeline_result: - assert isinstance(pipeline_line_result, PipelineRunResult) + for pipeline_result in pipeline_results: + assert isinstance(pipeline_result, PipelineRunResult) + assert pipeline_result.raw == "Test output" + assert len(pipeline_result.crews_outputs) == 1 + print("pipeline_result.token_usage", pipeline_result.token_usage) + assert pipeline_result.token_usage == {crew_name: DEFAULT_TOKEN_USAGE} + assert pipeline_result.trace == [input_data[0], "Test Crew"] - # for stream_result in pipeline_result: - # assert isinstance(stream_result[0], CrewOutput) - # assert stream_result[0].raw == "Test output" - # assert len(stream_result[0].tasks_output) == 1 - # assert stream_result[0].tasks_output[0].raw == "Task output" - # assert stream_result[0].token_usage == { - # "total_tokens": 100, - # "prompt_tokens": 50, - # "completion_tokens": 50, - # } + +@pytest.mark.asyncio +async def test_pipeline_result_ordering(mock_crew_factory): + """ + Ensure that results are returned in the same order as the inputs, especially with parallel processing. + """ + crew1 = mock_crew_factory(name="Crew 1", output_json_dict={"output": "crew1"}) + crew2 = mock_crew_factory(name="Crew 2", output_json_dict={"output": "crew2"}) + crew3 = mock_crew_factory(name="Crew 3", output_json_dict={"output": "crew3"}) + + pipeline = Pipeline( + stages=[crew1, [crew2, crew3]] + ) # Parallel stage to test ordering + + input_data = [{"id": 1}, {"id": 2}, {"id": 3}] + pipeline_results = await pipeline.process_runs(input_data) + + assert ( + len(pipeline_results) == 6 + ), "Should have 2 results for each input due to the parallel final stage" + + # Group results by their original input id + grouped_results = {} + for result in pipeline_results: + input_id = result.trace[0]["id"] + if input_id not in grouped_results: + grouped_results[input_id] = [] + grouped_results[input_id].append(result) + + # Check that we have the correct number of groups and results per group + assert len(grouped_results) == 3, "Should have results for each of the 3 inputs" + for input_id, results in grouped_results.items(): + assert ( + len(results) == 2 + ), f"Each input should have 2 results, but input {input_id} has {len(results)}" + + # Check the ordering and content of the results + for input_id in range(1, 4): + group = grouped_results[input_id] + assert group[0].trace == [ + {"id": input_id}, + "Crew 1", + "Crew 2", + ], f"Unexpected trace for first result of input {input_id}" + assert group[1].trace == [ + {"id": input_id}, + "Crew 1", + "Crew 3", + ], f"Unexpected trace for second result of input {input_id}" + assert ( + group[0].json_dict["output"] == "crew2" + ), f"Unexpected output for first result of input {input_id}" + assert ( + group[1].json_dict["output"] == "crew3" + ), f"Unexpected output for second result of input {input_id}" + + +class TestPydanticOutput(BaseModel): + key: str + value: int + + +@pytest.mark.asyncio +async def test_pipeline_process_streams_single_input_pydantic_output(mock_crew_factory): + crew_name = "Test Crew" + mock_crew = mock_crew_factory( + name=crew_name, + output_json_dict=None, + pydantic_output=TestPydanticOutput(key="test", value=42), + ) + pipeline = Pipeline(stages=[mock_crew]) + input_data = [{"key": "value"}] + pipeline_results = await pipeline.process_runs(input_data) + + assert len(pipeline_results) == 1 + pipeline_result = pipeline_results[0] + + print("pipeline_result.trace", pipeline_result.trace) + + assert isinstance(pipeline_result, PipelineRunResult) + assert pipeline_result.raw == "Test output" + assert len(pipeline_result.crews_outputs) == 1 + assert pipeline_result.token_usage == {crew_name: DEFAULT_TOKEN_USAGE} + print("INPUT DATA POST PROCESS", input_data) + assert pipeline_result.trace == [input_data[0], "Test Crew"] + + assert isinstance(pipeline_result.pydantic, TestPydanticOutput) + assert pipeline_result.pydantic.key == "test" + assert pipeline_result.pydantic.value == 42 + assert pipeline_result.json_dict is None + + +@pytest.mark.asyncio +async def test_pipeline_preserves_original_input(mock_crew_factory): + crew_name = "Test Crew" + mock_crew = mock_crew_factory( + name=crew_name, + output_json_dict={"new_key": "new_value"}, + ) + pipeline = Pipeline(stages=[mock_crew]) + + # Create a deep copy of the input data to ensure we're not comparing references + original_input_data = [{"key": "value", "nested": {"a": 1}}] + input_data = json.loads(json.dumps(original_input_data)) + + await pipeline.process_runs(input_data) + + # Assert that the original input hasn't been modified + assert ( + input_data == original_input_data + ), "The original input data should not be modified" + + # Ensure that even nested structures haven't been modified + assert ( + input_data[0]["nested"] == original_input_data[0]["nested"] + ), "Nested structures should not be modified" + + # Verify that adding new keys to the crew output doesn't affect the original input + assert ( + "new_key" not in input_data[0] + ), "New keys from crew output should not be added to the original input" @pytest.mark.asyncio @@ -106,15 +245,19 @@ async def test_pipeline_process_streams_multiple_inputs(mock_crew_factory): Test that Pipeline.process_streams() correctly processes multiple inputs and returns the expected CrewOutputs. """ - mock_crew = mock_crew_factory() + mock_crew = mock_crew_factory(name="Test Crew") pipeline = Pipeline(stages=[mock_crew]) input_data = [{"key1": "value1"}, {"key2": "value2"}] - pipeline_result = await pipeline.process_runs(input_data) + pipeline_results = await pipeline.process_runs(input_data) assert mock_crew.kickoff_async.call_count == 2 - assert len(pipeline_result) == 2 - for run_result in pipeline_result: - assert all(isinstance(run_output, CrewOutput) for run_output in run_result) + assert len(pipeline_results) == 2 + for pipeline_result in pipeline_results: + print("pipeline_result,", pipeline_result) + assert all( + isinstance(crew_output, CrewOutput) + for crew_output in pipeline_result.crews_outputs + ) @pytest.mark.asyncio @@ -122,37 +265,89 @@ async def test_pipeline_with_parallel_stages(mock_crew_factory): """ Test that Pipeline correctly handles parallel stages. """ - crew1 = mock_crew_factory() - crew2 = mock_crew_factory() - crew3 = mock_crew_factory() + crew1 = mock_crew_factory(name="Crew 1") + crew2 = mock_crew_factory(name="Crew 2") + crew3 = mock_crew_factory(name="Crew 3") pipeline = Pipeline(stages=[crew1, [crew2, crew3]]) input_data = [{"initial": "data"}] pipeline_result = await pipeline.process_runs(input_data) - crew1.kickoff_async.assert_called_once_with( - inputs={"initial": "data", "key": "value"} - ) - crew2.kickoff_async.assert_called_once_with( - inputs={"initial": "data", "key": "value"} - ) - crew3.kickoff_async.assert_called_once_with( - inputs={"initial": "data", "key": "value"} - ) + crew1.kickoff_async.assert_called_once_with(inputs={"initial": "data"}) + + assert len(pipeline_result) == 2 + pipeline_result_1, pipeline_result_2 = pipeline_result + + pipeline_result_1.trace = [ + "Crew 1", + "Crew 2", + ] + pipeline_result_2.trace = [ + "Crew 1", + "Crew 3", + ] + + expected_token_usage = { + "Crew 1": DEFAULT_TOKEN_USAGE, + "Crew 2": DEFAULT_TOKEN_USAGE, + "Crew 3": DEFAULT_TOKEN_USAGE, + } + + assert pipeline_result_1.token_usage == expected_token_usage + assert pipeline_result_2.token_usage == expected_token_usage + + +@pytest.mark.asyncio +async def test_pipeline_with_parallel_stages_end_in_single_stage(mock_crew_factory): + """ + Test that Pipeline correctly handles parallel stages. + """ + crew1 = mock_crew_factory(name="Crew 1") + crew2 = mock_crew_factory(name="Crew 2") + crew3 = mock_crew_factory(name="Crew 3") + crew4 = mock_crew_factory(name="Crew 4") + + pipeline = Pipeline(stages=[crew1, [crew2, crew3], crew4]) + input_data = [{"initial": "data"}] + + pipeline_result = await pipeline.process_runs(input_data) + + crew1.kickoff_async.assert_called_once_with(inputs={"initial": "data"}) assert len(pipeline_result) == 1 - for stage_result in pipeline_result: - assert isinstance(stage_result[0], CrewOutput) + pipeline_result_1 = pipeline_result[0] + + pipeline_result_1.trace = [ + input_data[0], + "Crew 1", + ["Crew 2", "Crew 3"], + "Crew 4", + ] + + expected_token_usage = { + "Crew 1": DEFAULT_TOKEN_USAGE, + "Crew 2": DEFAULT_TOKEN_USAGE, + "Crew 3": DEFAULT_TOKEN_USAGE, + "Crew 4": DEFAULT_TOKEN_USAGE, + } + + assert pipeline_result_1.token_usage == expected_token_usage + + +@pytest.mark.asyncio +async def test_pipeline_with_parallel_stages_multiple_inputs(mock_crew_factory): + # TODO: implement + pass def test_pipeline_rshift_operator(mock_crew_factory): """ Test that the >> operator correctly creates a Pipeline from Crews and lists of Crews. """ - crew1 = mock_crew_factory() - crew2 = mock_crew_factory() - crew3 = mock_crew_factory() + crew1 = mock_crew_factory(name="Crew 1") + crew2 = mock_crew_factory(name="Crew 2") + crew3 = mock_crew_factory(name="Crew 3") # Test single crew addition pipeline = Pipeline(stages=[]) >> crew1 @@ -171,6 +366,75 @@ def test_pipeline_rshift_operator(mock_crew_factory): pipeline >> "not a crew" +@pytest.mark.asyncio +async def test_pipeline_parallel_crews_to_parallel_crews(mock_crew_factory): + """ + Test that feeding parallel crews to parallel crews works correctly. + """ + crew1 = mock_crew_factory(name="Crew 1", output_json_dict={"output1": "crew1"}) + crew2 = mock_crew_factory(name="Crew 2", output_json_dict={"output2": "crew2"}) + crew3 = mock_crew_factory(name="Crew 3", output_json_dict={"output3": "crew3"}) + crew4 = mock_crew_factory(name="Crew 4", output_json_dict={"output4": "crew4"}) + + pipeline = Pipeline(stages=[[crew1, crew2], [crew3, crew4]]) + + input_data = [{"input": "test"}] + pipeline_results = await pipeline.process_runs(input_data) + + assert len(pipeline_results) == 2, "Should have 2 results for final parallel stage" + + pipeline_result_1, pipeline_result_2 = pipeline_results + + # Check the outputs + assert pipeline_result_1.json_dict == {"output3": "crew3"} + assert pipeline_result_2.json_dict == {"output4": "crew4"} + + # Check the traces + expected_traces = [ + [{"input": "test"}, ["Crew 1", "Crew 2"], "Crew 3"], + [{"input": "test"}, ["Crew 1", "Crew 2"], "Crew 4"], + ] + + for result, expected_trace in zip(pipeline_results, expected_traces): + assert result.trace == expected_trace, f"Unexpected trace: {result.trace}" + + +def test_pipeline_double_nesting_not_allowed(mock_crew_factory): + """ + Test that double nesting in pipeline stages is not allowed. + """ + crew1 = mock_crew_factory(name="Crew 1") + crew2 = mock_crew_factory(name="Crew 2") + crew3 = mock_crew_factory(name="Crew 3") + crew4 = mock_crew_factory(name="Crew 4") + + with pytest.raises(ValidationError) as exc_info: + Pipeline(stages=[crew1, [[crew2, crew3], crew4]]) + + error_msg = str(exc_info.value) + print(f"Full error message: {error_msg}") # For debugging + assert ( + "Double nesting is not allowed in pipeline stages" in error_msg + ), f"Unexpected error message: {error_msg}" + + +def test_pipeline_invalid_crew(mock_crew_factory): + """ + Test that non-Crew objects are not allowed in pipeline stages. + """ + crew1 = mock_crew_factory(name="Crew 1") + not_a_crew = "This is not a crew" + + with pytest.raises(ValidationError) as exc_info: + Pipeline(stages=[crew1, not_a_crew]) + + error_msg = str(exc_info.value) + print(f"Full error message: {error_msg}") # For debugging + assert ( + "Expected Crew instance or list of Crews, got " in error_msg + ), f"Unexpected error message: {error_msg}" + + """ TODO: Figure out what is the proper output for a pipeline with multiple stages @@ -178,27 +442,37 @@ Options: - Should the final output only include the last stage's output? - Should the final output include the accumulation of previous stages' outputs? + """ +# TODO: GET HELP FROM TEAM ON THIS ONE @pytest.mark.asyncio async def test_pipeline_data_accumulation(mock_crew_factory): - """ - Test that data is correctly accumulated through the pipeline stages. - """ - crew1 = mock_crew_factory(output_json_dict={"key1": "value1"}) - crew2 = mock_crew_factory(output_json_dict={"key2": "value2"}) + crew1 = mock_crew_factory(name="Crew 1", output_json_dict={"key1": "value1"}) + crew2 = mock_crew_factory(name="Crew 2", output_json_dict={"key2": "value2"}) pipeline = Pipeline(stages=[crew1, crew2]) input_data = [{"initial": "data"}] - pipeline_result = await pipeline.process_runs(input_data) + results = await pipeline.process_runs(input_data) - assert len(pipeline_result) == 1 - print("RESULT: ", pipeline_result) - for run_result in pipeline_result: - print("RUN RESULT: ", run_result) - assert run_result[0].json_dict == { - "initial": "data", - "key1": "value1", - "key2": "value2", - } + # Check that crew1 was called with only the initial input + crew1.kickoff_async.assert_called_once_with(inputs={"initial": "data"}) + + # Check that crew2 was called with the combined input from the initial data and crew1's output + crew2.kickoff_async.assert_called_once_with( + inputs={"initial": "data", "key1": "value1"} + ) + + # Check the final output + assert len(results) == 1 + final_result = results[0] + assert final_result.json_dict == {"key2": "value2"} + + # Check that the trace includes all stages + assert final_result.trace == [{"initial": "data"}, "Crew 1", "Crew 2"] + + # Check that crews_outputs contain the correct information + assert len(final_result.crews_outputs) == 2 + assert final_result.crews_outputs[0].json_dict == {"key1": "value1"} + assert final_result.crews_outputs[1].json_dict == {"key2": "value2"}