mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-04-07 19:48:13 +00:00
fix: propagate contextvars.Context in execute_async() thread
Use contextvars.copy_context() to capture the calling thread's context and run the worker thread target via ctx.run() so that ContextVar values (used by OpenTelemetry, Langfuse, and other tracing libraries) are preserved inside async task execution threads. Previously, threading.Thread() was used without copying context, causing all ContextVar values to silently reset to defaults in worker threads. Fixes #4822 Co-Authored-By: João <joao@crewai.com>
This commit is contained in:
@@ -2,6 +2,7 @@ from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from concurrent.futures import Future
|
||||
import contextvars
|
||||
from copy import copy as shallow_copy
|
||||
import datetime
|
||||
from hashlib import md5
|
||||
@@ -524,10 +525,11 @@ class Task(BaseModel):
|
||||
) -> Future[TaskOutput]:
|
||||
"""Execute the task asynchronously."""
|
||||
future: Future[TaskOutput] = Future()
|
||||
ctx = contextvars.copy_context()
|
||||
threading.Thread(
|
||||
daemon=True,
|
||||
target=self._execute_task_async,
|
||||
args=(agent, context, tools, future),
|
||||
target=ctx.run,
|
||||
args=(self._execute_task_async, agent, context, tools, future),
|
||||
).start()
|
||||
return future
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
"""Tests for async task execution."""
|
||||
|
||||
import contextvars
|
||||
import pytest
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
@@ -383,4 +384,139 @@ class TestAsyncTaskOutput:
|
||||
assert result.description == "Test description"
|
||||
assert result.expected_output == "Test expected"
|
||||
assert result.raw == "Test result"
|
||||
assert result.agent == "Test Agent"
|
||||
assert result.agent == "Test Agent"
|
||||
|
||||
|
||||
class TestAsyncContextVarPropagation:
|
||||
"""Tests for ContextVar propagation in threaded async task execution.
|
||||
|
||||
Verifies that execute_async() copies the calling thread's contextvars.Context
|
||||
into the worker thread so that ContextVar values (used by OpenTelemetry,
|
||||
Langfuse, and other tracing libraries) are preserved.
|
||||
|
||||
See: https://github.com/crewAIInc/crewAI/issues/4822
|
||||
"""
|
||||
|
||||
def test_execute_async_preserves_contextvar(self, test_agent: Agent) -> None:
|
||||
"""ContextVar set before execute_async() must be visible inside the worker thread."""
|
||||
test_var: contextvars.ContextVar[str | None] = contextvars.ContextVar(
|
||||
"test_var", default=None
|
||||
)
|
||||
test_var.set("parent_value")
|
||||
|
||||
captured: list[str | None] = []
|
||||
|
||||
original_execute_core = Task._execute_core
|
||||
|
||||
def patched_execute_core(self_task, agent, context, tools):
|
||||
captured.append(test_var.get())
|
||||
return original_execute_core(self_task, agent, context, tools)
|
||||
|
||||
task = Task(
|
||||
description="Test task",
|
||||
expected_output="Test output",
|
||||
agent=test_agent,
|
||||
)
|
||||
|
||||
with patch.object(Task, "_execute_core", patched_execute_core):
|
||||
future = task.execute_async(agent=test_agent)
|
||||
try:
|
||||
future.result(timeout=10)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
assert len(captured) == 1, "patched _execute_core should have been called once"
|
||||
assert captured[0] == "parent_value", (
|
||||
f"ContextVar should be 'parent_value' inside worker thread, got {captured[0]!r}"
|
||||
)
|
||||
|
||||
def test_execute_async_preserves_multiple_contextvars(self, test_agent: Agent) -> None:
|
||||
"""Multiple ContextVars set before execute_async() must all be visible."""
|
||||
var_a: contextvars.ContextVar[str | None] = contextvars.ContextVar(
|
||||
"var_a", default=None
|
||||
)
|
||||
var_b: contextvars.ContextVar[int | None] = contextvars.ContextVar(
|
||||
"var_b", default=None
|
||||
)
|
||||
var_a.set("alpha")
|
||||
var_b.set(42)
|
||||
|
||||
captured_a: list[str | None] = []
|
||||
captured_b: list[int | None] = []
|
||||
|
||||
original_execute_core = Task._execute_core
|
||||
|
||||
def patched_execute_core(self_task, agent, context, tools):
|
||||
captured_a.append(var_a.get())
|
||||
captured_b.append(var_b.get())
|
||||
return original_execute_core(self_task, agent, context, tools)
|
||||
|
||||
task = Task(
|
||||
description="Test task",
|
||||
expected_output="Test output",
|
||||
agent=test_agent,
|
||||
)
|
||||
|
||||
with patch.object(Task, "_execute_core", patched_execute_core):
|
||||
future = task.execute_async(agent=test_agent)
|
||||
try:
|
||||
future.result(timeout=10)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
assert captured_a[0] == "alpha"
|
||||
assert captured_b[0] == 42
|
||||
|
||||
def test_execute_async_context_is_isolated_copy(self, test_agent: Agent) -> None:
|
||||
"""Changes to ContextVar inside the worker thread must not leak back to the parent."""
|
||||
test_var: contextvars.ContextVar[str | None] = contextvars.ContextVar(
|
||||
"test_var", default=None
|
||||
)
|
||||
test_var.set("original")
|
||||
|
||||
original_execute_core = Task._execute_core
|
||||
|
||||
def patched_execute_core(self_task, agent, context, tools):
|
||||
test_var.set("modified_in_worker")
|
||||
return original_execute_core(self_task, agent, context, tools)
|
||||
|
||||
task = Task(
|
||||
description="Test task",
|
||||
expected_output="Test output",
|
||||
agent=test_agent,
|
||||
)
|
||||
|
||||
with patch.object(Task, "_execute_core", patched_execute_core):
|
||||
future = task.execute_async(agent=test_agent)
|
||||
try:
|
||||
future.result(timeout=10)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
assert test_var.get() == "original", (
|
||||
"ContextVar in parent thread should remain 'original' after worker modifies it"
|
||||
)
|
||||
|
||||
def test_execute_async_without_contextvar_still_works(self, test_agent: Agent) -> None:
|
||||
"""execute_async() must still work correctly when no ContextVars are set."""
|
||||
original_execute_core = Task._execute_core
|
||||
called = []
|
||||
|
||||
def patched_execute_core(self_task, agent, context, tools):
|
||||
called.append(True)
|
||||
return original_execute_core(self_task, agent, context, tools)
|
||||
|
||||
task = Task(
|
||||
description="Test task",
|
||||
expected_output="Test output",
|
||||
agent=test_agent,
|
||||
)
|
||||
|
||||
with patch.object(Task, "_execute_core", patched_execute_core):
|
||||
future = task.execute_async(agent=test_agent)
|
||||
try:
|
||||
future.result(timeout=10)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
assert len(called) == 1, "_execute_core should have been called"
|
||||
|
||||
Reference in New Issue
Block a user