From 00e963aeb34eb96b202c19cbc932cacf326e4638 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 12 Mar 2026 06:18:25 +0000 Subject: [PATCH] fix: propagate contextvars.Context in execute_async() thread MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Use contextvars.copy_context() to capture the calling thread's context and run the worker thread target via ctx.run() so that ContextVar values (used by OpenTelemetry, Langfuse, and other tracing libraries) are preserved inside async task execution threads. Previously, threading.Thread() was used without copying context, causing all ContextVar values to silently reset to defaults in worker threads. Fixes #4822 Co-Authored-By: João --- lib/crewai/src/crewai/task.py | 6 +- lib/crewai/tests/task/test_async_task.py | 138 ++++++++++++++++++++++- 2 files changed, 141 insertions(+), 3 deletions(-) diff --git a/lib/crewai/src/crewai/task.py b/lib/crewai/src/crewai/task.py index cfcb01799..6977eb638 100644 --- a/lib/crewai/src/crewai/task.py +++ b/lib/crewai/src/crewai/task.py @@ -2,6 +2,7 @@ from __future__ import annotations import asyncio from concurrent.futures import Future +import contextvars from copy import copy as shallow_copy import datetime from hashlib import md5 @@ -524,10 +525,11 @@ class Task(BaseModel): ) -> Future[TaskOutput]: """Execute the task asynchronously.""" future: Future[TaskOutput] = Future() + ctx = contextvars.copy_context() threading.Thread( daemon=True, - target=self._execute_task_async, - args=(agent, context, tools, future), + target=ctx.run, + args=(self._execute_task_async, agent, context, tools, future), ).start() return future diff --git a/lib/crewai/tests/task/test_async_task.py b/lib/crewai/tests/task/test_async_task.py index 70fec377d..fc68af119 100644 --- a/lib/crewai/tests/task/test_async_task.py +++ b/lib/crewai/tests/task/test_async_task.py @@ -1,5 +1,6 @@ """Tests for async task execution.""" +import contextvars import pytest from unittest.mock import AsyncMock, MagicMock, patch @@ -383,4 +384,139 @@ class TestAsyncTaskOutput: assert result.description == "Test description" assert result.expected_output == "Test expected" assert result.raw == "Test result" - assert result.agent == "Test Agent" \ No newline at end of file + assert result.agent == "Test Agent" + + +class TestAsyncContextVarPropagation: + """Tests for ContextVar propagation in threaded async task execution. + + Verifies that execute_async() copies the calling thread's contextvars.Context + into the worker thread so that ContextVar values (used by OpenTelemetry, + Langfuse, and other tracing libraries) are preserved. + + See: https://github.com/crewAIInc/crewAI/issues/4822 + """ + + def test_execute_async_preserves_contextvar(self, test_agent: Agent) -> None: + """ContextVar set before execute_async() must be visible inside the worker thread.""" + test_var: contextvars.ContextVar[str | None] = contextvars.ContextVar( + "test_var", default=None + ) + test_var.set("parent_value") + + captured: list[str | None] = [] + + original_execute_core = Task._execute_core + + def patched_execute_core(self_task, agent, context, tools): + captured.append(test_var.get()) + return original_execute_core(self_task, agent, context, tools) + + task = Task( + description="Test task", + expected_output="Test output", + agent=test_agent, + ) + + with patch.object(Task, "_execute_core", patched_execute_core): + future = task.execute_async(agent=test_agent) + try: + future.result(timeout=10) + except Exception: + pass + + assert len(captured) == 1, "patched _execute_core should have been called once" + assert captured[0] == "parent_value", ( + f"ContextVar should be 'parent_value' inside worker thread, got {captured[0]!r}" + ) + + def test_execute_async_preserves_multiple_contextvars(self, test_agent: Agent) -> None: + """Multiple ContextVars set before execute_async() must all be visible.""" + var_a: contextvars.ContextVar[str | None] = contextvars.ContextVar( + "var_a", default=None + ) + var_b: contextvars.ContextVar[int | None] = contextvars.ContextVar( + "var_b", default=None + ) + var_a.set("alpha") + var_b.set(42) + + captured_a: list[str | None] = [] + captured_b: list[int | None] = [] + + original_execute_core = Task._execute_core + + def patched_execute_core(self_task, agent, context, tools): + captured_a.append(var_a.get()) + captured_b.append(var_b.get()) + return original_execute_core(self_task, agent, context, tools) + + task = Task( + description="Test task", + expected_output="Test output", + agent=test_agent, + ) + + with patch.object(Task, "_execute_core", patched_execute_core): + future = task.execute_async(agent=test_agent) + try: + future.result(timeout=10) + except Exception: + pass + + assert captured_a[0] == "alpha" + assert captured_b[0] == 42 + + def test_execute_async_context_is_isolated_copy(self, test_agent: Agent) -> None: + """Changes to ContextVar inside the worker thread must not leak back to the parent.""" + test_var: contextvars.ContextVar[str | None] = contextvars.ContextVar( + "test_var", default=None + ) + test_var.set("original") + + original_execute_core = Task._execute_core + + def patched_execute_core(self_task, agent, context, tools): + test_var.set("modified_in_worker") + return original_execute_core(self_task, agent, context, tools) + + task = Task( + description="Test task", + expected_output="Test output", + agent=test_agent, + ) + + with patch.object(Task, "_execute_core", patched_execute_core): + future = task.execute_async(agent=test_agent) + try: + future.result(timeout=10) + except Exception: + pass + + assert test_var.get() == "original", ( + "ContextVar in parent thread should remain 'original' after worker modifies it" + ) + + def test_execute_async_without_contextvar_still_works(self, test_agent: Agent) -> None: + """execute_async() must still work correctly when no ContextVars are set.""" + original_execute_core = Task._execute_core + called = [] + + def patched_execute_core(self_task, agent, context, tools): + called.append(True) + return original_execute_core(self_task, agent, context, tools) + + task = Task( + description="Test task", + expected_output="Test output", + agent=test_agent, + ) + + with patch.object(Task, "_execute_core", patched_execute_core): + future = task.execute_async(agent=test_agent) + try: + future.result(timeout=10) + except Exception: + pass + + assert len(called) == 1, "_execute_core should have been called"