diff --git a/src/crewai/agents/agent_builder/utilities/base_token_process.py b/src/crewai/agents/agent_builder/utilities/base_token_process.py index ce0b446d3..e971d018e 100644 --- a/src/crewai/agents/agent_builder/utilities/base_token_process.py +++ b/src/crewai/agents/agent_builder/utilities/base_token_process.py @@ -1,4 +1,4 @@ -from typing import Any, Dict +from crewai.types.usage_metrics import UsageMetrics class TokenProcess: @@ -18,10 +18,10 @@ class TokenProcess: def sum_successful_requests(self, requests: int): self.successful_requests = self.successful_requests + requests - def get_summary(self) -> Dict[str, Any]: - return { - "total_tokens": self.total_tokens, - "prompt_tokens": self.prompt_tokens, - "completion_tokens": self.completion_tokens, - "successful_requests": self.successful_requests, - } + def get_summary(self) -> UsageMetrics: + return UsageMetrics( + total_tokens=self.total_tokens, + prompt_tokens=self.prompt_tokens, + completion_tokens=self.completion_tokens, + successful_requests=self.successful_requests, + ) diff --git a/src/crewai/crew.py b/src/crewai/crew.py index 4e306b03b..f47ebc2c3 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -32,6 +32,7 @@ from crewai.tasks.conditional_task import ConditionalTask from crewai.tasks.task_output import TaskOutput from crewai.telemetry import Telemetry from crewai.tools.agent_tools import AgentTools +from crewai.types.usage_metrics import UsageMetrics from crewai.utilities import I18N, FileHandler, Logger, RPMController from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE from crewai.utilities.evaluators.crew_evaluator_handler import CrewEvaluator @@ -112,7 +113,7 @@ class Crew(BaseModel): default={"provider": "openai"}, description="Configuration for the embedder to be used for the crew.", ) - usage_metrics: Optional[dict] = Field( + usage_metrics: Optional[UsageMetrics] = Field( default=None, description="Metrics for the LLM usage during all tasks execution.", ) @@ -454,7 +455,7 @@ class Crew(BaseModel): if self.planning: self._handle_crew_planning() - metrics = [] + metrics: List[UsageMetrics] = [] if self.process == Process.sequential: result = self._run_sequential_process() @@ -464,11 +465,12 @@ class Crew(BaseModel): raise NotImplementedError( f"The process '{self.process}' is not implemented yet." ) + metrics += [agent._token_process.get_summary() for agent in self.agents] - self.usage_metrics = { - key: sum([m[key] for m in metrics if m is not None]) for key in metrics[0] - } + self.usage_metrics = UsageMetrics() + for metric in metrics: + self.usage_metrics.add_usage_metrics(metric) return result @@ -477,12 +479,7 @@ class Crew(BaseModel): results: List[CrewOutput] = [] # Initialize the parent crew's usage metrics - total_usage_metrics = { - "total_tokens": 0, - "prompt_tokens": 0, - "completion_tokens": 0, - "successful_requests": 0, - } + total_usage_metrics = UsageMetrics() for input_data in inputs: crew = self.copy() @@ -490,8 +487,7 @@ class Crew(BaseModel): output = crew.kickoff(inputs=input_data) if crew.usage_metrics: - for key in total_usage_metrics: - total_usage_metrics[key] += crew.usage_metrics.get(key, 0) + total_usage_metrics.add_usage_metrics(crew.usage_metrics) results.append(output) @@ -520,29 +516,10 @@ class Crew(BaseModel): results = await asyncio.gather(*tasks) - total_usage_metrics = { - "total_tokens": 0, - "prompt_tokens": 0, - "completion_tokens": 0, - "successful_requests": 0, - } + total_usage_metrics = UsageMetrics() for crew in crew_copies: if crew.usage_metrics: - for key in total_usage_metrics: - total_usage_metrics[key] += crew.usage_metrics.get(key, 0) - - self.usage_metrics = total_usage_metrics - - total_usage_metrics = { - "total_tokens": 0, - "prompt_tokens": 0, - "completion_tokens": 0, - "successful_requests": 0, - } - for crew in crew_copies: - if crew.usage_metrics: - for key in total_usage_metrics: - total_usage_metrics[key] += crew.usage_metrics.get(key, 0) + total_usage_metrics.add_usage_metrics(crew.usage_metrics) self.usage_metrics = total_usage_metrics self._task_output_handler.reset() @@ -933,25 +910,18 @@ class Crew(BaseModel): ) self._telemetry.end_crew(self, final_string_output) - def calculate_usage_metrics(self) -> Dict[str, int]: + def calculate_usage_metrics(self) -> UsageMetrics: """Calculates and returns the usage metrics.""" - total_usage_metrics = { - "total_tokens": 0, - "prompt_tokens": 0, - "completion_tokens": 0, - "successful_requests": 0, - } + total_usage_metrics = UsageMetrics() for agent in self.agents: if hasattr(agent, "_token_process"): token_sum = agent._token_process.get_summary() - for key in total_usage_metrics: - total_usage_metrics[key] += token_sum.get(key, 0) + total_usage_metrics.add_usage_metrics(token_sum) if self.manager_agent and hasattr(self.manager_agent, "_token_process"): token_sum = self.manager_agent._token_process.get_summary() - for key in total_usage_metrics: - total_usage_metrics[key] += token_sum.get(key, 0) + total_usage_metrics.add_usage_metrics(token_sum) return total_usage_metrics diff --git a/src/crewai/crews/crew_output.py b/src/crewai/crews/crew_output.py index e630c1f3a..64d1f9caf 100644 --- a/src/crewai/crews/crew_output.py +++ b/src/crewai/crews/crew_output.py @@ -5,6 +5,7 @@ from pydantic import BaseModel, Field from crewai.tasks.output_format import OutputFormat from crewai.tasks.task_output import TaskOutput +from crewai.types.usage_metrics import UsageMetrics class CrewOutput(BaseModel): @@ -20,9 +21,7 @@ class CrewOutput(BaseModel): tasks_output: list[TaskOutput] = Field( description="Output of each task", default=[] ) - token_usage: Dict[str, Any] = Field( - description="Processed token summary", default={} - ) + token_usage: UsageMetrics = Field(description="Processed token summary", default={}) @property def json(self) -> Optional[str]: diff --git a/src/crewai/types/__init__.py b/src/crewai/types/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/crewai/types/usage_metrics.py b/src/crewai/types/usage_metrics.py new file mode 100644 index 000000000..a5cee6a0f --- /dev/null +++ b/src/crewai/types/usage_metrics.py @@ -0,0 +1,36 @@ +from pydantic import BaseModel, Field + + +class UsageMetrics(BaseModel): + """ + Model to track usage metrics for the crew's execution. + + Attributes: + total_tokens: Total number of tokens used. + prompt_tokens: Number of tokens used in prompts. + completion_tokens: Number of tokens used in completions. + successful_requests: Number of successful requests made. + """ + + total_tokens: int = Field(default=0, description="Total number of tokens used.") + prompt_tokens: int = Field( + default=0, description="Number of tokens used in prompts." + ) + completion_tokens: int = Field( + default=0, description="Number of tokens used in completions." + ) + successful_requests: int = Field( + default=0, description="Number of successful requests made." + ) + + def add_usage_metrics(self, usage_metrics: "UsageMetrics"): + """ + Add the usage metrics from another UsageMetrics object. + + Args: + usage_metrics (UsageMetrics): The usage metrics to add. + """ + self.total_tokens += usage_metrics.total_tokens + self.prompt_tokens += usage_metrics.prompt_tokens + self.completion_tokens += usage_metrics.completion_tokens + self.successful_requests += usage_metrics.successful_requests diff --git a/tests/crew_test.py b/tests/crew_test.py index 141ecfb7c..f3fb27872 100644 --- a/tests/crew_test.py +++ b/tests/crew_test.py @@ -18,6 +18,7 @@ from crewai.task import Task from crewai.tasks.conditional_task import ConditionalTask from crewai.tasks.output_format import OutputFormat from crewai.tasks.task_output import TaskOutput +from crewai.types.usage_metrics import UsageMetrics from crewai.utilities import Logger, RPMController from crewai.utilities.task_output_storage_handler import TaskOutputStorageHandler @@ -597,14 +598,10 @@ def test_crew_kickoff_usage_metrics(): assert len(results) == len(inputs) for result in results: # Assert that all required keys are in usage_metrics and their values are not None - for key in [ - "total_tokens", - "prompt_tokens", - "completion_tokens", - "successful_requests", - ]: - assert key in result.token_usage - assert result.token_usage[key] > 0 + assert result.token_usage.total_tokens > 0 + assert result.token_usage.prompt_tokens > 0 + assert result.token_usage.completion_tokens > 0 + assert result.token_usage.successful_requests > 0 def test_agents_rpm_is_never_set_if_crew_max_RPM_is_not_set(): @@ -1318,12 +1315,12 @@ def test_agent_usage_metrics_are_captured_for_hierarchical_process(): print(crew.usage_metrics) - assert crew.usage_metrics == { - "total_tokens": 219, - "prompt_tokens": 201, - "completion_tokens": 18, - "successful_requests": 1, - } + assert crew.usage_metrics == UsageMetrics( + total_tokens=219, + prompt_tokens=201, + completion_tokens=18, + successful_requests=1, + ) @pytest.mark.vcr(filter_headers=["authorization"]) diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index e7b7a745c..7f3cb4bf9 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -10,13 +10,12 @@ from crewai.pipeline.pipeline_run_result import PipelineRunResult from crewai.process import Process from crewai.task import Task from crewai.tasks.task_output import TaskOutput +from crewai.types.usage_metrics import UsageMetrics from pydantic import BaseModel, ValidationError -DEFAULT_TOKEN_USAGE = { - "total_tokens": 100, - "prompt_tokens": 50, - "completion_tokens": 50, -} +DEFAULT_TOKEN_USAGE = UsageMetrics( + total_tokens=100, prompt_tokens=50, completion_tokens=50, successful_requests=3 +) @pytest.fixture @@ -443,6 +442,7 @@ Options: - Should the final output include the accumulation of previous stages' outputs? """ + @pytest.mark.asyncio async def test_pipeline_data_accumulation(mock_crew_factory): crew1 = mock_crew_factory(name="Crew 1", output_json_dict={"key1": "value1"})