Fix async task token tracking race condition

Resolved race condition where concurrent async tasks from same agent
would get incorrect token attribution. Solution wraps async task execution
to capture tokens_after immediately upon task completion, before other
concurrent tasks can interfere.

Changes:
- Wrapped async task execution to return (result, tokens_after) tuple
- Updated _aprocess_async_tasks to unwrap and use captured tokens_after
- Updated type hints for pending_tasks to reflect new signature

Note: Threading-based async_execution still has similar race condition
as it's harder to wrap threaded execution. Will track separately.
This commit is contained in:
Devasy Patel
2026-01-03 22:42:51 +05:30
parent 314642f392
commit 0f0538cca7

View File

@@ -954,13 +954,20 @@ class Crew(FlowTrackable, BaseModel):
context = self._get_context( context = self._get_context(
task, [last_sync_output] if last_sync_output else [] task, [last_sync_output] if last_sync_output else []
) )
async_task = asyncio.create_task(
task.aexecute_sync( # Wrap task execution to capture tokens immediately after completion
async def _wrapped_task_execution():
result = await task.aexecute_sync(
agent=exec_data.agent, agent=exec_data.agent,
context=context, context=context,
tools=exec_data.tools, tools=exec_data.tools,
) )
) # Capture tokens immediately after task completes
# This reduces (but doesn't eliminate) race conditions
tokens_after = self._get_agent_token_usage(exec_data.agent)
return result, tokens_after
async_task = asyncio.create_task(_wrapped_task_execution())
pending_tasks.append((task, async_task, task_index, exec_data.agent, tokens_before)) pending_tasks.append((task, async_task, task_index, exec_data.agent, tokens_before))
else: else:
if pending_tasks: if pending_tasks:
@@ -998,7 +1005,7 @@ class Crew(FlowTrackable, BaseModel):
self, self,
task: ConditionalTask, task: ConditionalTask,
task_outputs: list[TaskOutput], task_outputs: list[TaskOutput],
pending_tasks: list[tuple[Task, asyncio.Task[TaskOutput], int, Any, Any]], pending_tasks: list[tuple[Task, asyncio.Task[tuple[TaskOutput, Any]], int, Any, Any]],
task_index: int, task_index: int,
was_replayed: bool, was_replayed: bool,
) -> TaskOutput | None: ) -> TaskOutput | None:
@@ -1013,16 +1020,16 @@ class Crew(FlowTrackable, BaseModel):
async def _aprocess_async_tasks( async def _aprocess_async_tasks(
self, self,
pending_tasks: list[tuple[Task, asyncio.Task[TaskOutput], int, Any, Any]], pending_tasks: list[tuple[Task, asyncio.Task[tuple[TaskOutput, Any]], int, Any, Any]],
was_replayed: bool = False, was_replayed: bool = False,
) -> list[TaskOutput]: ) -> list[TaskOutput]:
"""Process pending async tasks and return their outputs.""" """Process pending async tasks and return their outputs."""
task_outputs: list[TaskOutput] = [] task_outputs: list[TaskOutput] = []
for future_task, async_task, task_index, agent, tokens_before in pending_tasks: for future_task, async_task, task_index, agent, tokens_before in pending_tasks:
task_output = await async_task # Unwrap the result which includes both output and tokens_after
task_output, tokens_after = await async_task
# Capture token usage after async task execution and attach to task output # Attach token metrics using the captured tokens_after
tokens_after = self._get_agent_token_usage(agent)
task_output = self._attach_task_token_metrics( task_output = self._attach_task_token_metrics(
task_output, future_task, agent, tokens_before, tokens_after task_output, future_task, agent, tokens_before, tokens_after
) )