mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-09 08:08:32 +00:00
feat: add detailed token metrics tracking for agents and tasks
This commit is contained in:
@@ -203,6 +203,10 @@ class Crew(FlowTrackable, BaseModel):
|
|||||||
default=None,
|
default=None,
|
||||||
description="Metrics for the LLM usage during all tasks execution.",
|
description="Metrics for the LLM usage during all tasks execution.",
|
||||||
)
|
)
|
||||||
|
workflow_token_metrics: Any | None = Field(
|
||||||
|
default=None,
|
||||||
|
description="Detailed per-agent and per-task token metrics.",
|
||||||
|
)
|
||||||
manager_llm: str | InstanceOf[BaseLLM] | Any | None = Field(
|
manager_llm: str | InstanceOf[BaseLLM] | Any | None = Field(
|
||||||
description="Language model that will run the agent.", default=None
|
description="Language model that will run the agent.", default=None
|
||||||
)
|
)
|
||||||
@@ -1155,12 +1159,22 @@ class Crew(FlowTrackable, BaseModel):
|
|||||||
task_outputs = self._process_async_tasks(futures, was_replayed)
|
task_outputs = self._process_async_tasks(futures, was_replayed)
|
||||||
futures.clear()
|
futures.clear()
|
||||||
|
|
||||||
|
# Capture token usage before task execution
|
||||||
|
tokens_before = self._get_agent_token_usage(exec_data.agent)
|
||||||
|
|
||||||
context = self._get_context(task, task_outputs)
|
context = self._get_context(task, task_outputs)
|
||||||
task_output = task.execute_sync(
|
task_output = task.execute_sync(
|
||||||
agent=exec_data.agent,
|
agent=exec_data.agent,
|
||||||
context=context,
|
context=context,
|
||||||
tools=exec_data.tools,
|
tools=exec_data.tools,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Capture token usage after task execution and attach to task output
|
||||||
|
tokens_after = self._get_agent_token_usage(exec_data.agent)
|
||||||
|
task_output = self._attach_task_token_metrics(
|
||||||
|
task_output, task, exec_data.agent, tokens_before, tokens_after
|
||||||
|
)
|
||||||
|
|
||||||
task_outputs.append(task_output)
|
task_outputs.append(task_output)
|
||||||
self._process_task_result(task, task_output)
|
self._process_task_result(task, task_output)
|
||||||
self._store_execution_log(task, task_output, task_index, was_replayed)
|
self._store_execution_log(task, task_output, task_index, was_replayed)
|
||||||
@@ -1401,6 +1415,7 @@ class Crew(FlowTrackable, BaseModel):
|
|||||||
json_dict=final_task_output.json_dict,
|
json_dict=final_task_output.json_dict,
|
||||||
tasks_output=task_outputs,
|
tasks_output=task_outputs,
|
||||||
token_usage=self.token_usage,
|
token_usage=self.token_usage,
|
||||||
|
token_metrics=getattr(self, 'workflow_token_metrics', None),
|
||||||
)
|
)
|
||||||
|
|
||||||
def _process_async_tasks(
|
def _process_async_tasks(
|
||||||
@@ -1616,35 +1631,115 @@ class Crew(FlowTrackable, BaseModel):
|
|||||||
|
|
||||||
def calculate_usage_metrics(self) -> UsageMetrics:
|
def calculate_usage_metrics(self) -> UsageMetrics:
|
||||||
"""Calculates and returns the usage metrics."""
|
"""Calculates and returns the usage metrics."""
|
||||||
|
from crewai.types.usage_metrics import (
|
||||||
|
AgentTokenMetrics,
|
||||||
|
WorkflowTokenMetrics,
|
||||||
|
)
|
||||||
|
|
||||||
total_usage_metrics = UsageMetrics()
|
total_usage_metrics = UsageMetrics()
|
||||||
|
|
||||||
|
# Preserve existing workflow_token_metrics if it exists (has per_task data)
|
||||||
|
if hasattr(self, 'workflow_token_metrics') and self.workflow_token_metrics:
|
||||||
|
workflow_metrics = self.workflow_token_metrics
|
||||||
|
else:
|
||||||
|
workflow_metrics = WorkflowTokenMetrics()
|
||||||
|
|
||||||
for agent in self.agents:
|
for agent in self.agents:
|
||||||
|
agent_role = getattr(agent, 'role', 'Unknown Agent')
|
||||||
|
agent_id = str(getattr(agent, 'id', ''))
|
||||||
|
|
||||||
if isinstance(agent.llm, BaseLLM):
|
if isinstance(agent.llm, BaseLLM):
|
||||||
llm_usage = agent.llm.get_token_usage_summary()
|
llm_usage = agent.llm.get_token_usage_summary()
|
||||||
|
|
||||||
total_usage_metrics.add_usage_metrics(llm_usage)
|
total_usage_metrics.add_usage_metrics(llm_usage)
|
||||||
|
|
||||||
|
# Create per-agent metrics
|
||||||
|
agent_metrics = AgentTokenMetrics(
|
||||||
|
agent_name=agent_role,
|
||||||
|
agent_id=agent_id,
|
||||||
|
total_tokens=llm_usage.total_tokens,
|
||||||
|
prompt_tokens=llm_usage.prompt_tokens,
|
||||||
|
cached_prompt_tokens=llm_usage.cached_prompt_tokens,
|
||||||
|
completion_tokens=llm_usage.completion_tokens,
|
||||||
|
successful_requests=llm_usage.successful_requests
|
||||||
|
)
|
||||||
|
workflow_metrics.per_agent[agent_role] = agent_metrics
|
||||||
else:
|
else:
|
||||||
# fallback litellm
|
# fallback litellm
|
||||||
if hasattr(agent, "_token_process"):
|
if hasattr(agent, "_token_process"):
|
||||||
token_sum = agent._token_process.get_summary()
|
token_sum = agent._token_process.get_summary()
|
||||||
total_usage_metrics.add_usage_metrics(token_sum)
|
total_usage_metrics.add_usage_metrics(token_sum)
|
||||||
|
|
||||||
|
# Create per-agent metrics from litellm
|
||||||
|
agent_metrics = AgentTokenMetrics(
|
||||||
|
agent_name=agent_role,
|
||||||
|
agent_id=agent_id,
|
||||||
|
total_tokens=token_sum.total_tokens,
|
||||||
|
prompt_tokens=token_sum.prompt_tokens,
|
||||||
|
cached_prompt_tokens=token_sum.cached_prompt_tokens,
|
||||||
|
completion_tokens=token_sum.completion_tokens,
|
||||||
|
successful_requests=token_sum.successful_requests
|
||||||
|
)
|
||||||
|
workflow_metrics.per_agent[agent_role] = agent_metrics
|
||||||
|
|
||||||
if self.manager_agent and hasattr(self.manager_agent, "_token_process"):
|
if self.manager_agent:
|
||||||
token_sum = self.manager_agent._token_process.get_summary()
|
manager_role = getattr(self.manager_agent, 'role', 'Manager Agent')
|
||||||
total_usage_metrics.add_usage_metrics(token_sum)
|
manager_id = str(getattr(self.manager_agent, 'id', ''))
|
||||||
|
|
||||||
|
if hasattr(self.manager_agent, "_token_process"):
|
||||||
|
token_sum = self.manager_agent._token_process.get_summary()
|
||||||
|
total_usage_metrics.add_usage_metrics(token_sum)
|
||||||
|
|
||||||
|
# Create per-agent metrics for manager
|
||||||
|
manager_metrics = AgentTokenMetrics(
|
||||||
|
agent_name=manager_role,
|
||||||
|
agent_id=manager_id,
|
||||||
|
total_tokens=token_sum.total_tokens,
|
||||||
|
prompt_tokens=token_sum.prompt_tokens,
|
||||||
|
cached_prompt_tokens=token_sum.cached_prompt_tokens,
|
||||||
|
completion_tokens=token_sum.completion_tokens,
|
||||||
|
successful_requests=token_sum.successful_requests
|
||||||
|
)
|
||||||
|
workflow_metrics.per_agent[manager_role] = manager_metrics
|
||||||
|
|
||||||
if (
|
if (
|
||||||
self.manager_agent
|
hasattr(self.manager_agent, "llm")
|
||||||
and hasattr(self.manager_agent, "llm")
|
and hasattr(self.manager_agent.llm, "get_token_usage_summary")
|
||||||
and hasattr(self.manager_agent.llm, "get_token_usage_summary")
|
):
|
||||||
):
|
if isinstance(self.manager_agent.llm, BaseLLM):
|
||||||
if isinstance(self.manager_agent.llm, BaseLLM):
|
llm_usage = self.manager_agent.llm.get_token_usage_summary()
|
||||||
llm_usage = self.manager_agent.llm.get_token_usage_summary()
|
else:
|
||||||
else:
|
llm_usage = self.manager_agent.llm._token_process.get_summary()
|
||||||
llm_usage = self.manager_agent.llm._token_process.get_summary()
|
|
||||||
|
|
||||||
total_usage_metrics.add_usage_metrics(llm_usage)
|
total_usage_metrics.add_usage_metrics(llm_usage)
|
||||||
|
|
||||||
|
# Update or create manager metrics
|
||||||
|
if manager_role in workflow_metrics.per_agent:
|
||||||
|
workflow_metrics.per_agent[manager_role].total_tokens += llm_usage.total_tokens
|
||||||
|
workflow_metrics.per_agent[manager_role].prompt_tokens += llm_usage.prompt_tokens
|
||||||
|
workflow_metrics.per_agent[manager_role].cached_prompt_tokens += llm_usage.cached_prompt_tokens
|
||||||
|
workflow_metrics.per_agent[manager_role].completion_tokens += llm_usage.completion_tokens
|
||||||
|
workflow_metrics.per_agent[manager_role].successful_requests += llm_usage.successful_requests
|
||||||
|
else:
|
||||||
|
manager_metrics = AgentTokenMetrics(
|
||||||
|
agent_name=manager_role,
|
||||||
|
agent_id=manager_id,
|
||||||
|
total_tokens=llm_usage.total_tokens,
|
||||||
|
prompt_tokens=llm_usage.prompt_tokens,
|
||||||
|
cached_prompt_tokens=llm_usage.cached_prompt_tokens,
|
||||||
|
completion_tokens=llm_usage.completion_tokens,
|
||||||
|
successful_requests=llm_usage.successful_requests
|
||||||
|
)
|
||||||
|
workflow_metrics.per_agent[manager_role] = manager_metrics
|
||||||
|
|
||||||
|
# Set workflow-level totals
|
||||||
|
workflow_metrics.total_tokens = total_usage_metrics.total_tokens
|
||||||
|
workflow_metrics.prompt_tokens = total_usage_metrics.prompt_tokens
|
||||||
|
workflow_metrics.cached_prompt_tokens = total_usage_metrics.cached_prompt_tokens
|
||||||
|
workflow_metrics.completion_tokens = total_usage_metrics.completion_tokens
|
||||||
|
workflow_metrics.successful_requests = total_usage_metrics.successful_requests
|
||||||
|
|
||||||
|
# Store workflow metrics (preserving per_task data)
|
||||||
|
self.workflow_token_metrics = workflow_metrics
|
||||||
self.usage_metrics = total_usage_metrics
|
self.usage_metrics = total_usage_metrics
|
||||||
return total_usage_metrics
|
return total_usage_metrics
|
||||||
|
|
||||||
@@ -1918,3 +2013,55 @@ To enable tracing, do any one of these:
|
|||||||
padding=(1, 2),
|
padding=(1, 2),
|
||||||
)
|
)
|
||||||
console.print(panel)
|
console.print(panel)
|
||||||
|
|
||||||
|
def _get_agent_token_usage(self, agent: BaseAgent | None) -> UsageMetrics:
|
||||||
|
"""Get current token usage for an agent."""
|
||||||
|
if not agent:
|
||||||
|
return UsageMetrics()
|
||||||
|
|
||||||
|
if isinstance(agent.llm, BaseLLM):
|
||||||
|
return agent.llm.get_token_usage_summary()
|
||||||
|
elif hasattr(agent, "_token_process"):
|
||||||
|
return agent._token_process.get_summary()
|
||||||
|
|
||||||
|
return UsageMetrics()
|
||||||
|
|
||||||
|
def _attach_task_token_metrics(
|
||||||
|
self,
|
||||||
|
task_output: TaskOutput,
|
||||||
|
task: Task,
|
||||||
|
agent: BaseAgent | None,
|
||||||
|
tokens_before: UsageMetrics,
|
||||||
|
tokens_after: UsageMetrics
|
||||||
|
) -> TaskOutput:
|
||||||
|
"""Attach per-task token metrics to the task output."""
|
||||||
|
from crewai.types.usage_metrics import TaskTokenMetrics
|
||||||
|
|
||||||
|
if not agent:
|
||||||
|
return task_output
|
||||||
|
|
||||||
|
# Calculate the delta (tokens used by this specific task)
|
||||||
|
task_tokens = TaskTokenMetrics(
|
||||||
|
task_name=getattr(task, 'name', None) or task.description[:50],
|
||||||
|
task_id=str(getattr(task, 'id', '')),
|
||||||
|
agent_name=getattr(agent, 'role', 'Unknown Agent'),
|
||||||
|
total_tokens=tokens_after.total_tokens - tokens_before.total_tokens,
|
||||||
|
prompt_tokens=tokens_after.prompt_tokens - tokens_before.prompt_tokens,
|
||||||
|
cached_prompt_tokens=tokens_after.cached_prompt_tokens - tokens_before.cached_prompt_tokens,
|
||||||
|
completion_tokens=tokens_after.completion_tokens - tokens_before.completion_tokens,
|
||||||
|
successful_requests=tokens_after.successful_requests - tokens_before.successful_requests
|
||||||
|
)
|
||||||
|
|
||||||
|
# Attach to task output
|
||||||
|
task_output.usage_metrics = task_tokens
|
||||||
|
|
||||||
|
# Store in workflow metrics
|
||||||
|
if not hasattr(self, 'workflow_token_metrics') or self.workflow_token_metrics is None:
|
||||||
|
from crewai.types.usage_metrics import WorkflowTokenMetrics
|
||||||
|
self.workflow_token_metrics = WorkflowTokenMetrics()
|
||||||
|
|
||||||
|
task_key = f"{task_tokens.task_name}_{task_tokens.agent_name}"
|
||||||
|
self.workflow_token_metrics.per_task[task_key] = task_tokens
|
||||||
|
|
||||||
|
return task_output
|
||||||
|
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ from pydantic import BaseModel, Field
|
|||||||
|
|
||||||
from crewai.tasks.output_format import OutputFormat
|
from crewai.tasks.output_format import OutputFormat
|
||||||
from crewai.tasks.task_output import TaskOutput
|
from crewai.tasks.task_output import TaskOutput
|
||||||
from crewai.types.usage_metrics import UsageMetrics
|
from crewai.types.usage_metrics import UsageMetrics, WorkflowTokenMetrics
|
||||||
|
|
||||||
|
|
||||||
class CrewOutput(BaseModel):
|
class CrewOutput(BaseModel):
|
||||||
@@ -26,6 +26,10 @@ class CrewOutput(BaseModel):
|
|||||||
token_usage: UsageMetrics = Field(
|
token_usage: UsageMetrics = Field(
|
||||||
description="Processed token summary", default_factory=UsageMetrics
|
description="Processed token summary", default_factory=UsageMetrics
|
||||||
)
|
)
|
||||||
|
token_metrics: WorkflowTokenMetrics | None = Field(
|
||||||
|
description="Detailed per-agent and per-task token metrics",
|
||||||
|
default=None
|
||||||
|
)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def json(self) -> str | None: # type: ignore[override]
|
def json(self) -> str | None: # type: ignore[override]
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ from typing import Any
|
|||||||
from pydantic import BaseModel, Field, model_validator
|
from pydantic import BaseModel, Field, model_validator
|
||||||
|
|
||||||
from crewai.tasks.output_format import OutputFormat
|
from crewai.tasks.output_format import OutputFormat
|
||||||
|
from crewai.types.usage_metrics import TaskTokenMetrics
|
||||||
from crewai.utilities.types import LLMMessage
|
from crewai.utilities.types import LLMMessage
|
||||||
|
|
||||||
|
|
||||||
@@ -22,6 +23,7 @@ class TaskOutput(BaseModel):
|
|||||||
json_dict: JSON dictionary output of the task
|
json_dict: JSON dictionary output of the task
|
||||||
agent: Agent that executed the task
|
agent: Agent that executed the task
|
||||||
output_format: Output format of the task (JSON, PYDANTIC, or RAW)
|
output_format: Output format of the task (JSON, PYDANTIC, or RAW)
|
||||||
|
usage_metrics: Token usage metrics for this specific task
|
||||||
"""
|
"""
|
||||||
|
|
||||||
description: str = Field(description="Description of the task")
|
description: str = Field(description="Description of the task")
|
||||||
@@ -42,6 +44,10 @@ class TaskOutput(BaseModel):
|
|||||||
description="Output format of the task", default=OutputFormat.RAW
|
description="Output format of the task", default=OutputFormat.RAW
|
||||||
)
|
)
|
||||||
messages: list[LLMMessage] = Field(description="Messages of the task", default=[])
|
messages: list[LLMMessage] = Field(description="Messages of the task", default=[])
|
||||||
|
usage_metrics: TaskTokenMetrics | None = Field(
|
||||||
|
description="Token usage metrics for this task",
|
||||||
|
default=None
|
||||||
|
)
|
||||||
|
|
||||||
@model_validator(mode="after")
|
@model_validator(mode="after")
|
||||||
def set_summary(self):
|
def set_summary(self):
|
||||||
|
|||||||
@@ -44,3 +44,74 @@ class UsageMetrics(BaseModel):
|
|||||||
self.cached_prompt_tokens += usage_metrics.cached_prompt_tokens
|
self.cached_prompt_tokens += usage_metrics.cached_prompt_tokens
|
||||||
self.completion_tokens += usage_metrics.completion_tokens
|
self.completion_tokens += usage_metrics.completion_tokens
|
||||||
self.successful_requests += usage_metrics.successful_requests
|
self.successful_requests += usage_metrics.successful_requests
|
||||||
|
|
||||||
|
|
||||||
|
class AgentTokenMetrics(BaseModel):
|
||||||
|
"""Token usage metrics for a specific agent.
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
agent_name: Name/role of the agent
|
||||||
|
agent_id: Unique identifier for the agent
|
||||||
|
total_tokens: Total tokens used by this agent
|
||||||
|
prompt_tokens: Prompt tokens used by this agent
|
||||||
|
completion_tokens: Completion tokens used by this agent
|
||||||
|
successful_requests: Number of successful LLM requests
|
||||||
|
"""
|
||||||
|
|
||||||
|
agent_name: str = Field(description="Name/role of the agent")
|
||||||
|
agent_id: str | None = Field(default=None, description="Unique identifier for the agent")
|
||||||
|
total_tokens: int = Field(default=0, description="Total tokens used by this agent")
|
||||||
|
prompt_tokens: int = Field(default=0, description="Prompt tokens used by this agent")
|
||||||
|
cached_prompt_tokens: int = Field(default=0, description="Cached prompt tokens used by this agent")
|
||||||
|
completion_tokens: int = Field(default=0, description="Completion tokens used by this agent")
|
||||||
|
successful_requests: int = Field(default=0, description="Number of successful LLM requests")
|
||||||
|
|
||||||
|
|
||||||
|
class TaskTokenMetrics(BaseModel):
|
||||||
|
"""Token usage metrics for a specific task.
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
task_name: Name of the task
|
||||||
|
task_id: Unique identifier for the task
|
||||||
|
agent_name: Name of the agent that executed the task
|
||||||
|
total_tokens: Total tokens used for this task
|
||||||
|
prompt_tokens: Prompt tokens used for this task
|
||||||
|
completion_tokens: Completion tokens used for this task
|
||||||
|
successful_requests: Number of successful LLM requests
|
||||||
|
"""
|
||||||
|
|
||||||
|
task_name: str = Field(description="Name of the task")
|
||||||
|
task_id: str | None = Field(default=None, description="Unique identifier for the task")
|
||||||
|
agent_name: str = Field(description="Name of the agent that executed the task")
|
||||||
|
total_tokens: int = Field(default=0, description="Total tokens used for this task")
|
||||||
|
prompt_tokens: int = Field(default=0, description="Prompt tokens used for this task")
|
||||||
|
cached_prompt_tokens: int = Field(default=0, description="Cached prompt tokens used for this task")
|
||||||
|
completion_tokens: int = Field(default=0, description="Completion tokens used for this task")
|
||||||
|
successful_requests: int = Field(default=0, description="Number of successful LLM requests")
|
||||||
|
|
||||||
|
|
||||||
|
class WorkflowTokenMetrics(BaseModel):
|
||||||
|
"""Complete token usage metrics for a crew workflow.
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
total_tokens: Total tokens used across entire workflow
|
||||||
|
prompt_tokens: Total prompt tokens used
|
||||||
|
completion_tokens: Total completion tokens used
|
||||||
|
successful_requests: Total successful requests
|
||||||
|
per_agent: Dictionary mapping agent names to their token metrics
|
||||||
|
per_task: Dictionary mapping task names to their token metrics
|
||||||
|
"""
|
||||||
|
|
||||||
|
total_tokens: int = Field(default=0, description="Total tokens used across entire workflow")
|
||||||
|
prompt_tokens: int = Field(default=0, description="Total prompt tokens used")
|
||||||
|
cached_prompt_tokens: int = Field(default=0, description="Total cached prompt tokens used")
|
||||||
|
completion_tokens: int = Field(default=0, description="Total completion tokens used")
|
||||||
|
successful_requests: int = Field(default=0, description="Total successful requests")
|
||||||
|
per_agent: dict[str, AgentTokenMetrics] = Field(
|
||||||
|
default_factory=dict,
|
||||||
|
description="Token metrics per agent"
|
||||||
|
)
|
||||||
|
per_task: dict[str, TaskTokenMetrics] = Field(
|
||||||
|
default_factory=dict,
|
||||||
|
description="Token metrics per task"
|
||||||
|
)
|
||||||
|
|||||||
Reference in New Issue
Block a user