Merge branch 'brandon/cre-117-create-a-type-for-usage-metrics' into feature/procedure_v2

This commit is contained in:
Brandon Hancock
2024-07-29 15:55:11 -04:00
7 changed files with 77 additions and 75 deletions

View File

@@ -1,4 +1,4 @@
from typing import Any, Dict from crewai.types.usage_metrics import UsageMetrics
class TokenProcess: class TokenProcess:
@@ -18,10 +18,10 @@ class TokenProcess:
def sum_successful_requests(self, requests: int): def sum_successful_requests(self, requests: int):
self.successful_requests = self.successful_requests + requests self.successful_requests = self.successful_requests + requests
def get_summary(self) -> Dict[str, Any]: def get_summary(self) -> UsageMetrics:
return { return UsageMetrics(
"total_tokens": self.total_tokens, total_tokens=self.total_tokens,
"prompt_tokens": self.prompt_tokens, prompt_tokens=self.prompt_tokens,
"completion_tokens": self.completion_tokens, completion_tokens=self.completion_tokens,
"successful_requests": self.successful_requests, successful_requests=self.successful_requests,
} )

View File

@@ -32,6 +32,7 @@ from crewai.tasks.conditional_task import ConditionalTask
from crewai.tasks.task_output import TaskOutput from crewai.tasks.task_output import TaskOutput
from crewai.telemetry import Telemetry from crewai.telemetry import Telemetry
from crewai.tools.agent_tools import AgentTools from crewai.tools.agent_tools import AgentTools
from crewai.types.usage_metrics import UsageMetrics
from crewai.utilities import I18N, FileHandler, Logger, RPMController from crewai.utilities import I18N, FileHandler, Logger, RPMController
from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE
from crewai.utilities.evaluators.crew_evaluator_handler import CrewEvaluator from crewai.utilities.evaluators.crew_evaluator_handler import CrewEvaluator
@@ -112,7 +113,7 @@ class Crew(BaseModel):
default={"provider": "openai"}, default={"provider": "openai"},
description="Configuration for the embedder to be used for the crew.", description="Configuration for the embedder to be used for the crew.",
) )
usage_metrics: Optional[dict] = Field( usage_metrics: Optional[UsageMetrics] = Field(
default=None, default=None,
description="Metrics for the LLM usage during all tasks execution.", description="Metrics for the LLM usage during all tasks execution.",
) )
@@ -454,7 +455,7 @@ class Crew(BaseModel):
if self.planning: if self.planning:
self._handle_crew_planning() self._handle_crew_planning()
metrics = [] metrics: List[UsageMetrics] = []
if self.process == Process.sequential: if self.process == Process.sequential:
result = self._run_sequential_process() result = self._run_sequential_process()
@@ -464,11 +465,12 @@ class Crew(BaseModel):
raise NotImplementedError( raise NotImplementedError(
f"The process '{self.process}' is not implemented yet." f"The process '{self.process}' is not implemented yet."
) )
metrics += [agent._token_process.get_summary() for agent in self.agents] metrics += [agent._token_process.get_summary() for agent in self.agents]
self.usage_metrics = { self.usage_metrics = UsageMetrics()
key: sum([m[key] for m in metrics if m is not None]) for key in metrics[0] for metric in metrics:
} self.usage_metrics.add_usage_metrics(metric)
return result return result
@@ -477,12 +479,7 @@ class Crew(BaseModel):
results: List[CrewOutput] = [] results: List[CrewOutput] = []
# Initialize the parent crew's usage metrics # Initialize the parent crew's usage metrics
total_usage_metrics = { total_usage_metrics = UsageMetrics()
"total_tokens": 0,
"prompt_tokens": 0,
"completion_tokens": 0,
"successful_requests": 0,
}
for input_data in inputs: for input_data in inputs:
crew = self.copy() crew = self.copy()
@@ -490,8 +487,7 @@ class Crew(BaseModel):
output = crew.kickoff(inputs=input_data) output = crew.kickoff(inputs=input_data)
if crew.usage_metrics: if crew.usage_metrics:
for key in total_usage_metrics: total_usage_metrics.add_usage_metrics(crew.usage_metrics)
total_usage_metrics[key] += crew.usage_metrics.get(key, 0)
results.append(output) results.append(output)
@@ -520,29 +516,10 @@ class Crew(BaseModel):
results = await asyncio.gather(*tasks) results = await asyncio.gather(*tasks)
total_usage_metrics = { total_usage_metrics = UsageMetrics()
"total_tokens": 0,
"prompt_tokens": 0,
"completion_tokens": 0,
"successful_requests": 0,
}
for crew in crew_copies: for crew in crew_copies:
if crew.usage_metrics: if crew.usage_metrics:
for key in total_usage_metrics: total_usage_metrics.add_usage_metrics(crew.usage_metrics)
total_usage_metrics[key] += crew.usage_metrics.get(key, 0)
self.usage_metrics = total_usage_metrics
total_usage_metrics = {
"total_tokens": 0,
"prompt_tokens": 0,
"completion_tokens": 0,
"successful_requests": 0,
}
for crew in crew_copies:
if crew.usage_metrics:
for key in total_usage_metrics:
total_usage_metrics[key] += crew.usage_metrics.get(key, 0)
self.usage_metrics = total_usage_metrics self.usage_metrics = total_usage_metrics
self._task_output_handler.reset() self._task_output_handler.reset()
@@ -933,25 +910,18 @@ class Crew(BaseModel):
) )
self._telemetry.end_crew(self, final_string_output) self._telemetry.end_crew(self, final_string_output)
def calculate_usage_metrics(self) -> Dict[str, int]: def calculate_usage_metrics(self) -> UsageMetrics:
"""Calculates and returns the usage metrics.""" """Calculates and returns the usage metrics."""
total_usage_metrics = { total_usage_metrics = UsageMetrics()
"total_tokens": 0,
"prompt_tokens": 0,
"completion_tokens": 0,
"successful_requests": 0,
}
for agent in self.agents: for agent in self.agents:
if hasattr(agent, "_token_process"): if hasattr(agent, "_token_process"):
token_sum = agent._token_process.get_summary() token_sum = agent._token_process.get_summary()
for key in total_usage_metrics: total_usage_metrics.add_usage_metrics(token_sum)
total_usage_metrics[key] += token_sum.get(key, 0)
if self.manager_agent and hasattr(self.manager_agent, "_token_process"): if self.manager_agent and hasattr(self.manager_agent, "_token_process"):
token_sum = self.manager_agent._token_process.get_summary() token_sum = self.manager_agent._token_process.get_summary()
for key in total_usage_metrics: total_usage_metrics.add_usage_metrics(token_sum)
total_usage_metrics[key] += token_sum.get(key, 0)
return total_usage_metrics return total_usage_metrics

