From 906cd9769d7e2125485bbc09e8d8ef5cb1c29805 Mon Sep 17 00:00:00 2001 From: Vini Brasil Date: Thu, 4 Jun 2026 18:07:49 -0300 Subject: [PATCH] feat(flow): type DSL triggers as route-aware decorators (#6042) Centralize FlowTrigger and FlowMethodDecorator so start/listen/router and the boolean trigger helpers share one authoring contract. This preserves decorated method signatures for static checking while allowing route-label strings in nested FlowCondition data. Export the shared typing helpers for static analyzers, use an explicit Protocol body, align condition validation with Sequence-backed condition data, and drop the stale call-arg ignore exposed by the signature-preserving decorators. Update the flow guide to use or_(...) for multi-label listeners. --- .../guides/concepts/evaluating-use-cases.mdx | 4 +- lib/crewai/src/crewai/flow/dsl/_conditions.py | 57 +++++++++++-------- lib/crewai/src/crewai/flow/dsl/_listen.py | 16 +++--- lib/crewai/src/crewai/flow/dsl/_router.py | 16 +++--- lib/crewai/src/crewai/flow/dsl/_start.py | 17 +++--- lib/crewai/src/crewai/flow/dsl/_types.py | 27 +++++++++ lib/crewai/src/crewai/flow/dsl/_utils.py | 6 +- lib/crewai/src/crewai/flow/flow_wrappers.py | 10 ++-- lib/crewai/src/crewai/flow/runtime.py | 2 +- lib/crewai/src/crewai/flow/types.py | 2 +- lib/crewai/src/crewai/memory/recall_flow.py | 2 +- lib/crewai/tests/test_flow_definition.py | 15 +++++ 12 files changed, 115 insertions(+), 59 deletions(-) create mode 100644 lib/crewai/src/crewai/flow/dsl/_types.py diff --git a/docs/en/guides/concepts/evaluating-use-cases.mdx b/docs/en/guides/concepts/evaluating-use-cases.mdx index f7895deec..186afa2c6 100644 --- a/docs/en/guides/concepts/evaluating-use-cases.mdx +++ b/docs/en/guides/concepts/evaluating-use-cases.mdx @@ -172,7 +172,7 @@ Flows are ideal when: ```python # Example: Customer Support Flow with structured processing -from crewai.flow.flow import Flow, listen, router, start +from crewai.flow.flow import Flow, listen, or_, router, start from pydantic import BaseModel from typing import List, Dict @@ -238,7 +238,7 @@ class CustomerSupportFlow(Flow[SupportTicketState]): # Additional category handlers... - @listen("billing", "account_access", "technical_issue", "feature_request", "other") + @listen(or_("billing", "account_access", "technical_issue", "feature_request", "other")) def resolve_ticket(self, resolution_info): # Final resolution step self.state.resolution = f"Issue resolved: {resolution_info}" diff --git a/lib/crewai/src/crewai/flow/dsl/_conditions.py b/lib/crewai/src/crewai/flow/dsl/_conditions.py index f2051a63b..395bf2bc5 100644 --- a/lib/crewai/src/crewai/flow/dsl/_conditions.py +++ b/lib/crewai/src/crewai/flow/dsl/_conditions.py @@ -10,12 +10,13 @@ siblings. from __future__ import annotations -from collections.abc import Callable, Sequence +from collections.abc import Sequence from typing import Any from typing_extensions import TypeIs from crewai.flow.constants import AND_CONDITION, OR_CONDITION +from crewai.flow.dsl._types import FlowTrigger from crewai.flow.flow_definition import FlowDefinitionCondition from crewai.flow.flow_wrappers import ( FlowCondition, @@ -25,6 +26,10 @@ from crewai.flow.flow_wrappers import ( from crewai.flow.types import FlowMethodName +def _is_non_string_sequence(value: Any) -> bool: + return isinstance(value, Sequence) and not isinstance(value, (str, bytes)) + + def is_simple_flow_condition(obj: Any) -> TypeIs[SimpleFlowCondition]: """Check if the object is a ``(condition_type, methods)`` tuple.""" return ( @@ -46,7 +51,7 @@ def is_flow_condition_dict(obj: Any) -> TypeIs[FlowCondition]: if "conditions" in obj: conditions = obj["conditions"] - if not isinstance(conditions, list): + if not _is_non_string_sequence(conditions): return False for cond in conditions: if not ( @@ -57,7 +62,10 @@ def is_flow_condition_dict(obj: Any) -> TypeIs[FlowCondition]: if "methods" in obj: methods = obj["methods"] - if not (isinstance(methods, list) and all(isinstance(m, str) for m in methods)): + if not ( + _is_non_string_sequence(methods) + and all(isinstance(m, str) for m in methods) + ): return False allowed_keys = {"type", "conditions", "methods"} @@ -83,9 +91,12 @@ def _normalize_condition( if "conditions" in condition: return condition if "methods" in condition: - return {"type": condition["type"], "conditions": condition["methods"]} + normalized_methods: list[str | FlowMethodName | FlowCondition] = list( + condition["methods"] + ) + return {"type": condition["type"], "conditions": normalized_methods} return condition - if isinstance(condition, list) and all( + if _is_non_string_sequence(condition) and all( isinstance(item, str) or is_flow_condition_dict(item) for item in condition ): return {"type": OR_CONDITION, "conditions": condition} @@ -141,9 +152,7 @@ def _extract_all_methods( return [] -def _condition_trigger( - condition: str | FlowCondition | Callable[..., Any], -) -> FlowMethodName | FlowCondition: +def _condition_trigger(condition: FlowTrigger) -> FlowMethodName | FlowCondition: if isinstance(condition, str): return FlowMethodName(condition) if is_flow_condition_dict(condition): @@ -155,7 +164,7 @@ def _condition_trigger( def _condition_triggers( - conditions: Sequence[str | FlowCondition | Callable[..., Any]], + conditions: Sequence[FlowTrigger], error_message: str, ) -> FlowConditions: try: @@ -184,21 +193,22 @@ def _definition_condition_from_runtime(condition: Any) -> FlowDefinitionConditio return str(condition) -def or_(*conditions: str | FlowCondition | Callable[..., Any]) -> FlowCondition: - """Combines multiple conditions with OR logic for flow control. +def or_(*triggers: FlowTrigger) -> FlowCondition: + """Combine multiple triggers with OR logic for flow control. - Creates a condition that is satisfied when any of the specified conditions + Creates a condition that is satisfied when any of the specified triggers are met. This is used with @start, @listen, or @router decorators to create complex triggering conditions. Args: - conditions: Variable number of conditions that can be method names, existing condition dictionaries, or method references. + triggers: Route labels, method references, or existing conditions + returned by or_() / and_(). Returns: - A condition dictionary with format {"type": "OR", "conditions": list_of_conditions} where each condition can be a string (method name) or a nested dict + A condition dictionary with format {"type": "OR", "conditions": list_of_triggers}. Raises: - ValueError: If condition format is invalid. + ValueError: If a trigger format is invalid. Examples: >>> @listen(or_("success", "timeout")) @@ -209,26 +219,27 @@ def or_(*conditions: str | FlowCondition | Callable[..., Any]) -> FlowCondition: >>> def handle_nested(self): ... pass """ - processed_triggers = _condition_triggers(conditions, "Invalid condition in or_()") + processed_triggers = _condition_triggers(triggers, "Invalid trigger in or_()") return {"type": OR_CONDITION, "conditions": processed_triggers} -def and_(*conditions: str | FlowCondition | Callable[..., Any]) -> FlowCondition: - """Combines multiple conditions with AND logic for flow control. +def and_(*triggers: FlowTrigger) -> FlowCondition: + """Combine multiple triggers with AND logic for flow control. - Creates a condition that is satisfied only when all specified conditions + Creates a condition that is satisfied only when all specified triggers are met. This is used with @start, @listen, or @router decorators to create complex triggering conditions. Args: - *conditions: Variable number of conditions that can be method names, existing condition dictionaries, or method references. + triggers: Route labels, method references, or existing conditions + returned by or_() / and_(). Returns: A condition dictionary with format {"type": "AND", "conditions": list_of_conditions} - where each condition can be a string (method name) or a nested dict + where each condition can be a route label, method name, or nested condition. Raises: - ValueError: If any condition is invalid. + ValueError: If any trigger is invalid. Examples: >>> @listen(and_("validated", "processed")) @@ -239,7 +250,7 @@ def and_(*conditions: str | FlowCondition | Callable[..., Any]) -> FlowCondition >>> def handle_nested(self): ... pass """ - processed_triggers = _condition_triggers(conditions, "Invalid condition in and_()") + processed_triggers = _condition_triggers(triggers, "Invalid trigger in and_()") return {"type": AND_CONDITION, "conditions": processed_triggers} diff --git a/lib/crewai/src/crewai/flow/dsl/_listen.py b/lib/crewai/src/crewai/flow/dsl/_listen.py index 16a93a175..c8ada4c65 100644 --- a/lib/crewai/src/crewai/flow/dsl/_listen.py +++ b/lib/crewai/src/crewai/flow/dsl/_listen.py @@ -1,9 +1,10 @@ from __future__ import annotations from collections.abc import Callable -from typing import Any +from typing import cast from crewai.flow.dsl._conditions import _definition_condition_from_runtime +from crewai.flow.dsl._types import FlowMethodDecorator, FlowTrigger from crewai.flow.dsl._utils import ( P, R, @@ -11,12 +12,10 @@ from crewai.flow.dsl._utils import ( _set_trigger_metadata, ) from crewai.flow.flow_definition import FlowMethodDefinition -from crewai.flow.flow_wrappers import FlowCondition, ListenMethod +from crewai.flow.flow_wrappers import ListenMethod -def listen( - condition: str | FlowCondition | Callable[..., Any], -) -> Callable[[Callable[P, R]], ListenMethod[P, R]]: +def listen(condition: FlowTrigger) -> FlowMethodDecorator: """Creates a listener that executes when specified conditions are met. This decorator sets up a method to execute in response to other method @@ -24,10 +23,11 @@ def listen( conditions. Args: - condition: Specifies when the listener should execute. + condition: Route label, method reference, or condition returned by + or_() / and_() that triggers the listener. Returns: - A decorator function that wraps the method as a flow listener and preserves its signature. + A flow method decorator that preserves the decorated method's static signature. Raises: ValueError: If the condition format is invalid. @@ -52,4 +52,4 @@ def listen( _set_trigger_metadata(wrapper, condition) return wrapper - return decorator + return cast(FlowMethodDecorator, decorator) diff --git a/lib/crewai/src/crewai/flow/dsl/_router.py b/lib/crewai/src/crewai/flow/dsl/_router.py index 11ffc9d0b..89a666cb5 100644 --- a/lib/crewai/src/crewai/flow/dsl/_router.py +++ b/lib/crewai/src/crewai/flow/dsl/_router.py @@ -8,12 +8,14 @@ from typing import ( Any, Literal, Union, + cast, get_args, get_origin, get_type_hints, ) from crewai.flow.dsl._conditions import _definition_condition_from_runtime +from crewai.flow.dsl._types import FlowMethodDecorator, FlowTrigger from crewai.flow.dsl._utils import ( P, R, @@ -21,7 +23,7 @@ from crewai.flow.dsl._utils import ( _set_trigger_metadata, ) from crewai.flow.flow_definition import FlowMethodDefinition -from crewai.flow.flow_wrappers import FlowCondition, RouterMethod +from crewai.flow.flow_wrappers import RouterMethod def _unwrap_function(function: Any) -> Any: @@ -93,10 +95,10 @@ def _normalize_router_emit(value: Sequence[Any] | str) -> list[str]: def router( - condition: str | FlowCondition | Callable[..., Any], + condition: FlowTrigger, *, emit: Sequence[str] | str | None = None, -) -> Callable[[Callable[P, R]], RouterMethod[P, R]]: +) -> FlowMethodDecorator: """Creates a routing method that directs flow execution based on conditions. This decorator marks a method as a router, which can dynamically determine @@ -105,15 +107,15 @@ def router( Args: condition: Specifies when the router should execute. Can be: - - str: Name of a method that triggers this router + - str: Route label or method name that triggers this router - FlowCondition: Result from or_() or and_(), including nested conditions - - Callable[..., Any]: A method reference that triggers this router + - Flow method reference: A method whose completion triggers this router emit: Optional explicit router output events for static FlowDefinition and visualization. If omitted, Literal/Enum return annotations are used when available. Returns: - A decorator function that wraps the method as a router and preserves its signature. + A flow method decorator that preserves the decorated method's static signature. Raises: ValueError: If the condition format is invalid. @@ -161,4 +163,4 @@ def router( wrapper.__router_emit__ = router_events return wrapper - return decorator + return cast(FlowMethodDecorator, decorator) diff --git a/lib/crewai/src/crewai/flow/dsl/_start.py b/lib/crewai/src/crewai/flow/dsl/_start.py index 652a8332f..dcfde940d 100644 --- a/lib/crewai/src/crewai/flow/dsl/_start.py +++ b/lib/crewai/src/crewai/flow/dsl/_start.py @@ -1,9 +1,10 @@ from __future__ import annotations from collections.abc import Callable -from typing import Any +from typing import cast from crewai.flow.dsl._conditions import _definition_condition_from_runtime +from crewai.flow.dsl._types import FlowMethodDecorator, FlowTrigger from crewai.flow.dsl._utils import ( P, R, @@ -11,12 +12,12 @@ from crewai.flow.dsl._utils import ( _set_trigger_metadata, ) from crewai.flow.flow_definition import FlowMethodDefinition -from crewai.flow.flow_wrappers import FlowCondition, StartMethod +from crewai.flow.flow_wrappers import StartMethod def start( - condition: str | FlowCondition | Callable[..., Any] | None = None, -) -> Callable[[Callable[P, R]], StartMethod[P, R]]: + condition: FlowTrigger | None = None, +) -> FlowMethodDecorator: """Marks a method as a flow's starting point. This decorator designates a method as an entry point for the flow execution. @@ -25,13 +26,13 @@ def start( Args: condition: Defines when the start method should execute. Can be: - - str: Name of a method that triggers this start + - str: Route label or method name that triggers this start - FlowCondition: Result from or_() or and_(), including nested conditions - - Callable[..., Any]: A method reference that triggers this start + - Flow method reference: A method whose completion triggers this start Default is None, meaning unconditional start. Returns: - A decorator function that wraps the method as a flow start point and preserves its signature. + A flow method decorator that preserves the decorated method's static signature. Raises: ValueError: If the condition format is invalid. @@ -65,4 +66,4 @@ def start( _set_flow_method_definition(wrapper, FlowMethodDefinition(start=True)) return wrapper - return decorator + return cast(FlowMethodDecorator, decorator) diff --git a/lib/crewai/src/crewai/flow/dsl/_types.py b/lib/crewai/src/crewai/flow/dsl/_types.py new file mode 100644 index 000000000..829227fbc --- /dev/null +++ b/lib/crewai/src/crewai/flow/dsl/_types.py @@ -0,0 +1,27 @@ +"""Private typing helpers for the Python Flow DSL.""" + +from __future__ import annotations + +from collections.abc import Callable +from typing import Any, Protocol, TypeAlias, TypeVar + +from crewai.flow.flow_wrappers import FlowCondition +from crewai.flow.types import FlowMethodCallable + + +__all__ = ["FlowMethodDecorator", "FlowTrigger"] + +F = TypeVar("F", bound=Callable[..., Any]) + +FlowTrigger: TypeAlias = str | FlowMethodCallable[..., Any] | FlowCondition + + +class FlowMethodDecorator(Protocol): + """Decorator returned by Flow DSL authoring helpers. + + The runtime wraps methods in FlowMethod subclasses, but the authoring + contract preserves the decorated method's static callable type. + """ + + def __call__(self, func: F) -> F: + raise NotImplementedError diff --git a/lib/crewai/src/crewai/flow/dsl/_utils.py b/lib/crewai/src/crewai/flow/dsl/_utils.py index d23bc3886..d31a785f5 100644 --- a/lib/crewai/src/crewai/flow/dsl/_utils.py +++ b/lib/crewai/src/crewai/flow/dsl/_utils.py @@ -1,6 +1,6 @@ from __future__ import annotations -from collections.abc import Callable, Sequence +from collections.abc import Sequence import json import logging from typing import Any, ParamSpec, TypeVar @@ -16,6 +16,7 @@ from crewai.flow.dsl._conditions import ( _runtime_listener_condition_from_definition, is_flow_condition_dict, ) +from crewai.flow.dsl._types import FlowTrigger from crewai.flow.flow_definition import ( FlowConfigDefinition, FlowDefinition, @@ -27,7 +28,6 @@ from crewai.flow.flow_definition import ( FlowStateDefinition, ) from crewai.flow.flow_wrappers import ( - FlowCondition, FlowMethod, ListenMethod, RouterMethod, @@ -67,7 +67,7 @@ def _flow_method_names(values: Sequence[Any]) -> list[FlowMethodName]: def _set_trigger_metadata( wrapper: StartMethod[P, R] | ListenMethod[P, R] | RouterMethod[P, R], - condition: str | FlowCondition | Callable[..., Any], + condition: FlowTrigger, ) -> None: if isinstance(condition, str): wrapper.__trigger_methods__ = [FlowMethodName(condition)] diff --git a/lib/crewai/src/crewai/flow/flow_wrappers.py b/lib/crewai/src/crewai/flow/flow_wrappers.py index 7e42859c8..2fdaeb193 100644 --- a/lib/crewai/src/crewai/flow/flow_wrappers.py +++ b/lib/crewai/src/crewai/flow/flow_wrappers.py @@ -37,16 +37,16 @@ class FlowCondition(TypedDict, total=False): Attributes: type: The type of the condition. - conditions: A list of conditions types. - methods: A list of methods. + conditions: A sequence of route labels, method names, or nested conditions. + methods: A legacy sequence of route labels or method names. """ type: Required[FlowConditionType] - conditions: Sequence[FlowMethodName | FlowCondition] - methods: list[FlowMethodName] + conditions: Sequence[str | FlowMethodName | FlowCondition] + methods: Sequence[str | FlowMethodName] -FlowConditions: TypeAlias = list[FlowMethodName | FlowCondition] +FlowConditions: TypeAlias = Sequence[str | FlowMethodName | FlowCondition] class FlowMethod(Generic[P, R]): diff --git a/lib/crewai/src/crewai/flow/runtime.py b/lib/crewai/src/crewai/flow/runtime.py index 07e48e0e9..80b7a84da 100644 --- a/lib/crewai/src/crewai/flow/runtime.py +++ b/lib/crewai/src/crewai/flow/runtime.py @@ -2832,7 +2832,7 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta): def _evaluate_condition( self, - condition: FlowMethodName | FlowCondition, + condition: str | FlowMethodName | FlowCondition, trigger_method: FlowMethodName, listener_name: FlowMethodName, ) -> bool: diff --git a/lib/crewai/src/crewai/flow/types.py b/lib/crewai/src/crewai/flow/types.py index 46a285bbe..31f45f658 100644 --- a/lib/crewai/src/crewai/flow/types.py +++ b/lib/crewai/src/crewai/flow/types.py @@ -31,7 +31,7 @@ PendingListenerKey = NewType( class FlowMethodCallable(Protocol[P, R]): """A callable that can be used as a flow method reference.""" - __name__: FlowMethodName + __name__: str def __call__(self, *args: P.args, **kwargs: P.kwargs) -> R: ... diff --git a/lib/crewai/src/crewai/memory/recall_flow.py b/lib/crewai/src/crewai/memory/recall_flow.py index 9da5dca64..e09278983 100644 --- a/lib/crewai/src/crewai/memory/recall_flow.py +++ b/lib/crewai/src/crewai/memory/recall_flow.py @@ -337,7 +337,7 @@ class RecallFlow(Flow[RecallState]): @router(re_search) def re_decide_depth(self) -> str: """Re-evaluate depth after re-search. Same logic as decide_depth.""" - return self.decide_depth() # type: ignore[call-arg] + return self.decide_depth() @listen("synthesize") def synthesize_results(self) -> list[MemoryMatch]: diff --git a/lib/crewai/tests/test_flow_definition.py b/lib/crewai/tests/test_flow_definition.py index 1b8325e68..302997735 100644 --- a/lib/crewai/tests/test_flow_definition.py +++ b/lib/crewai/tests/test_flow_definition.py @@ -14,6 +14,7 @@ import crewai.flow.dsl as flow_dsl import crewai.flow.flow_definition as flow_definition import crewai.flow.visualization.builder as visualization_builder from crewai.flow import Flow, and_, human_feedback, listen, or_, persist, router, start +from crewai.flow.dsl._conditions import is_flow_condition_dict def test_flow_public_exports_are_explicit(): @@ -48,6 +49,20 @@ def test_flow_public_exports_are_explicit(): assert "calculate_node_levels" not in flow_visualization.__all__ +def test_flow_condition_dict_accepts_non_string_sequences(): + condition = { + "type": "OR", + "conditions": ( + "approved", + {"type": "AND", "methods": ("validated", "processed")}, + ), + } + + assert is_flow_condition_dict(condition) + assert not is_flow_condition_dict({"type": "OR", "conditions": "approved"}) + assert not is_flow_condition_dict({"type": "OR", "methods": b"approved"}) + + def test_private_flow_helpers_do_not_have_docstrings(): import crewai.flow.flow_wrappers as flow_wrappers import crewai.flow.human_feedback as human_feedback