diff --git a/lib/crewai/src/crewai/flow/async_feedback/__init__.py b/lib/crewai/src/crewai/flow/async_feedback/__init__.py index 612a54657..286fdaa8d 100644 --- a/lib/crewai/src/crewai/flow/async_feedback/__init__.py +++ b/lib/crewai/src/crewai/flow/async_feedback/__init__.py @@ -8,11 +8,13 @@ Example: from crewai.flow import Flow, start, human_feedback from crewai.flow.async_feedback import HumanFeedbackProvider, HumanFeedbackPending + class SlackProvider(HumanFeedbackProvider): def request_feedback(self, context, flow): self.send_slack_notification(context) raise HumanFeedbackPending(context=context) + class MyFlow(Flow): @start() @human_feedback( @@ -26,12 +28,13 @@ Example: ``` """ +from crewai.flow.async_feedback.providers import ConsoleProvider from crewai.flow.async_feedback.types import ( HumanFeedbackPending, HumanFeedbackProvider, PendingFeedbackContext, ) -from crewai.flow.async_feedback.providers import ConsoleProvider + __all__ = [ "ConsoleProvider", diff --git a/lib/crewai/src/crewai/flow/async_feedback/providers.py b/lib/crewai/src/crewai/flow/async_feedback/providers.py index 19207c8ef..e86c0a747 100644 --- a/lib/crewai/src/crewai/flow/async_feedback/providers.py +++ b/lib/crewai/src/crewai/flow/async_feedback/providers.py @@ -6,10 +6,11 @@ provider that collects feedback via console input. from __future__ import annotations -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any from crewai.flow.async_feedback.types import PendingFeedbackContext + if TYPE_CHECKING: from crewai.flow.flow import Flow @@ -27,6 +28,7 @@ class ConsoleProvider: ```python from crewai.flow.async_feedback import ConsoleProvider + # Explicitly use console provider @human_feedback( message="Review this:", @@ -49,7 +51,7 @@ class ConsoleProvider: def request_feedback( self, context: PendingFeedbackContext, - flow: Flow, + flow: Flow[Any], ) -> str: """Request feedback via console input (blocking). diff --git a/lib/crewai/src/crewai/flow/async_feedback/types.py b/lib/crewai/src/crewai/flow/async_feedback/types.py index dc6cd91f7..1d4da47ae 100644 --- a/lib/crewai/src/crewai/flow/async_feedback/types.py +++ b/lib/crewai/src/crewai/flow/async_feedback/types.py @@ -10,6 +10,7 @@ from dataclasses import dataclass, field from datetime import datetime from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable + if TYPE_CHECKING: from crewai.flow.flow import Flow @@ -155,7 +156,7 @@ class HumanFeedbackPending(Exception): # noqa: N818 - Not an error, a control f callback_info={ "slack_channel": "#reviews", "thread_id": ticket_id, - } + }, ) ``` """ @@ -232,7 +233,7 @@ class HumanFeedbackProvider(Protocol): callback_info={ "channel": self.channel, "thread_id": thread_id, - } + }, ) ``` """ @@ -240,7 +241,7 @@ class HumanFeedbackProvider(Protocol): def request_feedback( self, context: PendingFeedbackContext, - flow: Flow, + flow: Flow[Any], ) -> str: """Request feedback from a human. diff --git a/lib/crewai/src/crewai/flow/constants.py b/lib/crewai/src/crewai/flow/constants.py index c8720d529..e5711c0d0 100644 --- a/lib/crewai/src/crewai/flow/constants.py +++ b/lib/crewai/src/crewai/flow/constants.py @@ -1,4 +1,5 @@ from typing import Final, Literal + AND_CONDITION: Final[Literal["AND"]] = "AND" OR_CONDITION: Final[Literal["OR"]] = "OR" diff --git a/lib/crewai/src/crewai/flow/flow.py b/lib/crewai/src/crewai/flow/flow.py index b92d10d2d..1b4665620 100644 --- a/lib/crewai/src/crewai/flow/flow.py +++ b/lib/crewai/src/crewai/flow/flow.py @@ -58,6 +58,7 @@ from crewai.events.types.flow_events import ( MethodExecutionStartedEvent, ) from crewai.flow.constants import AND_CONDITION, OR_CONDITION +from crewai.flow.flow_context import current_flow_id, current_flow_request_id from crewai.flow.flow_wrappers import ( FlowCondition, FlowConditions, @@ -1540,6 +1541,13 @@ class Flow(Generic[T], metaclass=FlowMeta): ctx = baggage.set_baggage("flow_input_files", input_files or {}, context=ctx) flow_token = attach(ctx) + flow_id_token = None + request_id_token = None + if current_flow_id.get() is None: + flow_id_token = current_flow_id.set(self.flow_id) + if current_flow_request_id.get() is None: + request_id_token = current_flow_request_id.set(self.flow_id) + try: # Reset flow state for fresh execution unless restoring from persistence is_restoring = inputs and "id" in inputs and self._persistence is not None @@ -1717,6 +1725,10 @@ class Flow(Generic[T], metaclass=FlowMeta): return final_output finally: + if request_id_token is not None: + current_flow_request_id.reset(request_id_token) + if flow_id_token is not None: + current_flow_id.reset(flow_id_token) detach(flow_token) async def akickoff( diff --git a/lib/crewai/src/crewai/flow/flow_config.py b/lib/crewai/src/crewai/flow/flow_config.py index 308210dc9..8684cc3cf 100644 --- a/lib/crewai/src/crewai/flow/flow_config.py +++ b/lib/crewai/src/crewai/flow/flow_config.py @@ -8,6 +8,7 @@ from __future__ import annotations from typing import TYPE_CHECKING, Any + if TYPE_CHECKING: from crewai.flow.async_feedback.types import HumanFeedbackProvider diff --git a/lib/crewai/src/crewai/flow/flow_context.py b/lib/crewai/src/crewai/flow/flow_context.py new file mode 100644 index 000000000..ae9bd69f9 --- /dev/null +++ b/lib/crewai/src/crewai/flow/flow_context.py @@ -0,0 +1,16 @@ +"""Flow execution context management. + +This module provides context variables for tracking flow execution state across +async boundaries and nested function calls. +""" + +import contextvars + + +current_flow_request_id: contextvars.ContextVar[str | None] = contextvars.ContextVar( + "flow_request_id", default=None +) + +current_flow_id: contextvars.ContextVar[str | None] = contextvars.ContextVar( + "flow_id", default=None +) diff --git a/lib/crewai/src/crewai/flow/flow_trackable.py b/lib/crewai/src/crewai/flow/flow_trackable.py index 974895bc1..e247f5c52 100644 --- a/lib/crewai/src/crewai/flow/flow_trackable.py +++ b/lib/crewai/src/crewai/flow/flow_trackable.py @@ -1,46 +1,22 @@ -import inspect -from typing import Any - -from pydantic import BaseModel, Field, InstanceOf, model_validator +from pydantic import BaseModel, model_validator from typing_extensions import Self -from crewai.flow.flow import Flow +from crewai.flow.flow_context import current_flow_id, current_flow_request_id class FlowTrackable(BaseModel): - """Mixin that tracks the Flow instance that instantiated the object, e.g. a - Flow instance that created a Crew or Agent. + """Mixin that tracks flow execution context for objects created within flows. - Automatically finds and stores a reference to the parent Flow instance by - inspecting the call stack. + When a Crew or Agent is instantiated inside a flow execution, this mixin + automatically captures the flow ID and request ID from context variables, + enabling proper tracking and association with the parent flow execution. """ - parent_flow: InstanceOf[Flow[Any]] | None = Field( - default=None, - description="The parent flow of the instance, if it was created inside a flow.", - ) - @model_validator(mode="after") - def _set_parent_flow(self) -> Self: - max_depth = 8 - frame = inspect.currentframe() - - try: - if frame is None: - return self - - frame = frame.f_back - for _ in range(max_depth): - if frame is None: - break - - candidate = frame.f_locals.get("self") - if isinstance(candidate, Flow): - self.parent_flow = candidate - break - - frame = frame.f_back - finally: - del frame + def _set_flow_context(self) -> Self: + request_id = current_flow_request_id.get() + if request_id: + self._request_id = request_id + self._flow_id = current_flow_id.get() return self diff --git a/lib/crewai/src/crewai/flow/human_feedback.py b/lib/crewai/src/crewai/flow/human_feedback.py index 568aaf060..f5f2c9a14 100644 --- a/lib/crewai/src/crewai/flow/human_feedback.py +++ b/lib/crewai/src/crewai/flow/human_feedback.py @@ -11,6 +11,7 @@ Example (synchronous, default): ```python from crewai.flow import Flow, start, listen, human_feedback + class ReviewFlow(Flow): @start() @human_feedback( @@ -32,11 +33,13 @@ Example (asynchronous with custom provider): from crewai.flow import Flow, start, human_feedback from crewai.flow.async_feedback import HumanFeedbackProvider, HumanFeedbackPending + class SlackProvider(HumanFeedbackProvider): def request_feedback(self, context, flow): self.send_notification(context) raise HumanFeedbackPending(context=context) + class ReviewFlow(Flow): @start() @human_feedback( @@ -229,6 +232,7 @@ def human_feedback( def review_document(self): return document_content + @listen("approved") def publish(self): print(f"Publishing: {self.last_human_feedback.output}") @@ -265,7 +269,7 @@ def human_feedback( def decorator(func: F) -> F: """Inner decorator that wraps the function.""" - def _request_feedback(flow_instance: Flow, method_output: Any) -> str: + def _request_feedback(flow_instance: Flow[Any], method_output: Any) -> str: """Request feedback using provider or default console.""" from crewai.flow.async_feedback.types import PendingFeedbackContext @@ -291,19 +295,16 @@ def human_feedback( effective_provider = flow_config.hitl_provider if effective_provider is not None: - # Use provider (may raise HumanFeedbackPending for async providers) return effective_provider.request_feedback(context, flow_instance) - else: - # Use default console input (local development) - return flow_instance._request_human_feedback( - message=message, - output=method_output, - metadata=metadata, - emit=emit, - ) + return flow_instance._request_human_feedback( + message=message, + output=method_output, + metadata=metadata, + emit=emit, + ) def _process_feedback( - flow_instance: Flow, + flow_instance: Flow[Any], method_output: Any, raw_feedback: str, ) -> HumanFeedbackResult | str: @@ -319,12 +320,14 @@ def human_feedback( # No default and no feedback - use first outcome collapsed_outcome = emit[0] elif emit: - # Collapse feedback to outcome using LLM - collapsed_outcome = flow_instance._collapse_to_outcome( - feedback=raw_feedback, - outcomes=emit, - llm=llm, - ) + if llm is not None: + collapsed_outcome = flow_instance._collapse_to_outcome( + feedback=raw_feedback, + outcomes=emit, + llm=llm, + ) + else: + collapsed_outcome = emit[0] # Create result result = HumanFeedbackResult( @@ -349,7 +352,7 @@ def human_feedback( if asyncio.iscoroutinefunction(func): # Async wrapper @wraps(func) - async def async_wrapper(self: Flow, *args: Any, **kwargs: Any) -> Any: + async def async_wrapper(self: Flow[Any], *args: Any, **kwargs: Any) -> Any: # Execute the original method method_output = await func(self, *args, **kwargs) @@ -363,7 +366,7 @@ def human_feedback( else: # Sync wrapper @wraps(func) - def sync_wrapper(self: Flow, *args: Any, **kwargs: Any) -> Any: + def sync_wrapper(self: Flow[Any], *args: Any, **kwargs: Any) -> Any: # Execute the original method method_output = func(self, *args, **kwargs) @@ -397,11 +400,10 @@ def human_feedback( ) wrapper.__is_flow_method__ = True - # Make it a router if emit specified if emit: wrapper.__is_router__ = True wrapper.__router_paths__ = list(emit) - return wrapper # type: ignore[return-value] + return wrapper # type: ignore[no-any-return] return decorator diff --git a/lib/crewai/src/crewai/flow/persistence/base.py b/lib/crewai/src/crewai/flow/persistence/base.py index a2f66c7a9..376c9352b 100644 --- a/lib/crewai/src/crewai/flow/persistence/base.py +++ b/lib/crewai/src/crewai/flow/persistence/base.py @@ -7,6 +7,7 @@ from typing import TYPE_CHECKING, Any from pydantic import BaseModel + if TYPE_CHECKING: from crewai.flow.async_feedback.types import PendingFeedbackContext @@ -103,4 +104,3 @@ class FlowPersistence(ABC): Args: flow_uuid: Unique identifier for the flow instance """ - pass diff --git a/lib/crewai/src/crewai/flow/persistence/sqlite.py b/lib/crewai/src/crewai/flow/persistence/sqlite.py index 6189e2043..8130c111c 100644 --- a/lib/crewai/src/crewai/flow/persistence/sqlite.py +++ b/lib/crewai/src/crewai/flow/persistence/sqlite.py @@ -15,6 +15,7 @@ from pydantic import BaseModel from crewai.flow.persistence.base import FlowPersistence from crewai.utilities.paths import db_storage_path + if TYPE_CHECKING: from crewai.flow.async_feedback.types import PendingFeedbackContext @@ -176,7 +177,8 @@ class SQLiteFlowPersistence(FlowPersistence): row = cursor.fetchone() if row: - return json.loads(row[0]) + result = json.loads(row[0]) + return result if isinstance(result, dict) else None return None def save_pending_feedback( @@ -196,7 +198,6 @@ class SQLiteFlowPersistence(FlowPersistence): state_data: Current state data """ # Import here to avoid circular imports - from crewai.flow.async_feedback.types import PendingFeedbackContext # Convert state_data to dict if isinstance(state_data, BaseModel): diff --git a/lib/crewai/tests/agents/test_lite_agent.py b/lib/crewai/tests/agents/test_lite_agent.py index 80e3dcda5..32a7c0ef1 100644 --- a/lib/crewai/tests/agents/test_lite_agent.py +++ b/lib/crewai/tests/agents/test_lite_agent.py @@ -299,14 +299,16 @@ class TestFlow(Flow): return agent.kickoff("Test query") -def verify_agent_parent_flow(result, agent, flow): - """Verify that both the result and agent have the correct parent flow.""" - assert result.parent_flow is flow +def verify_agent_flow_context(result, agent, flow): + """Verify that both the result and agent have the correct flow context.""" + assert result._flow_id == flow.flow_id # type: ignore[attr-defined] + assert result._request_id == flow.flow_id # type: ignore[attr-defined] assert agent is not None - assert agent.parent_flow is flow + assert agent._flow_id == flow.flow_id # type: ignore[attr-defined] + assert agent._request_id == flow.flow_id # type: ignore[attr-defined] -def test_sets_parent_flow_when_inside_flow(): +def test_sets_flow_context_when_inside_flow(): """Test that an Agent can be created and executed inside a Flow context.""" captured_event = None diff --git a/lib/crewai/tests/cassettes/test_sets_parent_flow_when_inside_flow.yaml b/lib/crewai/tests/cassettes/test_sets_flow_context_when_inside_flow.yaml similarity index 100% rename from lib/crewai/tests/cassettes/test_sets_parent_flow_when_inside_flow.yaml rename to lib/crewai/tests/cassettes/test_sets_flow_context_when_inside_flow.yaml diff --git a/lib/crewai/tests/test_crew.py b/lib/crewai/tests/test_crew.py index 911bd676c..d2eeb531d 100644 --- a/lib/crewai/tests/test_crew.py +++ b/lib/crewai/tests/test_crew.py @@ -4520,7 +4520,7 @@ def test_crew_copy_with_memory(): pytest.fail(f"Copying crew raised an unexpected exception: {e}") -def test_sets_parent_flow_when_using_crewbase_pattern_inside_flow(): +def test_sets_flow_context_when_using_crewbase_pattern_inside_flow(): @CrewBase class TestCrew: agents_config = None @@ -4582,10 +4582,11 @@ def test_sets_parent_flow_when_using_crewbase_pattern_inside_flow(): flow.kickoff() assert captured_crew is not None - assert captured_crew.parent_flow is flow + assert captured_crew._flow_id == flow.flow_id # type: ignore[attr-defined] + assert captured_crew._request_id == flow.flow_id # type: ignore[attr-defined] -def test_sets_parent_flow_when_outside_flow(researcher, writer): +def test_sets_flow_context_when_outside_flow(researcher, writer): crew = Crew( agents=[researcher, writer], process=Process.sequential, @@ -4594,11 +4595,12 @@ def test_sets_parent_flow_when_outside_flow(researcher, writer): Task(description="Task 2", expected_output="output", agent=writer), ], ) - assert crew.parent_flow is None + assert not hasattr(crew, "_flow_id") + assert not hasattr(crew, "_request_id") @pytest.mark.vcr() -def test_sets_parent_flow_when_inside_flow(researcher, writer): +def test_sets_flow_context_when_inside_flow(researcher, writer): class MyFlow(Flow): @start() def start(self): @@ -4615,7 +4617,8 @@ def test_sets_parent_flow_when_inside_flow(researcher, writer): flow = MyFlow() result = flow.kickoff() - assert result.parent_flow is flow + assert result._flow_id == flow.flow_id # type: ignore[attr-defined] + assert result._request_id == flow.flow_id # type: ignore[attr-defined] def test_reset_knowledge_with_no_crew_knowledge(researcher, writer):