View File

@@ -5,6 +5,7 @@ from pydantic import BaseModel, Field
from crewai.tasks.output_format import OutputFormat from crewai.tasks.output_format import OutputFormat
from crewai.tasks.task_output import TaskOutput from crewai.tasks.task_output import TaskOutput
from crewai.types.usage_metrics import UsageMetrics
class CrewOutput(BaseModel): class CrewOutput(BaseModel):
@@ -20,9 +21,7 @@ class CrewOutput(BaseModel):
tasks_output: list[TaskOutput] = Field( tasks_output: list[TaskOutput] = Field(
description="Output of each task", default=[] description="Output of each task", default=[]
) )
token_usage: Dict[str, Any] = Field( token_usage: UsageMetrics = Field(description="Processed token summary", default={})
description="Processed token summary", default={}
)
@property @property
def json(self) -> Optional[str]: def json(self) -> Optional[str]:

View File

View File

@@ -0,0 +1,36 @@
from pydantic import BaseModel, Field
class UsageMetrics(BaseModel):
"""
Model to track usage metrics for the crew's execution.
Attributes:
total_tokens: Total number of tokens used.
prompt_tokens: Number of tokens used in prompts.
completion_tokens: Number of tokens used in completions.
successful_requests: Number of successful requests made.
"""
total_tokens: int = Field(default=0, description="Total number of tokens used.")
prompt_tokens: int = Field(
default=0, description="Number of tokens used in prompts."
)
completion_tokens: int = Field(
default=0, description="Number of tokens used in completions."
)
successful_requests: int = Field(
default=0, description="Number of successful requests made."
)
def add_usage_metrics(self, usage_metrics: "UsageMetrics"):
"""
Add the usage metrics from another UsageMetrics object.
Args:
usage_metrics (UsageMetrics): The usage metrics to add.
"""
self.total_tokens += usage_metrics.total_tokens
self.prompt_tokens += usage_metrics.prompt_tokens
self.completion_tokens += usage_metrics.completion_tokens
self.successful_requests += usage_metrics.successful_requests

