Split flow.py into DSL, definition, and runtime (#5997)

This commit separates the monolithic `flow.py` into three modules, each
with one job:

- `dsl.py` - the Python DSL for flows (@start/@listen/@router, or_/and_)
- `flow_definition.py` - the structural model extracted from the DSL
- `runtime.py` - the execution engine and state for flows

This phase moves code only and should not have any breaking changes.
This commit is contained in:
Vini Brasil
2026-06-01 18:37:10 -03:00
committed by GitHub
parent 4dafb05735
commit 1aba9fe415
6 changed files with 4720 additions and 4579 deletions

View File

@@ -0,0 +1,320 @@
"""Flow authoring DSL: the ``@start`` / ``@listen`` / ``@router`` decorators
plus the ``or_`` / ``and_`` condition combinators.
These decorators wrap user methods into the typed wrappers defined in
``flow_wrappers`` and record their trigger conditions. The structural model
those conditions feed is built in ``flow_definition``; execution happens in
``runtime``.
"""
from __future__ import annotations
from collections.abc import Callable
from typing import Any, ParamSpec, TypeVar
from crewai.flow.constants import AND_CONDITION, OR_CONDITION
from crewai.flow.flow_definition import (
_extract_all_methods,
is_flow_condition_dict,
is_flow_method_callable,
is_flow_method_name,
)
from crewai.flow.flow_wrappers import (
FlowCondition,
FlowConditions,
ListenMethod,
RouterMethod,
StartMethod,
)
P = ParamSpec("P")
R = TypeVar("R")
def start(
condition: str | FlowCondition | Callable[..., Any] | None = None,
) -> Callable[[Callable[P, R]], StartMethod[P, R]]:
"""Marks a method as a flow's starting point.
This decorator designates a method as an entry point for the flow execution.
It can optionally specify conditions that trigger the start based on other
method executions.
Args:
condition: Defines when the start method should execute. Can be:
- str: Name of a method that triggers this start
- FlowCondition: Result from or_() or and_(), including nested conditions
- Callable[..., Any]: A method reference that triggers this start
Default is None, meaning unconditional start.
Returns:
A decorator function that wraps the method as a flow start point and preserves its signature.
Raises:
ValueError: If the condition format is invalid.
Examples:
>>> @start() # Unconditional start
>>> def begin_flow(self):
... pass
>>> @start("method_name") # Start after specific method
>>> def conditional_start(self):
... pass
>>> @start(and_("method1", "method2")) # Start after multiple methods
>>> def complex_start(self):
... pass
"""
def decorator(func: Callable[P, R]) -> StartMethod[P, R]:
"""Decorator that wraps a function as a start method.
Args:
func: The function to wrap as a start method.
Returns:
A StartMethod wrapper around the function.
"""
wrapper = StartMethod(func)
if condition is not None:
if is_flow_method_name(condition):
wrapper.__trigger_methods__ = [condition]
wrapper.__condition_type__ = OR_CONDITION
elif is_flow_condition_dict(condition):
if "conditions" in condition:
wrapper.__trigger_condition__ = condition
wrapper.__trigger_methods__ = _extract_all_methods(condition)
wrapper.__condition_type__ = condition["type"]
elif "methods" in condition:
wrapper.__trigger_methods__ = condition["methods"]
wrapper.__condition_type__ = condition["type"]
else:
raise ValueError(
"Condition dict must contain 'conditions' or 'methods'"
)
elif is_flow_method_callable(condition):
wrapper.__trigger_methods__ = [condition.__name__]
wrapper.__condition_type__ = OR_CONDITION
else:
raise ValueError(
"Condition must be a method, string, or a result of or_() or and_()"
)
return wrapper
return decorator
def listen(
condition: str | FlowCondition | Callable[..., Any],
) -> Callable[[Callable[P, R]], ListenMethod[P, R]]:
"""Creates a listener that executes when specified conditions are met.
This decorator sets up a method to execute in response to other method
executions in the flow. It supports both simple and complex triggering
conditions.
Args:
condition: Specifies when the listener should execute.
Returns:
A decorator function that wraps the method as a flow listener and preserves its signature.
Raises:
ValueError: If the condition format is invalid.
Examples:
>>> @listen("process_data")
>>> def handle_processed_data(self):
... pass
>>> @listen("method_name")
>>> def handle_completion(self):
... pass
"""
def decorator(func: Callable[P, R]) -> ListenMethod[P, R]:
"""Decorator that wraps a function as a listener method.
Args:
func: The function to wrap as a listener method.
Returns:
A ListenMethod wrapper around the function.
"""
wrapper = ListenMethod(func)
if is_flow_method_name(condition):
wrapper.__trigger_methods__ = [condition]
wrapper.__condition_type__ = OR_CONDITION
elif is_flow_condition_dict(condition):
if "conditions" in condition:
wrapper.__trigger_condition__ = condition
wrapper.__trigger_methods__ = _extract_all_methods(condition)
wrapper.__condition_type__ = condition["type"]
elif "methods" in condition:
wrapper.__trigger_methods__ = condition["methods"]
wrapper.__condition_type__ = condition["type"]
else:
raise ValueError(
"Condition dict must contain 'conditions' or 'methods'"
)
elif is_flow_method_callable(condition):
wrapper.__trigger_methods__ = [condition.__name__]
wrapper.__condition_type__ = OR_CONDITION
else:
raise ValueError(
"Condition must be a method, string, or a result of or_() or and_()"
)
return wrapper
return decorator
def router(
condition: str | FlowCondition | Callable[..., Any],
) -> Callable[[Callable[P, R]], RouterMethod[P, R]]:
"""Creates a routing method that directs flow execution based on conditions.
This decorator marks a method as a router, which can dynamically determine
the next steps in the flow based on its return value. Routers are triggered
by specified conditions and can return constants that determine which path
the flow should take.
Args:
condition: Specifies when the router should execute. Can be:
- str: Name of a method that triggers this router
- FlowCondition: Result from or_() or and_(), including nested conditions
- Callable[..., Any]: A method reference that triggers this router
Returns:
A decorator function that wraps the method as a router and preserves its signature.
Raises:
ValueError: If the condition format is invalid.
Examples:
>>> @router("check_status")
>>> def route_based_on_status(self):
... if self.state.status == "success":
... return "SUCCESS"
... return "FAILURE"
>>> @router(and_("validate", "process"))
>>> def complex_routing(self):
... if all([self.state.valid, self.state.processed]):
... return "CONTINUE"
... return "STOP"
"""
def decorator(func: Callable[P, R]) -> RouterMethod[P, R]:
"""Decorator that wraps a function as a router method.
Args:
func: The function to wrap as a router method.
Returns:
A RouterMethod wrapper around the function.
"""
wrapper = RouterMethod(func)
if is_flow_method_name(condition):
wrapper.__trigger_methods__ = [condition]
wrapper.__condition_type__ = OR_CONDITION
elif is_flow_condition_dict(condition):
if "conditions" in condition:
wrapper.__trigger_condition__ = condition
wrapper.__trigger_methods__ = _extract_all_methods(condition)
wrapper.__condition_type__ = condition["type"]
elif "methods" in condition:
wrapper.__trigger_methods__ = condition["methods"]
wrapper.__condition_type__ = condition["type"]
else:
raise ValueError(
"Condition dict must contain 'conditions' or 'methods'"
)
elif is_flow_method_callable(condition):
wrapper.__trigger_methods__ = [condition.__name__]
wrapper.__condition_type__ = OR_CONDITION
else:
raise ValueError(
"Condition must be a method, string, or a result of or_() or and_()"
)
return wrapper
return decorator
def or_(*conditions: str | FlowCondition | Callable[..., Any]) -> FlowCondition:
"""Combines multiple conditions with OR logic for flow control.
Creates a condition that is satisfied when any of the specified conditions
are met. This is used with @start, @listen, or @router decorators to create
complex triggering conditions.
Args:
conditions: Variable number of conditions that can be method names, existing condition dictionaries, or method references.
Returns:
A condition dictionary with format {"type": "OR", "conditions": list_of_conditions} where each condition can be a string (method name) or a nested dict
Raises:
ValueError: If condition format is invalid.
Examples:
>>> @listen(or_("success", "timeout"))
>>> def handle_completion(self):
... pass
>>> @listen(or_(and_("step1", "step2"), "step3"))
>>> def handle_nested(self):
... pass
"""
processed_conditions: FlowConditions = []
for condition in conditions:
if is_flow_condition_dict(condition) or is_flow_method_name(condition):
processed_conditions.append(condition)
elif is_flow_method_callable(condition):
processed_conditions.append(condition.__name__)
else:
raise ValueError("Invalid condition in or_()")
return {"type": OR_CONDITION, "conditions": processed_conditions}
def and_(*conditions: str | FlowCondition | Callable[..., Any]) -> FlowCondition:
"""Combines multiple conditions with AND logic for flow control.
Creates a condition that is satisfied only when all specified conditions
are met. This is used with @start, @listen, or @router decorators to create
complex triggering conditions.
Args:
*conditions: Variable number of conditions that can be method names, existing condition dictionaries, or method references.
Returns:
A condition dictionary with format {"type": "AND", "conditions": list_of_conditions}
where each condition can be a string (method name) or a nested dict
Raises:
ValueError: If any condition is invalid.
Examples:
>>> @listen(and_("validated", "processed"))
>>> def handle_complete_data(self):
... pass
>>> @listen(and_(or_("step1", "step2"), "step3"))
>>> def handle_nested(self):
... pass
"""
processed_conditions: FlowConditions = []
for condition in conditions:
if is_flow_condition_dict(condition) or is_flow_method_name(condition):
processed_conditions.append(condition)
elif is_flow_method_callable(condition):
processed_conditions.append(condition.__name__)
else:
raise ValueError("Invalid condition in and_()")
return {"type": AND_CONDITION, "conditions": processed_conditions}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -569,13 +569,13 @@ class TestFlowResumeWithFeedback:
flow = TestFlow.from_pending("async-direct-test", persistence)
with patch("crewai.flow.flow.crewai_event_bus.emit"):
with patch("crewai.flow.runtime.crewai_event_bus.emit"):
result = await flow.resume_async("async feedback")
assert flow.last_human_feedback is not None
assert flow.last_human_feedback.feedback == "async feedback"
@patch("crewai.flow.flow.crewai_event_bus.emit")
@patch("crewai.flow.runtime.crewai_event_bus.emit")
def test_resume_basic(self, mock_emit: MagicMock) -> None:
"""Test basic resume functionality."""
with tempfile.TemporaryDirectory() as tmpdir:
@@ -615,7 +615,7 @@ class TestFlowResumeWithFeedback:
assert persistence.load_pending_feedback("resume-test-123") is None
@patch("crewai.flow.flow.crewai_event_bus.emit")
@patch("crewai.flow.runtime.crewai_event_bus.emit")
def test_resume_routing(self, mock_emit: MagicMock) -> None:
"""Test resume with routing."""
with tempfile.TemporaryDirectory() as tmpdir:
@@ -697,7 +697,7 @@ class TestAsyncHumanFeedbackIntegration:
assert hasattr(method, "__human_feedback_config__")
assert method.__human_feedback_config__.provider is not None
@patch("crewai.flow.flow.crewai_event_bus.emit")
@patch("crewai.flow.runtime.crewai_event_bus.emit")
def test_async_provider_pauses_flow(self, mock_emit: MagicMock) -> None:
"""Test that async provider pauses flow execution."""
with tempfile.TemporaryDirectory() as tmpdir:
@@ -743,7 +743,7 @@ class TestAsyncHumanFeedbackIntegration:
persisted = persistence.load_pending_feedback(flow_id)
assert persisted is not None
@patch("crewai.flow.flow.crewai_event_bus.emit")
@patch("crewai.flow.runtime.crewai_event_bus.emit")
def test_full_async_flow_cycle(self, mock_emit: MagicMock) -> None:
"""Test complete async flow: start -> pause -> resume."""
with tempfile.TemporaryDirectory() as tmpdir:
@@ -804,7 +804,7 @@ class TestAsyncHumanFeedbackIntegration:
class TestAutoPersistence:
"""Tests for automatic persistence when no persistence is provided."""
@patch("crewai.flow.flow.crewai_event_bus.emit")
@patch("crewai.flow.runtime.crewai_event_bus.emit")
def test_auto_persistence_when_none_provided(self, mock_emit: MagicMock) -> None:
"""Test that persistence is auto-created when HumanFeedbackPending is raised."""
@@ -925,7 +925,7 @@ class TestCollapseToOutcomeJsonParsing:
class TestLLMObjectPreservedInContext:
"""Tests that BaseLLM objects have their model string preserved in PendingFeedbackContext."""
@patch("crewai.flow.flow.crewai_event_bus.emit")
@patch("crewai.flow.runtime.crewai_event_bus.emit")
def test_basellm_object_model_string_survives_roundtrip(self, mock_emit: MagicMock) -> None:
"""Test that when llm is a BaseLLM object, its model string is stored in context
so that outcome collapsing works after async pause/resume.
@@ -1125,7 +1125,7 @@ class TestAsyncHumanFeedbackEdgeCases:
flow = TestFlow.from_pending("default-test", persistence)
with patch("crewai.flow.flow.crewai_event_bus.emit"):
with patch("crewai.flow.runtime.crewai_event_bus.emit"):
result = flow.resume("")
assert flow.last_human_feedback.outcome == "approved"
@@ -1159,7 +1159,7 @@ class TestAsyncHumanFeedbackEdgeCases:
flow = TestFlow.from_pending("no-feedback-test", persistence)
with patch("crewai.flow.flow.crewai_event_bus.emit"):
with patch("crewai.flow.runtime.crewai_event_bus.emit"):
result = flow.resume()
assert flow.last_human_feedback.outcome == "approved"
@@ -1213,7 +1213,7 @@ class TestLiveLLMPreservationOnResume:
assert hasattr(method, "_hf_llm")
assert method._hf_llm == "gpt-4o-mini"
@patch("crewai.flow.flow.crewai_event_bus.emit")
@patch("crewai.flow.runtime.crewai_event_bus.emit")
def test_resume_async_uses_live_basellm_over_serialized_string(
self, mock_emit: MagicMock
) -> None:
@@ -1286,7 +1286,7 @@ class TestLiveLLMPreservationOnResume:
# And verify it's a BaseLLM instance, not a string
assert isinstance(captured_llm[0], BaseLLM)
@patch("crewai.flow.flow.crewai_event_bus.emit")
@patch("crewai.flow.runtime.crewai_event_bus.emit")
def test_resume_async_falls_back_to_serialized_string_when_no_hf_llm(
self, mock_emit: MagicMock
) -> None:
@@ -1344,7 +1344,7 @@ class TestLiveLLMPreservationOnResume:
assert isinstance(captured_llm[0], BaseLLMClass)
assert captured_llm[0].model == "gpt-4o-mini"
@patch("crewai.flow.flow.crewai_event_bus.emit")
@patch("crewai.flow.runtime.crewai_event_bus.emit")
def test_resume_async_uses_string_from_context_when_hf_llm_is_string(
self, mock_emit: MagicMock
) -> None: