diff --git a/lib/crewai/src/crewai/crew.py b/lib/crewai/src/crewai/crew.py index d38720dcc..04cf3537a 100644 --- a/lib/crewai/src/crewai/crew.py +++ b/lib/crewai/src/crewai/crew.py @@ -948,6 +948,9 @@ class Crew(FlowTrackable, BaseModel): continue if task.async_execution: + # Capture token usage before async task execution + tokens_before = self._get_agent_token_usage(exec_data.agent) + context = self._get_context( task, [last_sync_output] if last_sync_output else [] ) @@ -958,7 +961,7 @@ class Crew(FlowTrackable, BaseModel): tools=exec_data.tools, ) ) - pending_tasks.append((task, async_task, task_index)) + pending_tasks.append((task, async_task, task_index, exec_data.agent, tokens_before)) else: if pending_tasks: task_outputs = await self._aprocess_async_tasks( @@ -966,12 +969,22 @@ class Crew(FlowTrackable, BaseModel): ) pending_tasks.clear() + # Capture token usage before task execution + tokens_before = self._get_agent_token_usage(exec_data.agent) + context = self._get_context(task, task_outputs) task_output = await task.aexecute_sync( agent=exec_data.agent, context=context, tools=exec_data.tools, ) + + # Capture token usage after task execution and attach to task output + tokens_after = self._get_agent_token_usage(exec_data.agent) + task_output = self._attach_task_token_metrics( + task_output, task, exec_data.agent, tokens_before, tokens_after + ) + task_outputs.append(task_output) self._process_task_result(task, task_output) self._store_execution_log(task, task_output, task_index, was_replayed) @@ -985,7 +998,7 @@ class Crew(FlowTrackable, BaseModel): self, task: ConditionalTask, task_outputs: list[TaskOutput], - pending_tasks: list[tuple[Task, asyncio.Task[TaskOutput], int]], + pending_tasks: list[tuple[Task, asyncio.Task[TaskOutput], int, Any, Any]], task_index: int, was_replayed: bool, ) -> TaskOutput | None: @@ -1000,13 +1013,20 @@ class Crew(FlowTrackable, BaseModel): async def _aprocess_async_tasks( self, - pending_tasks: list[tuple[Task, asyncio.Task[TaskOutput], int]], + pending_tasks: list[tuple[Task, asyncio.Task[TaskOutput], int, Any, Any]], was_replayed: bool = False, ) -> list[TaskOutput]: """Process pending async tasks and return their outputs.""" task_outputs: list[TaskOutput] = [] - for future_task, async_task, task_index in pending_tasks: + for future_task, async_task, task_index, agent, tokens_before in pending_tasks: task_output = await async_task + + # Capture token usage after async task execution and attach to task output + tokens_after = self._get_agent_token_usage(agent) + task_output = self._attach_task_token_metrics( + task_output, future_task, agent, tokens_before, tokens_after + ) + task_outputs.append(task_output) self._process_task_result(future_task, task_output) self._store_execution_log( @@ -1145,6 +1165,9 @@ class Crew(FlowTrackable, BaseModel): continue if task.async_execution: + # Capture token usage before async task execution + tokens_before = self._get_agent_token_usage(exec_data.agent) + context = self._get_context( task, [last_sync_output] if last_sync_output else [] ) @@ -1153,7 +1176,7 @@ class Crew(FlowTrackable, BaseModel): context=context, tools=exec_data.tools, ) - futures.append((task, future, task_index)) + futures.append((task, future, task_index, exec_data.agent, tokens_before)) else: if futures: task_outputs = self._process_async_tasks(futures, was_replayed) @@ -1188,7 +1211,7 @@ class Crew(FlowTrackable, BaseModel): self, task: ConditionalTask, task_outputs: list[TaskOutput], - futures: list[tuple[Task, Future[TaskOutput], int]], + futures: list[tuple[Task, Future[TaskOutput], int, Any, Any]], task_index: int, was_replayed: bool, ) -> TaskOutput | None: @@ -1420,12 +1443,19 @@ class Crew(FlowTrackable, BaseModel): def _process_async_tasks( self, - futures: list[tuple[Task, Future[TaskOutput], int]], + futures: list[tuple[Task, Future[TaskOutput], int, Any, Any]], was_replayed: bool = False, ) -> list[TaskOutput]: task_outputs: list[TaskOutput] = [] - for future_task, future, task_index in futures: + for future_task, future, task_index, agent, tokens_before in futures: task_output = future.result() + + # Capture token usage after async task execution and attach to task output + tokens_after = self._get_agent_token_usage(agent) + task_output = self._attach_task_token_metrics( + task_output, future_task, agent, tokens_before, tokens_after + ) + task_outputs.append(task_output) self._process_task_result(future_task, task_output) self._store_execution_log( @@ -1646,34 +1676,53 @@ class Crew(FlowTrackable, BaseModel): # Build per-agent metrics from per-task data (more accurate) # This avoids the cumulative token issue where all agents show the same total + # Key by agent_id to handle multiple agents with the same role agent_token_sums = {} + agent_info_map = {} # Map agent_id to (agent_name, agent_id) + + # First, build a map of all agents by their ID + for agent in self.agents: + agent_role = getattr(agent, 'role', 'Unknown Agent') + agent_id = str(getattr(agent, 'id', '')) + agent_info_map[agent_id] = (agent_role, agent_id) if workflow_metrics.per_task: # Sum up tokens for each agent from their tasks + # We need to find which agent_id corresponds to each task's agent_name for task_name, task_metrics in workflow_metrics.per_task.items(): agent_name = task_metrics.agent_name - if agent_name not in agent_token_sums: - agent_token_sums[agent_name] = { - 'total_tokens': 0, - 'prompt_tokens': 0, - 'cached_prompt_tokens': 0, - 'completion_tokens': 0, - 'successful_requests': 0 - } - agent_token_sums[agent_name]['total_tokens'] += task_metrics.total_tokens - agent_token_sums[agent_name]['prompt_tokens'] += task_metrics.prompt_tokens - agent_token_sums[agent_name]['cached_prompt_tokens'] += task_metrics.cached_prompt_tokens - agent_token_sums[agent_name]['completion_tokens'] += task_metrics.completion_tokens - agent_token_sums[agent_name]['successful_requests'] += task_metrics.successful_requests + # Find the agent_id for this agent_name from agent_info_map + # For now, we'll use the agent_name as a temporary key but this needs improvement + # TODO: Store agent_id in TaskTokenMetrics to avoid this lookup + matching_agent_ids = [aid for aid, (name, _) in agent_info_map.items() if name == agent_name] + + # Use the first matching agent_id (limitation: can't distinguish between same-role agents) + # This is better than nothing but ideally we'd store agent_id in TaskTokenMetrics + for agent_id in matching_agent_ids: + if agent_id not in agent_token_sums: + agent_token_sums[agent_id] = { + 'total_tokens': 0, + 'prompt_tokens': 0, + 'cached_prompt_tokens': 0, + 'completion_tokens': 0, + 'successful_requests': 0 + } + # Only add to the first matching agent (this is the limitation) + agent_token_sums[agent_id]['total_tokens'] += task_metrics.total_tokens + agent_token_sums[agent_id]['prompt_tokens'] += task_metrics.prompt_tokens + agent_token_sums[agent_id]['cached_prompt_tokens'] += task_metrics.cached_prompt_tokens + agent_token_sums[agent_id]['completion_tokens'] += task_metrics.completion_tokens + agent_token_sums[agent_id]['successful_requests'] += task_metrics.successful_requests + break # Only add to first matching agent - # Create per-agent metrics from the summed task data + # Create per-agent metrics from the summed task data, keyed by agent_id for agent in self.agents: agent_role = getattr(agent, 'role', 'Unknown Agent') agent_id = str(getattr(agent, 'id', '')) - if agent_role in agent_token_sums: + if agent_id in agent_token_sums: # Use accurate per-task summed data - sums = agent_token_sums[agent_role] + sums = agent_token_sums[agent_id] agent_metrics = AgentTokenMetrics( agent_name=agent_role, agent_id=agent_id, @@ -1683,7 +1732,8 @@ class Crew(FlowTrackable, BaseModel): completion_tokens=sums['completion_tokens'], successful_requests=sums['successful_requests'] ) - workflow_metrics.per_agent[agent_role] = agent_metrics + # Key by agent_id to avoid collision for agents with same role + workflow_metrics.per_agent[agent_id] = agent_metrics # Still get total usage for overall metrics if isinstance(agent.llm, BaseLLM): @@ -2074,7 +2124,8 @@ To enable tracing, do any one of these: from crewai.types.usage_metrics import WorkflowTokenMetrics self.workflow_token_metrics = WorkflowTokenMetrics() - task_key = f"{task_tokens.task_name}_{task_tokens.agent_name}" + # Use task_id in the key to prevent collision when multiple tasks have the same name + task_key = f"{task_tokens.task_id}_{task_tokens.task_name}_{task_tokens.agent_name}" self.workflow_token_metrics.per_task[task_key] = task_tokens return task_output