From 910c8df1a76d8f2033c10dd00409b66d211a6466 Mon Sep 17 00:00:00 2001 From: Brandon Hancock Date: Mon, 29 Jul 2024 16:00:57 -0400 Subject: [PATCH] Update pipeline to use UsageMetric --- src/crewai/pipeline/pipeline.py | 11 ++++++----- src/crewai/pipeline/pipeline_run_result.py | 3 ++- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/src/crewai/pipeline/pipeline.py b/src/crewai/pipeline/pipeline.py index 6d2811774..9065de902 100644 --- a/src/crewai/pipeline/pipeline.py +++ b/src/crewai/pipeline/pipeline.py @@ -7,6 +7,7 @@ from pydantic import BaseModel, Field, model_validator from crewai.crew import Crew from crewai.crews.crew_output import CrewOutput from crewai.pipeline.pipeline_run_result import PipelineRunResult +from crewai.types.usage_metrics import UsageMetrics Trace = Union[Union[str, Dict[str, Any]], List[Union[str, Dict[str, Any]]]] @@ -149,7 +150,7 @@ class Pipeline(BaseModel): """ initial_input = copy.deepcopy(run_input) current_input = copy.deepcopy(run_input) - usage_metrics = {} + pipeline_usage_metrics: Dict[str, UsageMetrics] = {} all_stage_outputs: List[List[CrewOutput]] = [] traces: List[List[Union[str, Dict[str, Any]]]] = [[initial_input]] @@ -158,13 +159,13 @@ class Pipeline(BaseModel): stage_outputs, stage_trace = await self._process_stage(stage, stage_input) self._update_metrics_and_input( - usage_metrics, current_input, stage, stage_outputs + pipeline_usage_metrics, current_input, stage, stage_outputs ) traces.append(stage_trace) all_stage_outputs.append(stage_outputs) return self._build_pipeline_run_results( - all_stage_outputs, traces, usage_metrics + all_stage_outputs, traces, pipeline_usage_metrics ) async def _process_stage( @@ -221,7 +222,7 @@ class Pipeline(BaseModel): def _update_metrics_and_input( self, - usage_metrics: Dict[str, Any], + usage_metrics: Dict[str, UsageMetrics], current_input: Dict[str, Any], stage: Union[Crew, List[Crew]], outputs: List[CrewOutput], @@ -243,7 +244,7 @@ class Pipeline(BaseModel): self, all_stage_outputs: List[List[CrewOutput]], traces: List[List[Union[str, Dict[str, Any]]]], - token_usage: Dict[str, Any], + token_usage: Dict[str, UsageMetrics], ) -> List[PipelineRunResult]: """ Builds the results of a pipeline run. diff --git a/src/crewai/pipeline/pipeline_run_result.py b/src/crewai/pipeline/pipeline_run_result.py index 172a8f545..576cd9949 100644 --- a/src/crewai/pipeline/pipeline_run_result.py +++ b/src/crewai/pipeline/pipeline_run_result.py @@ -5,6 +5,7 @@ from typing import Any, Dict, List, Optional, Union from pydantic import UUID4, BaseModel, Field from crewai.crews.crew_output import CrewOutput +from crewai.types.usage_metrics import UsageMetrics class PipelineRunResult(BaseModel): @@ -23,7 +24,7 @@ class PipelineRunResult(BaseModel): description="JSON dict output of the pipeline run", default={} ) - token_usage: Dict[str, Any] = Field( + token_usage: Dict[str, UsageMetrics] = Field( description="Token usage for each crew in the run" ) trace: List[Any] = Field(