diff --git a/src/crewai/pipeline/pipeline.py b/src/crewai/pipeline/pipeline.py index 59f15b8b5..daca7a51d 100644 --- a/src/crewai/pipeline/pipeline.py +++ b/src/crewai/pipeline/pipeline.py @@ -1,11 +1,12 @@ import asyncio +from collections import deque from typing import Any, Dict, List, Union from pydantic import BaseModel, Field from crewai.crew import Crew from crewai.crews.crew_output import CrewOutput -from crewai.pipeline.pipeline_output import PipelineOutput +from crewai.pipeline.pipeline_run_result import PipelineRunResult """ Pipeline Terminology: @@ -38,6 +39,8 @@ Each input creates its own run, flowing through all stages of the pipeline. Multiple runs can be processed concurrently, each following the defined pipeline structure. """ +Trace = Union[Union[str, Dict[str, Any]], List[Union[str, Dict[str, Any]]]] + class Pipeline(BaseModel): stages: List[Union[Crew, List[Crew]]] = Field( @@ -46,43 +49,130 @@ class Pipeline(BaseModel): async def process_runs( self, run_inputs: List[Dict[str, Any]] - ) -> List[List[CrewOutput]]: + ) -> List[PipelineRunResult]: """ Process multiple runs in parallel, with each run going through all stages. """ - pipeline_output = PipelineOutput() + pipeline_results = [] - async def process_single_run(run_input: Dict[str, Any]) -> List[CrewOutput]: - print("current_input in run", run_input) - stage_outputs = [] + def format_traces( + traces: List[List[Union[str, Dict[str, Any]]]], + ) -> List[List[Trace]]: + formatted_traces: List[Trace] = [] + + # Process all traces except the last one + for trace in traces[:-1]: + if len(trace) == 1: + formatted_traces.append(trace[0]) + else: + formatted_traces.append(trace) + + # Handle the final stage trace + traces_to_return: List[List[Trace]] = [] + + final_trace = traces[-1] + if len(final_trace) == 1: + formatted_traces.append(final_trace) + traces_to_return.append(formatted_traces) + else: + for trace in final_trace: + copied_traces = formatted_traces.copy() + copied_traces.append(trace) + traces_to_return.append(copied_traces) + + return traces_to_return + + def build_pipeline_run_results( + final_stage_outputs: List[CrewOutput], + traces: List[List[Union[str, Dict[str, Any]]]], + token_usage: Dict[str, Any], + ) -> List[PipelineRunResult]: + """ + Build PipelineRunResult objects from the final stage outputs and traces. + """ + + pipeline_run_results: List[PipelineRunResult] = [] + + # Format traces + formatted_traces = format_traces(traces) + + for output, formatted_trace in zip(final_stage_outputs, formatted_traces): + # FORMAT TRACE + + new_pipeline_run_result = PipelineRunResult( + final_output=output, + token_usage=token_usage, + trace=formatted_trace, + ) + + pipeline_run_results.append(new_pipeline_run_result) + + return pipeline_run_results + + async def process_single_run( + run_input: Dict[str, Any] + ) -> List[PipelineRunResult]: + stages_queue = deque(self.stages) + usage_metrics = {} + stage_outputs: List[CrewOutput] = [] + traces: List[List[Union[str, Dict[str, Any]]]] = [[run_input]] + + stage = None + while stages_queue: + stage = stages_queue.popleft() - for stage in self.stages: if isinstance(stage, Crew): # Process single crew - stage_output = await stage.kickoff_async(inputs=run_input) - stage_outputs = [stage_output] + output = await stage.kickoff_async(inputs=run_input) + # Update usage metrics and setup inputs for next stage + usage_metrics[stage.name] = output.token_usage + run_input.update(output.to_dict()) + # Update traces for single crew stage + traces.append([stage.name or "No name"]) + # Store output for final results + stage_outputs = [output] + else: # Process each crew in parallel parallel_outputs = await asyncio.gather( *[crew.kickoff_async(inputs=run_input) for crew in stage] ) + # Update usage metrics and setup inputs for next stage + for crew, output in zip(stage, parallel_outputs): + usage_metrics[crew.name] = output.token_usage + run_input.update(output.to_dict()) + # Update traces for parallel stage + traces.append([crew.name or "No name" for crew in stage]) + # Store output for final results stage_outputs = parallel_outputs - # Convert all CrewOutputs from stage into a dictionary for next stage - # and update original run_input dictionary with new values - stage_output_dicts = [output.to_dict() for output in stage_outputs] - for stage_dict in stage_output_dicts: - run_input.update(stage_dict) - print("UPDATING run_input - new values:", run_input) + print("STAGE OUTPUTS: ", stage_outputs) + print("TRACES: ", traces) + print("TOKEN USAGE: ", usage_metrics) - # Return all CrewOutputs from this run - return stage_outputs + # Build final pipeline run results + final_results = build_pipeline_run_results( + final_stage_outputs=stage_outputs, + traces=traces, + token_usage=usage_metrics, + ) + print("FINAL RESULTS: ", final_results) + + # prepare traces for final results + return final_results # Process all runs in parallel - return await asyncio.gather( + all_run_results = await asyncio.gather( *(process_single_run(input_data) for input_data in run_inputs) ) + # Flatten the list of lists into a single list of results + pipeline_results.extend( + result for run_result in all_run_results for result in run_result + ) + + return pipeline_results + def __rshift__(self, other: Any) -> "Pipeline": """ Implements the >> operator to add another Stage (Crew or List[Crew]) to an existing Pipeline. @@ -100,5 +190,5 @@ class Pipeline(BaseModel): # Helper function to run the pipeline async def run_pipeline( pipeline: Pipeline, inputs: List[Dict[str, Any]] -) -> List[List[CrewOutput]]: +) -> List[PipelineRunResult]: return await pipeline.process_runs(inputs) diff --git a/src/crewai/pipeline/pipeline_output.py b/src/crewai/pipeline/pipeline_output.py index 242799a12..06947573b 100644 --- a/src/crewai/pipeline/pipeline_output.py +++ b/src/crewai/pipeline/pipeline_output.py @@ -1,21 +1,14 @@ -from typing import Any, Dict, List +from typing import List from pydantic import BaseModel, Field -from crewai.crews.crew_output import CrewOutput +from crewai.pipeline.pipeline_run_result import PipelineRunResult class PipelineOutput(BaseModel): - final_outputs: List[CrewOutput] = Field( - description="List of final outputs from the last crew in the pipeline", - default=[], - ) - token_usage: List[List[Dict[str, Any]]] = Field( - description="Token usage for each crew in each stream", default=[] + run_results: List[PipelineRunResult] = Field( + description="List of results for each run through the pipeline", default=[] ) - def add_final_output(self, output: CrewOutput): - self.final_outputs.append(output) - - def add_token_usage(self, usage: List[Dict[str, Any]]): - self.token_usage.append(usage) + def add_run_result(self, result: PipelineRunResult): + self.run_results.append(result) diff --git a/src/crewai/pipeline/pipeline_run_result.py b/src/crewai/pipeline/pipeline_run_result.py new file mode 100644 index 000000000..38c64bbc8 --- /dev/null +++ b/src/crewai/pipeline/pipeline_run_result.py @@ -0,0 +1,22 @@ +from typing import Any, Dict, List + +from pydantic import BaseModel, Field + +from crewai.crews.crew_output import CrewOutput + + +class PipelineRunResult(BaseModel): + final_output: CrewOutput = Field( + description="Final output from the last crew in the run" + ) + token_usage: Dict[str, Any] = Field( + description="Token usage for each crew in the run" + ) + trace: List[Any] = Field( + description="Trace of the journey of inputs through the run" + ) + # TODO: Should we store all outputs from crews along the way? + crews_output: List[CrewOutput] = Field( + description="Output from each crew in the run", + default=[], + ) diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index 1d4047d64..8615391ac 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -5,6 +5,7 @@ from crewai.agent import Agent from crewai.crew import Crew from crewai.crews.crew_output import CrewOutput from crewai.pipeline.pipeline import Pipeline +from crewai.pipeline.pipeline_run_result import PipelineRunResult from crewai.process import Process from crewai.task import Task from crewai.tasks.task_output import TaskOutput @@ -83,16 +84,20 @@ async def test_pipeline_process_streams_single_input(mock_crew_factory): pipeline_result = await pipeline.process_runs(input_data) mock_crew.kickoff_async.assert_called_once_with(inputs={"key": "value"}) - for stream_result in pipeline_result: - assert isinstance(stream_result[0], CrewOutput) - assert stream_result[0].raw == "Test output" - assert len(stream_result[0].tasks_output) == 1 - assert stream_result[0].tasks_output[0].raw == "Task output" - assert stream_result[0].token_usage == { - "total_tokens": 100, - "prompt_tokens": 50, - "completion_tokens": 50, - } + + for pipeline_line_result in pipeline_result: + assert isinstance(pipeline_line_result, PipelineRunResult) + + # for stream_result in pipeline_result: + # assert isinstance(stream_result[0], CrewOutput) + # assert stream_result[0].raw == "Test output" + # assert len(stream_result[0].tasks_output) == 1 + # assert stream_result[0].tasks_output[0].raw == "Task output" + # assert stream_result[0].token_usage == { + # "total_tokens": 100, + # "prompt_tokens": 50, + # "completion_tokens": 50, + # } @pytest.mark.asyncio