Compare commits

...

1 Commits

Author SHA1 Message Date
Devin AI
00e963aeb3 fix: propagate contextvars.Context in execute_async() thread
Use contextvars.copy_context() to capture the calling thread's context
and run the worker thread target via ctx.run() so that ContextVar values
(used by OpenTelemetry, Langfuse, and other tracing libraries) are
preserved inside async task execution threads.

Previously, threading.Thread() was used without copying context, causing
all ContextVar values to silently reset to defaults in worker threads.

Fixes #4822

Co-Authored-By: João <joao@crewai.com>
2026-03-12 06:18:25 +00:00
2 changed files with 141 additions and 3 deletions

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
import asyncio
from concurrent.futures import Future
import contextvars
from copy import copy as shallow_copy
import datetime
from hashlib import md5
@@ -524,10 +525,11 @@ class Task(BaseModel):
) -> Future[TaskOutput]:
"""Execute the task asynchronously."""
future: Future[TaskOutput] = Future()
ctx = contextvars.copy_context()
threading.Thread(
daemon=True,
target=self._execute_task_async,
args=(agent, context, tools, future),
target=ctx.run,
args=(self._execute_task_async, agent, context, tools, future),
).start()
return future

View File

@@ -1,5 +1,6 @@
"""Tests for async task execution."""
import contextvars
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
@@ -383,4 +384,139 @@ class TestAsyncTaskOutput:
assert result.description == "Test description"
assert result.expected_output == "Test expected"
assert result.raw == "Test result"
assert result.agent == "Test Agent"
assert result.agent == "Test Agent"
class TestAsyncContextVarPropagation:
"""Tests for ContextVar propagation in threaded async task execution.
Verifies that execute_async() copies the calling thread's contextvars.Context
into the worker thread so that ContextVar values (used by OpenTelemetry,
Langfuse, and other tracing libraries) are preserved.
See: https://github.com/crewAIInc/crewAI/issues/4822
"""
def test_execute_async_preserves_contextvar(self, test_agent: Agent) -> None:
"""ContextVar set before execute_async() must be visible inside the worker thread."""
test_var: contextvars.ContextVar[str | None] = contextvars.ContextVar(
"test_var", default=None
)
test_var.set("parent_value")
captured: list[str | None] = []
original_execute_core = Task._execute_core
def patched_execute_core(self_task, agent, context, tools):
captured.append(test_var.get())
return original_execute_core(self_task, agent, context, tools)
task = Task(
description="Test task",
expected_output="Test output",
agent=test_agent,
)
with patch.object(Task, "_execute_core", patched_execute_core):
future = task.execute_async(agent=test_agent)
try:
future.result(timeout=10)
except Exception:
pass
assert len(captured) == 1, "patched _execute_core should have been called once"
assert captured[0] == "parent_value", (
f"ContextVar should be 'parent_value' inside worker thread, got {captured[0]!r}"
)
def test_execute_async_preserves_multiple_contextvars(self, test_agent: Agent) -> None:
"""Multiple ContextVars set before execute_async() must all be visible."""
var_a: contextvars.ContextVar[str | None] = contextvars.ContextVar(
"var_a", default=None
)
var_b: contextvars.ContextVar[int | None] = contextvars.ContextVar(
"var_b", default=None
)
var_a.set("alpha")
var_b.set(42)
captured_a: list[str | None] = []
captured_b: list[int | None] = []
original_execute_core = Task._execute_core
def patched_execute_core(self_task, agent, context, tools):
captured_a.append(var_a.get())
captured_b.append(var_b.get())
return original_execute_core(self_task, agent, context, tools)
task = Task(
description="Test task",
expected_output="Test output",
agent=test_agent,
)
with patch.object(Task, "_execute_core", patched_execute_core):
future = task.execute_async(agent=test_agent)
try:
future.result(timeout=10)
except Exception:
pass
assert captured_a[0] == "alpha"
assert captured_b[0] == 42
def test_execute_async_context_is_isolated_copy(self, test_agent: Agent) -> None:
"""Changes to ContextVar inside the worker thread must not leak back to the parent."""
test_var: contextvars.ContextVar[str | None] = contextvars.ContextVar(
"test_var", default=None
)
test_var.set("original")
original_execute_core = Task._execute_core
def patched_execute_core(self_task, agent, context, tools):
test_var.set("modified_in_worker")
return original_execute_core(self_task, agent, context, tools)
task = Task(
description="Test task",
expected_output="Test output",
agent=test_agent,
)
with patch.object(Task, "_execute_core", patched_execute_core):
future = task.execute_async(agent=test_agent)
try:
future.result(timeout=10)
except Exception:
pass
assert test_var.get() == "original", (
"ContextVar in parent thread should remain 'original' after worker modifies it"
)
def test_execute_async_without_contextvar_still_works(self, test_agent: Agent) -> None:
"""execute_async() must still work correctly when no ContextVars are set."""
original_execute_core = Task._execute_core
called = []
def patched_execute_core(self_task, agent, context, tools):
called.append(True)
return original_execute_core(self_task, agent, context, tools)
task = Task(
description="Test task",
expected_output="Test output",
agent=test_agent,
)
with patch.object(Task, "_execute_core", patched_execute_core):
future = task.execute_async(agent=test_agent)
try:
future.result(timeout=10)
except Exception:
pass
assert len(called) == 1, "_execute_core should have been called"