mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-04-09 04:28:16 +00:00
fix: use contextvars for flow execution context
This commit is contained in:
@@ -8,11 +8,13 @@ Example:
|
||||
from crewai.flow import Flow, start, human_feedback
|
||||
from crewai.flow.async_feedback import HumanFeedbackProvider, HumanFeedbackPending
|
||||
|
||||
|
||||
class SlackProvider(HumanFeedbackProvider):
|
||||
def request_feedback(self, context, flow):
|
||||
self.send_slack_notification(context)
|
||||
raise HumanFeedbackPending(context=context)
|
||||
|
||||
|
||||
class MyFlow(Flow):
|
||||
@start()
|
||||
@human_feedback(
|
||||
@@ -26,12 +28,13 @@ Example:
|
||||
```
|
||||
"""
|
||||
|
||||
from crewai.flow.async_feedback.providers import ConsoleProvider
|
||||
from crewai.flow.async_feedback.types import (
|
||||
HumanFeedbackPending,
|
||||
HumanFeedbackProvider,
|
||||
PendingFeedbackContext,
|
||||
)
|
||||
from crewai.flow.async_feedback.providers import ConsoleProvider
|
||||
|
||||
|
||||
__all__ = [
|
||||
"ConsoleProvider",
|
||||
|
||||
@@ -6,10 +6,11 @@ provider that collects feedback via console input.
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from crewai.flow.async_feedback.types import PendingFeedbackContext
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from crewai.flow.flow import Flow
|
||||
|
||||
@@ -27,6 +28,7 @@ class ConsoleProvider:
|
||||
```python
|
||||
from crewai.flow.async_feedback import ConsoleProvider
|
||||
|
||||
|
||||
# Explicitly use console provider
|
||||
@human_feedback(
|
||||
message="Review this:",
|
||||
@@ -49,7 +51,7 @@ class ConsoleProvider:
|
||||
def request_feedback(
|
||||
self,
|
||||
context: PendingFeedbackContext,
|
||||
flow: Flow,
|
||||
flow: Flow[Any],
|
||||
) -> str:
|
||||
"""Request feedback via console input (blocking).
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from crewai.flow.flow import Flow
|
||||
|
||||
@@ -155,7 +156,7 @@ class HumanFeedbackPending(Exception): # noqa: N818 - Not an error, a control f
|
||||
callback_info={
|
||||
"slack_channel": "#reviews",
|
||||
"thread_id": ticket_id,
|
||||
}
|
||||
},
|
||||
)
|
||||
```
|
||||
"""
|
||||
@@ -232,7 +233,7 @@ class HumanFeedbackProvider(Protocol):
|
||||
callback_info={
|
||||
"channel": self.channel,
|
||||
"thread_id": thread_id,
|
||||
}
|
||||
},
|
||||
)
|
||||
```
|
||||
"""
|
||||
@@ -240,7 +241,7 @@ class HumanFeedbackProvider(Protocol):
|
||||
def request_feedback(
|
||||
self,
|
||||
context: PendingFeedbackContext,
|
||||
flow: Flow,
|
||||
flow: Flow[Any],
|
||||
) -> str:
|
||||
"""Request feedback from a human.
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
from typing import Final, Literal
|
||||
|
||||
|
||||
AND_CONDITION: Final[Literal["AND"]] = "AND"
|
||||
OR_CONDITION: Final[Literal["OR"]] = "OR"
|
||||
|
||||
@@ -58,6 +58,7 @@ from crewai.events.types.flow_events import (
|
||||
MethodExecutionStartedEvent,
|
||||
)
|
||||
from crewai.flow.constants import AND_CONDITION, OR_CONDITION
|
||||
from crewai.flow.flow_context import current_flow_id, current_flow_request_id
|
||||
from crewai.flow.flow_wrappers import (
|
||||
FlowCondition,
|
||||
FlowConditions,
|
||||
@@ -1540,6 +1541,13 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
ctx = baggage.set_baggage("flow_input_files", input_files or {}, context=ctx)
|
||||
flow_token = attach(ctx)
|
||||
|
||||
flow_id_token = None
|
||||
request_id_token = None
|
||||
if current_flow_id.get() is None:
|
||||
flow_id_token = current_flow_id.set(self.flow_id)
|
||||
if current_flow_request_id.get() is None:
|
||||
request_id_token = current_flow_request_id.set(self.flow_id)
|
||||
|
||||
try:
|
||||
# Reset flow state for fresh execution unless restoring from persistence
|
||||
is_restoring = inputs and "id" in inputs and self._persistence is not None
|
||||
@@ -1717,6 +1725,10 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
|
||||
return final_output
|
||||
finally:
|
||||
if request_id_token is not None:
|
||||
current_flow_request_id.reset(request_id_token)
|
||||
if flow_id_token is not None:
|
||||
current_flow_id.reset(flow_id_token)
|
||||
detach(flow_token)
|
||||
|
||||
async def akickoff(
|
||||
|
||||
@@ -8,6 +8,7 @@ from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from crewai.flow.async_feedback.types import HumanFeedbackProvider
|
||||
|
||||
|
||||
16
lib/crewai/src/crewai/flow/flow_context.py
Normal file
16
lib/crewai/src/crewai/flow/flow_context.py
Normal file
@@ -0,0 +1,16 @@
|
||||
"""Flow execution context management.
|
||||
|
||||
This module provides context variables for tracking flow execution state across
|
||||
async boundaries and nested function calls.
|
||||
"""
|
||||
|
||||
import contextvars
|
||||
|
||||
|
||||
current_flow_request_id: contextvars.ContextVar[str | None] = contextvars.ContextVar(
|
||||
"flow_request_id", default=None
|
||||
)
|
||||
|
||||
current_flow_id: contextvars.ContextVar[str | None] = contextvars.ContextVar(
|
||||
"flow_id", default=None
|
||||
)
|
||||
@@ -1,46 +1,22 @@
|
||||
import inspect
|
||||
from typing import Any
|
||||
|
||||
from pydantic import BaseModel, Field, InstanceOf, model_validator
|
||||
from pydantic import BaseModel, model_validator
|
||||
from typing_extensions import Self
|
||||
|
||||
from crewai.flow.flow import Flow
|
||||
from crewai.flow.flow_context import current_flow_id, current_flow_request_id
|
||||
|
||||
|
||||
class FlowTrackable(BaseModel):
|
||||
"""Mixin that tracks the Flow instance that instantiated the object, e.g. a
|
||||
Flow instance that created a Crew or Agent.
|
||||
"""Mixin that tracks flow execution context for objects created within flows.
|
||||
|
||||
Automatically finds and stores a reference to the parent Flow instance by
|
||||
inspecting the call stack.
|
||||
When a Crew or Agent is instantiated inside a flow execution, this mixin
|
||||
automatically captures the flow ID and request ID from context variables,
|
||||
enabling proper tracking and association with the parent flow execution.
|
||||
"""
|
||||
|
||||
parent_flow: InstanceOf[Flow[Any]] | None = Field(
|
||||
default=None,
|
||||
description="The parent flow of the instance, if it was created inside a flow.",
|
||||
)
|
||||
|
||||
@model_validator(mode="after")
|
||||
def _set_parent_flow(self) -> Self:
|
||||
max_depth = 8
|
||||
frame = inspect.currentframe()
|
||||
|
||||
try:
|
||||
if frame is None:
|
||||
return self
|
||||
|
||||
frame = frame.f_back
|
||||
for _ in range(max_depth):
|
||||
if frame is None:
|
||||
break
|
||||
|
||||
candidate = frame.f_locals.get("self")
|
||||
if isinstance(candidate, Flow):
|
||||
self.parent_flow = candidate
|
||||
break
|
||||
|
||||
frame = frame.f_back
|
||||
finally:
|
||||
del frame
|
||||
def _set_flow_context(self) -> Self:
|
||||
request_id = current_flow_request_id.get()
|
||||
if request_id:
|
||||
self._request_id = request_id
|
||||
self._flow_id = current_flow_id.get()
|
||||
|
||||
return self
|
||||
|
||||
@@ -11,6 +11,7 @@ Example (synchronous, default):
|
||||
```python
|
||||
from crewai.flow import Flow, start, listen, human_feedback
|
||||
|
||||
|
||||
class ReviewFlow(Flow):
|
||||
@start()
|
||||
@human_feedback(
|
||||
@@ -32,11 +33,13 @@ Example (asynchronous with custom provider):
|
||||
from crewai.flow import Flow, start, human_feedback
|
||||
from crewai.flow.async_feedback import HumanFeedbackProvider, HumanFeedbackPending
|
||||
|
||||
|
||||
class SlackProvider(HumanFeedbackProvider):
|
||||
def request_feedback(self, context, flow):
|
||||
self.send_notification(context)
|
||||
raise HumanFeedbackPending(context=context)
|
||||
|
||||
|
||||
class ReviewFlow(Flow):
|
||||
@start()
|
||||
@human_feedback(
|
||||
@@ -229,6 +232,7 @@ def human_feedback(
|
||||
def review_document(self):
|
||||
return document_content
|
||||
|
||||
|
||||
@listen("approved")
|
||||
def publish(self):
|
||||
print(f"Publishing: {self.last_human_feedback.output}")
|
||||
@@ -265,7 +269,7 @@ def human_feedback(
|
||||
def decorator(func: F) -> F:
|
||||
"""Inner decorator that wraps the function."""
|
||||
|
||||
def _request_feedback(flow_instance: Flow, method_output: Any) -> str:
|
||||
def _request_feedback(flow_instance: Flow[Any], method_output: Any) -> str:
|
||||
"""Request feedback using provider or default console."""
|
||||
from crewai.flow.async_feedback.types import PendingFeedbackContext
|
||||
|
||||
@@ -291,19 +295,16 @@ def human_feedback(
|
||||
effective_provider = flow_config.hitl_provider
|
||||
|
||||
if effective_provider is not None:
|
||||
# Use provider (may raise HumanFeedbackPending for async providers)
|
||||
return effective_provider.request_feedback(context, flow_instance)
|
||||
else:
|
||||
# Use default console input (local development)
|
||||
return flow_instance._request_human_feedback(
|
||||
message=message,
|
||||
output=method_output,
|
||||
metadata=metadata,
|
||||
emit=emit,
|
||||
)
|
||||
return flow_instance._request_human_feedback(
|
||||
message=message,
|
||||
output=method_output,
|
||||
metadata=metadata,
|
||||
emit=emit,
|
||||
)
|
||||
|
||||
def _process_feedback(
|
||||
flow_instance: Flow,
|
||||
flow_instance: Flow[Any],
|
||||
method_output: Any,
|
||||
raw_feedback: str,
|
||||
) -> HumanFeedbackResult | str:
|
||||
@@ -319,12 +320,14 @@ def human_feedback(
|
||||
# No default and no feedback - use first outcome
|
||||
collapsed_outcome = emit[0]
|
||||
elif emit:
|
||||
# Collapse feedback to outcome using LLM
|
||||
collapsed_outcome = flow_instance._collapse_to_outcome(
|
||||
feedback=raw_feedback,
|
||||
outcomes=emit,
|
||||
llm=llm,
|
||||
)
|
||||
if llm is not None:
|
||||
collapsed_outcome = flow_instance._collapse_to_outcome(
|
||||
feedback=raw_feedback,
|
||||
outcomes=emit,
|
||||
llm=llm,
|
||||
)
|
||||
else:
|
||||
collapsed_outcome = emit[0]
|
||||
|
||||
# Create result
|
||||
result = HumanFeedbackResult(
|
||||
@@ -349,7 +352,7 @@ def human_feedback(
|
||||
if asyncio.iscoroutinefunction(func):
|
||||
# Async wrapper
|
||||
@wraps(func)
|
||||
async def async_wrapper(self: Flow, *args: Any, **kwargs: Any) -> Any:
|
||||
async def async_wrapper(self: Flow[Any], *args: Any, **kwargs: Any) -> Any:
|
||||
# Execute the original method
|
||||
method_output = await func(self, *args, **kwargs)
|
||||
|
||||
@@ -363,7 +366,7 @@ def human_feedback(
|
||||
else:
|
||||
# Sync wrapper
|
||||
@wraps(func)
|
||||
def sync_wrapper(self: Flow, *args: Any, **kwargs: Any) -> Any:
|
||||
def sync_wrapper(self: Flow[Any], *args: Any, **kwargs: Any) -> Any:
|
||||
# Execute the original method
|
||||
method_output = func(self, *args, **kwargs)
|
||||
|
||||
@@ -397,11 +400,10 @@ def human_feedback(
|
||||
)
|
||||
wrapper.__is_flow_method__ = True
|
||||
|
||||
# Make it a router if emit specified
|
||||
if emit:
|
||||
wrapper.__is_router__ = True
|
||||
wrapper.__router_paths__ = list(emit)
|
||||
|
||||
return wrapper # type: ignore[return-value]
|
||||
return wrapper # type: ignore[no-any-return]
|
||||
|
||||
return decorator
|
||||
|
||||
@@ -7,6 +7,7 @@ from typing import TYPE_CHECKING, Any
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from crewai.flow.async_feedback.types import PendingFeedbackContext
|
||||
|
||||
@@ -103,4 +104,3 @@ class FlowPersistence(ABC):
|
||||
Args:
|
||||
flow_uuid: Unique identifier for the flow instance
|
||||
"""
|
||||
pass
|
||||
|
||||
@@ -15,6 +15,7 @@ from pydantic import BaseModel
|
||||
from crewai.flow.persistence.base import FlowPersistence
|
||||
from crewai.utilities.paths import db_storage_path
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from crewai.flow.async_feedback.types import PendingFeedbackContext
|
||||
|
||||
@@ -176,7 +177,8 @@ class SQLiteFlowPersistence(FlowPersistence):
|
||||
row = cursor.fetchone()
|
||||
|
||||
if row:
|
||||
return json.loads(row[0])
|
||||
result = json.loads(row[0])
|
||||
return result if isinstance(result, dict) else None
|
||||
return None
|
||||
|
||||
def save_pending_feedback(
|
||||
@@ -196,7 +198,6 @@ class SQLiteFlowPersistence(FlowPersistence):
|
||||
state_data: Current state data
|
||||
"""
|
||||
# Import here to avoid circular imports
|
||||
from crewai.flow.async_feedback.types import PendingFeedbackContext
|
||||
|
||||
# Convert state_data to dict
|
||||
if isinstance(state_data, BaseModel):
|
||||
|
||||
@@ -299,14 +299,16 @@ class TestFlow(Flow):
|
||||
return agent.kickoff("Test query")
|
||||
|
||||
|
||||
def verify_agent_parent_flow(result, agent, flow):
|
||||
"""Verify that both the result and agent have the correct parent flow."""
|
||||
assert result.parent_flow is flow
|
||||
def verify_agent_flow_context(result, agent, flow):
|
||||
"""Verify that both the result and agent have the correct flow context."""
|
||||
assert result._flow_id == flow.flow_id # type: ignore[attr-defined]
|
||||
assert result._request_id == flow.flow_id # type: ignore[attr-defined]
|
||||
assert agent is not None
|
||||
assert agent.parent_flow is flow
|
||||
assert agent._flow_id == flow.flow_id # type: ignore[attr-defined]
|
||||
assert agent._request_id == flow.flow_id # type: ignore[attr-defined]
|
||||
|
||||
|
||||
def test_sets_parent_flow_when_inside_flow():
|
||||
def test_sets_flow_context_when_inside_flow():
|
||||
"""Test that an Agent can be created and executed inside a Flow context."""
|
||||
captured_event = None
|
||||
|
||||
|
||||
@@ -4520,7 +4520,7 @@ def test_crew_copy_with_memory():
|
||||
pytest.fail(f"Copying crew raised an unexpected exception: {e}")
|
||||
|
||||
|
||||
def test_sets_parent_flow_when_using_crewbase_pattern_inside_flow():
|
||||
def test_sets_flow_context_when_using_crewbase_pattern_inside_flow():
|
||||
@CrewBase
|
||||
class TestCrew:
|
||||
agents_config = None
|
||||
@@ -4582,10 +4582,11 @@ def test_sets_parent_flow_when_using_crewbase_pattern_inside_flow():
|
||||
flow.kickoff()
|
||||
|
||||
assert captured_crew is not None
|
||||
assert captured_crew.parent_flow is flow
|
||||
assert captured_crew._flow_id == flow.flow_id # type: ignore[attr-defined]
|
||||
assert captured_crew._request_id == flow.flow_id # type: ignore[attr-defined]
|
||||
|
||||
|
||||
def test_sets_parent_flow_when_outside_flow(researcher, writer):
|
||||
def test_sets_flow_context_when_outside_flow(researcher, writer):
|
||||
crew = Crew(
|
||||
agents=[researcher, writer],
|
||||
process=Process.sequential,
|
||||
@@ -4594,11 +4595,12 @@ def test_sets_parent_flow_when_outside_flow(researcher, writer):
|
||||
Task(description="Task 2", expected_output="output", agent=writer),
|
||||
],
|
||||
)
|
||||
assert crew.parent_flow is None
|
||||
assert not hasattr(crew, "_flow_id")
|
||||
assert not hasattr(crew, "_request_id")
|
||||
|
||||
|
||||
@pytest.mark.vcr()
|
||||
def test_sets_parent_flow_when_inside_flow(researcher, writer):
|
||||
def test_sets_flow_context_when_inside_flow(researcher, writer):
|
||||
class MyFlow(Flow):
|
||||
@start()
|
||||
def start(self):
|
||||
@@ -4615,7 +4617,8 @@ def test_sets_parent_flow_when_inside_flow(researcher, writer):
|
||||
|
||||
flow = MyFlow()
|
||||
result = flow.kickoff()
|
||||
assert result.parent_flow is flow
|
||||
assert result._flow_id == flow.flow_id # type: ignore[attr-defined]
|
||||
assert result._request_id == flow.flow_id # type: ignore[attr-defined]
|
||||
|
||||
|
||||
def test_reset_knowledge_with_no_crew_knowledge(researcher, writer):
|
||||
|
||||
Reference in New Issue
Block a user