mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-05-01 07:13:00 +00:00
fix: propagate ContextVars into async task threads
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Nightly Canary Release / Check for new commits (push) Has been cancelled
Nightly Canary Release / Build nightly packages (push) Has been cancelled
Nightly Canary Release / Publish nightly to PyPI (push) Has been cancelled
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Nightly Canary Release / Check for new commits (push) Has been cancelled
Nightly Canary Release / Build nightly packages (push) Has been cancelled
Nightly Canary Release / Publish nightly to PyPI (push) Has been cancelled
threading.Thread() does not inherit the parent's contextvars.Context, causing ContextVar-based state (OpenTelemetry spans, Langfuse trace IDs, and any other request-scoped vars) to be silently dropped in async tasks. Fix by calling contextvars.copy_context() before spawning each thread and using ctx.run() as the thread target, which runs the function inside the captured context. Affected locations: - task.py: execute_async() — the primary async task execution path - utilities/streaming.py: create_chunk_generator() — streaming execution path Fixes: #4822 Related: #4168, #4286 Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
@@ -1,6 +1,7 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
|
import contextvars
|
||||||
from concurrent.futures import Future
|
from concurrent.futures import Future
|
||||||
from copy import copy as shallow_copy
|
from copy import copy as shallow_copy
|
||||||
import datetime
|
import datetime
|
||||||
@@ -524,10 +525,11 @@ class Task(BaseModel):
|
|||||||
) -> Future[TaskOutput]:
|
) -> Future[TaskOutput]:
|
||||||
"""Execute the task asynchronously."""
|
"""Execute the task asynchronously."""
|
||||||
future: Future[TaskOutput] = Future()
|
future: Future[TaskOutput] = Future()
|
||||||
|
ctx = contextvars.copy_context()
|
||||||
threading.Thread(
|
threading.Thread(
|
||||||
daemon=True,
|
daemon=True,
|
||||||
target=self._execute_task_async,
|
target=ctx.run,
|
||||||
args=(agent, context, tools, future),
|
args=(self._execute_task_async, agent, context, tools, future),
|
||||||
).start()
|
).start()
|
||||||
return future
|
return future
|
||||||
|
|
||||||
|
|||||||
@@ -2,6 +2,7 @@
|
|||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
from collections.abc import AsyncIterator, Callable, Iterator
|
from collections.abc import AsyncIterator, Callable, Iterator
|
||||||
|
import contextvars
|
||||||
import queue
|
import queue
|
||||||
import threading
|
import threading
|
||||||
from typing import Any, NamedTuple
|
from typing import Any, NamedTuple
|
||||||
@@ -240,7 +241,8 @@ def create_chunk_generator(
|
|||||||
Yields:
|
Yields:
|
||||||
StreamChunk objects as they arrive.
|
StreamChunk objects as they arrive.
|
||||||
"""
|
"""
|
||||||
thread = threading.Thread(target=run_func, daemon=True)
|
ctx = contextvars.copy_context()
|
||||||
|
thread = threading.Thread(target=ctx.run, args=(run_func,), daemon=True)
|
||||||
thread.start()
|
thread.start()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
|||||||
Reference in New Issue
Block a user