mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-10 00:28:31 +00:00
Add developer notes to explain what is going on in pipelines.
This commit is contained in:
@@ -12,12 +12,44 @@ Trace = Union[Union[str, Dict[str, Any]], List[Union[str, Dict[str, Any]]]]
|
||||
|
||||
|
||||
"""
|
||||
Developer Notes:
|
||||
|
||||
This module defines a Pipeline class that represents a sequence of operations (stages)
|
||||
to process inputs. Each stage can be either sequential or parallel, and the pipeline
|
||||
can process multiple runs concurrently.
|
||||
|
||||
Core Loop Explanation:
|
||||
1. The `process_runs` method processes multiple runs in parallel, each going through
|
||||
all pipeline stages.
|
||||
2. The `process_single_run` method handles the processing of a single run through
|
||||
all stages, updating metrics and input data along the way.
|
||||
3. The `_process_stage` method determines whether a stage is sequential or parallel
|
||||
and processes it accordingly.
|
||||
4. The `_process_single_crew` and `_process_parallel_crews` methods handle the
|
||||
execution of single and parallel crew stages.
|
||||
5. The `_update_metrics_and_input` method updates usage metrics and the current input
|
||||
with the outputs from a stage.
|
||||
6. The `_build_pipeline_run_results` method constructs the final results of the
|
||||
pipeline run, including traces and outputs.
|
||||
|
||||
Handling Traces and Crew Outputs:
|
||||
- During the processing of stages, we handle the results (traces and crew outputs)
|
||||
for all stages except the last one differently from the final stage.
|
||||
- For intermediate stages, the primary focus is on passing the input data between stages.
|
||||
This involves merging the output dictionaries from all crews in a stage into a single
|
||||
dictionary and passing it to the next stage. This merged dictionary allows for smooth
|
||||
data flow between stages.
|
||||
- For the final stage, in addition to passing the input data, we also need to prepare
|
||||
the final outputs and traces to be returned as the overall result of the pipeline run.
|
||||
In this case, we do not merge the results, as each result needs to be included
|
||||
separately in its own pipeline run result.
|
||||
|
||||
Pipeline Terminology:
|
||||
Pipeline: The overall structure that defines a sequence of operations.
|
||||
Stage: A distinct part of the pipeline, which can be either sequential or parallel.
|
||||
Run: A specific execution of the pipeline for a given set of inputs, representing a single instance of processing through the pipeline.
|
||||
Branch: Parallel executions within a stage (e.g., concurrent crew operations).
|
||||
Trace: The journey of an individual input through the entire pipeline.
|
||||
- Pipeline: The overall structure that defines a sequence of operations.
|
||||
- Stage: A distinct part of the pipeline, which can be either sequential or parallel.
|
||||
- Run: A specific execution of the pipeline for a given set of inputs, representing a single instance of processing through the pipeline.
|
||||
- Branch: Parallel executions within a stage (e.g., concurrent crew operations).
|
||||
- Trace: The journey of an individual input through the entire pipeline.
|
||||
|
||||
Example pipeline structure:
|
||||
crew1 >> crew2 >> crew3
|
||||
@@ -219,10 +251,3 @@ class Pipeline(BaseModel):
|
||||
raise TypeError(
|
||||
f"Unsupported operand type for >>: '{type(self).__name__}' and '{type(other).__name__}'"
|
||||
)
|
||||
|
||||
|
||||
# Helper function to run the pipeline
|
||||
async def run_pipeline(
|
||||
pipeline: Pipeline, inputs: List[Dict[str, Any]]
|
||||
) -> List[PipelineRunResult]:
|
||||
return await pipeline.process_runs(inputs)
|
||||
|
||||
Reference in New Issue
Block a user