From 0f0538cca79ffd45bdff6324b8881db6ad430c5b Mon Sep 17 00:00:00 2001 From: Devasy Patel <110348311+Devasy23@users.noreply.github.com> Date: Sat, 3 Jan 2026 22:42:51 +0530 Subject: [PATCH] Fix async task token tracking race condition Resolved race condition where concurrent async tasks from same agent would get incorrect token attribution. Solution wraps async task execution to capture tokens_after immediately upon task completion, before other concurrent tasks can interfere. Changes: - Wrapped async task execution to return (result, tokens_after) tuple - Updated _aprocess_async_tasks to unwrap and use captured tokens_after - Updated type hints for pending_tasks to reflect new signature Note: Threading-based async_execution still has similar race condition as it's harder to wrap threaded execution. Will track separately. --- lib/crewai/src/crewai/crew.py | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/lib/crewai/src/crewai/crew.py b/lib/crewai/src/crewai/crew.py index 04cf3537a..b4491bf2a 100644 --- a/lib/crewai/src/crewai/crew.py +++ b/lib/crewai/src/crewai/crew.py @@ -954,13 +954,20 @@ class Crew(FlowTrackable, BaseModel): context = self._get_context( task, [last_sync_output] if last_sync_output else [] ) - async_task = asyncio.create_task( - task.aexecute_sync( + + # Wrap task execution to capture tokens immediately after completion + async def _wrapped_task_execution(): + result = await task.aexecute_sync( agent=exec_data.agent, context=context, tools=exec_data.tools, ) - ) + # Capture tokens immediately after task completes + # This reduces (but doesn't eliminate) race conditions + tokens_after = self._get_agent_token_usage(exec_data.agent) + return result, tokens_after + + async_task = asyncio.create_task(_wrapped_task_execution()) pending_tasks.append((task, async_task, task_index, exec_data.agent, tokens_before)) else: if pending_tasks: @@ -998,7 +1005,7 @@ class Crew(FlowTrackable, BaseModel): self, task: ConditionalTask, task_outputs: list[TaskOutput], - pending_tasks: list[tuple[Task, asyncio.Task[TaskOutput], int, Any, Any]], + pending_tasks: list[tuple[Task, asyncio.Task[tuple[TaskOutput, Any]], int, Any, Any]], task_index: int, was_replayed: bool, ) -> TaskOutput | None: @@ -1013,16 +1020,16 @@ class Crew(FlowTrackable, BaseModel): async def _aprocess_async_tasks( self, - pending_tasks: list[tuple[Task, asyncio.Task[TaskOutput], int, Any, Any]], + pending_tasks: list[tuple[Task, asyncio.Task[tuple[TaskOutput, Any]], int, Any, Any]], was_replayed: bool = False, ) -> list[TaskOutput]: """Process pending async tasks and return their outputs.""" task_outputs: list[TaskOutput] = [] for future_task, async_task, task_index, agent, tokens_before in pending_tasks: - task_output = await async_task + # Unwrap the result which includes both output and tokens_after + task_output, tokens_after = await async_task - # Capture token usage after async task execution and attach to task output - tokens_after = self._get_agent_token_usage(agent) + # Attach token metrics using the captured tokens_after task_output = self._attach_task_token_metrics( task_output, future_task, agent, tokens_before, tokens_after )