diff --git a/lib/crewai/src/crewai/crew.py b/lib/crewai/src/crewai/crew.py index 2c7f583b9..8970af7d5 100644 --- a/lib/crewai/src/crewai/crew.py +++ b/lib/crewai/src/crewai/crew.py @@ -203,6 +203,10 @@ class Crew(FlowTrackable, BaseModel): default=None, description="Metrics for the LLM usage during all tasks execution.", ) + workflow_token_metrics: Any | None = Field( + default=None, + description="Detailed per-agent and per-task token metrics.", + ) manager_llm: str | InstanceOf[BaseLLM] | Any | None = Field( description="Language model that will run the agent.", default=None ) @@ -1155,12 +1159,22 @@ class Crew(FlowTrackable, BaseModel): task_outputs = self._process_async_tasks(futures, was_replayed) futures.clear() + # Capture token usage before task execution + tokens_before = self._get_agent_token_usage(exec_data.agent) + context = self._get_context(task, task_outputs) task_output = task.execute_sync( agent=exec_data.agent, context=context, tools=exec_data.tools, ) + + # Capture token usage after task execution and attach to task output + tokens_after = self._get_agent_token_usage(exec_data.agent) + task_output = self._attach_task_token_metrics( + task_output, task, exec_data.agent, tokens_before, tokens_after + ) + task_outputs.append(task_output) self._process_task_result(task, task_output) self._store_execution_log(task, task_output, task_index, was_replayed) @@ -1401,6 +1415,7 @@ class Crew(FlowTrackable, BaseModel): json_dict=final_task_output.json_dict, tasks_output=task_outputs, token_usage=self.token_usage, + token_metrics=getattr(self, 'workflow_token_metrics', None), ) def _process_async_tasks( @@ -1616,35 +1631,115 @@ class Crew(FlowTrackable, BaseModel): def calculate_usage_metrics(self) -> UsageMetrics: """Calculates and returns the usage metrics.""" + from crewai.types.usage_metrics import ( + AgentTokenMetrics, + WorkflowTokenMetrics, + ) + total_usage_metrics = UsageMetrics() + + # Preserve existing workflow_token_metrics if it exists (has per_task data) + if hasattr(self, 'workflow_token_metrics') and self.workflow_token_metrics: + workflow_metrics = self.workflow_token_metrics + else: + workflow_metrics = WorkflowTokenMetrics() for agent in self.agents: + agent_role = getattr(agent, 'role', 'Unknown Agent') + agent_id = str(getattr(agent, 'id', '')) + if isinstance(agent.llm, BaseLLM): llm_usage = agent.llm.get_token_usage_summary() - total_usage_metrics.add_usage_metrics(llm_usage) + + # Create per-agent metrics + agent_metrics = AgentTokenMetrics( + agent_name=agent_role, + agent_id=agent_id, + total_tokens=llm_usage.total_tokens, + prompt_tokens=llm_usage.prompt_tokens, + cached_prompt_tokens=llm_usage.cached_prompt_tokens, + completion_tokens=llm_usage.completion_tokens, + successful_requests=llm_usage.successful_requests + ) + workflow_metrics.per_agent[agent_role] = agent_metrics else: # fallback litellm if hasattr(agent, "_token_process"): token_sum = agent._token_process.get_summary() total_usage_metrics.add_usage_metrics(token_sum) + + # Create per-agent metrics from litellm + agent_metrics = AgentTokenMetrics( + agent_name=agent_role, + agent_id=agent_id, + total_tokens=token_sum.total_tokens, + prompt_tokens=token_sum.prompt_tokens, + cached_prompt_tokens=token_sum.cached_prompt_tokens, + completion_tokens=token_sum.completion_tokens, + successful_requests=token_sum.successful_requests + ) + workflow_metrics.per_agent[agent_role] = agent_metrics - if self.manager_agent and hasattr(self.manager_agent, "_token_process"): - token_sum = self.manager_agent._token_process.get_summary() - total_usage_metrics.add_usage_metrics(token_sum) + if self.manager_agent: + manager_role = getattr(self.manager_agent, 'role', 'Manager Agent') + manager_id = str(getattr(self.manager_agent, 'id', '')) + + if hasattr(self.manager_agent, "_token_process"): + token_sum = self.manager_agent._token_process.get_summary() + total_usage_metrics.add_usage_metrics(token_sum) + + # Create per-agent metrics for manager + manager_metrics = AgentTokenMetrics( + agent_name=manager_role, + agent_id=manager_id, + total_tokens=token_sum.total_tokens, + prompt_tokens=token_sum.prompt_tokens, + cached_prompt_tokens=token_sum.cached_prompt_tokens, + completion_tokens=token_sum.completion_tokens, + successful_requests=token_sum.successful_requests + ) + workflow_metrics.per_agent[manager_role] = manager_metrics - if ( - self.manager_agent - and hasattr(self.manager_agent, "llm") - and hasattr(self.manager_agent.llm, "get_token_usage_summary") - ): - if isinstance(self.manager_agent.llm, BaseLLM): - llm_usage = self.manager_agent.llm.get_token_usage_summary() - else: - llm_usage = self.manager_agent.llm._token_process.get_summary() + if ( + hasattr(self.manager_agent, "llm") + and hasattr(self.manager_agent.llm, "get_token_usage_summary") + ): + if isinstance(self.manager_agent.llm, BaseLLM): + llm_usage = self.manager_agent.llm.get_token_usage_summary() + else: + llm_usage = self.manager_agent.llm._token_process.get_summary() - total_usage_metrics.add_usage_metrics(llm_usage) + total_usage_metrics.add_usage_metrics(llm_usage) + + # Update or create manager metrics + if manager_role in workflow_metrics.per_agent: + workflow_metrics.per_agent[manager_role].total_tokens += llm_usage.total_tokens + workflow_metrics.per_agent[manager_role].prompt_tokens += llm_usage.prompt_tokens + workflow_metrics.per_agent[manager_role].cached_prompt_tokens += llm_usage.cached_prompt_tokens + workflow_metrics.per_agent[manager_role].completion_tokens += llm_usage.completion_tokens + workflow_metrics.per_agent[manager_role].successful_requests += llm_usage.successful_requests + else: + manager_metrics = AgentTokenMetrics( + agent_name=manager_role, + agent_id=manager_id, + total_tokens=llm_usage.total_tokens, + prompt_tokens=llm_usage.prompt_tokens, + cached_prompt_tokens=llm_usage.cached_prompt_tokens, + completion_tokens=llm_usage.completion_tokens, + successful_requests=llm_usage.successful_requests + ) + workflow_metrics.per_agent[manager_role] = manager_metrics + # Set workflow-level totals + workflow_metrics.total_tokens = total_usage_metrics.total_tokens + workflow_metrics.prompt_tokens = total_usage_metrics.prompt_tokens + workflow_metrics.cached_prompt_tokens = total_usage_metrics.cached_prompt_tokens + workflow_metrics.completion_tokens = total_usage_metrics.completion_tokens + workflow_metrics.successful_requests = total_usage_metrics.successful_requests + + # Store workflow metrics (preserving per_task data) + self.workflow_token_metrics = workflow_metrics self.usage_metrics = total_usage_metrics return total_usage_metrics @@ -1918,3 +2013,55 @@ To enable tracing, do any one of these: padding=(1, 2), ) console.print(panel) + + def _get_agent_token_usage(self, agent: BaseAgent | None) -> UsageMetrics: + """Get current token usage for an agent.""" + if not agent: + return UsageMetrics() + + if isinstance(agent.llm, BaseLLM): + return agent.llm.get_token_usage_summary() + elif hasattr(agent, "_token_process"): + return agent._token_process.get_summary() + + return UsageMetrics() + + def _attach_task_token_metrics( + self, + task_output: TaskOutput, + task: Task, + agent: BaseAgent | None, + tokens_before: UsageMetrics, + tokens_after: UsageMetrics + ) -> TaskOutput: + """Attach per-task token metrics to the task output.""" + from crewai.types.usage_metrics import TaskTokenMetrics + + if not agent: + return task_output + + # Calculate the delta (tokens used by this specific task) + task_tokens = TaskTokenMetrics( + task_name=getattr(task, 'name', None) or task.description[:50], + task_id=str(getattr(task, 'id', '')), + agent_name=getattr(agent, 'role', 'Unknown Agent'), + total_tokens=tokens_after.total_tokens - tokens_before.total_tokens, + prompt_tokens=tokens_after.prompt_tokens - tokens_before.prompt_tokens, + cached_prompt_tokens=tokens_after.cached_prompt_tokens - tokens_before.cached_prompt_tokens, + completion_tokens=tokens_after.completion_tokens - tokens_before.completion_tokens, + successful_requests=tokens_after.successful_requests - tokens_before.successful_requests + ) + + # Attach to task output + task_output.usage_metrics = task_tokens + + # Store in workflow metrics + if not hasattr(self, 'workflow_token_metrics') or self.workflow_token_metrics is None: + from crewai.types.usage_metrics import WorkflowTokenMetrics + self.workflow_token_metrics = WorkflowTokenMetrics() + + task_key = f"{task_tokens.task_name}_{task_tokens.agent_name}" + self.workflow_token_metrics.per_task[task_key] = task_tokens + + return task_output + diff --git a/lib/crewai/src/crewai/crews/crew_output.py b/lib/crewai/src/crewai/crews/crew_output.py index 9f2f03185..fef564654 100644 --- a/lib/crewai/src/crewai/crews/crew_output.py +++ b/lib/crewai/src/crewai/crews/crew_output.py @@ -7,7 +7,7 @@ from pydantic import BaseModel, Field from crewai.tasks.output_format import OutputFormat from crewai.tasks.task_output import TaskOutput -from crewai.types.usage_metrics import UsageMetrics +from crewai.types.usage_metrics import UsageMetrics, WorkflowTokenMetrics class CrewOutput(BaseModel): @@ -26,6 +26,10 @@ class CrewOutput(BaseModel): token_usage: UsageMetrics = Field( description="Processed token summary", default_factory=UsageMetrics ) + token_metrics: WorkflowTokenMetrics | None = Field( + description="Detailed per-agent and per-task token metrics", + default=None + ) @property def json(self) -> str | None: # type: ignore[override] diff --git a/lib/crewai/src/crewai/tasks/task_output.py b/lib/crewai/src/crewai/tasks/task_output.py index 901604ac1..1da3ef0c5 100644 --- a/lib/crewai/src/crewai/tasks/task_output.py +++ b/lib/crewai/src/crewai/tasks/task_output.py @@ -6,6 +6,7 @@ from typing import Any from pydantic import BaseModel, Field, model_validator from crewai.tasks.output_format import OutputFormat +from crewai.types.usage_metrics import TaskTokenMetrics from crewai.utilities.types import LLMMessage @@ -22,6 +23,7 @@ class TaskOutput(BaseModel): json_dict: JSON dictionary output of the task agent: Agent that executed the task output_format: Output format of the task (JSON, PYDANTIC, or RAW) + usage_metrics: Token usage metrics for this specific task """ description: str = Field(description="Description of the task") @@ -42,6 +44,10 @@ class TaskOutput(BaseModel): description="Output format of the task", default=OutputFormat.RAW ) messages: list[LLMMessage] = Field(description="Messages of the task", default=[]) + usage_metrics: TaskTokenMetrics | None = Field( + description="Token usage metrics for this task", + default=None + ) @model_validator(mode="after") def set_summary(self): diff --git a/lib/crewai/src/crewai/types/usage_metrics.py b/lib/crewai/src/crewai/types/usage_metrics.py index 77e9ef598..2bcdbdd76 100644 --- a/lib/crewai/src/crewai/types/usage_metrics.py +++ b/lib/crewai/src/crewai/types/usage_metrics.py @@ -44,3 +44,74 @@ class UsageMetrics(BaseModel): self.cached_prompt_tokens += usage_metrics.cached_prompt_tokens self.completion_tokens += usage_metrics.completion_tokens self.successful_requests += usage_metrics.successful_requests + + +class AgentTokenMetrics(BaseModel): + """Token usage metrics for a specific agent. + + Attributes: + agent_name: Name/role of the agent + agent_id: Unique identifier for the agent + total_tokens: Total tokens used by this agent + prompt_tokens: Prompt tokens used by this agent + completion_tokens: Completion tokens used by this agent + successful_requests: Number of successful LLM requests + """ + + agent_name: str = Field(description="Name/role of the agent") + agent_id: str | None = Field(default=None, description="Unique identifier for the agent") + total_tokens: int = Field(default=0, description="Total tokens used by this agent") + prompt_tokens: int = Field(default=0, description="Prompt tokens used by this agent") + cached_prompt_tokens: int = Field(default=0, description="Cached prompt tokens used by this agent") + completion_tokens: int = Field(default=0, description="Completion tokens used by this agent") + successful_requests: int = Field(default=0, description="Number of successful LLM requests") + + +class TaskTokenMetrics(BaseModel): + """Token usage metrics for a specific task. + + Attributes: + task_name: Name of the task + task_id: Unique identifier for the task + agent_name: Name of the agent that executed the task + total_tokens: Total tokens used for this task + prompt_tokens: Prompt tokens used for this task + completion_tokens: Completion tokens used for this task + successful_requests: Number of successful LLM requests + """ + + task_name: str = Field(description="Name of the task") + task_id: str | None = Field(default=None, description="Unique identifier for the task") + agent_name: str = Field(description="Name of the agent that executed the task") + total_tokens: int = Field(default=0, description="Total tokens used for this task") + prompt_tokens: int = Field(default=0, description="Prompt tokens used for this task") + cached_prompt_tokens: int = Field(default=0, description="Cached prompt tokens used for this task") + completion_tokens: int = Field(default=0, description="Completion tokens used for this task") + successful_requests: int = Field(default=0, description="Number of successful LLM requests") + + +class WorkflowTokenMetrics(BaseModel): + """Complete token usage metrics for a crew workflow. + + Attributes: + total_tokens: Total tokens used across entire workflow + prompt_tokens: Total prompt tokens used + completion_tokens: Total completion tokens used + successful_requests: Total successful requests + per_agent: Dictionary mapping agent names to their token metrics + per_task: Dictionary mapping task names to their token metrics + """ + + total_tokens: int = Field(default=0, description="Total tokens used across entire workflow") + prompt_tokens: int = Field(default=0, description="Total prompt tokens used") + cached_prompt_tokens: int = Field(default=0, description="Total cached prompt tokens used") + completion_tokens: int = Field(default=0, description="Total completion tokens used") + successful_requests: int = Field(default=0, description="Total successful requests") + per_agent: dict[str, AgentTokenMetrics] = Field( + default_factory=dict, + description="Token metrics per agent" + ) + per_task: dict[str, TaskTokenMetrics] = Field( + default_factory=dict, + description="Token metrics per task" + )