diff --git a/lib/crewai/src/crewai/flow/__init__.py b/lib/crewai/src/crewai/flow/__init__.py index 2475dd226..364d2ab49 100644 --- a/lib/crewai/src/crewai/flow/__init__.py +++ b/lib/crewai/src/crewai/flow/__init__.py @@ -9,9 +9,9 @@ from crewai.flow.conversation import ( ConversationalConfig, ConversationalInputs, ) +from crewai.flow.dsl import HumanFeedbackResult, human_feedback from crewai.flow.flow import Flow, and_, listen, or_, router, start from crewai.flow.flow_config import flow_config -from crewai.flow.human_feedback import HumanFeedbackResult, human_feedback from crewai.flow.input_provider import InputProvider, InputResponse from crewai.flow.persistence import persist from crewai.flow.visualization import ( diff --git a/lib/crewai/src/crewai/flow/dsl/__init__.py b/lib/crewai/src/crewai/flow/dsl/__init__.py new file mode 100644 index 000000000..1dfb14ddb --- /dev/null +++ b/lib/crewai/src/crewai/flow/dsl/__init__.py @@ -0,0 +1,32 @@ +"""Flow DSL: the Python authoring layer for Flows. + +Provides the ``@start`` / ``@listen`` / ``@router`` decorators and the +``or_`` / ``and_`` condition combinators used to write Flow classes in +Python. The DSL is one way to produce a Flow Structure: this package +extracts a :class:`~crewai.flow.flow_definition.FlowDefinition` from a +Python Flow class. Execution is handled by ``runtime``. +""" + +from crewai.flow.dsl._conditions import and_, or_ +from crewai.flow.dsl._human_feedback import ( + HumanFeedbackResult, + human_feedback, +) +from crewai.flow.dsl._listen import listen +from crewai.flow.dsl._router import router +from crewai.flow.dsl._start import start +from crewai.flow.dsl._utils import ( + build_flow_definition as build_flow_definition, + extract_flow_definition as extract_flow_definition, +) + + +__all__ = [ + "HumanFeedbackResult", + "and_", + "human_feedback", + "listen", + "or_", + "router", + "start", +] diff --git a/lib/crewai/src/crewai/flow/dsl/_conditions.py b/lib/crewai/src/crewai/flow/dsl/_conditions.py new file mode 100644 index 000000000..f2051a63b --- /dev/null +++ b/lib/crewai/src/crewai/flow/dsl/_conditions.py @@ -0,0 +1,276 @@ +"""Flow DSL condition primitives. + +Type guards, the public ``or_`` / ``and_`` combinators, and the conversions +between runtime conditions, normalized conditions, and the +``FlowDefinitionCondition`` shape stored on a :class:`FlowDefinition`. These are +the lower layer of the DSL: the decorators and the definition builder +(``_utils``) build on top of them, so this module imports nothing from its +siblings. +""" + +from __future__ import annotations + +from collections.abc import Callable, Sequence +from typing import Any + +from typing_extensions import TypeIs + +from crewai.flow.constants import AND_CONDITION, OR_CONDITION +from crewai.flow.flow_definition import FlowDefinitionCondition +from crewai.flow.flow_wrappers import ( + FlowCondition, + FlowConditions, + SimpleFlowCondition, +) +from crewai.flow.types import FlowMethodName + + +def is_simple_flow_condition(obj: Any) -> TypeIs[SimpleFlowCondition]: + """Check if the object is a ``(condition_type, methods)`` tuple.""" + return ( + isinstance(obj, tuple) + and len(obj) == 2 + and isinstance(obj[0], str) + and isinstance(obj[1], list) + ) + + +def is_flow_condition_dict(obj: Any) -> TypeIs[FlowCondition]: + """Check if the object matches the FlowCondition structure.""" + if not isinstance(obj, dict): + return False + + type_value = obj.get("type") + if type_value not in ("AND", "OR"): + return False + + if "conditions" in obj: + conditions = obj["conditions"] + if not isinstance(conditions, list): + return False + for cond in conditions: + if not ( + isinstance(cond, str) + or (isinstance(cond, dict) and is_flow_condition_dict(cond)) + ): + return False + + if "methods" in obj: + methods = obj["methods"] + if not (isinstance(methods, list) and all(isinstance(m, str) for m in methods)): + return False + + allowed_keys = {"type", "conditions", "methods"} + if not set(obj).issubset(allowed_keys): + return False + + return True + + +def _method_reference_name(value: Any) -> FlowMethodName | None: + name = getattr(value, "__name__", None) + if callable(value) and isinstance(name, str): + return FlowMethodName(name) + return None + + +def _normalize_condition( + condition: FlowConditions | FlowCondition | str, +) -> FlowCondition: + if isinstance(condition, str): + return {"type": OR_CONDITION, "conditions": [FlowMethodName(condition)]} + if is_flow_condition_dict(condition): + if "conditions" in condition: + return condition + if "methods" in condition: + return {"type": condition["type"], "conditions": condition["methods"]} + return condition + if isinstance(condition, list) and all( + isinstance(item, str) or is_flow_condition_dict(item) for item in condition + ): + return {"type": OR_CONDITION, "conditions": condition} + + raise ValueError(f"Cannot normalize condition: {condition}") + + +def _extract_all_methods_recursive( + condition: str | FlowCondition | dict[str, Any] | list[Any], + flow: Any | None = None, +) -> list[FlowMethodName]: + if isinstance(condition, str): + if flow is not None: + if condition in flow._methods: + return [FlowMethodName(condition)] + return [] + return [FlowMethodName(condition)] + if is_flow_condition_dict(condition): + normalized = _normalize_condition(condition) + methods = [] + for sub_cond in normalized.get("conditions", []): + methods.extend(_extract_all_methods_recursive(sub_cond, flow)) + return methods + if isinstance(condition, list): + methods = [] + for item in condition: + methods.extend(_extract_all_methods_recursive(item, flow)) + return methods + return [] + + +def _extract_all_methods( + condition: str | FlowCondition | dict[str, Any] | list[Any], +) -> list[FlowMethodName]: + if isinstance(condition, str): + return [FlowMethodName(condition)] + if is_flow_condition_dict(condition): + normalized = _normalize_condition(condition) + cond_type = normalized.get("type", OR_CONDITION) + + if cond_type == AND_CONDITION: + return [ + FlowMethodName(sub_cond) + for sub_cond in normalized.get("conditions", []) + if isinstance(sub_cond, str) + ] + return [] + if isinstance(condition, list): + methods = [] + for item in condition: + methods.extend(_extract_all_methods(item)) + return methods + return [] + + +def _condition_trigger( + condition: str | FlowCondition | Callable[..., Any], +) -> FlowMethodName | FlowCondition: + if isinstance(condition, str): + return FlowMethodName(condition) + if is_flow_condition_dict(condition): + return condition + method_name = _method_reference_name(condition) + if method_name is not None: + return method_name + raise ValueError("Invalid condition") + + +def _condition_triggers( + conditions: Sequence[str | FlowCondition | Callable[..., Any]], + error_message: str, +) -> FlowConditions: + try: + return [_condition_trigger(condition) for condition in conditions] + except ValueError as exc: + raise ValueError(error_message) from exc + + +def _definition_condition_from_runtime(condition: Any) -> FlowDefinitionCondition: + if isinstance(condition, str): + return str(condition) + method_name = _method_reference_name(condition) + if method_name is not None: + return str(method_name) + if is_flow_condition_dict(condition): + normalized = _normalize_condition(condition) + key = "and" if normalized.get("type") == AND_CONDITION else "or" + return { + key: [ + _definition_condition_from_runtime(sub_condition) + for sub_condition in normalized.get("conditions", []) + ] + } + if isinstance(condition, list): + return {"or": [_definition_condition_from_runtime(item) for item in condition]} + return str(condition) + + +def or_(*conditions: str | FlowCondition | Callable[..., Any]) -> FlowCondition: + """Combines multiple conditions with OR logic for flow control. + + Creates a condition that is satisfied when any of the specified conditions + are met. This is used with @start, @listen, or @router decorators to create + complex triggering conditions. + + Args: + conditions: Variable number of conditions that can be method names, existing condition dictionaries, or method references. + + Returns: + A condition dictionary with format {"type": "OR", "conditions": list_of_conditions} where each condition can be a string (method name) or a nested dict + + Raises: + ValueError: If condition format is invalid. + + Examples: + >>> @listen(or_("success", "timeout")) + >>> def handle_completion(self): + ... pass + + >>> @listen(or_(and_("step1", "step2"), "step3")) + >>> def handle_nested(self): + ... pass + """ + processed_triggers = _condition_triggers(conditions, "Invalid condition in or_()") + return {"type": OR_CONDITION, "conditions": processed_triggers} + + +def and_(*conditions: str | FlowCondition | Callable[..., Any]) -> FlowCondition: + """Combines multiple conditions with AND logic for flow control. + + Creates a condition that is satisfied only when all specified conditions + are met. This is used with @start, @listen, or @router decorators to create + complex triggering conditions. + + Args: + *conditions: Variable number of conditions that can be method names, existing condition dictionaries, or method references. + + Returns: + A condition dictionary with format {"type": "AND", "conditions": list_of_conditions} + where each condition can be a string (method name) or a nested dict + + Raises: + ValueError: If any condition is invalid. + + Examples: + >>> @listen(and_("validated", "processed")) + >>> def handle_complete_data(self): + ... pass + + >>> @listen(and_(or_("step1", "step2"), "step3")) + >>> def handle_nested(self): + ... pass + """ + processed_triggers = _condition_triggers(conditions, "Invalid condition in and_()") + return {"type": AND_CONDITION, "conditions": processed_triggers} + + +def _runtime_condition_from_definition( + condition: FlowDefinitionCondition, +) -> FlowMethodName | FlowCondition: + if isinstance(condition, str): + return FlowMethodName(condition) + if is_flow_condition_dict(condition): + return condition + + if "and" in condition: + return { + "type": AND_CONDITION, + "conditions": [ + _runtime_condition_from_definition(item) + for item in condition.get("and", []) + ], + } + return { + "type": OR_CONDITION, + "conditions": [ + _runtime_condition_from_definition(item) for item in condition.get("or", []) + ], + } + + +def _runtime_listener_condition_from_definition( + condition: FlowDefinitionCondition, +) -> SimpleFlowCondition | FlowCondition: + runtime_condition = _runtime_condition_from_definition(condition) + if isinstance(runtime_condition, str): + return (OR_CONDITION, [FlowMethodName(str(runtime_condition))]) + return runtime_condition diff --git a/lib/crewai/src/crewai/flow/dsl/_human_feedback.py b/lib/crewai/src/crewai/flow/dsl/_human_feedback.py new file mode 100644 index 000000000..9fa2b7e67 --- /dev/null +++ b/lib/crewai/src/crewai/flow/dsl/_human_feedback.py @@ -0,0 +1,98 @@ +from __future__ import annotations + +from collections.abc import Callable, Sequence +from typing import TYPE_CHECKING, Any, TypeVar + +from crewai.flow.flow_definition import FlowMethodDefinition +from crewai.flow.human_feedback import ( + HumanFeedbackConfig, + HumanFeedbackResult, + _build_human_feedback_runtime_decorator, +) + + +if TYPE_CHECKING: + from crewai.flow.async_feedback.types import HumanFeedbackProvider + from crewai.llms.base_llm import BaseLLM + + +F = TypeVar("F", bound=Callable[..., Any]) + +__all__ = ["HumanFeedbackResult", "human_feedback"] + + +def _stamp_human_feedback_metadata( + wrapper: Any, + func: Callable[..., Any], + config: HumanFeedbackConfig, +) -> None: + for attr in [ + "__is_start_method__", + "__trigger_methods__", + "__condition_type__", + "__trigger_condition__", + "__is_flow_method__", + "__flow_persistence_config__", + "__is_router__", + "__router_emit__", + "__flow_method_definition__", + ]: + if hasattr(func, attr): + setattr(wrapper, attr, getattr(func, attr)) + + wrapper.__human_feedback_config__ = config + wrapper.__is_flow_method__ = True + + if config.emit: + wrapper.__is_router__ = True + wrapper.__router_emit__ = list(config.emit) + fragment = getattr(wrapper, "__flow_method_definition__", None) + if isinstance(fragment, FlowMethodDefinition): + wrapper.__flow_method_definition__ = fragment.model_copy( + update={"router": True, "emit": list(config.emit)} + ) + + wrapper._human_feedback_llm = config.llm + + +def human_feedback( + message: str, + emit: Sequence[str] | None = None, + llm: str | BaseLLM | None = "gpt-4o-mini", + default_outcome: str | None = None, + metadata: dict[str, Any] | None = None, + provider: HumanFeedbackProvider | None = None, + learn: bool = False, + learn_source: str = "hitl", + learn_strict: bool = False, +) -> Callable[[F], F]: + """Decorator for Flow methods that require human feedback.""" + runtime_decorator = _build_human_feedback_runtime_decorator( + message=message, + emit=emit, + llm=llm, + default_outcome=default_outcome, + metadata=metadata, + provider=provider, + learn=learn, + learn_source=learn_source, + learn_strict=learn_strict, + ) + config = HumanFeedbackConfig( + message=message, + emit=emit, + llm=llm, + default_outcome=default_outcome, + metadata=metadata, + provider=provider, + learn=learn, + learn_source=learn_source, + learn_strict=learn_strict, + ) + + def decorator(func: F) -> F: + wrapper = runtime_decorator(func) + _stamp_human_feedback_metadata(wrapper, func, config) + return wrapper + + return decorator diff --git a/lib/crewai/src/crewai/flow/dsl/_listen.py b/lib/crewai/src/crewai/flow/dsl/_listen.py new file mode 100644 index 000000000..16a93a175 --- /dev/null +++ b/lib/crewai/src/crewai/flow/dsl/_listen.py @@ -0,0 +1,55 @@ +from __future__ import annotations + +from collections.abc import Callable +from typing import Any + +from crewai.flow.dsl._conditions import _definition_condition_from_runtime +from crewai.flow.dsl._utils import ( + P, + R, + _set_flow_method_definition, + _set_trigger_metadata, +) +from crewai.flow.flow_definition import FlowMethodDefinition +from crewai.flow.flow_wrappers import FlowCondition, ListenMethod + + +def listen( + condition: str | FlowCondition | Callable[..., Any], +) -> Callable[[Callable[P, R]], ListenMethod[P, R]]: + """Creates a listener that executes when specified conditions are met. + + This decorator sets up a method to execute in response to other method + executions in the flow. It supports both simple and complex triggering + conditions. + + Args: + condition: Specifies when the listener should execute. + + Returns: + A decorator function that wraps the method as a flow listener and preserves its signature. + + Raises: + ValueError: If the condition format is invalid. + + Examples: + >>> @listen("process_data") + >>> def handle_processed_data(self): + ... pass + + >>> @listen("method_name") + >>> def handle_completion(self): + ... pass + """ + + def decorator(func: Callable[P, R]) -> ListenMethod[P, R]: + wrapper = ListenMethod(func) + + _set_flow_method_definition( + wrapper, + FlowMethodDefinition(listen=_definition_condition_from_runtime(condition)), + ) + _set_trigger_metadata(wrapper, condition) + return wrapper + + return decorator diff --git a/lib/crewai/src/crewai/flow/dsl/_router.py b/lib/crewai/src/crewai/flow/dsl/_router.py new file mode 100644 index 000000000..11ffc9d0b --- /dev/null +++ b/lib/crewai/src/crewai/flow/dsl/_router.py @@ -0,0 +1,164 @@ +from __future__ import annotations + +from collections.abc import Callable, Sequence +from enum import Enum +import inspect +from types import UnionType +from typing import ( + Any, + Literal, + Union, + get_args, + get_origin, + get_type_hints, +) + +from crewai.flow.dsl._conditions import _definition_condition_from_runtime +from crewai.flow.dsl._utils import ( + P, + R, + _set_flow_method_definition, + _set_trigger_metadata, +) +from crewai.flow.flow_definition import FlowMethodDefinition +from crewai.flow.flow_wrappers import FlowCondition, RouterMethod + + +def _unwrap_function(function: Any) -> Any: + if hasattr(function, "__func__"): + function = function.__func__ + + if hasattr(function, "__wrapped__"): + wrapped = function.__wrapped__ + if hasattr(wrapped, "unwrap"): + return wrapped.unwrap() + return wrapped + + if hasattr(function, "unwrap"): + return function.unwrap() + + return function + + +def _string_values_from_annotation(annotation: Any) -> list[str]: + if annotation is inspect.Signature.empty or isinstance(annotation, str): + return [] + if isinstance(annotation, type) and issubclass(annotation, Enum): + return [member.value for member in annotation if isinstance(member.value, str)] + + origin = get_origin(annotation) + if origin is None: + return [] + + args = get_args(annotation) + if origin is Literal or getattr(origin, "__name__", "") == "Literal": + return [arg for arg in args if isinstance(arg, str)] + + if not ( + origin is Union + or origin is UnionType + or getattr(origin, "__name__", "") == "Annotated" + ): + return [] + + values: list[str] = [] + for arg in args: + values.extend(_string_values_from_annotation(arg)) + return values + + +def _return_annotation(function: Any) -> Any: + unwrapped = _unwrap_function(function) + + try: + return get_type_hints(unwrapped, include_extras=True).get( + "return", inspect.Signature.empty + ) + except (NameError, TypeError, ValueError): + try: + return inspect.signature(unwrapped).return_annotation + except (TypeError, ValueError): + return inspect.Signature.empty + + +def _get_router_return_events(function: Any) -> list[str] | None: + values = _string_values_from_annotation(_return_annotation(function)) + return list(dict.fromkeys(values)) if values else None + + +def _normalize_router_emit(value: Sequence[Any] | str) -> list[str]: + if isinstance(value, str): + return [str(value)] + return list(dict.fromkeys(str(item) for item in value)) + + +def router( + condition: str | FlowCondition | Callable[..., Any], + *, + emit: Sequence[str] | str | None = None, +) -> Callable[[Callable[P, R]], RouterMethod[P, R]]: + """Creates a routing method that directs flow execution based on conditions. + + This decorator marks a method as a router, which can dynamically determine + the next steps in the flow based on its return value. Routers are triggered + by specified conditions and can return constants that emit downstream events. + + Args: + condition: Specifies when the router should execute. Can be: + - str: Name of a method that triggers this router + - FlowCondition: Result from or_() or and_(), including nested conditions + - Callable[..., Any]: A method reference that triggers this router + emit: Optional explicit router output events for static FlowDefinition + and visualization. If omitted, Literal/Enum return annotations are + used when available. + + Returns: + A decorator function that wraps the method as a router and preserves its signature. + + Raises: + ValueError: If the condition format is invalid. + + Examples: + >>> @router("check_status") + >>> def route_based_on_status(self): + ... if self.state.status == "success": + ... return "SUCCESS" + ... return "FAILURE" + + >>> @router(and_("validate", "process")) + >>> def complex_routing(self): + ... if all([self.state.valid, self.state.processed]): + ... return "CONTINUE" + ... return "STOP" + + >>> @router("check_status", emit=["SUCCESS", "FAILURE"]) + >>> def explicit_routing(self): + ... return "SUCCESS" + """ + + def decorator(func: Callable[P, R]) -> RouterMethod[P, R]: + wrapper = RouterMethod(func) + + if emit is not None: + router_events = _normalize_router_emit(emit) + else: + router_events = _get_router_return_events(func) or [] + + _set_flow_method_definition( + wrapper, + FlowMethodDefinition( + listen=_definition_condition_from_runtime(condition), + router=True, + emit=router_events or None, + ), + ) + + _set_trigger_metadata(wrapper, condition) + + if emit is not None: + wrapper.__router_emit__ = router_events + elif router_events: + wrapper.__router_emit__ = router_events + return wrapper + + return decorator diff --git a/lib/crewai/src/crewai/flow/dsl/_start.py b/lib/crewai/src/crewai/flow/dsl/_start.py new file mode 100644 index 000000000..652a8332f --- /dev/null +++ b/lib/crewai/src/crewai/flow/dsl/_start.py @@ -0,0 +1,68 @@ +from __future__ import annotations + +from collections.abc import Callable +from typing import Any + +from crewai.flow.dsl._conditions import _definition_condition_from_runtime +from crewai.flow.dsl._utils import ( + P, + R, + _set_flow_method_definition, + _set_trigger_metadata, +) +from crewai.flow.flow_definition import FlowMethodDefinition +from crewai.flow.flow_wrappers import FlowCondition, StartMethod + + +def start( + condition: str | FlowCondition | Callable[..., Any] | None = None, +) -> Callable[[Callable[P, R]], StartMethod[P, R]]: + """Marks a method as a flow's starting point. + + This decorator designates a method as an entry point for the flow execution. + It can optionally specify conditions that trigger the start based on other + method executions. + + Args: + condition: Defines when the start method should execute. Can be: + - str: Name of a method that triggers this start + - FlowCondition: Result from or_() or and_(), including nested conditions + - Callable[..., Any]: A method reference that triggers this start + Default is None, meaning unconditional start. + + Returns: + A decorator function that wraps the method as a flow start point and preserves its signature. + + Raises: + ValueError: If the condition format is invalid. + + Examples: + >>> @start() # Unconditional start + >>> def begin_flow(self): + ... pass + + >>> @start("method_name") # Start after specific method + >>> def conditional_start(self): + ... pass + + >>> @start(and_("method1", "method2")) # Start after multiple methods + >>> def complex_start(self): + ... pass + """ + + def decorator(func: Callable[P, R]) -> StartMethod[P, R]: + wrapper = StartMethod(func) + + if condition is not None: + _set_flow_method_definition( + wrapper, + FlowMethodDefinition( + start=_definition_condition_from_runtime(condition) + ), + ) + _set_trigger_metadata(wrapper, condition) + else: + _set_flow_method_definition(wrapper, FlowMethodDefinition(start=True)) + return wrapper + + return decorator diff --git a/lib/crewai/src/crewai/flow/dsl.py b/lib/crewai/src/crewai/flow/dsl/_utils.py similarity index 51% rename from lib/crewai/src/crewai/flow/dsl.py rename to lib/crewai/src/crewai/flow/dsl/_utils.py index 353d8f187..d23bc3886 100644 --- a/lib/crewai/src/crewai/flow/dsl.py +++ b/lib/crewai/src/crewai/flow/dsl/_utils.py @@ -1,35 +1,21 @@ -"""Flow DSL: the Python authoring layer for Flows. - -Provides the ``@start`` / ``@listen`` / ``@router`` decorators and the -``or_`` / ``and_`` condition combinators used to write Flow classes in -Python. The DSL is one way to produce a Flow Structure: this module -extracts a :class:`~crewai.flow.flow_definition.FlowDefinition` from a -Python Flow class. Execution is handled by ``runtime``. -""" - from __future__ import annotations from collections.abc import Callable, Sequence -from enum import Enum -import inspect import json import logging -from types import UnionType -from typing import ( - Any, - Literal, - ParamSpec, - TypeVar, - Union, - get_args, - get_origin, - get_type_hints, -) +from typing import Any, ParamSpec, TypeVar from pydantic import BaseModel from typing_extensions import TypeIs from crewai.flow.constants import AND_CONDITION, OR_CONDITION +from crewai.flow.dsl._conditions import ( + _definition_condition_from_runtime, + _extract_all_methods, + _method_reference_name, + _runtime_listener_condition_from_definition, + is_flow_condition_dict, +) from crewai.flow.flow_definition import ( FlowConfigDefinition, FlowDefinition, @@ -42,11 +28,9 @@ from crewai.flow.flow_definition import ( ) from crewai.flow.flow_wrappers import ( FlowCondition, - FlowConditions, FlowMethod, ListenMethod, RouterMethod, - SimpleFlowCondition, StartMethod, ) from crewai.flow.types import FlowMethodName @@ -57,21 +41,9 @@ R = TypeVar("R") logger = logging.getLogger(__name__) -__all__ = ["and_", "listen", "or_", "router", "start"] - _FLOW_METHOD_DEFINITION_ATTR = "__flow_method_definition__" -def is_simple_flow_condition(obj: Any) -> TypeIs[SimpleFlowCondition]: - """Check if the object is a ``(condition_type, methods)`` tuple.""" - return ( - isinstance(obj, tuple) - and len(obj) == 2 - and isinstance(obj[0], str) - and isinstance(obj[1], list) - ) - - def is_flow_method(obj: Any) -> TypeIs[FlowMethod[Any, Any]]: """Check if the object carries Flow method wrapper metadata.""" return ( @@ -89,184 +61,10 @@ def _should_include_flow_method(flow_class: type, method: Any) -> bool: return True -def is_flow_condition_dict(obj: Any) -> TypeIs[FlowCondition]: - """Check if the object matches the FlowCondition structure.""" - if not isinstance(obj, dict): - return False - - type_value = obj.get("type") - if type_value not in ("AND", "OR"): - return False - - if "conditions" in obj: - conditions = obj["conditions"] - if not isinstance(conditions, list): - return False - for cond in conditions: - if not ( - isinstance(cond, str) - or (isinstance(cond, dict) and is_flow_condition_dict(cond)) - ): - return False - - if "methods" in obj: - methods = obj["methods"] - if not (isinstance(methods, list) and all(isinstance(m, str) for m in methods)): - return False - - allowed_keys = {"type", "conditions", "methods"} - if not set(obj).issubset(allowed_keys): - return False - - return True - - -def _method_reference_name(value: Any) -> FlowMethodName | None: - name = getattr(value, "__name__", None) - if callable(value) and isinstance(name, str): - return FlowMethodName(name) - return None - - def _flow_method_names(values: Sequence[Any]) -> list[FlowMethodName]: return [FlowMethodName(str(value)) for value in values] -def _extract_all_methods_recursive( - condition: str | FlowCondition | dict[str, Any] | list[Any], - flow: Any | None = None, -) -> list[FlowMethodName]: - if isinstance(condition, str): - if flow is not None: - if condition in flow._methods: - return [FlowMethodName(condition)] - return [] - return [FlowMethodName(condition)] - if is_flow_condition_dict(condition): - normalized = _normalize_condition(condition) - methods = [] - for sub_cond in normalized.get("conditions", []): - methods.extend(_extract_all_methods_recursive(sub_cond, flow)) - return methods - if isinstance(condition, list): - methods = [] - for item in condition: - methods.extend(_extract_all_methods_recursive(item, flow)) - return methods - return [] - - -def _normalize_condition( - condition: FlowConditions | FlowCondition | str, -) -> FlowCondition: - if isinstance(condition, str): - return {"type": OR_CONDITION, "conditions": [FlowMethodName(condition)]} - if is_flow_condition_dict(condition): - if "conditions" in condition: - return condition - if "methods" in condition: - return {"type": condition["type"], "conditions": condition["methods"]} - return condition - if isinstance(condition, list) and all( - isinstance(item, str) or is_flow_condition_dict(item) for item in condition - ): - return {"type": OR_CONDITION, "conditions": condition} - - raise ValueError(f"Cannot normalize condition: {condition}") - - -def _extract_all_methods( - condition: str | FlowCondition | dict[str, Any] | list[Any], -) -> list[FlowMethodName]: - if isinstance(condition, str): - return [FlowMethodName(condition)] - if is_flow_condition_dict(condition): - normalized = _normalize_condition(condition) - cond_type = normalized.get("type", OR_CONDITION) - - if cond_type == AND_CONDITION: - return [ - FlowMethodName(sub_cond) - for sub_cond in normalized.get("conditions", []) - if isinstance(sub_cond, str) - ] - return [] - if isinstance(condition, list): - methods = [] - for item in condition: - methods.extend(_extract_all_methods(item)) - return methods - return [] - - -def _unwrap_function(function: Any) -> Any: - if hasattr(function, "__func__"): - function = function.__func__ - - if hasattr(function, "__wrapped__"): - wrapped = function.__wrapped__ - if hasattr(wrapped, "unwrap"): - return wrapped.unwrap() - return wrapped - - if hasattr(function, "unwrap"): - return function.unwrap() - - return function - - -def _string_values_from_annotation(annotation: Any) -> list[str]: - if annotation is inspect.Signature.empty or isinstance(annotation, str): - return [] - if isinstance(annotation, type) and issubclass(annotation, Enum): - return [member.value for member in annotation if isinstance(member.value, str)] - - origin = get_origin(annotation) - if origin is None: - return [] - - args = get_args(annotation) - if origin is Literal or getattr(origin, "__name__", "") == "Literal": - return [arg for arg in args if isinstance(arg, str)] - - if not ( - origin is Union - or origin is UnionType - or getattr(origin, "__name__", "") == "Annotated" - ): - return [] - - values: list[str] = [] - for arg in args: - values.extend(_string_values_from_annotation(arg)) - return values - - -def _return_annotation(function: Any) -> Any: - unwrapped = _unwrap_function(function) - - try: - return get_type_hints(unwrapped, include_extras=True).get( - "return", inspect.Signature.empty - ) - except (NameError, TypeError, ValueError): - try: - return inspect.signature(unwrapped).return_annotation - except (TypeError, ValueError): - return inspect.Signature.empty - - -def _get_router_return_events(function: Any) -> list[str] | None: - values = _string_values_from_annotation(_return_annotation(function)) - return list(dict.fromkeys(values)) if values else None - - -def _normalize_router_emit(value: Sequence[Any] | str) -> list[str]: - if isinstance(value, str): - return [str(value)] - return list(dict.fromkeys(str(item) for item in value)) - - def _set_trigger_metadata( wrapper: StartMethod[P, R] | ListenMethod[P, R] | RouterMethod[P, R], condition: str | FlowCondition | Callable[..., Any], @@ -299,29 +97,6 @@ def _set_trigger_metadata( ) -def _condition_trigger( - condition: str | FlowCondition | Callable[..., Any], -) -> FlowMethodName | FlowCondition: - if isinstance(condition, str): - return FlowMethodName(condition) - if is_flow_condition_dict(condition): - return condition - method_name = _method_reference_name(condition) - if method_name is not None: - return method_name - raise ValueError("Invalid condition") - - -def _condition_triggers( - conditions: Sequence[str | FlowCondition | Callable[..., Any]], - error_message: str, -) -> FlowConditions: - try: - return [_condition_trigger(condition) for condition in conditions] - except ValueError as exc: - raise ValueError(error_message) from exc - - def _set_flow_method_definition( wrapper: StartMethod[P, R] | ListenMethod[P, R] | RouterMethod[P, R], definition: FlowMethodDefinition, @@ -338,232 +113,6 @@ def _get_flow_method_definition(method: Any) -> FlowMethodDefinition | None: return None -def start( - condition: str | FlowCondition | Callable[..., Any] | None = None, -) -> Callable[[Callable[P, R]], StartMethod[P, R]]: - """Marks a method as a flow's starting point. - - This decorator designates a method as an entry point for the flow execution. - It can optionally specify conditions that trigger the start based on other - method executions. - - Args: - condition: Defines when the start method should execute. Can be: - - str: Name of a method that triggers this start - - FlowCondition: Result from or_() or and_(), including nested conditions - - Callable[..., Any]: A method reference that triggers this start - Default is None, meaning unconditional start. - - Returns: - A decorator function that wraps the method as a flow start point and preserves its signature. - - Raises: - ValueError: If the condition format is invalid. - - Examples: - >>> @start() # Unconditional start - >>> def begin_flow(self): - ... pass - - >>> @start("method_name") # Start after specific method - >>> def conditional_start(self): - ... pass - - >>> @start(and_("method1", "method2")) # Start after multiple methods - >>> def complex_start(self): - ... pass - """ - - def decorator(func: Callable[P, R]) -> StartMethod[P, R]: - wrapper = StartMethod(func) - - if condition is not None: - _set_flow_method_definition( - wrapper, - FlowMethodDefinition( - start=_definition_condition_from_runtime(condition) - ), - ) - _set_trigger_metadata(wrapper, condition) - else: - _set_flow_method_definition(wrapper, FlowMethodDefinition(start=True)) - return wrapper - - return decorator - - -def listen( - condition: str | FlowCondition | Callable[..., Any], -) -> Callable[[Callable[P, R]], ListenMethod[P, R]]: - """Creates a listener that executes when specified conditions are met. - - This decorator sets up a method to execute in response to other method - executions in the flow. It supports both simple and complex triggering - conditions. - - Args: - condition: Specifies when the listener should execute. - - Returns: - A decorator function that wraps the method as a flow listener and preserves its signature. - - Raises: - ValueError: If the condition format is invalid. - - Examples: - >>> @listen("process_data") - >>> def handle_processed_data(self): - ... pass - - >>> @listen("method_name") - >>> def handle_completion(self): - ... pass - """ - - def decorator(func: Callable[P, R]) -> ListenMethod[P, R]: - wrapper = ListenMethod(func) - - _set_flow_method_definition( - wrapper, - FlowMethodDefinition(listen=_definition_condition_from_runtime(condition)), - ) - _set_trigger_metadata(wrapper, condition) - return wrapper - - return decorator - - -def router( - condition: str | FlowCondition | Callable[..., Any], - *, - emit: Sequence[str] | str | None = None, -) -> Callable[[Callable[P, R]], RouterMethod[P, R]]: - """Creates a routing method that directs flow execution based on conditions. - - This decorator marks a method as a router, which can dynamically determine - the next steps in the flow based on its return value. Routers are triggered - by specified conditions and can return constants that emit downstream events. - - Args: - condition: Specifies when the router should execute. Can be: - - str: Name of a method that triggers this router - - FlowCondition: Result from or_() or and_(), including nested conditions - - Callable[..., Any]: A method reference that triggers this router - emit: Optional explicit router output events for static FlowDefinition - and visualization. If omitted, Literal/Enum return annotations are - used when available. - - Returns: - A decorator function that wraps the method as a router and preserves its signature. - - Raises: - ValueError: If the condition format is invalid. - - Examples: - >>> @router("check_status") - >>> def route_based_on_status(self): - ... if self.state.status == "success": - ... return "SUCCESS" - ... return "FAILURE" - - >>> @router(and_("validate", "process")) - >>> def complex_routing(self): - ... if all([self.state.valid, self.state.processed]): - ... return "CONTINUE" - ... return "STOP" - - >>> @router("check_status", emit=["SUCCESS", "FAILURE"]) - >>> def explicit_routing(self): - ... return "SUCCESS" - """ - - def decorator(func: Callable[P, R]) -> RouterMethod[P, R]: - wrapper = RouterMethod(func) - - if emit is not None: - router_events = _normalize_router_emit(emit) - else: - router_events = _get_router_return_events(func) or [] - - _set_flow_method_definition( - wrapper, - FlowMethodDefinition( - listen=_definition_condition_from_runtime(condition), - router=True, - emit=router_events or None, - ), - ) - - _set_trigger_metadata(wrapper, condition) - - if emit is not None: - wrapper.__router_emit__ = router_events - elif router_events: - wrapper.__router_emit__ = router_events - return wrapper - - return decorator - - -def or_(*conditions: str | FlowCondition | Callable[..., Any]) -> FlowCondition: - """Combines multiple conditions with OR logic for flow control. - - Creates a condition that is satisfied when any of the specified conditions - are met. This is used with @start, @listen, or @router decorators to create - complex triggering conditions. - - Args: - conditions: Variable number of conditions that can be method names, existing condition dictionaries, or method references. - - Returns: - A condition dictionary with format {"type": "OR", "conditions": list_of_conditions} where each condition can be a string (method name) or a nested dict - - Raises: - ValueError: If condition format is invalid. - - Examples: - >>> @listen(or_("success", "timeout")) - >>> def handle_completion(self): - ... pass - - >>> @listen(or_(and_("step1", "step2"), "step3")) - >>> def handle_nested(self): - ... pass - """ - processed_triggers = _condition_triggers(conditions, "Invalid condition in or_()") - return {"type": OR_CONDITION, "conditions": processed_triggers} - - -def and_(*conditions: str | FlowCondition | Callable[..., Any]) -> FlowCondition: - """Combines multiple conditions with AND logic for flow control. - - Creates a condition that is satisfied only when all specified conditions - are met. This is used with @start, @listen, or @router decorators to create - complex triggering conditions. - - Args: - *conditions: Variable number of conditions that can be method names, existing condition dictionaries, or method references. - - Returns: - A condition dictionary with format {"type": "AND", "conditions": list_of_conditions} - where each condition can be a string (method name) or a nested dict - - Raises: - ValueError: If any condition is invalid. - - Examples: - >>> @listen(and_("validated", "processed")) - >>> def handle_complete_data(self): - ... pass - - >>> @listen(and_(or_("step1", "step2"), "step3")) - >>> def handle_nested(self): - ... pass - """ - processed_triggers = _condition_triggers(conditions, "Invalid condition in and_()") - return {"type": AND_CONDITION, "conditions": processed_triggers} - - def _object_ref(value: Any) -> str: target = value if isinstance(value, type) else type(value) module = getattr(target, "__module__", "") @@ -689,26 +238,6 @@ def _build_config_definition( return FlowConfigDefinition(**values) -def _definition_condition_from_runtime(condition: Any) -> FlowDefinitionCondition: - if isinstance(condition, str): - return str(condition) - method_name = _method_reference_name(condition) - if method_name is not None: - return str(method_name) - if is_flow_condition_dict(condition): - normalized = _normalize_condition(condition) - key = "and" if normalized.get("type") == AND_CONDITION else "or" - return { - key: [ - _definition_condition_from_runtime(sub_condition) - for sub_condition in normalized.get("conditions", []) - ] - } - if isinstance(condition, list): - return {"or": [_definition_condition_from_runtime(item) for item in condition]} - return str(condition) - - def _condition_from_method_metadata(method: Any) -> FlowDefinitionCondition | None: trigger_condition = getattr(method, "__trigger_condition__", None) if trigger_condition is not None: @@ -760,39 +289,6 @@ def _definition_trigger_condition( return None -def _runtime_condition_from_definition( - condition: FlowDefinitionCondition, -) -> FlowMethodName | FlowCondition: - if isinstance(condition, str): - return FlowMethodName(condition) - if is_flow_condition_dict(condition): - return condition - - if "and" in condition: - return { - "type": AND_CONDITION, - "conditions": [ - _runtime_condition_from_definition(item) - for item in condition.get("and", []) - ], - } - return { - "type": OR_CONDITION, - "conditions": [ - _runtime_condition_from_definition(item) for item in condition.get("or", []) - ], - } - - -def _runtime_listener_condition_from_definition( - condition: FlowDefinitionCondition, -) -> SimpleFlowCondition | FlowCondition: - runtime_condition = _runtime_condition_from_definition(condition) - if isinstance(runtime_condition, str): - return (OR_CONDITION, [FlowMethodName(str(runtime_condition))]) - return runtime_condition - - def _build_human_feedback_definition( method: Any, diagnostics: list[FlowDefinitionDiagnostic], diff --git a/lib/crewai/src/crewai/flow/human_feedback.py b/lib/crewai/src/crewai/flow/human_feedback.py index 65a61d5b2..010f9d6c7 100644 --- a/lib/crewai/src/crewai/flow/human_feedback.py +++ b/lib/crewai/src/crewai/flow/human_feedback.py @@ -65,7 +65,6 @@ from typing import TYPE_CHECKING, Any, TypeVar from pydantic import BaseModel, Field -from crewai.flow.flow_definition import FlowMethodDefinition from crewai.flow.flow_wrappers import FlowMethod @@ -222,7 +221,7 @@ class DistilledLessons(BaseModel): ) -def human_feedback( +def _build_human_feedback_runtime_decorator( message: str, emit: Sequence[str] | None = None, llm: str | BaseLLM | None = "gpt-4o-mini", @@ -233,102 +232,6 @@ def human_feedback( learn_source: str = "hitl", learn_strict: bool = False, ) -> Callable[[F], F]: - """Decorator for Flow methods that require human feedback. - - This decorator wraps a Flow method to: - 1. Execute the method and capture its output - 2. Display the output to the human with a feedback request - 3. Collect the human's free-form feedback - 4. Optionally collapse the feedback to a predefined outcome using an LLM - 5. Store the result for access by downstream methods - - When `emit` is specified, the decorator acts as a router, and the - collapsed outcome triggers the appropriate @listen decorated method. - - Supports both synchronous (blocking) and asynchronous (non-blocking) - feedback collection through the `provider` parameter. If no provider - is specified, defaults to synchronous console input. - - Args: - message: The message shown to the human when requesting feedback. - This should clearly explain what kind of feedback is expected. - emit: Optional sequence of outcome strings. When provided, the - human's feedback will be collapsed to one of these outcomes - using the specified LLM. The outcome then triggers @listen - methods that match. - llm: The LLM model to use for collapsing feedback to outcomes. - Required when emit is specified. Can be a model string - like "gpt-4o-mini" or a BaseLLM instance. - default_outcome: The outcome to use when the human provides no - feedback (empty input). Must be one of the emit values - if emit is specified. - metadata: Optional metadata for enterprise integrations. This is - passed through to the HumanFeedbackResult and can be used - by enterprise forks for features like Slack/Teams integration. - provider: Optional HumanFeedbackProvider for custom feedback - collection. Use this for async workflows that integrate with - external systems like Slack, Teams, or webhooks. When the - provider raises HumanFeedbackPending, the flow pauses and - can be resumed later with Flow.resume(). - learn: Enable HITL learning. Recall past lessons to pre-review - output before the human sees it, and distill new lessons - from feedback after. - learn_source: Memory source tag for stored/recalled lessons. - learn_strict: When True, re-raise exceptions from the pre-review - and distillation steps instead of falling back to raw output. - Default False preserves graceful degradation; failures are - always logged via ``logger.warning`` regardless of this flag. - - Returns: - A decorator function that wraps the method with human feedback - collection logic. - - Raises: - ValueError: If emit is specified but llm is not provided. - ValueError: If default_outcome is specified but emit is not. - ValueError: If default_outcome is not in the emit list. - HumanFeedbackPending: When an async provider pauses execution. - - Example: - Basic feedback without routing: - ```python - @start() - @human_feedback(message="Please review this output:") - def generate_content(self): - return "Generated content..." - ``` - - With routing based on feedback: - ```python - @start() - @human_feedback( - message="Review and approve or reject:", - emit=["approved", "rejected", "needs_revision"], - llm="gpt-4o-mini", - default_outcome="needs_revision", - ) - def review_document(self): - return document_content - - - @listen("approved") - def publish(self): - print(f"Publishing: {self.last_human_feedback.output}") - ``` - - Async feedback with custom provider: - ```python - @start() - @human_feedback( - message="Review this content:", - emit=["approved", "rejected"], - llm="gpt-4o-mini", - provider=SlackProvider(channel="#reviews"), - ) - def generate_content(self): - return "Content to review..." - ``` - """ if emit is not None: if not llm: raise ValueError( @@ -631,55 +534,33 @@ def human_feedback( wrapper = sync_wrapper - for attr in [ - "__is_start_method__", - "__trigger_methods__", - "__condition_type__", - "__trigger_condition__", - "__is_flow_method__", - "__flow_persistence_config__", - "__is_router__", - "__router_emit__", - "__flow_method_definition__", - ]: - if hasattr(func, attr): - setattr(wrapper, attr, getattr(func, attr)) - - # Create config inline to avoid race conditions - wrapper.__human_feedback_config__ = HumanFeedbackConfig( - message=message, - emit=emit, - llm=llm, - default_outcome=default_outcome, - metadata=metadata, - provider=provider, - learn=learn, - learn_source=learn_source, - learn_strict=learn_strict, - ) - wrapper.__is_flow_method__ = True - - if emit: - wrapper.__is_router__ = True - wrapper.__router_emit__ = list(emit) - # Keep the definition fragment in sync: emit promotes the method to - # a router and the feedback outcomes replace any emit recorded by an - # inner @router. Copy before updating so the wrapped method's own - # fragment (shared by reference) is left untouched. - fragment = getattr(wrapper, "__flow_method_definition__", None) - if isinstance(fragment, FlowMethodDefinition): - wrapper.__flow_method_definition__ = fragment.model_copy( - update={"router": True, "emit": list(emit)} - ) - - # Stash the live LLM object for HITL resume to retrieve. - # When a flow pauses for human feedback and later resumes (possibly in a - # different process), the serialized context only contains a model string. - # By storing the original LLM on the wrapper, resume_async can retrieve - # the fully-configured LLM (with credentials, project, safety_settings, etc.) - # instead of creating a bare LLM from just the model string. - wrapper._human_feedback_llm = llm - return wrapper # type: ignore[no-any-return] return decorator + + +def human_feedback( + message: str, + emit: Sequence[str] | None = None, + llm: str | BaseLLM | None = "gpt-4o-mini", + default_outcome: str | None = None, + metadata: dict[str, Any] | None = None, + provider: HumanFeedbackProvider | None = None, + learn: bool = False, + learn_source: str = "hitl", + learn_strict: bool = False, +) -> Callable[[F], F]: + """Compatibility import path for the Flow human-feedback DSL decorator.""" + from crewai.flow.dsl._human_feedback import human_feedback as dsl_human_feedback + + return dsl_human_feedback( + message=message, + emit=emit, + llm=llm, + default_outcome=default_outcome, + metadata=metadata, + provider=provider, + learn=learn, + learn_source=learn_source, + learn_strict=learn_strict, + ) diff --git a/lib/crewai/src/crewai/flow/runtime.py b/lib/crewai/src/crewai/flow/runtime.py index 33bfbacea..0290b7ff1 100644 --- a/lib/crewai/src/crewai/flow/runtime.py +++ b/lib/crewai/src/crewai/flow/runtime.py @@ -90,15 +90,17 @@ from crewai.experimental.conversational import ( ) from crewai.experimental.conversational_mixin import _ConversationalMixin from crewai.flow.constants import AND_CONDITION, OR_CONDITION -from crewai.flow.dsl import ( +from crewai.flow.dsl._conditions import ( _extract_all_methods, _extract_all_methods_recursive, _normalize_condition, + is_flow_condition_dict, + is_simple_flow_condition, +) +from crewai.flow.dsl._utils import ( build_flow_definition, extract_flow_definition, - is_flow_condition_dict, is_flow_method, - is_simple_flow_condition, ) from crewai.flow.flow_context import current_flow_id, current_flow_request_id from crewai.flow.flow_definition import FlowDefinition diff --git a/lib/crewai/tests/test_flow_definition.py b/lib/crewai/tests/test_flow_definition.py index a02036f66..1b8325e68 100644 --- a/lib/crewai/tests/test_flow_definition.py +++ b/lib/crewai/tests/test_flow_definition.py @@ -25,7 +25,15 @@ def test_flow_public_exports_are_explicit(): assert "FlowDefinitionDiagnostic" not in flow_package.__all__ assert "build_flow_definition" not in flow_package.__all__ assert "flow_structure" not in flow_package.__all__ - assert set(flow_dsl.__all__) == {"and_", "listen", "or_", "router", "start"} + assert set(flow_dsl.__all__) == { + "HumanFeedbackResult", + "and_", + "human_feedback", + "listen", + "or_", + "router", + "start", + } assert set(flow_definition.__all__) == { "FlowConfigDefinition", "FlowDefinition",