Rename variables based on joaos feedback

This commit is contained in:
Brandon Hancock
2024-07-30 09:54:11 -04:00
parent e1a03ad97d
commit 072044c537
4 changed files with 37 additions and 37 deletions

View File

@@ -6,7 +6,7 @@ from pydantic import BaseModel, Field, model_validator
from crewai.crew import Crew
from crewai.crews.crew_output import CrewOutput
from crewai.pipeline.pipeline_run_result import PipelineRunResult
from crewai.pipeline.pipeline_kickoff_result import PipelineKickoffResult
from crewai.types.usage_metrics import UsageMetrics
Trace = Union[Union[str, Dict[str, Any]], List[Union[str, Dict[str, Any]]]]
@@ -17,12 +17,12 @@ Developer Notes:
This module defines a Pipeline class that represents a sequence of operations (stages)
to process inputs. Each stage can be either sequential or parallel, and the pipeline
can process multiple runs concurrently.
can process multiple kickoffs concurrently.
Core Loop Explanation:
1. The `process_runs` method processes multiple runs in parallel, each going through
1. The `process_kickoffs` method processes multiple kickoffs in parallel, each going through
all pipeline stages.
2. The `process_single_run` method handles the processing of a single run through
2. The `process_single_kickoff` method handles the processing of a single kickouff through
all stages, updating metrics and input data along the way.
3. The `_process_stage` method determines whether a stage is sequential or parallel
and processes it accordingly.
@@ -30,8 +30,8 @@ Core Loop Explanation:
execution of single and parallel crew stages.
5. The `_update_metrics_and_input` method updates usage metrics and the current input
with the outputs from a stage.
6. The `_build_pipeline_run_results` method constructs the final results of the
pipeline run, including traces and outputs.
6. The `_build_pipeline_kickoff_results` method constructs the final results of the
pipeline kickoff, including traces and outputs.
Handling Traces and Crew Outputs:
- During the processing of stages, we handle the results (traces and crew outputs)
@@ -41,14 +41,14 @@ Handling Traces and Crew Outputs:
dictionary and passing it to the next stage. This merged dictionary allows for smooth
data flow between stages.
- For the final stage, in addition to passing the input data, we also need to prepare
the final outputs and traces to be returned as the overall result of the pipeline run.
the final outputs and traces to be returned as the overall result of the pipeline kickoff.
In this case, we do not merge the results, as each result needs to be included
separately in its own pipeline run result.
separately in its own pipeline kickoff result.
Pipeline Terminology:
- Pipeline: The overall structure that defines a sequence of operations.
- Stage: A distinct part of the pipeline, which can be either sequential or parallel.
- Run: A specific execution of the pipeline for a given set of inputs, representing a single instance of processing through the pipeline.
- Kickoff: A specific execution of the pipeline for a given set of inputs, representing a single instance of processing through the pipeline.
- Branch: Parallel executions within a stage (e.g., concurrent crew operations).
- Trace: The journey of an individual input through the entire pipeline.
@@ -60,8 +60,8 @@ This represents a pipeline with three sequential stages:
2. crew2 is the second stage, which takes the output from crew1 as its input, processes it, and passes its output to crew3.
3. crew3 is the final stage, which takes the output from crew2 as its input and produces the final output of the pipeline.
Each input creates its own run, flowing through all stages of the pipeline.
Multiple runs can be processed concurrently, each following the defined pipeline structure.
Each input creates its own kickoff, flowing through all stages of the pipeline.
Multiple kickoffss can be processed concurrently, each following the defined pipeline structure.
Another example pipeline structure:
crew1 >> [crew2, crew3] >> crew4
@@ -71,8 +71,8 @@ This represents a pipeline with three stages:
2. A parallel stage with two branches (crew2 and crew3 executing concurrently)
3. Another sequential stage (crew4)
Each input creates its own run, flowing through all stages of the pipeline.
Multiple runs can be processed concurrently, each following the defined pipeline structure.
Each input creates its own kickoff, flowing through all stages of the pipeline.
Multiple kickoffs can be processed concurrently, each following the defined pipeline structure.
"""
@@ -111,22 +111,22 @@ class Pipeline(BaseModel):
return values
async def kickoff(
self, run_inputs: List[Dict[str, Any]]
) -> List[PipelineRunResult]:
self, inputs: List[Dict[str, Any]]
) -> List[PipelineKickoffResult]:
"""
Processes multiple runs in parallel, each going through all pipeline stages.
Args:
run_inputs (List[Dict[str, Any]]): List of inputs for each run.
inputs (List[Dict[str, Any]]): List of inputs for each run.
Returns:
List[PipelineRunResult]: List of results from each run.
List[PipelineKickoffResult]: List of results from each run.
"""
pipeline_results: List[PipelineRunResult] = []
pipeline_results: List[PipelineKickoffResult] = []
# Process all runs in parallel
all_run_results = await asyncio.gather(
*(self.process_single_kickoff(input_data) for input_data in run_inputs)
*(self.process_single_kickoff(input_data) for input_data in inputs)
)
# Flatten the list of lists into a single list of results
@@ -137,19 +137,19 @@ class Pipeline(BaseModel):
return pipeline_results
async def process_single_kickoff(
self, run_input: Dict[str, Any]
) -> List[PipelineRunResult]:
self, kickoff_input: Dict[str, Any]
) -> List[PipelineKickoffResult]:
"""
Processes a single run through all pipeline stages.
Args:
run_input (Dict[str, Any]): The input for the run.
input (Dict[str, Any]): The input for the run.
Returns:
List[PipelineRunResult]: The results of processing the run.
List[PipelineKickoffResult]: The results of processing the run.
"""
initial_input = copy.deepcopy(run_input)
current_input = copy.deepcopy(run_input)
initial_input = copy.deepcopy(kickoff_input)
current_input = copy.deepcopy(kickoff_input)
pipeline_usage_metrics: Dict[str, UsageMetrics] = {}
all_stage_outputs: List[List[CrewOutput]] = []
traces: List[List[Union[str, Dict[str, Any]]]] = [[initial_input]]
@@ -164,7 +164,7 @@ class Pipeline(BaseModel):
traces.append(stage_trace)
all_stage_outputs.append(stage_outputs)
return self._build_pipeline_run_results(
return self._build_pipeline_kickoff_results(
all_stage_outputs, traces, pipeline_usage_metrics
)
@@ -240,12 +240,12 @@ class Pipeline(BaseModel):
usage_metrics[crew.name or str(crew.id)] = output.token_usage
current_input.update(output.to_dict())
def _build_pipeline_run_results(
def _build_pipeline_kickoff_results(
self,
all_stage_outputs: List[List[CrewOutput]],
traces: List[List[Union[str, Dict[str, Any]]]],
token_usage: Dict[str, UsageMetrics],
) -> List[PipelineRunResult]:
) -> List[PipelineKickoffResult]:
"""
Builds the results of a pipeline run.
@@ -255,13 +255,13 @@ class Pipeline(BaseModel):
token_usage (Dict[str, Any]): Token usage metrics.
Returns:
List[PipelineRunResult]: The results of the pipeline run.
List[PipelineKickoffResult]: The results of the pipeline run.
"""
formatted_traces = self._format_traces(traces)
formatted_crew_outputs = self._format_crew_outputs(all_stage_outputs)
return [
PipelineRunResult(
PipelineKickoffResult(
token_usage=token_usage,
trace=formatted_trace,
raw=crews_outputs[-1].raw,

View File

@@ -8,7 +8,7 @@ from crewai.crews.crew_output import CrewOutput
from crewai.types.usage_metrics import UsageMetrics
class PipelineRunResult(BaseModel):
class PipelineKickoffResult(BaseModel):
"""Class that represents the result of a pipeline run."""
id: UUID4 = Field(

View File

@@ -3,7 +3,7 @@ from typing import List
from pydantic import UUID4, BaseModel, Field
from crewai.pipeline.pipeline_run_result import PipelineRunResult
from crewai.pipeline.pipeline_kickoff_result import PipelineKickoffResult
class PipelineOutput(BaseModel):
@@ -12,9 +12,9 @@ class PipelineOutput(BaseModel):
frozen=True,
description="Unique identifier for the object, not set by user.",
)
run_results: List[PipelineRunResult] = Field(
run_results: List[PipelineKickoffResult] = Field(
description="List of results for each run through the pipeline", default=[]
)
def add_run_result(self, result: PipelineRunResult):
def add_run_result(self, result: PipelineKickoffResult):
self.run_results.append(result)

View File

@@ -6,7 +6,7 @@ from crewai.agent import Agent
from crewai.crew import Crew
from crewai.crews.crew_output import CrewOutput
from crewai.pipeline.pipeline import Pipeline
from crewai.pipeline.pipeline_run_result import PipelineRunResult
from crewai.pipeline.pipeline_kickoff_result import PipelineKickoffResult
from crewai.process import Process
from crewai.task import Task
from crewai.tasks.task_output import TaskOutput
@@ -108,7 +108,7 @@ async def test_pipeline_process_streams_single_input(mock_crew_factory):
mock_crew.kickoff_async.assert_called_once_with(inputs={"key": "value"})
for pipeline_result in pipeline_results:
assert isinstance(pipeline_result, PipelineRunResult)
assert isinstance(pipeline_result, PipelineKickoffResult)
assert pipeline_result.raw == "Test output"
assert len(pipeline_result.crews_outputs) == 1
print("pipeline_result.token_usage", pipeline_result.token_usage)
@@ -194,7 +194,7 @@ async def test_pipeline_process_streams_single_input_pydantic_output(mock_crew_f
print("pipeline_result.trace", pipeline_result.trace)
assert isinstance(pipeline_result, PipelineRunResult)
assert isinstance(pipeline_result, PipelineKickoffResult)
assert pipeline_result.raw == "Test output"
assert len(pipeline_result.crews_outputs) == 1
assert pipeline_result.token_usage == {crew_name: DEFAULT_TOKEN_USAGE}