diff --git a/src/crewai/pipeline/pipeline.py b/src/crewai/pipeline/pipeline.py index fa93203ff..a83e438de 100644 --- a/src/crewai/pipeline/pipeline.py +++ b/src/crewai/pipeline/pipeline.py @@ -12,12 +12,44 @@ Trace = Union[Union[str, Dict[str, Any]], List[Union[str, Dict[str, Any]]]] """ +Developer Notes: + +This module defines a Pipeline class that represents a sequence of operations (stages) +to process inputs. Each stage can be either sequential or parallel, and the pipeline +can process multiple runs concurrently. + +Core Loop Explanation: +1. The `process_runs` method processes multiple runs in parallel, each going through + all pipeline stages. +2. The `process_single_run` method handles the processing of a single run through + all stages, updating metrics and input data along the way. +3. The `_process_stage` method determines whether a stage is sequential or parallel + and processes it accordingly. +4. The `_process_single_crew` and `_process_parallel_crews` methods handle the + execution of single and parallel crew stages. +5. The `_update_metrics_and_input` method updates usage metrics and the current input + with the outputs from a stage. +6. The `_build_pipeline_run_results` method constructs the final results of the + pipeline run, including traces and outputs. + +Handling Traces and Crew Outputs: +- During the processing of stages, we handle the results (traces and crew outputs) + for all stages except the last one differently from the final stage. +- For intermediate stages, the primary focus is on passing the input data between stages. + This involves merging the output dictionaries from all crews in a stage into a single + dictionary and passing it to the next stage. This merged dictionary allows for smooth + data flow between stages. +- For the final stage, in addition to passing the input data, we also need to prepare + the final outputs and traces to be returned as the overall result of the pipeline run. + In this case, we do not merge the results, as each result needs to be included + separately in its own pipeline run result. + Pipeline Terminology: -Pipeline: The overall structure that defines a sequence of operations. -Stage: A distinct part of the pipeline, which can be either sequential or parallel. -Run: A specific execution of the pipeline for a given set of inputs, representing a single instance of processing through the pipeline. -Branch: Parallel executions within a stage (e.g., concurrent crew operations). -Trace: The journey of an individual input through the entire pipeline. +- Pipeline: The overall structure that defines a sequence of operations. +- Stage: A distinct part of the pipeline, which can be either sequential or parallel. +- Run: A specific execution of the pipeline for a given set of inputs, representing a single instance of processing through the pipeline. +- Branch: Parallel executions within a stage (e.g., concurrent crew operations). +- Trace: The journey of an individual input through the entire pipeline. Example pipeline structure: crew1 >> crew2 >> crew3 @@ -219,10 +251,3 @@ class Pipeline(BaseModel): raise TypeError( f"Unsupported operand type for >>: '{type(self).__name__}' and '{type(other).__name__}'" ) - - -# Helper function to run the pipeline -async def run_pipeline( - pipeline: Pipeline, inputs: List[Dict[str, Any]] -) -> List[PipelineRunResult]: - return await pipeline.process_runs(inputs)