From d8e38f2f0b497d92659b4e577b0647ce97acc891 Mon Sep 17 00:00:00 2001 From: danglies007 Date: Thu, 12 Mar 2026 21:33:58 +0200 Subject: [PATCH] fix: propagate ContextVars into async task threads MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit threading.Thread() does not inherit the parent's contextvars.Context, causing ContextVar-based state (OpenTelemetry spans, Langfuse trace IDs, and any other request-scoped vars) to be silently dropped in async tasks. Fix by calling contextvars.copy_context() before spawning each thread and using ctx.run() as the thread target, which runs the function inside the captured context. Affected locations: - task.py: execute_async() — the primary async task execution path - utilities/streaming.py: create_chunk_generator() — streaming execution path Fixes: #4822 Related: #4168, #4286 Co-authored-by: Claude --- lib/crewai/src/crewai/task.py | 6 ++++-- lib/crewai/src/crewai/utilities/streaming.py | 4 +++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/lib/crewai/src/crewai/task.py b/lib/crewai/src/crewai/task.py index cfcb01799..fb0275364 100644 --- a/lib/crewai/src/crewai/task.py +++ b/lib/crewai/src/crewai/task.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import contextvars from concurrent.futures import Future from copy import copy as shallow_copy import datetime @@ -524,10 +525,11 @@ class Task(BaseModel): ) -> Future[TaskOutput]: """Execute the task asynchronously.""" future: Future[TaskOutput] = Future() + ctx = contextvars.copy_context() threading.Thread( daemon=True, - target=self._execute_task_async, - args=(agent, context, tools, future), + target=ctx.run, + args=(self._execute_task_async, agent, context, tools, future), ).start() return future diff --git a/lib/crewai/src/crewai/utilities/streaming.py b/lib/crewai/src/crewai/utilities/streaming.py index 8f43e8ef0..ded67527d 100644 --- a/lib/crewai/src/crewai/utilities/streaming.py +++ b/lib/crewai/src/crewai/utilities/streaming.py @@ -2,6 +2,7 @@ import asyncio from collections.abc import AsyncIterator, Callable, Iterator +import contextvars import queue import threading from typing import Any, NamedTuple @@ -240,7 +241,8 @@ def create_chunk_generator( Yields: StreamChunk objects as they arrive. """ - thread = threading.Thread(target=run_func, daemon=True) + ctx = contextvars.copy_context() + thread = threading.Thread(target=ctx.run, args=(run_func,), daemon=True) thread.start() try: