diff --git a/lib/crewai/src/crewai/task.py b/lib/crewai/src/crewai/task.py index cfcb01799..6977eb638 100644 --- a/lib/crewai/src/crewai/task.py +++ b/lib/crewai/src/crewai/task.py @@ -2,6 +2,7 @@ from __future__ import annotations import asyncio from concurrent.futures import Future +import contextvars from copy import copy as shallow_copy import datetime from hashlib import md5 @@ -524,10 +525,11 @@ class Task(BaseModel): ) -> Future[TaskOutput]: """Execute the task asynchronously.""" future: Future[TaskOutput] = Future() + ctx = contextvars.copy_context() threading.Thread( daemon=True, - target=self._execute_task_async, - args=(agent, context, tools, future), + target=ctx.run, + args=(self._execute_task_async, agent, context, tools, future), ).start() return future diff --git a/lib/crewai/tests/task/test_async_task.py b/lib/crewai/tests/task/test_async_task.py index 70fec377d..fc68af119 100644 --- a/lib/crewai/tests/task/test_async_task.py +++ b/lib/crewai/tests/task/test_async_task.py @@ -1,5 +1,6 @@ """Tests for async task execution.""" +import contextvars import pytest from unittest.mock import AsyncMock, MagicMock, patch @@ -383,4 +384,139 @@ class TestAsyncTaskOutput: assert result.description == "Test description" assert result.expected_output == "Test expected" assert result.raw == "Test result" - assert result.agent == "Test Agent" \ No newline at end of file + assert result.agent == "Test Agent" + + +class TestAsyncContextVarPropagation: + """Tests for ContextVar propagation in threaded async task execution. + + Verifies that execute_async() copies the calling thread's contextvars.Context + into the worker thread so that ContextVar values (used by OpenTelemetry, + Langfuse, and other tracing libraries) are preserved. + + See: https://github.com/crewAIInc/crewAI/issues/4822 + """ + + def test_execute_async_preserves_contextvar(self, test_agent: Agent) -> None: + """ContextVar set before execute_async() must be visible inside the worker thread.""" + test_var: contextvars.ContextVar[str | None] = contextvars.ContextVar( + "test_var", default=None + ) + test_var.set("parent_value") + + captured: list[str | None] = [] + + original_execute_core = Task._execute_core + + def patched_execute_core(self_task, agent, context, tools): + captured.append(test_var.get()) + return original_execute_core(self_task, agent, context, tools) + + task = Task( + description="Test task", + expected_output="Test output", + agent=test_agent, + ) + + with patch.object(Task, "_execute_core", patched_execute_core): + future = task.execute_async(agent=test_agent) + try: + future.result(timeout=10) + except Exception: + pass + + assert len(captured) == 1, "patched _execute_core should have been called once" + assert captured[0] == "parent_value", ( + f"ContextVar should be 'parent_value' inside worker thread, got {captured[0]!r}" + ) + + def test_execute_async_preserves_multiple_contextvars(self, test_agent: Agent) -> None: + """Multiple ContextVars set before execute_async() must all be visible.""" + var_a: contextvars.ContextVar[str | None] = contextvars.ContextVar( + "var_a", default=None + ) + var_b: contextvars.ContextVar[int | None] = contextvars.ContextVar( + "var_b", default=None + ) + var_a.set("alpha") + var_b.set(42) + + captured_a: list[str | None] = [] + captured_b: list[int | None] = [] + + original_execute_core = Task._execute_core + + def patched_execute_core(self_task, agent, context, tools): + captured_a.append(var_a.get()) + captured_b.append(var_b.get()) + return original_execute_core(self_task, agent, context, tools) + + task = Task( + description="Test task", + expected_output="Test output", + agent=test_agent, + ) + + with patch.object(Task, "_execute_core", patched_execute_core): + future = task.execute_async(agent=test_agent) + try: + future.result(timeout=10) + except Exception: + pass + + assert captured_a[0] == "alpha" + assert captured_b[0] == 42 + + def test_execute_async_context_is_isolated_copy(self, test_agent: Agent) -> None: + """Changes to ContextVar inside the worker thread must not leak back to the parent.""" + test_var: contextvars.ContextVar[str | None] = contextvars.ContextVar( + "test_var", default=None + ) + test_var.set("original") + + original_execute_core = Task._execute_core + + def patched_execute_core(self_task, agent, context, tools): + test_var.set("modified_in_worker") + return original_execute_core(self_task, agent, context, tools) + + task = Task( + description="Test task", + expected_output="Test output", + agent=test_agent, + ) + + with patch.object(Task, "_execute_core", patched_execute_core): + future = task.execute_async(agent=test_agent) + try: + future.result(timeout=10) + except Exception: + pass + + assert test_var.get() == "original", ( + "ContextVar in parent thread should remain 'original' after worker modifies it" + ) + + def test_execute_async_without_contextvar_still_works(self, test_agent: Agent) -> None: + """execute_async() must still work correctly when no ContextVars are set.""" + original_execute_core = Task._execute_core + called = [] + + def patched_execute_core(self_task, agent, context, tools): + called.append(True) + return original_execute_core(self_task, agent, context, tools) + + task = Task( + description="Test task", + expected_output="Test output", + agent=test_agent, + ) + + with patch.object(Task, "_execute_core", patched_execute_core): + future = task.execute_async(agent=test_agent) + try: + future.result(timeout=10) + except Exception: + pass + + assert len(called) == 1, "_execute_core should have been called"