mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-04-30 14:52:36 +00:00
fix(executor): propagate contextvars context to parallel tool call threads
ThreadPoolExecutor threads do not inherit the calling thread's contextvars context, causing _event_id_stack and _current_celery_task_id to be empty in worker threads. This broke OTel span parenting for parallel tool calls (missing parent_event_id) and lost the Celery task ID in the enterprise tracking layer ([Task ID: no-task]). Fix by capturing an independent context copy per submission via contextvars.copy_context().run in CrewAgentExecutor._handle_native_tool_calls, so each worker thread starts with the correct inherited context without sharing mutable state across threads.
This commit is contained in:
@@ -8,6 +8,7 @@ from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from collections.abc import Callable
|
||||
import contextvars
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
import inspect
|
||||
import logging
|
||||
@@ -755,6 +756,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
|
||||
with ThreadPoolExecutor(max_workers=max_workers) as pool:
|
||||
futures = {
|
||||
pool.submit(
|
||||
contextvars.copy_context().run,
|
||||
self._execute_single_native_tool_call,
|
||||
call_id=call_id,
|
||||
func_name=func_name,
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import contextvars
|
||||
from collections.abc import Callable, Coroutine
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from datetime import datetime
|
||||
@@ -728,7 +729,7 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
|
||||
max_workers = min(8, len(runnable_tool_calls))
|
||||
with ThreadPoolExecutor(max_workers=max_workers) as pool:
|
||||
future_to_idx = {
|
||||
pool.submit(self._execute_single_native_tool_call, tool_call): idx
|
||||
pool.submit(contextvars.copy_context().run, self._execute_single_native_tool_call, tool_call): idx
|
||||
for idx, tool_call in enumerate(runnable_tool_calls)
|
||||
}
|
||||
ordered_results: list[dict[str, Any] | None] = [None] * len(
|
||||
|
||||
Reference in New Issue
Block a user