Update pipeline to use UsageMetric

This commit is contained in:
Brandon Hancock
2024-07-29 16:00:57 -04:00
parent b9177f2d04
commit 910c8df1a7
2 changed files with 8 additions and 6 deletions

View File

@@ -7,6 +7,7 @@ from pydantic import BaseModel, Field, model_validator
from crewai.crew import Crew from crewai.crew import Crew
from crewai.crews.crew_output import CrewOutput from crewai.crews.crew_output import CrewOutput
from crewai.pipeline.pipeline_run_result import PipelineRunResult from crewai.pipeline.pipeline_run_result import PipelineRunResult
from crewai.types.usage_metrics import UsageMetrics
Trace = Union[Union[str, Dict[str, Any]], List[Union[str, Dict[str, Any]]]] Trace = Union[Union[str, Dict[str, Any]], List[Union[str, Dict[str, Any]]]]
@@ -149,7 +150,7 @@ class Pipeline(BaseModel):
""" """
initial_input = copy.deepcopy(run_input) initial_input = copy.deepcopy(run_input)
current_input = copy.deepcopy(run_input) current_input = copy.deepcopy(run_input)
usage_metrics = {} pipeline_usage_metrics: Dict[str, UsageMetrics] = {}
all_stage_outputs: List[List[CrewOutput]] = [] all_stage_outputs: List[List[CrewOutput]] = []
traces: List[List[Union[str, Dict[str, Any]]]] = [[initial_input]] traces: List[List[Union[str, Dict[str, Any]]]] = [[initial_input]]
@@ -158,13 +159,13 @@ class Pipeline(BaseModel):
stage_outputs, stage_trace = await self._process_stage(stage, stage_input) stage_outputs, stage_trace = await self._process_stage(stage, stage_input)
self._update_metrics_and_input( self._update_metrics_and_input(
usage_metrics, current_input, stage, stage_outputs pipeline_usage_metrics, current_input, stage, stage_outputs
) )
traces.append(stage_trace) traces.append(stage_trace)
all_stage_outputs.append(stage_outputs) all_stage_outputs.append(stage_outputs)
return self._build_pipeline_run_results( return self._build_pipeline_run_results(
all_stage_outputs, traces, usage_metrics all_stage_outputs, traces, pipeline_usage_metrics
) )
async def _process_stage( async def _process_stage(
@@ -221,7 +222,7 @@ class Pipeline(BaseModel):
def _update_metrics_and_input( def _update_metrics_and_input(
self, self,
usage_metrics: Dict[str, Any], usage_metrics: Dict[str, UsageMetrics],
current_input: Dict[str, Any], current_input: Dict[str, Any],
stage: Union[Crew, List[Crew]], stage: Union[Crew, List[Crew]],
outputs: List[CrewOutput], outputs: List[CrewOutput],
@@ -243,7 +244,7 @@ class Pipeline(BaseModel):
self, self,
all_stage_outputs: List[List[CrewOutput]], all_stage_outputs: List[List[CrewOutput]],
traces: List[List[Union[str, Dict[str, Any]]]], traces: List[List[Union[str, Dict[str, Any]]]],
token_usage: Dict[str, Any], token_usage: Dict[str, UsageMetrics],
) -> List[PipelineRunResult]: ) -> List[PipelineRunResult]:
""" """
Builds the results of a pipeline run. Builds the results of a pipeline run.

View File

@@ -5,6 +5,7 @@ from typing import Any, Dict, List, Optional, Union
from pydantic import UUID4, BaseModel, Field from pydantic import UUID4, BaseModel, Field
from crewai.crews.crew_output import CrewOutput from crewai.crews.crew_output import CrewOutput
from crewai.types.usage_metrics import UsageMetrics
class PipelineRunResult(BaseModel): class PipelineRunResult(BaseModel):
@@ -23,7 +24,7 @@ class PipelineRunResult(BaseModel):
description="JSON dict output of the pipeline run", default={} description="JSON dict output of the pipeline run", default={}
) )
token_usage: Dict[str, Any] = Field( token_usage: Dict[str, UsageMetrics] = Field(
description="Token usage for each crew in the run" description="Token usage for each crew in the run"
) )
trace: List[Any] = Field( trace: List[Any] = Field(