feat(flow): type DSL triggers as route-aware decorators (#6042)
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Vulnerability Scan / pip-audit (push) Has been cancelled
Check Documentation Broken Links / Check broken links (push) Has been cancelled
Nightly Canary Release / Check for new commits (push) Has been cancelled
Nightly Canary Release / Build nightly packages (push) Has been cancelled
Nightly Canary Release / Publish nightly to PyPI (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled

Centralize FlowTrigger and FlowMethodDecorator so start/listen/router and the boolean trigger helpers share one authoring contract. This preserves decorated method signatures for static checking while allowing route-label strings in nested FlowCondition data.

Export the shared typing helpers for static analyzers, use an explicit Protocol body, align condition validation with Sequence-backed condition data, and drop the stale call-arg ignore exposed by the signature-preserving decorators.

Update the flow guide to use or_(...) for multi-label listeners.
This commit is contained in:
Vini Brasil
2026-06-04 18:07:49 -03:00
committed by GitHub
parent 14ce97d787
commit 906cd9769d
12 changed files with 115 additions and 59 deletions

View File

@@ -172,7 +172,7 @@ Flows are ideal when:
```python
# Example: Customer Support Flow with structured processing
from crewai.flow.flow import Flow, listen, router, start
from crewai.flow.flow import Flow, listen, or_, router, start
from pydantic import BaseModel
from typing import List, Dict
@@ -238,7 +238,7 @@ class CustomerSupportFlow(Flow[SupportTicketState]):
# Additional category handlers...
@listen("billing", "account_access", "technical_issue", "feature_request", "other")
@listen(or_("billing", "account_access", "technical_issue", "feature_request", "other"))
def resolve_ticket(self, resolution_info):
# Final resolution step
self.state.resolution = f"Issue resolved: {resolution_info}"

View File

@@ -10,12 +10,13 @@ siblings.
from __future__ import annotations
from collections.abc import Callable, Sequence
from collections.abc import Sequence
from typing import Any
from typing_extensions import TypeIs
from crewai.flow.constants import AND_CONDITION, OR_CONDITION
from crewai.flow.dsl._types import FlowTrigger
from crewai.flow.flow_definition import FlowDefinitionCondition
from crewai.flow.flow_wrappers import (
FlowCondition,
@@ -25,6 +26,10 @@ from crewai.flow.flow_wrappers import (
from crewai.flow.types import FlowMethodName
def _is_non_string_sequence(value: Any) -> bool:
return isinstance(value, Sequence) and not isinstance(value, (str, bytes))
def is_simple_flow_condition(obj: Any) -> TypeIs[SimpleFlowCondition]:
"""Check if the object is a ``(condition_type, methods)`` tuple."""
return (
@@ -46,7 +51,7 @@ def is_flow_condition_dict(obj: Any) -> TypeIs[FlowCondition]:
if "conditions" in obj:
conditions = obj["conditions"]
if not isinstance(conditions, list):
if not _is_non_string_sequence(conditions):
return False
for cond in conditions:
if not (
@@ -57,7 +62,10 @@ def is_flow_condition_dict(obj: Any) -> TypeIs[FlowCondition]:
if "methods" in obj:
methods = obj["methods"]
if not (isinstance(methods, list) and all(isinstance(m, str) for m in methods)):
if not (
_is_non_string_sequence(methods)
and all(isinstance(m, str) for m in methods)
):
return False
allowed_keys = {"type", "conditions", "methods"}
@@ -83,9 +91,12 @@ def _normalize_condition(
if "conditions" in condition:
return condition
if "methods" in condition:
return {"type": condition["type"], "conditions": condition["methods"]}
normalized_methods: list[str | FlowMethodName | FlowCondition] = list(
condition["methods"]
)
return {"type": condition["type"], "conditions": normalized_methods}
return condition
if isinstance(condition, list) and all(
if _is_non_string_sequence(condition) and all(
isinstance(item, str) or is_flow_condition_dict(item) for item in condition
):
return {"type": OR_CONDITION, "conditions": condition}
@@ -141,9 +152,7 @@ def _extract_all_methods(
return []
def _condition_trigger(
condition: str | FlowCondition | Callable[..., Any],
) -> FlowMethodName | FlowCondition:
def _condition_trigger(condition: FlowTrigger) -> FlowMethodName | FlowCondition:
if isinstance(condition, str):
return FlowMethodName(condition)
if is_flow_condition_dict(condition):
@@ -155,7 +164,7 @@ def _condition_trigger(
def _condition_triggers(
conditions: Sequence[str | FlowCondition | Callable[..., Any]],
conditions: Sequence[FlowTrigger],
error_message: str,
) -> FlowConditions:
try:
@@ -184,21 +193,22 @@ def _definition_condition_from_runtime(condition: Any) -> FlowDefinitionConditio
return str(condition)
def or_(*conditions: str | FlowCondition | Callable[..., Any]) -> FlowCondition:
"""Combines multiple conditions with OR logic for flow control.
def or_(*triggers: FlowTrigger) -> FlowCondition:
"""Combine multiple triggers with OR logic for flow control.
Creates a condition that is satisfied when any of the specified conditions
Creates a condition that is satisfied when any of the specified triggers
are met. This is used with @start, @listen, or @router decorators to create
complex triggering conditions.
Args:
conditions: Variable number of conditions that can be method names, existing condition dictionaries, or method references.
triggers: Route labels, method references, or existing conditions
returned by or_() / and_().
Returns:
A condition dictionary with format {"type": "OR", "conditions": list_of_conditions} where each condition can be a string (method name) or a nested dict
A condition dictionary with format {"type": "OR", "conditions": list_of_triggers}.
Raises:
ValueError: If condition format is invalid.
ValueError: If a trigger format is invalid.
Examples:
>>> @listen(or_("success", "timeout"))
@@ -209,26 +219,27 @@ def or_(*conditions: str | FlowCondition | Callable[..., Any]) -> FlowCondition:
>>> def handle_nested(self):
... pass
"""
processed_triggers = _condition_triggers(conditions, "Invalid condition in or_()")
processed_triggers = _condition_triggers(triggers, "Invalid trigger in or_()")
return {"type": OR_CONDITION, "conditions": processed_triggers}
def and_(*conditions: str | FlowCondition | Callable[..., Any]) -> FlowCondition:
"""Combines multiple conditions with AND logic for flow control.
def and_(*triggers: FlowTrigger) -> FlowCondition:
"""Combine multiple triggers with AND logic for flow control.
Creates a condition that is satisfied only when all specified conditions
Creates a condition that is satisfied only when all specified triggers
are met. This is used with @start, @listen, or @router decorators to create
complex triggering conditions.
Args:
*conditions: Variable number of conditions that can be method names, existing condition dictionaries, or method references.
triggers: Route labels, method references, or existing conditions
returned by or_() / and_().
Returns:
A condition dictionary with format {"type": "AND", "conditions": list_of_conditions}
where each condition can be a string (method name) or a nested dict
where each condition can be a route label, method name, or nested condition.
Raises:
ValueError: If any condition is invalid.
ValueError: If any trigger is invalid.
Examples:
>>> @listen(and_("validated", "processed"))
@@ -239,7 +250,7 @@ def and_(*conditions: str | FlowCondition | Callable[..., Any]) -> FlowCondition
>>> def handle_nested(self):
... pass
"""
processed_triggers = _condition_triggers(conditions, "Invalid condition in and_()")
processed_triggers = _condition_triggers(triggers, "Invalid trigger in and_()")
return {"type": AND_CONDITION, "conditions": processed_triggers}

View File

@@ -1,9 +1,10 @@
from __future__ import annotations
from collections.abc import Callable
from typing import Any
from typing import cast
from crewai.flow.dsl._conditions import _definition_condition_from_runtime
from crewai.flow.dsl._types import FlowMethodDecorator, FlowTrigger
from crewai.flow.dsl._utils import (
P,
R,
@@ -11,12 +12,10 @@ from crewai.flow.dsl._utils import (
_set_trigger_metadata,
)
from crewai.flow.flow_definition import FlowMethodDefinition
from crewai.flow.flow_wrappers import FlowCondition, ListenMethod
from crewai.flow.flow_wrappers import ListenMethod
def listen(
condition: str | FlowCondition | Callable[..., Any],
) -> Callable[[Callable[P, R]], ListenMethod[P, R]]:
def listen(condition: FlowTrigger) -> FlowMethodDecorator:
"""Creates a listener that executes when specified conditions are met.
This decorator sets up a method to execute in response to other method
@@ -24,10 +23,11 @@ def listen(
conditions.
Args:
condition: Specifies when the listener should execute.
condition: Route label, method reference, or condition returned by
or_() / and_() that triggers the listener.
Returns:
A decorator function that wraps the method as a flow listener and preserves its signature.
A flow method decorator that preserves the decorated method's static signature.
Raises:
ValueError: If the condition format is invalid.
@@ -52,4 +52,4 @@ def listen(
_set_trigger_metadata(wrapper, condition)
return wrapper
return decorator
return cast(FlowMethodDecorator, decorator)

View File

@@ -8,12 +8,14 @@ from typing import (
Any,
Literal,
Union,
cast,
get_args,
get_origin,
get_type_hints,
)
from crewai.flow.dsl._conditions import _definition_condition_from_runtime
from crewai.flow.dsl._types import FlowMethodDecorator, FlowTrigger
from crewai.flow.dsl._utils import (
P,
R,
@@ -21,7 +23,7 @@ from crewai.flow.dsl._utils import (
_set_trigger_metadata,
)
from crewai.flow.flow_definition import FlowMethodDefinition
from crewai.flow.flow_wrappers import FlowCondition, RouterMethod
from crewai.flow.flow_wrappers import RouterMethod
def _unwrap_function(function: Any) -> Any:
@@ -93,10 +95,10 @@ def _normalize_router_emit(value: Sequence[Any] | str) -> list[str]:
def router(
condition: str | FlowCondition | Callable[..., Any],
condition: FlowTrigger,
*,
emit: Sequence[str] | str | None = None,
) -> Callable[[Callable[P, R]], RouterMethod[P, R]]:
) -> FlowMethodDecorator:
"""Creates a routing method that directs flow execution based on conditions.
This decorator marks a method as a router, which can dynamically determine
@@ -105,15 +107,15 @@ def router(
Args:
condition: Specifies when the router should execute. Can be:
- str: Name of a method that triggers this router
- str: Route label or method name that triggers this router
- FlowCondition: Result from or_() or and_(), including nested conditions
- Callable[..., Any]: A method reference that triggers this router
- Flow method reference: A method whose completion triggers this router
emit: Optional explicit router output events for static FlowDefinition
and visualization. If omitted, Literal/Enum return annotations are
used when available.
Returns:
A decorator function that wraps the method as a router and preserves its signature.
A flow method decorator that preserves the decorated method's static signature.
Raises:
ValueError: If the condition format is invalid.
@@ -161,4 +163,4 @@ def router(
wrapper.__router_emit__ = router_events
return wrapper
return decorator
return cast(FlowMethodDecorator, decorator)

View File

@@ -1,9 +1,10 @@
from __future__ import annotations
from collections.abc import Callable
from typing import Any
from typing import cast
from crewai.flow.dsl._conditions import _definition_condition_from_runtime
from crewai.flow.dsl._types import FlowMethodDecorator, FlowTrigger
from crewai.flow.dsl._utils import (
P,
R,
@@ -11,12 +12,12 @@ from crewai.flow.dsl._utils import (
_set_trigger_metadata,
)
from crewai.flow.flow_definition import FlowMethodDefinition
from crewai.flow.flow_wrappers import FlowCondition, StartMethod
from crewai.flow.flow_wrappers import StartMethod
def start(
condition: str | FlowCondition | Callable[..., Any] | None = None,
) -> Callable[[Callable[P, R]], StartMethod[P, R]]:
condition: FlowTrigger | None = None,
) -> FlowMethodDecorator:
"""Marks a method as a flow's starting point.
This decorator designates a method as an entry point for the flow execution.
@@ -25,13 +26,13 @@ def start(
Args:
condition: Defines when the start method should execute. Can be:
- str: Name of a method that triggers this start
- str: Route label or method name that triggers this start
- FlowCondition: Result from or_() or and_(), including nested conditions
- Callable[..., Any]: A method reference that triggers this start
- Flow method reference: A method whose completion triggers this start
Default is None, meaning unconditional start.
Returns:
A decorator function that wraps the method as a flow start point and preserves its signature.
A flow method decorator that preserves the decorated method's static signature.
Raises:
ValueError: If the condition format is invalid.
@@ -65,4 +66,4 @@ def start(
_set_flow_method_definition(wrapper, FlowMethodDefinition(start=True))
return wrapper
return decorator
return cast(FlowMethodDecorator, decorator)

View File

@@ -0,0 +1,27 @@
"""Private typing helpers for the Python Flow DSL."""
from __future__ import annotations
from collections.abc import Callable
from typing import Any, Protocol, TypeAlias, TypeVar
from crewai.flow.flow_wrappers import FlowCondition
from crewai.flow.types import FlowMethodCallable
__all__ = ["FlowMethodDecorator", "FlowTrigger"]
F = TypeVar("F", bound=Callable[..., Any])
FlowTrigger: TypeAlias = str | FlowMethodCallable[..., Any] | FlowCondition
class FlowMethodDecorator(Protocol):
"""Decorator returned by Flow DSL authoring helpers.
The runtime wraps methods in FlowMethod subclasses, but the authoring
contract preserves the decorated method's static callable type.
"""
def __call__(self, func: F) -> F:
raise NotImplementedError

View File

@@ -1,6 +1,6 @@
from __future__ import annotations
from collections.abc import Callable, Sequence
from collections.abc import Sequence
import json
import logging
from typing import Any, ParamSpec, TypeVar
@@ -16,6 +16,7 @@ from crewai.flow.dsl._conditions import (
_runtime_listener_condition_from_definition,
is_flow_condition_dict,
)
from crewai.flow.dsl._types import FlowTrigger
from crewai.flow.flow_definition import (
FlowConfigDefinition,
FlowDefinition,
@@ -27,7 +28,6 @@ from crewai.flow.flow_definition import (
FlowStateDefinition,
)
from crewai.flow.flow_wrappers import (
FlowCondition,
FlowMethod,
ListenMethod,
RouterMethod,
@@ -67,7 +67,7 @@ def _flow_method_names(values: Sequence[Any]) -> list[FlowMethodName]:
def _set_trigger_metadata(
wrapper: StartMethod[P, R] | ListenMethod[P, R] | RouterMethod[P, R],
condition: str | FlowCondition | Callable[..., Any],
condition: FlowTrigger,
) -> None:
if isinstance(condition, str):
wrapper.__trigger_methods__ = [FlowMethodName(condition)]

View File

@@ -37,16 +37,16 @@ class FlowCondition(TypedDict, total=False):
Attributes:
type: The type of the condition.
conditions: A list of conditions types.
methods: A list of methods.
conditions: A sequence of route labels, method names, or nested conditions.
methods: A legacy sequence of route labels or method names.
"""
type: Required[FlowConditionType]
conditions: Sequence[FlowMethodName | FlowCondition]
methods: list[FlowMethodName]
conditions: Sequence[str | FlowMethodName | FlowCondition]
methods: Sequence[str | FlowMethodName]
FlowConditions: TypeAlias = list[FlowMethodName | FlowCondition]
FlowConditions: TypeAlias = Sequence[str | FlowMethodName | FlowCondition]
class FlowMethod(Generic[P, R]):

View File

@@ -2832,7 +2832,7 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
def _evaluate_condition(
self,
condition: FlowMethodName | FlowCondition,
condition: str | FlowMethodName | FlowCondition,
trigger_method: FlowMethodName,
listener_name: FlowMethodName,
) -> bool:

View File

@@ -31,7 +31,7 @@ PendingListenerKey = NewType(
class FlowMethodCallable(Protocol[P, R]):
"""A callable that can be used as a flow method reference."""
__name__: FlowMethodName
__name__: str
def __call__(self, *args: P.args, **kwargs: P.kwargs) -> R: ...

View File

@@ -337,7 +337,7 @@ class RecallFlow(Flow[RecallState]):
@router(re_search)
def re_decide_depth(self) -> str:
"""Re-evaluate depth after re-search. Same logic as decide_depth."""
return self.decide_depth() # type: ignore[call-arg]
return self.decide_depth()
@listen("synthesize")
def synthesize_results(self) -> list[MemoryMatch]:

View File

@@ -14,6 +14,7 @@ import crewai.flow.dsl as flow_dsl
import crewai.flow.flow_definition as flow_definition
import crewai.flow.visualization.builder as visualization_builder
from crewai.flow import Flow, and_, human_feedback, listen, or_, persist, router, start
from crewai.flow.dsl._conditions import is_flow_condition_dict
def test_flow_public_exports_are_explicit():
@@ -48,6 +49,20 @@ def test_flow_public_exports_are_explicit():
assert "calculate_node_levels" not in flow_visualization.__all__
def test_flow_condition_dict_accepts_non_string_sequences():
condition = {
"type": "OR",
"conditions": (
"approved",
{"type": "AND", "methods": ("validated", "processed")},
),
}
assert is_flow_condition_dict(condition)
assert not is_flow_condition_dict({"type": "OR", "conditions": "approved"})
assert not is_flow_condition_dict({"type": "OR", "methods": b"approved"})
def test_private_flow_helpers_do_not_have_docstrings():
import crewai.flow.flow_wrappers as flow_wrappers
import crewai.flow.human_feedback as human_feedback