fix: use contextvars for flow execution context

This commit is contained in:
Greyson LaLonde
2026-02-02 11:24:02 -05:00
committed by GitHub
parent 536447ab0e
commit 9d7f45376a
14 changed files with 96 additions and 76 deletions

View File

@@ -8,11 +8,13 @@ Example:
from crewai.flow import Flow, start, human_feedback from crewai.flow import Flow, start, human_feedback
from crewai.flow.async_feedback import HumanFeedbackProvider, HumanFeedbackPending from crewai.flow.async_feedback import HumanFeedbackProvider, HumanFeedbackPending
class SlackProvider(HumanFeedbackProvider): class SlackProvider(HumanFeedbackProvider):
def request_feedback(self, context, flow): def request_feedback(self, context, flow):
self.send_slack_notification(context) self.send_slack_notification(context)
raise HumanFeedbackPending(context=context) raise HumanFeedbackPending(context=context)
class MyFlow(Flow): class MyFlow(Flow):
@start() @start()
@human_feedback( @human_feedback(
@@ -26,12 +28,13 @@ Example:
``` ```
""" """
from crewai.flow.async_feedback.providers import ConsoleProvider
from crewai.flow.async_feedback.types import ( from crewai.flow.async_feedback.types import (
HumanFeedbackPending, HumanFeedbackPending,
HumanFeedbackProvider, HumanFeedbackProvider,
PendingFeedbackContext, PendingFeedbackContext,
) )
from crewai.flow.async_feedback.providers import ConsoleProvider
__all__ = [ __all__ = [
"ConsoleProvider", "ConsoleProvider",

View File

@@ -6,10 +6,11 @@ provider that collects feedback via console input.
from __future__ import annotations from __future__ import annotations
from typing import TYPE_CHECKING from typing import TYPE_CHECKING, Any
from crewai.flow.async_feedback.types import PendingFeedbackContext from crewai.flow.async_feedback.types import PendingFeedbackContext
if TYPE_CHECKING: if TYPE_CHECKING:
from crewai.flow.flow import Flow from crewai.flow.flow import Flow
@@ -27,6 +28,7 @@ class ConsoleProvider:
```python ```python
from crewai.flow.async_feedback import ConsoleProvider from crewai.flow.async_feedback import ConsoleProvider
# Explicitly use console provider # Explicitly use console provider
@human_feedback( @human_feedback(
message="Review this:", message="Review this:",
@@ -49,7 +51,7 @@ class ConsoleProvider:
def request_feedback( def request_feedback(
self, self,
context: PendingFeedbackContext, context: PendingFeedbackContext,
flow: Flow, flow: Flow[Any],
) -> str: ) -> str:
"""Request feedback via console input (blocking). """Request feedback via console input (blocking).

View File

@@ -10,6 +10,7 @@ from dataclasses import dataclass, field
from datetime import datetime from datetime import datetime
from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable
if TYPE_CHECKING: if TYPE_CHECKING:
from crewai.flow.flow import Flow from crewai.flow.flow import Flow
@@ -155,7 +156,7 @@ class HumanFeedbackPending(Exception): # noqa: N818 - Not an error, a control f
callback_info={ callback_info={
"slack_channel": "#reviews", "slack_channel": "#reviews",
"thread_id": ticket_id, "thread_id": ticket_id,
} },
) )
``` ```
""" """
@@ -232,7 +233,7 @@ class HumanFeedbackProvider(Protocol):
callback_info={ callback_info={
"channel": self.channel, "channel": self.channel,
"thread_id": thread_id, "thread_id": thread_id,
} },
) )
``` ```
""" """
@@ -240,7 +241,7 @@ class HumanFeedbackProvider(Protocol):
def request_feedback( def request_feedback(
self, self,
context: PendingFeedbackContext, context: PendingFeedbackContext,
flow: Flow, flow: Flow[Any],
) -> str: ) -> str:
"""Request feedback from a human. """Request feedback from a human.

View File

@@ -1,4 +1,5 @@
from typing import Final, Literal from typing import Final, Literal
AND_CONDITION: Final[Literal["AND"]] = "AND" AND_CONDITION: Final[Literal["AND"]] = "AND"
OR_CONDITION: Final[Literal["OR"]] = "OR" OR_CONDITION: Final[Literal["OR"]] = "OR"

View File

@@ -58,6 +58,7 @@ from crewai.events.types.flow_events import (
MethodExecutionStartedEvent, MethodExecutionStartedEvent,
) )
from crewai.flow.constants import AND_CONDITION, OR_CONDITION from crewai.flow.constants import AND_CONDITION, OR_CONDITION
from crewai.flow.flow_context import current_flow_id, current_flow_request_id
from crewai.flow.flow_wrappers import ( from crewai.flow.flow_wrappers import (
FlowCondition, FlowCondition,
FlowConditions, FlowConditions,
@@ -1540,6 +1541,13 @@ class Flow(Generic[T], metaclass=FlowMeta):
ctx = baggage.set_baggage("flow_input_files", input_files or {}, context=ctx) ctx = baggage.set_baggage("flow_input_files", input_files or {}, context=ctx)
flow_token = attach(ctx) flow_token = attach(ctx)
flow_id_token = None
request_id_token = None
if current_flow_id.get() is None:
flow_id_token = current_flow_id.set(self.flow_id)
if current_flow_request_id.get() is None:
request_id_token = current_flow_request_id.set(self.flow_id)
try: try:
# Reset flow state for fresh execution unless restoring from persistence # Reset flow state for fresh execution unless restoring from persistence
is_restoring = inputs and "id" in inputs and self._persistence is not None is_restoring = inputs and "id" in inputs and self._persistence is not None
@@ -1717,6 +1725,10 @@ class Flow(Generic[T], metaclass=FlowMeta):
return final_output return final_output
finally: finally:
if request_id_token is not None:
current_flow_request_id.reset(request_id_token)
if flow_id_token is not None:
current_flow_id.reset(flow_id_token)
detach(flow_token) detach(flow_token)
async def akickoff( async def akickoff(

View File

@@ -8,6 +8,7 @@ from __future__ import annotations
from typing import TYPE_CHECKING, Any from typing import TYPE_CHECKING, Any
if TYPE_CHECKING: if TYPE_CHECKING:
from crewai.flow.async_feedback.types import HumanFeedbackProvider from crewai.flow.async_feedback.types import HumanFeedbackProvider

View File

@@ -0,0 +1,16 @@
"""Flow execution context management.
This module provides context variables for tracking flow execution state across
async boundaries and nested function calls.
"""
import contextvars
current_flow_request_id: contextvars.ContextVar[str | None] = contextvars.ContextVar(
"flow_request_id", default=None
)
current_flow_id: contextvars.ContextVar[str | None] = contextvars.ContextVar(
"flow_id", default=None
)

View File

@@ -1,46 +1,22 @@
import inspect from pydantic import BaseModel, model_validator
from typing import Any
from pydantic import BaseModel, Field, InstanceOf, model_validator
from typing_extensions import Self from typing_extensions import Self
from crewai.flow.flow import Flow from crewai.flow.flow_context import current_flow_id, current_flow_request_id
class FlowTrackable(BaseModel): class FlowTrackable(BaseModel):
"""Mixin that tracks the Flow instance that instantiated the object, e.g. a """Mixin that tracks flow execution context for objects created within flows.
Flow instance that created a Crew or Agent.
Automatically finds and stores a reference to the parent Flow instance by When a Crew or Agent is instantiated inside a flow execution, this mixin
inspecting the call stack. automatically captures the flow ID and request ID from context variables,
enabling proper tracking and association with the parent flow execution.
""" """
parent_flow: InstanceOf[Flow[Any]] | None = Field(
default=None,
description="The parent flow of the instance, if it was created inside a flow.",
)
@model_validator(mode="after") @model_validator(mode="after")
def _set_parent_flow(self) -> Self: def _set_flow_context(self) -> Self:
max_depth = 8 request_id = current_flow_request_id.get()
frame = inspect.currentframe() if request_id:
self._request_id = request_id
try: self._flow_id = current_flow_id.get()
if frame is None:
return self
frame = frame.f_back
for _ in range(max_depth):
if frame is None:
break
candidate = frame.f_locals.get("self")
if isinstance(candidate, Flow):
self.parent_flow = candidate
break
frame = frame.f_back
finally:
del frame
return self return self

View File

@@ -11,6 +11,7 @@ Example (synchronous, default):
```python ```python
from crewai.flow import Flow, start, listen, human_feedback from crewai.flow import Flow, start, listen, human_feedback
class ReviewFlow(Flow): class ReviewFlow(Flow):
@start() @start()
@human_feedback( @human_feedback(
@@ -32,11 +33,13 @@ Example (asynchronous with custom provider):
from crewai.flow import Flow, start, human_feedback from crewai.flow import Flow, start, human_feedback
from crewai.flow.async_feedback import HumanFeedbackProvider, HumanFeedbackPending from crewai.flow.async_feedback import HumanFeedbackProvider, HumanFeedbackPending
class SlackProvider(HumanFeedbackProvider): class SlackProvider(HumanFeedbackProvider):
def request_feedback(self, context, flow): def request_feedback(self, context, flow):
self.send_notification(context) self.send_notification(context)
raise HumanFeedbackPending(context=context) raise HumanFeedbackPending(context=context)
class ReviewFlow(Flow): class ReviewFlow(Flow):
@start() @start()
@human_feedback( @human_feedback(
@@ -229,6 +232,7 @@ def human_feedback(
def review_document(self): def review_document(self):
return document_content return document_content
@listen("approved") @listen("approved")
def publish(self): def publish(self):
print(f"Publishing: {self.last_human_feedback.output}") print(f"Publishing: {self.last_human_feedback.output}")
@@ -265,7 +269,7 @@ def human_feedback(
def decorator(func: F) -> F: def decorator(func: F) -> F:
"""Inner decorator that wraps the function.""" """Inner decorator that wraps the function."""
def _request_feedback(flow_instance: Flow, method_output: Any) -> str: def _request_feedback(flow_instance: Flow[Any], method_output: Any) -> str:
"""Request feedback using provider or default console.""" """Request feedback using provider or default console."""
from crewai.flow.async_feedback.types import PendingFeedbackContext from crewai.flow.async_feedback.types import PendingFeedbackContext
@@ -291,19 +295,16 @@ def human_feedback(
effective_provider = flow_config.hitl_provider effective_provider = flow_config.hitl_provider
if effective_provider is not None: if effective_provider is not None:
# Use provider (may raise HumanFeedbackPending for async providers)
return effective_provider.request_feedback(context, flow_instance) return effective_provider.request_feedback(context, flow_instance)
else: return flow_instance._request_human_feedback(
# Use default console input (local development) message=message,
return flow_instance._request_human_feedback( output=method_output,
message=message, metadata=metadata,
output=method_output, emit=emit,
metadata=metadata, )
emit=emit,
)
def _process_feedback( def _process_feedback(
flow_instance: Flow, flow_instance: Flow[Any],
method_output: Any, method_output: Any,
raw_feedback: str, raw_feedback: str,
) -> HumanFeedbackResult | str: ) -> HumanFeedbackResult | str:
@@ -319,12 +320,14 @@ def human_feedback(
# No default and no feedback - use first outcome # No default and no feedback - use first outcome
collapsed_outcome = emit[0] collapsed_outcome = emit[0]
elif emit: elif emit:
# Collapse feedback to outcome using LLM if llm is not None:
collapsed_outcome = flow_instance._collapse_to_outcome( collapsed_outcome = flow_instance._collapse_to_outcome(
feedback=raw_feedback, feedback=raw_feedback,
outcomes=emit, outcomes=emit,
llm=llm, llm=llm,
) )
else:
collapsed_outcome = emit[0]
# Create result # Create result
result = HumanFeedbackResult( result = HumanFeedbackResult(
@@ -349,7 +352,7 @@ def human_feedback(
if asyncio.iscoroutinefunction(func): if asyncio.iscoroutinefunction(func):
# Async wrapper # Async wrapper
@wraps(func) @wraps(func)
async def async_wrapper(self: Flow, *args: Any, **kwargs: Any) -> Any: async def async_wrapper(self: Flow[Any], *args: Any, **kwargs: Any) -> Any:
# Execute the original method # Execute the original method
method_output = await func(self, *args, **kwargs) method_output = await func(self, *args, **kwargs)
@@ -363,7 +366,7 @@ def human_feedback(
else: else:
# Sync wrapper # Sync wrapper
@wraps(func) @wraps(func)
def sync_wrapper(self: Flow, *args: Any, **kwargs: Any) -> Any: def sync_wrapper(self: Flow[Any], *args: Any, **kwargs: Any) -> Any:
# Execute the original method # Execute the original method
method_output = func(self, *args, **kwargs) method_output = func(self, *args, **kwargs)
@@ -397,11 +400,10 @@ def human_feedback(
) )
wrapper.__is_flow_method__ = True wrapper.__is_flow_method__ = True
# Make it a router if emit specified
if emit: if emit:
wrapper.__is_router__ = True wrapper.__is_router__ = True
wrapper.__router_paths__ = list(emit) wrapper.__router_paths__ = list(emit)
return wrapper # type: ignore[return-value] return wrapper # type: ignore[no-any-return]
return decorator return decorator

View File

@@ -7,6 +7,7 @@ from typing import TYPE_CHECKING, Any
from pydantic import BaseModel from pydantic import BaseModel
if TYPE_CHECKING: if TYPE_CHECKING:
from crewai.flow.async_feedback.types import PendingFeedbackContext from crewai.flow.async_feedback.types import PendingFeedbackContext
@@ -103,4 +104,3 @@ class FlowPersistence(ABC):
Args: Args:
flow_uuid: Unique identifier for the flow instance flow_uuid: Unique identifier for the flow instance
""" """
pass

View File

@@ -15,6 +15,7 @@ from pydantic import BaseModel
from crewai.flow.persistence.base import FlowPersistence from crewai.flow.persistence.base import FlowPersistence
from crewai.utilities.paths import db_storage_path from crewai.utilities.paths import db_storage_path
if TYPE_CHECKING: if TYPE_CHECKING:
from crewai.flow.async_feedback.types import PendingFeedbackContext from crewai.flow.async_feedback.types import PendingFeedbackContext
@@ -176,7 +177,8 @@ class SQLiteFlowPersistence(FlowPersistence):
row = cursor.fetchone() row = cursor.fetchone()
if row: if row:
return json.loads(row[0]) result = json.loads(row[0])
return result if isinstance(result, dict) else None
return None return None
def save_pending_feedback( def save_pending_feedback(
@@ -196,7 +198,6 @@ class SQLiteFlowPersistence(FlowPersistence):
state_data: Current state data state_data: Current state data
""" """
# Import here to avoid circular imports # Import here to avoid circular imports
from crewai.flow.async_feedback.types import PendingFeedbackContext
# Convert state_data to dict # Convert state_data to dict
if isinstance(state_data, BaseModel): if isinstance(state_data, BaseModel):

View File

@@ -299,14 +299,16 @@ class TestFlow(Flow):
return agent.kickoff("Test query") return agent.kickoff("Test query")
def verify_agent_parent_flow(result, agent, flow): def verify_agent_flow_context(result, agent, flow):
"""Verify that both the result and agent have the correct parent flow.""" """Verify that both the result and agent have the correct flow context."""
assert result.parent_flow is flow assert result._flow_id == flow.flow_id # type: ignore[attr-defined]
assert result._request_id == flow.flow_id # type: ignore[attr-defined]
assert agent is not None assert agent is not None
assert agent.parent_flow is flow assert agent._flow_id == flow.flow_id # type: ignore[attr-defined]
assert agent._request_id == flow.flow_id # type: ignore[attr-defined]
def test_sets_parent_flow_when_inside_flow(): def test_sets_flow_context_when_inside_flow():
"""Test that an Agent can be created and executed inside a Flow context.""" """Test that an Agent can be created and executed inside a Flow context."""
captured_event = None captured_event = None

View File

@@ -4520,7 +4520,7 @@ def test_crew_copy_with_memory():
pytest.fail(f"Copying crew raised an unexpected exception: {e}") pytest.fail(f"Copying crew raised an unexpected exception: {e}")
def test_sets_parent_flow_when_using_crewbase_pattern_inside_flow(): def test_sets_flow_context_when_using_crewbase_pattern_inside_flow():
@CrewBase @CrewBase
class TestCrew: class TestCrew:
agents_config = None agents_config = None
@@ -4582,10 +4582,11 @@ def test_sets_parent_flow_when_using_crewbase_pattern_inside_flow():
flow.kickoff() flow.kickoff()
assert captured_crew is not None assert captured_crew is not None
assert captured_crew.parent_flow is flow assert captured_crew._flow_id == flow.flow_id # type: ignore[attr-defined]
assert captured_crew._request_id == flow.flow_id # type: ignore[attr-defined]
def test_sets_parent_flow_when_outside_flow(researcher, writer): def test_sets_flow_context_when_outside_flow(researcher, writer):
crew = Crew( crew = Crew(
agents=[researcher, writer], agents=[researcher, writer],
process=Process.sequential, process=Process.sequential,
@@ -4594,11 +4595,12 @@ def test_sets_parent_flow_when_outside_flow(researcher, writer):
Task(description="Task 2", expected_output="output", agent=writer), Task(description="Task 2", expected_output="output", agent=writer),
], ],
) )
assert crew.parent_flow is None assert not hasattr(crew, "_flow_id")
assert not hasattr(crew, "_request_id")
@pytest.mark.vcr() @pytest.mark.vcr()
def test_sets_parent_flow_when_inside_flow(researcher, writer): def test_sets_flow_context_when_inside_flow(researcher, writer):
class MyFlow(Flow): class MyFlow(Flow):
@start() @start()
def start(self): def start(self):
@@ -4615,7 +4617,8 @@ def test_sets_parent_flow_when_inside_flow(researcher, writer):
flow = MyFlow() flow = MyFlow()
result = flow.kickoff() result = flow.kickoff()
assert result.parent_flow is flow assert result._flow_id == flow.flow_id # type: ignore[attr-defined]
assert result._request_id == flow.flow_id # type: ignore[attr-defined]
def test_reset_knowledge_with_no_crew_knowledge(researcher, writer): def test_reset_knowledge_with_no_crew_knowledge(researcher, writer):