From 059cb93aeb017b4e1bec5c60cd1a22e040988f44 Mon Sep 17 00:00:00 2001 From: Tiago Freire Date: Thu, 5 Mar 2026 08:20:09 -0500 Subject: [PATCH] fix(executor): propagate contextvars context to parallel tool call threads ThreadPoolExecutor threads do not inherit the calling thread's contextvars context, causing _event_id_stack and _current_celery_task_id to be empty in worker threads. This broke OTel span parenting for parallel tool calls (missing parent_event_id) and lost the Celery task ID in the enterprise tracking layer ([Task ID: no-task]). Fix by capturing an independent context copy per submission via contextvars.copy_context().run in CrewAgentExecutor._handle_native_tool_calls, so each worker thread starts with the correct inherited context without sharing mutable state across threads. --- lib/crewai/src/crewai/agents/crew_agent_executor.py | 2 ++ lib/crewai/src/crewai/experimental/agent_executor.py | 3 ++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/lib/crewai/src/crewai/agents/crew_agent_executor.py b/lib/crewai/src/crewai/agents/crew_agent_executor.py index ff40489d9..ac1cccbeb 100644 --- a/lib/crewai/src/crewai/agents/crew_agent_executor.py +++ b/lib/crewai/src/crewai/agents/crew_agent_executor.py @@ -8,6 +8,7 @@ from __future__ import annotations import asyncio from collections.abc import Callable +import contextvars from concurrent.futures import ThreadPoolExecutor, as_completed import inspect import logging @@ -755,6 +756,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): with ThreadPoolExecutor(max_workers=max_workers) as pool: futures = { pool.submit( + contextvars.copy_context().run, self._execute_single_native_tool_call, call_id=call_id, func_name=func_name, diff --git a/lib/crewai/src/crewai/experimental/agent_executor.py b/lib/crewai/src/crewai/experimental/agent_executor.py index 4f2a92681..b0662f6c6 100644 --- a/lib/crewai/src/crewai/experimental/agent_executor.py +++ b/lib/crewai/src/crewai/experimental/agent_executor.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import contextvars from collections.abc import Callable, Coroutine from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime @@ -728,7 +729,7 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): max_workers = min(8, len(runnable_tool_calls)) with ThreadPoolExecutor(max_workers=max_workers) as pool: future_to_idx = { - pool.submit(self._execute_single_native_tool_call, tool_call): idx + pool.submit(contextvars.copy_context().run, self._execute_single_native_tool_call, tool_call): idx for idx, tool_call in enumerate(runnable_tool_calls) } ordered_results: list[dict[str, Any] | None] = [None] * len(