Split flow DSL monolith into focused decorator modules (#6040)

The Flow DSL lived in one 1033-line `dsl.py` that mixed every decorator
(`@start`/`@listen`/`@router`), the `human_feedback` decorator,
condition combinators, and FlowDefinition extraction helpers in a single
file.

Split it into a `dsl/` package where each decorator gets its own module
(`start.py` 68 lines, `listen.py` 55, `router.py` 164,
`human_feedback.py` 98) and the shared extraction/condition helpers stay
in `utils.py`. The public API is re-exported from `dsl/__init__.py`, so
import paths are unchanged.

This is simpler because each decorator is now read and changed in
isolation instead of scanning a 1000-line file to find one of them, and
router-specific annotation parsing no longer sits next to unrelated
start/listen logic.
This commit is contained in:
Vini Brasil
2026-06-04 15:02:06 -03:00
committed by GitHub
parent aed69237d4
commit 75dad212a2
11 changed files with 744 additions and 664 deletions

View File

@@ -9,9 +9,9 @@ from crewai.flow.conversation import (
ConversationalConfig,
ConversationalInputs,
)
from crewai.flow.dsl import HumanFeedbackResult, human_feedback
from crewai.flow.flow import Flow, and_, listen, or_, router, start
from crewai.flow.flow_config import flow_config
from crewai.flow.human_feedback import HumanFeedbackResult, human_feedback
from crewai.flow.input_provider import InputProvider, InputResponse
from crewai.flow.persistence import persist
from crewai.flow.visualization import (

View File

@@ -0,0 +1,32 @@
"""Flow DSL: the Python authoring layer for Flows.
Provides the ``@start`` / ``@listen`` / ``@router`` decorators and the
``or_`` / ``and_`` condition combinators used to write Flow classes in
Python. The DSL is one way to produce a Flow Structure: this package
extracts a :class:`~crewai.flow.flow_definition.FlowDefinition` from a
Python Flow class. Execution is handled by ``runtime``.
"""
from crewai.flow.dsl._conditions import and_, or_
from crewai.flow.dsl._human_feedback import (
HumanFeedbackResult,
human_feedback,
)
from crewai.flow.dsl._listen import listen
from crewai.flow.dsl._router import router
from crewai.flow.dsl._start import start
from crewai.flow.dsl._utils import (
build_flow_definition as build_flow_definition,
extract_flow_definition as extract_flow_definition,
)
__all__ = [
"HumanFeedbackResult",
"and_",
"human_feedback",
"listen",
"or_",
"router",
"start",
]

View File

@@ -0,0 +1,276 @@
"""Flow DSL condition primitives.
Type guards, the public ``or_`` / ``and_`` combinators, and the conversions
between runtime conditions, normalized conditions, and the
``FlowDefinitionCondition`` shape stored on a :class:`FlowDefinition`. These are
the lower layer of the DSL: the decorators and the definition builder
(``_utils``) build on top of them, so this module imports nothing from its
siblings.
"""
from __future__ import annotations
from collections.abc import Callable, Sequence
from typing import Any
from typing_extensions import TypeIs
from crewai.flow.constants import AND_CONDITION, OR_CONDITION
from crewai.flow.flow_definition import FlowDefinitionCondition
from crewai.flow.flow_wrappers import (
FlowCondition,
FlowConditions,
SimpleFlowCondition,
)
from crewai.flow.types import FlowMethodName
def is_simple_flow_condition(obj: Any) -> TypeIs[SimpleFlowCondition]:
"""Check if the object is a ``(condition_type, methods)`` tuple."""
return (
isinstance(obj, tuple)
and len(obj) == 2
and isinstance(obj[0], str)
and isinstance(obj[1], list)
)
def is_flow_condition_dict(obj: Any) -> TypeIs[FlowCondition]:
"""Check if the object matches the FlowCondition structure."""
if not isinstance(obj, dict):
return False
type_value = obj.get("type")
if type_value not in ("AND", "OR"):
return False
if "conditions" in obj:
conditions = obj["conditions"]
if not isinstance(conditions, list):
return False
for cond in conditions:
if not (
isinstance(cond, str)
or (isinstance(cond, dict) and is_flow_condition_dict(cond))
):
return False
if "methods" in obj:
methods = obj["methods"]
if not (isinstance(methods, list) and all(isinstance(m, str) for m in methods)):
return False
allowed_keys = {"type", "conditions", "methods"}
if not set(obj).issubset(allowed_keys):
return False
return True
def _method_reference_name(value: Any) -> FlowMethodName | None:
name = getattr(value, "__name__", None)
if callable(value) and isinstance(name, str):
return FlowMethodName(name)
return None
def _normalize_condition(
condition: FlowConditions | FlowCondition | str,
) -> FlowCondition:
if isinstance(condition, str):
return {"type": OR_CONDITION, "conditions": [FlowMethodName(condition)]}
if is_flow_condition_dict(condition):
if "conditions" in condition:
return condition
if "methods" in condition:
return {"type": condition["type"], "conditions": condition["methods"]}
return condition
if isinstance(condition, list) and all(
isinstance(item, str) or is_flow_condition_dict(item) for item in condition
):
return {"type": OR_CONDITION, "conditions": condition}
raise ValueError(f"Cannot normalize condition: {condition}")
def _extract_all_methods_recursive(
condition: str | FlowCondition | dict[str, Any] | list[Any],
flow: Any | None = None,
) -> list[FlowMethodName]:
if isinstance(condition, str):
if flow is not None:
if condition in flow._methods:
return [FlowMethodName(condition)]
return []
return [FlowMethodName(condition)]
if is_flow_condition_dict(condition):
normalized = _normalize_condition(condition)
methods = []
for sub_cond in normalized.get("conditions", []):
methods.extend(_extract_all_methods_recursive(sub_cond, flow))
return methods
if isinstance(condition, list):
methods = []
for item in condition:
methods.extend(_extract_all_methods_recursive(item, flow))
return methods
return []
def _extract_all_methods(
condition: str | FlowCondition | dict[str, Any] | list[Any],
) -> list[FlowMethodName]:
if isinstance(condition, str):
return [FlowMethodName(condition)]
if is_flow_condition_dict(condition):
normalized = _normalize_condition(condition)
cond_type = normalized.get("type", OR_CONDITION)
if cond_type == AND_CONDITION:
return [
FlowMethodName(sub_cond)
for sub_cond in normalized.get("conditions", [])
if isinstance(sub_cond, str)
]
return []
if isinstance(condition, list):
methods = []
for item in condition:
methods.extend(_extract_all_methods(item))
return methods
return []
def _condition_trigger(
condition: str | FlowCondition | Callable[..., Any],
) -> FlowMethodName | FlowCondition:
if isinstance(condition, str):
return FlowMethodName(condition)
if is_flow_condition_dict(condition):
return condition
method_name = _method_reference_name(condition)
if method_name is not None:
return method_name
raise ValueError("Invalid condition")
def _condition_triggers(
conditions: Sequence[str | FlowCondition | Callable[..., Any]],
error_message: str,
) -> FlowConditions:
try:
return [_condition_trigger(condition) for condition in conditions]
except ValueError as exc:
raise ValueError(error_message) from exc
def _definition_condition_from_runtime(condition: Any) -> FlowDefinitionCondition:
if isinstance(condition, str):
return str(condition)
method_name = _method_reference_name(condition)
if method_name is not None:
return str(method_name)
if is_flow_condition_dict(condition):
normalized = _normalize_condition(condition)
key = "and" if normalized.get("type") == AND_CONDITION else "or"
return {
key: [
_definition_condition_from_runtime(sub_condition)
for sub_condition in normalized.get("conditions", [])
]
}
if isinstance(condition, list):
return {"or": [_definition_condition_from_runtime(item) for item in condition]}
return str(condition)
def or_(*conditions: str | FlowCondition | Callable[..., Any]) -> FlowCondition:
"""Combines multiple conditions with OR logic for flow control.
Creates a condition that is satisfied when any of the specified conditions
are met. This is used with @start, @listen, or @router decorators to create
complex triggering conditions.
Args:
conditions: Variable number of conditions that can be method names, existing condition dictionaries, or method references.
Returns:
A condition dictionary with format {"type": "OR", "conditions": list_of_conditions} where each condition can be a string (method name) or a nested dict
Raises:
ValueError: If condition format is invalid.
Examples:
>>> @listen(or_("success", "timeout"))
>>> def handle_completion(self):
... pass
>>> @listen(or_(and_("step1", "step2"), "step3"))
>>> def handle_nested(self):
... pass
"""
processed_triggers = _condition_triggers(conditions, "Invalid condition in or_()")
return {"type": OR_CONDITION, "conditions": processed_triggers}
def and_(*conditions: str | FlowCondition | Callable[..., Any]) -> FlowCondition:
"""Combines multiple conditions with AND logic for flow control.
Creates a condition that is satisfied only when all specified conditions
are met. This is used with @start, @listen, or @router decorators to create
complex triggering conditions.
Args:
*conditions: Variable number of conditions that can be method names, existing condition dictionaries, or method references.
Returns:
A condition dictionary with format {"type": "AND", "conditions": list_of_conditions}
where each condition can be a string (method name) or a nested dict
Raises:
ValueError: If any condition is invalid.
Examples:
>>> @listen(and_("validated", "processed"))
>>> def handle_complete_data(self):
... pass
>>> @listen(and_(or_("step1", "step2"), "step3"))
>>> def handle_nested(self):
... pass
"""
processed_triggers = _condition_triggers(conditions, "Invalid condition in and_()")
return {"type": AND_CONDITION, "conditions": processed_triggers}
def _runtime_condition_from_definition(
condition: FlowDefinitionCondition,
) -> FlowMethodName | FlowCondition:
if isinstance(condition, str):
return FlowMethodName(condition)
if is_flow_condition_dict(condition):
return condition
if "and" in condition:
return {
"type": AND_CONDITION,
"conditions": [
_runtime_condition_from_definition(item)
for item in condition.get("and", [])
],
}
return {
"type": OR_CONDITION,
"conditions": [
_runtime_condition_from_definition(item) for item in condition.get("or", [])
],
}
def _runtime_listener_condition_from_definition(
condition: FlowDefinitionCondition,
) -> SimpleFlowCondition | FlowCondition:
runtime_condition = _runtime_condition_from_definition(condition)
if isinstance(runtime_condition, str):
return (OR_CONDITION, [FlowMethodName(str(runtime_condition))])
return runtime_condition

View File

@@ -0,0 +1,98 @@
from __future__ import annotations
from collections.abc import Callable, Sequence
from typing import TYPE_CHECKING, Any, TypeVar
from crewai.flow.flow_definition import FlowMethodDefinition
from crewai.flow.human_feedback import (
HumanFeedbackConfig,
HumanFeedbackResult,
_build_human_feedback_runtime_decorator,
)
if TYPE_CHECKING:
from crewai.flow.async_feedback.types import HumanFeedbackProvider
from crewai.llms.base_llm import BaseLLM
F = TypeVar("F", bound=Callable[..., Any])
__all__ = ["HumanFeedbackResult", "human_feedback"]
def _stamp_human_feedback_metadata(
wrapper: Any,
func: Callable[..., Any],
config: HumanFeedbackConfig,
) -> None:
for attr in [
"__is_start_method__",
"__trigger_methods__",
"__condition_type__",
"__trigger_condition__",
"__is_flow_method__",
"__flow_persistence_config__",
"__is_router__",
"__router_emit__",
"__flow_method_definition__",
]:
if hasattr(func, attr):
setattr(wrapper, attr, getattr(func, attr))
wrapper.__human_feedback_config__ = config
wrapper.__is_flow_method__ = True
if config.emit:
wrapper.__is_router__ = True
wrapper.__router_emit__ = list(config.emit)
fragment = getattr(wrapper, "__flow_method_definition__", None)
if isinstance(fragment, FlowMethodDefinition):
wrapper.__flow_method_definition__ = fragment.model_copy(
update={"router": True, "emit": list(config.emit)}
)
wrapper._human_feedback_llm = config.llm
def human_feedback(
message: str,
emit: Sequence[str] | None = None,
llm: str | BaseLLM | None = "gpt-4o-mini",
default_outcome: str | None = None,
metadata: dict[str, Any] | None = None,
provider: HumanFeedbackProvider | None = None,
learn: bool = False,
learn_source: str = "hitl",
learn_strict: bool = False,
) -> Callable[[F], F]:
"""Decorator for Flow methods that require human feedback."""
runtime_decorator = _build_human_feedback_runtime_decorator(
message=message,
emit=emit,
llm=llm,
default_outcome=default_outcome,
metadata=metadata,
provider=provider,
learn=learn,
learn_source=learn_source,
learn_strict=learn_strict,
)
config = HumanFeedbackConfig(
message=message,
emit=emit,
llm=llm,
default_outcome=default_outcome,
metadata=metadata,
provider=provider,
learn=learn,
learn_source=learn_source,
learn_strict=learn_strict,
)
def decorator(func: F) -> F:
wrapper = runtime_decorator(func)
_stamp_human_feedback_metadata(wrapper, func, config)
return wrapper
return decorator

View File

@@ -0,0 +1,55 @@
from __future__ import annotations
from collections.abc import Callable
from typing import Any
from crewai.flow.dsl._conditions import _definition_condition_from_runtime
from crewai.flow.dsl._utils import (
P,
R,
_set_flow_method_definition,
_set_trigger_metadata,
)
from crewai.flow.flow_definition import FlowMethodDefinition
from crewai.flow.flow_wrappers import FlowCondition, ListenMethod
def listen(
condition: str | FlowCondition | Callable[..., Any],
) -> Callable[[Callable[P, R]], ListenMethod[P, R]]:
"""Creates a listener that executes when specified conditions are met.
This decorator sets up a method to execute in response to other method
executions in the flow. It supports both simple and complex triggering
conditions.
Args:
condition: Specifies when the listener should execute.
Returns:
A decorator function that wraps the method as a flow listener and preserves its signature.
Raises:
ValueError: If the condition format is invalid.
Examples:
>>> @listen("process_data")
>>> def handle_processed_data(self):
... pass
>>> @listen("method_name")
>>> def handle_completion(self):
... pass
"""
def decorator(func: Callable[P, R]) -> ListenMethod[P, R]:
wrapper = ListenMethod(func)
_set_flow_method_definition(
wrapper,
FlowMethodDefinition(listen=_definition_condition_from_runtime(condition)),
)
_set_trigger_metadata(wrapper, condition)
return wrapper
return decorator

View File

@@ -0,0 +1,164 @@
from __future__ import annotations
from collections.abc import Callable, Sequence
from enum import Enum
import inspect
from types import UnionType
from typing import (
Any,
Literal,
Union,
get_args,
get_origin,
get_type_hints,
)
from crewai.flow.dsl._conditions import _definition_condition_from_runtime
from crewai.flow.dsl._utils import (
P,
R,
_set_flow_method_definition,
_set_trigger_metadata,
)
from crewai.flow.flow_definition import FlowMethodDefinition
from crewai.flow.flow_wrappers import FlowCondition, RouterMethod
def _unwrap_function(function: Any) -> Any:
if hasattr(function, "__func__"):
function = function.__func__
if hasattr(function, "__wrapped__"):
wrapped = function.__wrapped__
if hasattr(wrapped, "unwrap"):
return wrapped.unwrap()
return wrapped
if hasattr(function, "unwrap"):
return function.unwrap()
return function
def _string_values_from_annotation(annotation: Any) -> list[str]:
if annotation is inspect.Signature.empty or isinstance(annotation, str):
return []
if isinstance(annotation, type) and issubclass(annotation, Enum):
return [member.value for member in annotation if isinstance(member.value, str)]
origin = get_origin(annotation)
if origin is None:
return []
args = get_args(annotation)
if origin is Literal or getattr(origin, "__name__", "") == "Literal":
return [arg for arg in args if isinstance(arg, str)]
if not (
origin is Union
or origin is UnionType
or getattr(origin, "__name__", "") == "Annotated"
):
return []
values: list[str] = []
for arg in args:
values.extend(_string_values_from_annotation(arg))
return values
def _return_annotation(function: Any) -> Any:
unwrapped = _unwrap_function(function)
try:
return get_type_hints(unwrapped, include_extras=True).get(
"return", inspect.Signature.empty
)
except (NameError, TypeError, ValueError):
try:
return inspect.signature(unwrapped).return_annotation
except (TypeError, ValueError):
return inspect.Signature.empty
def _get_router_return_events(function: Any) -> list[str] | None:
values = _string_values_from_annotation(_return_annotation(function))
return list(dict.fromkeys(values)) if values else None
def _normalize_router_emit(value: Sequence[Any] | str) -> list[str]:
if isinstance(value, str):
return [str(value)]
return list(dict.fromkeys(str(item) for item in value))
def router(
condition: str | FlowCondition | Callable[..., Any],
*,
emit: Sequence[str] | str | None = None,
) -> Callable[[Callable[P, R]], RouterMethod[P, R]]:
"""Creates a routing method that directs flow execution based on conditions.
This decorator marks a method as a router, which can dynamically determine
the next steps in the flow based on its return value. Routers are triggered
by specified conditions and can return constants that emit downstream events.
Args:
condition: Specifies when the router should execute. Can be:
- str: Name of a method that triggers this router
- FlowCondition: Result from or_() or and_(), including nested conditions
- Callable[..., Any]: A method reference that triggers this router
emit: Optional explicit router output events for static FlowDefinition
and visualization. If omitted, Literal/Enum return annotations are
used when available.
Returns:
A decorator function that wraps the method as a router and preserves its signature.
Raises:
ValueError: If the condition format is invalid.
Examples:
>>> @router("check_status")
>>> def route_based_on_status(self):
... if self.state.status == "success":
... return "SUCCESS"
... return "FAILURE"
>>> @router(and_("validate", "process"))
>>> def complex_routing(self):
... if all([self.state.valid, self.state.processed]):
... return "CONTINUE"
... return "STOP"
>>> @router("check_status", emit=["SUCCESS", "FAILURE"])
>>> def explicit_routing(self):
... return "SUCCESS"
"""
def decorator(func: Callable[P, R]) -> RouterMethod[P, R]:
wrapper = RouterMethod(func)
if emit is not None:
router_events = _normalize_router_emit(emit)
else:
router_events = _get_router_return_events(func) or []
_set_flow_method_definition(
wrapper,
FlowMethodDefinition(
listen=_definition_condition_from_runtime(condition),
router=True,
emit=router_events or None,
),
)
_set_trigger_metadata(wrapper, condition)
if emit is not None:
wrapper.__router_emit__ = router_events
elif router_events:
wrapper.__router_emit__ = router_events
return wrapper
return decorator

View File

@@ -0,0 +1,68 @@
from __future__ import annotations
from collections.abc import Callable
from typing import Any
from crewai.flow.dsl._conditions import _definition_condition_from_runtime
from crewai.flow.dsl._utils import (
P,
R,
_set_flow_method_definition,
_set_trigger_metadata,
)
from crewai.flow.flow_definition import FlowMethodDefinition
from crewai.flow.flow_wrappers import FlowCondition, StartMethod
def start(
condition: str | FlowCondition | Callable[..., Any] | None = None,
) -> Callable[[Callable[P, R]], StartMethod[P, R]]:
"""Marks a method as a flow's starting point.
This decorator designates a method as an entry point for the flow execution.
It can optionally specify conditions that trigger the start based on other
method executions.
Args:
condition: Defines when the start method should execute. Can be:
- str: Name of a method that triggers this start
- FlowCondition: Result from or_() or and_(), including nested conditions
- Callable[..., Any]: A method reference that triggers this start
Default is None, meaning unconditional start.
Returns:
A decorator function that wraps the method as a flow start point and preserves its signature.
Raises:
ValueError: If the condition format is invalid.
Examples:
>>> @start() # Unconditional start
>>> def begin_flow(self):
... pass
>>> @start("method_name") # Start after specific method
>>> def conditional_start(self):
... pass
>>> @start(and_("method1", "method2")) # Start after multiple methods
>>> def complex_start(self):
... pass
"""
def decorator(func: Callable[P, R]) -> StartMethod[P, R]:
wrapper = StartMethod(func)
if condition is not None:
_set_flow_method_definition(
wrapper,
FlowMethodDefinition(
start=_definition_condition_from_runtime(condition)
),
)
_set_trigger_metadata(wrapper, condition)
else:
_set_flow_method_definition(wrapper, FlowMethodDefinition(start=True))
return wrapper
return decorator

View File

@@ -1,35 +1,21 @@
"""Flow DSL: the Python authoring layer for Flows.
Provides the ``@start`` / ``@listen`` / ``@router`` decorators and the
``or_`` / ``and_`` condition combinators used to write Flow classes in
Python. The DSL is one way to produce a Flow Structure: this module
extracts a :class:`~crewai.flow.flow_definition.FlowDefinition` from a
Python Flow class. Execution is handled by ``runtime``.
"""
from __future__ import annotations
from collections.abc import Callable, Sequence
from enum import Enum
import inspect
import json
import logging
from types import UnionType
from typing import (
Any,
Literal,
ParamSpec,
TypeVar,
Union,
get_args,
get_origin,
get_type_hints,
)
from typing import Any, ParamSpec, TypeVar
from pydantic import BaseModel
from typing_extensions import TypeIs
from crewai.flow.constants import AND_CONDITION, OR_CONDITION
from crewai.flow.dsl._conditions import (
_definition_condition_from_runtime,
_extract_all_methods,
_method_reference_name,
_runtime_listener_condition_from_definition,
is_flow_condition_dict,
)
from crewai.flow.flow_definition import (
FlowConfigDefinition,
FlowDefinition,
@@ -42,11 +28,9 @@ from crewai.flow.flow_definition import (
)
from crewai.flow.flow_wrappers import (
FlowCondition,
FlowConditions,
FlowMethod,
ListenMethod,
RouterMethod,
SimpleFlowCondition,
StartMethod,
)
from crewai.flow.types import FlowMethodName
@@ -57,21 +41,9 @@ R = TypeVar("R")
logger = logging.getLogger(__name__)
__all__ = ["and_", "listen", "or_", "router", "start"]
_FLOW_METHOD_DEFINITION_ATTR = "__flow_method_definition__"
def is_simple_flow_condition(obj: Any) -> TypeIs[SimpleFlowCondition]:
"""Check if the object is a ``(condition_type, methods)`` tuple."""
return (
isinstance(obj, tuple)
and len(obj) == 2
and isinstance(obj[0], str)
and isinstance(obj[1], list)
)
def is_flow_method(obj: Any) -> TypeIs[FlowMethod[Any, Any]]:
"""Check if the object carries Flow method wrapper metadata."""
return (
@@ -89,184 +61,10 @@ def _should_include_flow_method(flow_class: type, method: Any) -> bool:
return True
def is_flow_condition_dict(obj: Any) -> TypeIs[FlowCondition]:
"""Check if the object matches the FlowCondition structure."""
if not isinstance(obj, dict):
return False
type_value = obj.get("type")
if type_value not in ("AND", "OR"):
return False
if "conditions" in obj:
conditions = obj["conditions"]
if not isinstance(conditions, list):
return False
for cond in conditions:
if not (
isinstance(cond, str)
or (isinstance(cond, dict) and is_flow_condition_dict(cond))
):
return False
if "methods" in obj:
methods = obj["methods"]
if not (isinstance(methods, list) and all(isinstance(m, str) for m in methods)):
return False
allowed_keys = {"type", "conditions", "methods"}
if not set(obj).issubset(allowed_keys):
return False
return True
def _method_reference_name(value: Any) -> FlowMethodName | None:
name = getattr(value, "__name__", None)
if callable(value) and isinstance(name, str):
return FlowMethodName(name)
return None
def _flow_method_names(values: Sequence[Any]) -> list[FlowMethodName]:
return [FlowMethodName(str(value)) for value in values]
def _extract_all_methods_recursive(
condition: str | FlowCondition | dict[str, Any] | list[Any],
flow: Any | None = None,
) -> list[FlowMethodName]:
if isinstance(condition, str):
if flow is not None:
if condition in flow._methods:
return [FlowMethodName(condition)]
return []
return [FlowMethodName(condition)]
if is_flow_condition_dict(condition):
normalized = _normalize_condition(condition)
methods = []
for sub_cond in normalized.get("conditions", []):
methods.extend(_extract_all_methods_recursive(sub_cond, flow))
return methods
if isinstance(condition, list):
methods = []
for item in condition:
methods.extend(_extract_all_methods_recursive(item, flow))
return methods
return []
def _normalize_condition(
condition: FlowConditions | FlowCondition | str,
) -> FlowCondition:
if isinstance(condition, str):
return {"type": OR_CONDITION, "conditions": [FlowMethodName(condition)]}
if is_flow_condition_dict(condition):
if "conditions" in condition:
return condition
if "methods" in condition:
return {"type": condition["type"], "conditions": condition["methods"]}
return condition
if isinstance(condition, list) and all(
isinstance(item, str) or is_flow_condition_dict(item) for item in condition
):
return {"type": OR_CONDITION, "conditions": condition}
raise ValueError(f"Cannot normalize condition: {condition}")
def _extract_all_methods(
condition: str | FlowCondition | dict[str, Any] | list[Any],
) -> list[FlowMethodName]:
if isinstance(condition, str):
return [FlowMethodName(condition)]
if is_flow_condition_dict(condition):
normalized = _normalize_condition(condition)
cond_type = normalized.get("type", OR_CONDITION)
if cond_type == AND_CONDITION:
return [
FlowMethodName(sub_cond)
for sub_cond in normalized.get("conditions", [])
if isinstance(sub_cond, str)
]
return []
if isinstance(condition, list):
methods = []
for item in condition:
methods.extend(_extract_all_methods(item))
return methods
return []
def _unwrap_function(function: Any) -> Any:
if hasattr(function, "__func__"):
function = function.__func__
if hasattr(function, "__wrapped__"):
wrapped = function.__wrapped__
if hasattr(wrapped, "unwrap"):
return wrapped.unwrap()
return wrapped
if hasattr(function, "unwrap"):
return function.unwrap()
return function
def _string_values_from_annotation(annotation: Any) -> list[str]:
if annotation is inspect.Signature.empty or isinstance(annotation, str):
return []
if isinstance(annotation, type) and issubclass(annotation, Enum):
return [member.value for member in annotation if isinstance(member.value, str)]
origin = get_origin(annotation)
if origin is None:
return []
args = get_args(annotation)
if origin is Literal or getattr(origin, "__name__", "") == "Literal":
return [arg for arg in args if isinstance(arg, str)]
if not (
origin is Union
or origin is UnionType
or getattr(origin, "__name__", "") == "Annotated"
):
return []
values: list[str] = []
for arg in args:
values.extend(_string_values_from_annotation(arg))
return values
def _return_annotation(function: Any) -> Any:
unwrapped = _unwrap_function(function)
try:
return get_type_hints(unwrapped, include_extras=True).get(
"return", inspect.Signature.empty
)
except (NameError, TypeError, ValueError):
try:
return inspect.signature(unwrapped).return_annotation
except (TypeError, ValueError):
return inspect.Signature.empty
def _get_router_return_events(function: Any) -> list[str] | None:
values = _string_values_from_annotation(_return_annotation(function))
return list(dict.fromkeys(values)) if values else None
def _normalize_router_emit(value: Sequence[Any] | str) -> list[str]:
if isinstance(value, str):
return [str(value)]
return list(dict.fromkeys(str(item) for item in value))
def _set_trigger_metadata(
wrapper: StartMethod[P, R] | ListenMethod[P, R] | RouterMethod[P, R],
condition: str | FlowCondition | Callable[..., Any],
@@ -299,29 +97,6 @@ def _set_trigger_metadata(
)
def _condition_trigger(
condition: str | FlowCondition | Callable[..., Any],
) -> FlowMethodName | FlowCondition:
if isinstance(condition, str):
return FlowMethodName(condition)
if is_flow_condition_dict(condition):
return condition
method_name = _method_reference_name(condition)
if method_name is not None:
return method_name
raise ValueError("Invalid condition")
def _condition_triggers(
conditions: Sequence[str | FlowCondition | Callable[..., Any]],
error_message: str,
) -> FlowConditions:
try:
return [_condition_trigger(condition) for condition in conditions]
except ValueError as exc:
raise ValueError(error_message) from exc
def _set_flow_method_definition(
wrapper: StartMethod[P, R] | ListenMethod[P, R] | RouterMethod[P, R],
definition: FlowMethodDefinition,
@@ -338,232 +113,6 @@ def _get_flow_method_definition(method: Any) -> FlowMethodDefinition | None:
return None
def start(
condition: str | FlowCondition | Callable[..., Any] | None = None,
) -> Callable[[Callable[P, R]], StartMethod[P, R]]:
"""Marks a method as a flow's starting point.
This decorator designates a method as an entry point for the flow execution.
It can optionally specify conditions that trigger the start based on other
method executions.
Args:
condition: Defines when the start method should execute. Can be:
- str: Name of a method that triggers this start
- FlowCondition: Result from or_() or and_(), including nested conditions
- Callable[..., Any]: A method reference that triggers this start
Default is None, meaning unconditional start.
Returns:
A decorator function that wraps the method as a flow start point and preserves its signature.
Raises:
ValueError: If the condition format is invalid.
Examples:
>>> @start() # Unconditional start
>>> def begin_flow(self):
... pass
>>> @start("method_name") # Start after specific method
>>> def conditional_start(self):
... pass
>>> @start(and_("method1", "method2")) # Start after multiple methods
>>> def complex_start(self):
... pass
"""
def decorator(func: Callable[P, R]) -> StartMethod[P, R]:
wrapper = StartMethod(func)
if condition is not None:
_set_flow_method_definition(
wrapper,
FlowMethodDefinition(
start=_definition_condition_from_runtime(condition)
),
)
_set_trigger_metadata(wrapper, condition)
else:
_set_flow_method_definition(wrapper, FlowMethodDefinition(start=True))
return wrapper
return decorator
def listen(
condition: str | FlowCondition | Callable[..., Any],
) -> Callable[[Callable[P, R]], ListenMethod[P, R]]:
"""Creates a listener that executes when specified conditions are met.
This decorator sets up a method to execute in response to other method
executions in the flow. It supports both simple and complex triggering
conditions.
Args:
condition: Specifies when the listener should execute.
Returns:
A decorator function that wraps the method as a flow listener and preserves its signature.
Raises:
ValueError: If the condition format is invalid.
Examples:
>>> @listen("process_data")
>>> def handle_processed_data(self):
... pass
>>> @listen("method_name")
>>> def handle_completion(self):
... pass
"""
def decorator(func: Callable[P, R]) -> ListenMethod[P, R]:
wrapper = ListenMethod(func)
_set_flow_method_definition(
wrapper,
FlowMethodDefinition(listen=_definition_condition_from_runtime(condition)),
)
_set_trigger_metadata(wrapper, condition)
return wrapper
return decorator
def router(
condition: str | FlowCondition | Callable[..., Any],
*,
emit: Sequence[str] | str | None = None,
) -> Callable[[Callable[P, R]], RouterMethod[P, R]]:
"""Creates a routing method that directs flow execution based on conditions.
This decorator marks a method as a router, which can dynamically determine
the next steps in the flow based on its return value. Routers are triggered
by specified conditions and can return constants that emit downstream events.
Args:
condition: Specifies when the router should execute. Can be:
- str: Name of a method that triggers this router
- FlowCondition: Result from or_() or and_(), including nested conditions
- Callable[..., Any]: A method reference that triggers this router
emit: Optional explicit router output events for static FlowDefinition
and visualization. If omitted, Literal/Enum return annotations are
used when available.
Returns:
A decorator function that wraps the method as a router and preserves its signature.
Raises:
ValueError: If the condition format is invalid.
Examples:
>>> @router("check_status")
>>> def route_based_on_status(self):
... if self.state.status == "success":
... return "SUCCESS"
... return "FAILURE"
>>> @router(and_("validate", "process"))
>>> def complex_routing(self):
... if all([self.state.valid, self.state.processed]):
... return "CONTINUE"
... return "STOP"
>>> @router("check_status", emit=["SUCCESS", "FAILURE"])
>>> def explicit_routing(self):
... return "SUCCESS"
"""
def decorator(func: Callable[P, R]) -> RouterMethod[P, R]:
wrapper = RouterMethod(func)
if emit is not None:
router_events = _normalize_router_emit(emit)
else:
router_events = _get_router_return_events(func) or []
_set_flow_method_definition(
wrapper,
FlowMethodDefinition(
listen=_definition_condition_from_runtime(condition),
router=True,
emit=router_events or None,
),
)
_set_trigger_metadata(wrapper, condition)
if emit is not None:
wrapper.__router_emit__ = router_events
elif router_events:
wrapper.__router_emit__ = router_events
return wrapper
return decorator
def or_(*conditions: str | FlowCondition | Callable[..., Any]) -> FlowCondition:
"""Combines multiple conditions with OR logic for flow control.
Creates a condition that is satisfied when any of the specified conditions
are met. This is used with @start, @listen, or @router decorators to create
complex triggering conditions.
Args:
conditions: Variable number of conditions that can be method names, existing condition dictionaries, or method references.
Returns:
A condition dictionary with format {"type": "OR", "conditions": list_of_conditions} where each condition can be a string (method name) or a nested dict
Raises:
ValueError: If condition format is invalid.
Examples:
>>> @listen(or_("success", "timeout"))
>>> def handle_completion(self):
... pass
>>> @listen(or_(and_("step1", "step2"), "step3"))
>>> def handle_nested(self):
... pass
"""
processed_triggers = _condition_triggers(conditions, "Invalid condition in or_()")
return {"type": OR_CONDITION, "conditions": processed_triggers}
def and_(*conditions: str | FlowCondition | Callable[..., Any]) -> FlowCondition:
"""Combines multiple conditions with AND logic for flow control.
Creates a condition that is satisfied only when all specified conditions
are met. This is used with @start, @listen, or @router decorators to create
complex triggering conditions.
Args:
*conditions: Variable number of conditions that can be method names, existing condition dictionaries, or method references.
Returns:
A condition dictionary with format {"type": "AND", "conditions": list_of_conditions}
where each condition can be a string (method name) or a nested dict
Raises:
ValueError: If any condition is invalid.
Examples:
>>> @listen(and_("validated", "processed"))
>>> def handle_complete_data(self):
... pass
>>> @listen(and_(or_("step1", "step2"), "step3"))
>>> def handle_nested(self):
... pass
"""
processed_triggers = _condition_triggers(conditions, "Invalid condition in and_()")
return {"type": AND_CONDITION, "conditions": processed_triggers}
def _object_ref(value: Any) -> str:
target = value if isinstance(value, type) else type(value)
module = getattr(target, "__module__", "")
@@ -689,26 +238,6 @@ def _build_config_definition(
return FlowConfigDefinition(**values)
def _definition_condition_from_runtime(condition: Any) -> FlowDefinitionCondition:
if isinstance(condition, str):
return str(condition)
method_name = _method_reference_name(condition)
if method_name is not None:
return str(method_name)
if is_flow_condition_dict(condition):
normalized = _normalize_condition(condition)
key = "and" if normalized.get("type") == AND_CONDITION else "or"
return {
key: [
_definition_condition_from_runtime(sub_condition)
for sub_condition in normalized.get("conditions", [])
]
}
if isinstance(condition, list):
return {"or": [_definition_condition_from_runtime(item) for item in condition]}
return str(condition)
def _condition_from_method_metadata(method: Any) -> FlowDefinitionCondition | None:
trigger_condition = getattr(method, "__trigger_condition__", None)
if trigger_condition is not None:
@@ -760,39 +289,6 @@ def _definition_trigger_condition(
return None
def _runtime_condition_from_definition(
condition: FlowDefinitionCondition,
) -> FlowMethodName | FlowCondition:
if isinstance(condition, str):
return FlowMethodName(condition)
if is_flow_condition_dict(condition):
return condition
if "and" in condition:
return {
"type": AND_CONDITION,
"conditions": [
_runtime_condition_from_definition(item)
for item in condition.get("and", [])
],
}
return {
"type": OR_CONDITION,
"conditions": [
_runtime_condition_from_definition(item) for item in condition.get("or", [])
],
}
def _runtime_listener_condition_from_definition(
condition: FlowDefinitionCondition,
) -> SimpleFlowCondition | FlowCondition:
runtime_condition = _runtime_condition_from_definition(condition)
if isinstance(runtime_condition, str):
return (OR_CONDITION, [FlowMethodName(str(runtime_condition))])
return runtime_condition
def _build_human_feedback_definition(
method: Any,
diagnostics: list[FlowDefinitionDiagnostic],

View File

@@ -65,7 +65,6 @@ from typing import TYPE_CHECKING, Any, TypeVar
from pydantic import BaseModel, Field
from crewai.flow.flow_definition import FlowMethodDefinition
from crewai.flow.flow_wrappers import FlowMethod
@@ -222,7 +221,7 @@ class DistilledLessons(BaseModel):
)
def human_feedback(
def _build_human_feedback_runtime_decorator(
message: str,
emit: Sequence[str] | None = None,
llm: str | BaseLLM | None = "gpt-4o-mini",
@@ -233,102 +232,6 @@ def human_feedback(
learn_source: str = "hitl",
learn_strict: bool = False,
) -> Callable[[F], F]:
"""Decorator for Flow methods that require human feedback.
This decorator wraps a Flow method to:
1. Execute the method and capture its output
2. Display the output to the human with a feedback request
3. Collect the human's free-form feedback
4. Optionally collapse the feedback to a predefined outcome using an LLM
5. Store the result for access by downstream methods
When `emit` is specified, the decorator acts as a router, and the
collapsed outcome triggers the appropriate @listen decorated method.
Supports both synchronous (blocking) and asynchronous (non-blocking)
feedback collection through the `provider` parameter. If no provider
is specified, defaults to synchronous console input.
Args:
message: The message shown to the human when requesting feedback.
This should clearly explain what kind of feedback is expected.
emit: Optional sequence of outcome strings. When provided, the
human's feedback will be collapsed to one of these outcomes
using the specified LLM. The outcome then triggers @listen
methods that match.
llm: The LLM model to use for collapsing feedback to outcomes.
Required when emit is specified. Can be a model string
like "gpt-4o-mini" or a BaseLLM instance.
default_outcome: The outcome to use when the human provides no
feedback (empty input). Must be one of the emit values
if emit is specified.
metadata: Optional metadata for enterprise integrations. This is
passed through to the HumanFeedbackResult and can be used
by enterprise forks for features like Slack/Teams integration.
provider: Optional HumanFeedbackProvider for custom feedback
collection. Use this for async workflows that integrate with
external systems like Slack, Teams, or webhooks. When the
provider raises HumanFeedbackPending, the flow pauses and
can be resumed later with Flow.resume().
learn: Enable HITL learning. Recall past lessons to pre-review
output before the human sees it, and distill new lessons
from feedback after.
learn_source: Memory source tag for stored/recalled lessons.
learn_strict: When True, re-raise exceptions from the pre-review
and distillation steps instead of falling back to raw output.
Default False preserves graceful degradation; failures are
always logged via ``logger.warning`` regardless of this flag.
Returns:
A decorator function that wraps the method with human feedback
collection logic.
Raises:
ValueError: If emit is specified but llm is not provided.
ValueError: If default_outcome is specified but emit is not.
ValueError: If default_outcome is not in the emit list.
HumanFeedbackPending: When an async provider pauses execution.
Example:
Basic feedback without routing:
```python
@start()
@human_feedback(message="Please review this output:")
def generate_content(self):
return "Generated content..."
```
With routing based on feedback:
```python
@start()
@human_feedback(
message="Review and approve or reject:",
emit=["approved", "rejected", "needs_revision"],
llm="gpt-4o-mini",
default_outcome="needs_revision",
)
def review_document(self):
return document_content
@listen("approved")
def publish(self):
print(f"Publishing: {self.last_human_feedback.output}")
```
Async feedback with custom provider:
```python
@start()
@human_feedback(
message="Review this content:",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
provider=SlackProvider(channel="#reviews"),
)
def generate_content(self):
return "Content to review..."
```
"""
if emit is not None:
if not llm:
raise ValueError(
@@ -631,55 +534,33 @@ def human_feedback(
wrapper = sync_wrapper
for attr in [
"__is_start_method__",
"__trigger_methods__",
"__condition_type__",
"__trigger_condition__",
"__is_flow_method__",
"__flow_persistence_config__",
"__is_router__",
"__router_emit__",
"__flow_method_definition__",
]:
if hasattr(func, attr):
setattr(wrapper, attr, getattr(func, attr))
# Create config inline to avoid race conditions
wrapper.__human_feedback_config__ = HumanFeedbackConfig(
message=message,
emit=emit,
llm=llm,
default_outcome=default_outcome,
metadata=metadata,
provider=provider,
learn=learn,
learn_source=learn_source,
learn_strict=learn_strict,
)
wrapper.__is_flow_method__ = True
if emit:
wrapper.__is_router__ = True
wrapper.__router_emit__ = list(emit)
# Keep the definition fragment in sync: emit promotes the method to
# a router and the feedback outcomes replace any emit recorded by an
# inner @router. Copy before updating so the wrapped method's own
# fragment (shared by reference) is left untouched.
fragment = getattr(wrapper, "__flow_method_definition__", None)
if isinstance(fragment, FlowMethodDefinition):
wrapper.__flow_method_definition__ = fragment.model_copy(
update={"router": True, "emit": list(emit)}
)
# Stash the live LLM object for HITL resume to retrieve.
# When a flow pauses for human feedback and later resumes (possibly in a
# different process), the serialized context only contains a model string.
# By storing the original LLM on the wrapper, resume_async can retrieve
# the fully-configured LLM (with credentials, project, safety_settings, etc.)
# instead of creating a bare LLM from just the model string.
wrapper._human_feedback_llm = llm
return wrapper # type: ignore[no-any-return]
return decorator
def human_feedback(
message: str,
emit: Sequence[str] | None = None,
llm: str | BaseLLM | None = "gpt-4o-mini",
default_outcome: str | None = None,
metadata: dict[str, Any] | None = None,
provider: HumanFeedbackProvider | None = None,
learn: bool = False,
learn_source: str = "hitl",
learn_strict: bool = False,
) -> Callable[[F], F]:
"""Compatibility import path for the Flow human-feedback DSL decorator."""
from crewai.flow.dsl._human_feedback import human_feedback as dsl_human_feedback
return dsl_human_feedback(
message=message,
emit=emit,
llm=llm,
default_outcome=default_outcome,
metadata=metadata,
provider=provider,
learn=learn,
learn_source=learn_source,
learn_strict=learn_strict,
)

View File

@@ -90,15 +90,17 @@ from crewai.experimental.conversational import (
)
from crewai.experimental.conversational_mixin import _ConversationalMixin
from crewai.flow.constants import AND_CONDITION, OR_CONDITION
from crewai.flow.dsl import (
from crewai.flow.dsl._conditions import (
_extract_all_methods,
_extract_all_methods_recursive,
_normalize_condition,
is_flow_condition_dict,
is_simple_flow_condition,
)
from crewai.flow.dsl._utils import (
build_flow_definition,
extract_flow_definition,
is_flow_condition_dict,
is_flow_method,
is_simple_flow_condition,
)
from crewai.flow.flow_context import current_flow_id, current_flow_request_id
from crewai.flow.flow_definition import FlowDefinition

View File

@@ -25,7 +25,15 @@ def test_flow_public_exports_are_explicit():
assert "FlowDefinitionDiagnostic" not in flow_package.__all__
assert "build_flow_definition" not in flow_package.__all__
assert "flow_structure" not in flow_package.__all__
assert set(flow_dsl.__all__) == {"and_", "listen", "or_", "router", "start"}
assert set(flow_dsl.__all__) == {
"HumanFeedbackResult",
"and_",
"human_feedback",
"listen",
"or_",
"router",
"start",
}
assert set(flow_definition.__all__) == {
"FlowConfigDefinition",
"FlowDefinition",