View File

@@ -18,6 +18,7 @@ from crewai.task import Task
from crewai.tasks.conditional_task import ConditionalTask from crewai.tasks.conditional_task import ConditionalTask
from crewai.tasks.output_format import OutputFormat from crewai.tasks.output_format import OutputFormat
from crewai.tasks.task_output import TaskOutput from crewai.tasks.task_output import TaskOutput
from crewai.types.usage_metrics import UsageMetrics
from crewai.utilities import Logger, RPMController from crewai.utilities import Logger, RPMController
from crewai.utilities.task_output_storage_handler import TaskOutputStorageHandler from crewai.utilities.task_output_storage_handler import TaskOutputStorageHandler
@@ -597,14 +598,10 @@ def test_crew_kickoff_usage_metrics():
assert len(results) == len(inputs) assert len(results) == len(inputs)
for result in results: for result in results:
# Assert that all required keys are in usage_metrics and their values are not None # Assert that all required keys are in usage_metrics and their values are not None
for key in [ assert result.token_usage.total_tokens > 0
"total_tokens", assert result.token_usage.prompt_tokens > 0
"prompt_tokens", assert result.token_usage.completion_tokens > 0
"completion_tokens", assert result.token_usage.successful_requests > 0
"successful_requests",
]:
assert key in result.token_usage
assert result.token_usage[key] > 0
def test_agents_rpm_is_never_set_if_crew_max_RPM_is_not_set(): def test_agents_rpm_is_never_set_if_crew_max_RPM_is_not_set():
@@ -1318,12 +1315,12 @@ def test_agent_usage_metrics_are_captured_for_hierarchical_process():
print(crew.usage_metrics) print(crew.usage_metrics)
assert crew.usage_metrics == { assert crew.usage_metrics == UsageMetrics(
"total_tokens": 219, total_tokens=219,
"prompt_tokens": 201, prompt_tokens=201,
"completion_tokens": 18, completion_tokens=18,
"successful_requests": 1, successful_requests=1,
} )
@pytest.mark.vcr(filter_headers=["authorization"]) @pytest.mark.vcr(filter_headers=["authorization"])

View File

@@ -10,13 +10,12 @@ from crewai.pipeline.pipeline_run_result import PipelineRunResult
from crewai.process import Process from crewai.process import Process
from crewai.task import Task from crewai.task import Task
from crewai.tasks.task_output import TaskOutput from crewai.tasks.task_output import TaskOutput
from crewai.types.usage_metrics import UsageMetrics
from pydantic import BaseModel, ValidationError from pydantic import BaseModel, ValidationError
DEFAULT_TOKEN_USAGE = { DEFAULT_TOKEN_USAGE = UsageMetrics(
"total_tokens": 100, total_tokens=100, prompt_tokens=50, completion_tokens=50, successful_requests=3
"prompt_tokens": 50, )
"completion_tokens": 50,
}
@pytest.fixture @pytest.fixture
@@ -443,6 +442,7 @@ Options:
- Should the final output include the accumulation of previous stages' outputs? - Should the final output include the accumulation of previous stages' outputs?
""" """
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_pipeline_data_accumulation(mock_crew_factory): async def test_pipeline_data_accumulation(mock_crew_factory):
crew1 = mock_crew_factory(name="Crew 1", output_json_dict={"key1": "value1"}) crew1 = mock_crew_factory(name="Crew 1", output_json_dict={"key1": "value1"})