From 373dca3d047798f6fe7cc47110f12718df5f2930 Mon Sep 17 00:00:00 2001 From: Vini Brasil Date: Thu, 11 Jun 2026 14:18:49 -0700 Subject: [PATCH] Run flows from a definition without a Python subclass (#6104) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Read flow dispatch from FlowDefinition Store the definition in a `_definition` PrivateAttr at post-init and convert the dispatch helpers (`_start_method_names`, `_listener_methods`, `_start_condition`, `_listen_condition`, `_is_router`) from classmethods to instance methods that read it. Event names now fall back to `self._definition.name` instead of `self.__class__.__name__`. Behavior is identical for decorator subclasses, but the engine no longer assumes the definition comes from the class. This is the seam for `Flow.from_definition`, where an instance runs a definition that was loaded rather than built from a Python subclass. * Add Flow.from_definition to run flows without a subclass A FlowDefinition (e.g. loaded from YAML) was only usable for dispatch on decorator-authored subclasses. Now each method definition records an importable `module:qualname` handler ref, and `Flow.from_definition` resolves and binds those handlers to build a runnable flow directly. * Build flow state from FlowDefinition Definition-driven flows previously always started with a bare dict state. * Replace handler string with structured FlowActionDefinition `handler: str | None` was optional and opaque — missing handlers only surfaced at kickoff time. `do: FlowActionDefinition` is required, so Pydantic rejects invalid definitions at parse time. The `call: "code"` discriminator prepares the schema for future non-Python action types (e.g. MCP tool, crew) without touching `FlowMethodDefinition`. Resolution logic is extracted to `runtime/_action_resolvers.py` to keep the dispatch point isolated. * Fix conversational start router missing required do field FlowMethodDefinition.do became required when the handler string was replaced with FlowActionDefinition, but _conversation_start_router still built its fragment without it, breaking crewai import entirely. Co-Authored-By: Claude Fable 5 * Add event scoping to flow test * Change lib/crewai/tests/test_flow_from_definition.py --------- Co-authored-by: Claude Fable 5 --- .../experimental/conversational_mixin.py | 4 +- lib/crewai/src/crewai/flow/dsl/_listen.py | 7 +- lib/crewai/src/crewai/flow/dsl/_router.py | 2 + lib/crewai/src/crewai/flow/dsl/_start.py | 19 +- lib/crewai/src/crewai/flow/dsl/_utils.py | 11 +- lib/crewai/src/crewai/flow/flow_definition.py | 12 +- .../flow/{runtime.py => runtime/__init__.py} | 238 ++++++-- .../crewai/flow/runtime/_action_resolvers.py | 43 ++ .../crewai/utilities/pydantic_schema_utils.py | 6 +- lib/crewai/tests/test_flow.py | 20 + lib/crewai/tests/test_flow_definition.py | 25 +- lib/crewai/tests/test_flow_from_definition.py | 552 ++++++++++++++++++ lib/crewai/tests/test_flow_visualization.py | 25 +- 13 files changed, 881 insertions(+), 83 deletions(-) rename lib/crewai/src/crewai/flow/{runtime.py => runtime/__init__.py} (94%) create mode 100644 lib/crewai/src/crewai/flow/runtime/_action_resolvers.py create mode 100644 lib/crewai/tests/test_flow_from_definition.py diff --git a/lib/crewai/src/crewai/experimental/conversational_mixin.py b/lib/crewai/src/crewai/experimental/conversational_mixin.py index 8ad4bb6cb..b5e182ae7 100644 --- a/lib/crewai/src/crewai/experimental/conversational_mixin.py +++ b/lib/crewai/src/crewai/experimental/conversational_mixin.py @@ -47,7 +47,7 @@ from crewai.flow.conversation import ( receive_user_message as _receive_user_message, ) from crewai.flow.dsl import listen, start -from crewai.flow.dsl._utils import _set_flow_method_definition +from crewai.flow.dsl._utils import _method_action, _set_flow_method_definition from crewai.flow.flow_definition import FlowMethodDefinition from crewai.utilities.types import LLMMessage @@ -78,7 +78,7 @@ def _conversation_start_router(func: Callable[..., Any]) -> Any: wrapper = start()(func) _set_flow_method_definition( cast(Any, wrapper), - FlowMethodDefinition(start=True, router=True), + FlowMethodDefinition(do=_method_action(func), start=True, router=True), ) return wrapper diff --git a/lib/crewai/src/crewai/flow/dsl/_listen.py b/lib/crewai/src/crewai/flow/dsl/_listen.py index ba7794e3a..37c9a9d25 100644 --- a/lib/crewai/src/crewai/flow/dsl/_listen.py +++ b/lib/crewai/src/crewai/flow/dsl/_listen.py @@ -8,6 +8,7 @@ from crewai.flow.dsl._types import FlowMethodDecorator, FlowTrigger from crewai.flow.dsl._utils import ( P, R, + _method_action, _set_flow_method_definition, ) from crewai.flow.flow_definition import FlowMethodDefinition @@ -45,7 +46,11 @@ def listen(condition: FlowTrigger) -> FlowMethodDecorator: wrapper = ListenMethod(func) _set_flow_method_definition( - wrapper, FlowMethodDefinition(listen=_to_definition_condition(condition)) + wrapper, + FlowMethodDefinition( + do=_method_action(func), + listen=_to_definition_condition(condition), + ), ) return wrapper diff --git a/lib/crewai/src/crewai/flow/dsl/_router.py b/lib/crewai/src/crewai/flow/dsl/_router.py index 327f3a943..3edbf33ba 100644 --- a/lib/crewai/src/crewai/flow/dsl/_router.py +++ b/lib/crewai/src/crewai/flow/dsl/_router.py @@ -19,6 +19,7 @@ from crewai.flow.dsl._types import FlowMethodDecorator, FlowTrigger from crewai.flow.dsl._utils import ( P, R, + _method_action, _set_flow_method_definition, ) from crewai.flow.flow_definition import FlowMethodDefinition @@ -148,6 +149,7 @@ def router( _set_flow_method_definition( wrapper, FlowMethodDefinition( + do=_method_action(func), listen=_to_definition_condition(condition), router=True, emit=router_events or None, diff --git a/lib/crewai/src/crewai/flow/dsl/_start.py b/lib/crewai/src/crewai/flow/dsl/_start.py index 882128d6c..fe9f82974 100644 --- a/lib/crewai/src/crewai/flow/dsl/_start.py +++ b/lib/crewai/src/crewai/flow/dsl/_start.py @@ -8,6 +8,7 @@ from crewai.flow.dsl._types import FlowMethodDecorator, FlowTrigger from crewai.flow.dsl._utils import ( P, R, + _method_action, _set_flow_method_definition, ) from crewai.flow.flow_definition import FlowMethodDefinition @@ -53,13 +54,17 @@ def start( def decorator(func: Callable[P, R]) -> StartMethod[P, R]: wrapper = StartMethod(func) - if condition is not None: - _set_flow_method_definition( - wrapper, - FlowMethodDefinition(start=_to_definition_condition(condition)), - ) - else: - _set_flow_method_definition(wrapper, FlowMethodDefinition(start=True)) + _set_flow_method_definition( + wrapper, + FlowMethodDefinition( + do=_method_action(func), + start=( + _to_definition_condition(condition) + if condition is not None + else True + ), + ), + ) return wrapper return cast(FlowMethodDecorator, decorator) diff --git a/lib/crewai/src/crewai/flow/dsl/_utils.py b/lib/crewai/src/crewai/flow/dsl/_utils.py index 119173500..c9ceebdc0 100644 --- a/lib/crewai/src/crewai/flow/dsl/_utils.py +++ b/lib/crewai/src/crewai/flow/dsl/_utils.py @@ -8,6 +8,7 @@ from pydantic import BaseModel from typing_extensions import TypeIs from crewai.flow.flow_definition import ( + FlowActionDefinition, FlowConfigDefinition, FlowConversationalDefinition, FlowConversationalRouterDefinition, @@ -84,6 +85,10 @@ def _stamp_inherited_conversational_metadata( return method +def _method_action(method: Any) -> FlowActionDefinition: + return FlowActionDefinition(ref=f"{method.__module__}:{method.__qualname__}") + + def _set_flow_method_definition( wrapper: FlowMethod[P, R], definition: FlowMethodDefinition, @@ -373,9 +378,11 @@ def _build_method_definition( ) -> FlowMethodDefinition: fragment = _get_flow_method_definition(method) if fragment is None: - method_definition = FlowMethodDefinition() + method_definition = FlowMethodDefinition(do=_method_action(method)) else: - method_definition = fragment.model_copy(deep=True) + method_definition = fragment.model_copy( + deep=True, update={"do": _method_action(method)} + ) human_feedback = _build_human_feedback_definition( method, diagnostics, f"{path}.human_feedback" diff --git a/lib/crewai/src/crewai/flow/flow_definition.py b/lib/crewai/src/crewai/flow/flow_definition.py index 0830f7a65..365bfd7a7 100644 --- a/lib/crewai/src/crewai/flow/flow_definition.py +++ b/lib/crewai/src/crewai/flow/flow_definition.py @@ -27,6 +27,7 @@ logger = logging.getLogger(__name__) FlowDefinitionCondition = str | dict[str, Any] __all__ = [ + "FlowActionDefinition", "FlowConfigDefinition", "FlowConversationalDefinition", "FlowConversationalRouterDefinition", @@ -52,8 +53,9 @@ class FlowDefinitionDiagnostic(BaseModel): class FlowStateDefinition(BaseModel): """Static description of a Flow state contract.""" - type: TypingLiteral["dict", "pydantic", "unknown"] = "dict" + type: TypingLiteral["dict", "pydantic", "json_schema", "unknown"] = "dict" ref: str | None = None + json_schema: dict[str, Any] | None = None default: Any = None @@ -90,9 +92,17 @@ class FlowHumanFeedbackDefinition(BaseModel): learn_strict: bool = False +class FlowActionDefinition(BaseModel): + """What a Flow method node executes, independent of when it fires.""" + + call: TypingLiteral["code"] = "code" + ref: str + + class FlowMethodDefinition(BaseModel): """Static definition of one Flow method and its execution roles.""" + do: FlowActionDefinition start: bool | FlowDefinitionCondition | None = None listen: FlowDefinitionCondition | None = None router: bool = False diff --git a/lib/crewai/src/crewai/flow/runtime.py b/lib/crewai/src/crewai/flow/runtime/__init__.py similarity index 94% rename from lib/crewai/src/crewai/flow/runtime.py rename to lib/crewai/src/crewai/flow/runtime/__init__.py index 6a9dfeda7..33d399da5 100644 --- a/lib/crewai/src/crewai/flow/runtime.py +++ b/lib/crewai/src/crewai/flow/runtime/__init__.py @@ -22,6 +22,7 @@ from concurrent.futures import Future, ThreadPoolExecutor import contextvars import copy import enum +import importlib import inspect import logging import threading @@ -95,6 +96,7 @@ from crewai.flow.flow_definition import ( FlowDefinition, FlowDefinitionCondition, FlowMethodDefinition, + FlowStateDefinition, ) from crewai.flow.flow_wrappers import ( FlowMethod, @@ -105,6 +107,7 @@ from crewai.flow.flow_wrappers import ( from crewai.flow.human_feedback import HumanFeedbackResult from crewai.flow.input_provider import InputProvider from crewai.flow.persistence.base import FlowPersistence +from crewai.flow.runtime._action_resolvers import resolve_action from crewai.flow.types import ( FlowExecutionData, FlowMethodName, @@ -169,6 +172,57 @@ def _condition_satisfied(condition: FlowDefinitionCondition, events: set[str]) - return combine(_condition_satisfied(branch, events) for branch in branches) +def _build_definition_state_model( + state_definition: FlowStateDefinition, +) -> BaseModel | None: + kwargs = ( + dict(state_definition.default) + if isinstance(state_definition.default, dict) + else {} + ) + + model_class: type[BaseModel] | None = None + if state_definition.ref: + try: + module_name, _, qualname = state_definition.ref.partition(":") + resolved: Any = importlib.import_module(module_name) + for part in qualname.split("."): + resolved = getattr(resolved, part) + except Exception: + logger.warning( + "Could not import state ref %r", state_definition.ref, exc_info=True + ) + else: + if isinstance(resolved, type) and issubclass(resolved, BaseModel): + model_class = resolved + else: + logger.warning( + "State ref %r is not a pydantic model", state_definition.ref + ) + + if model_class is None and state_definition.json_schema: + from crewai.utilities.pydantic_schema_utils import create_model_from_schema + + try: + model_class = create_model_from_schema(state_definition.json_schema) + except Exception: + logger.warning( + "Could not build a state model from the declared json_schema", + exc_info=True, + ) + + if model_class is None: + return None + + if not issubclass(model_class, FlowState): + + class StateWithId(FlowState, model_class): # type: ignore[misc, valid-type] + pass + + model_class = StateWithId + return model_class(**kwargs) + + def _iter_condition_events(condition: FlowDefinitionCondition) -> Iterator[str]: if isinstance(condition, str): yield condition @@ -695,21 +749,24 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): return flow_definition @classmethod - def _start_method_names(cls) -> list[FlowMethodName]: + def from_definition(cls, definition: FlowDefinition) -> Flow[Any]: + """Build a runnable Flow directly from a definition; no subclass required.""" + return cls.model_validate({}, context={"flow_definition": definition}) + + def _start_method_names(self) -> list[FlowMethodName]: return [ FlowMethodName(method_name) - for method_name, method_definition in cls.flow_definition().methods.items() + for method_name, method_definition in self._definition.methods.items() if method_definition.is_start ] - @classmethod def _listener_methods( - cls, + self, ) -> Iterator[tuple[FlowMethodName, FlowMethodDefinition, FlowDefinitionCondition]]: # (name, definition, condition) for every non-start method that listens. # Routers are included (they listen too); callers wanting only plain # listeners filter on definition.router. - for method_name, method_definition in cls.flow_definition().methods.items(): + for method_name, method_definition in self._definition.methods.items(): if method_definition.listen is not None and not method_definition.is_start: yield ( FlowMethodName(method_name), @@ -717,25 +774,22 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): method_definition.listen, ) - @classmethod def _start_condition( - cls, method_name: FlowMethodName + self, method_name: FlowMethodName ) -> FlowDefinitionCondition | None: - method_definition = cls.flow_definition().methods[str(method_name)] + method_definition = self._definition.methods[str(method_name)] start = method_definition.start if isinstance(start, (str, dict)): return start return None - @classmethod def _listen_condition( - cls, method_name: FlowMethodName + self, method_name: FlowMethodName ) -> FlowDefinitionCondition | None: - return cls.flow_definition().methods[str(method_name)].listen + return self._definition.methods[str(method_name)].listen - @classmethod - def _is_router(cls, method_name: FlowMethodName) -> bool: - return cls.flow_definition().methods[str(method_name)].router + def _is_router(self, method_name: FlowMethodName) -> bool: + return self._definition.methods[str(method_name)].router initial_state: Annotated[ # type: ignore[type-arg] type[BaseModel] | type[dict] | dict[str, Any] | BaseModel | None, @@ -879,7 +933,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): restore_event_scope(()) reset_last_event_id() - _methods: dict[FlowMethodName, FlowMethod[Any, Any]] = PrivateAttr( + _methods: dict[FlowMethodName, Callable[..., Any]] = PrivateAttr( default_factory=dict ) _method_execution_counts: dict[FlowMethodName, int] = PrivateAttr( @@ -893,6 +947,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): PrivateAttr(default=None) ) _method_outputs: list[Any] = PrivateAttr(default_factory=list) + _definition: FlowDefinition = PrivateAttr() _state_lock: threading.Lock = PrivateAttr(default_factory=threading.Lock) _or_listeners_lock: threading.Lock = PrivateAttr(default_factory=threading.Lock) _completed_methods: set[FlowMethodName] = PrivateAttr(default_factory=set) @@ -922,15 +977,27 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): object.__setattr__(self, name, value) def model_post_init(self, __context: Any) -> None: - self._flow_post_init() + definition = ( + __context.get("flow_definition") if isinstance(__context, dict) else None + ) + self._flow_post_init(definition) - def _flow_post_init(self) -> None: + def _flow_post_init(self, definition: FlowDefinition | None = None) -> None: """Heavy initialization: state creation, events, memory, method registration.""" if getattr(self, "_flow_post_init_done", False): return object.__setattr__(self, "_flow_post_init_done", True) self._initialize_runtime_extension_attrs() + self._definition = definition or type(self).flow_definition() + if self.name and self.name != self._definition.name: + self._definition = self._definition.model_copy(update={"name": self.name}) + methods = ( + self._action_bound_methods() + if definition is not None + else self._class_bound_methods() + ) + if self._state is None: self._state = self._create_initial_state() @@ -945,7 +1012,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): self, FlowCreatedEvent( type="flow_created", - flow_name=self.name or self.__class__.__name__, + flow_name=self._definition.name, ), ) @@ -955,17 +1022,42 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): if self.memory is None and not getattr(self, "_skip_auto_memory", False): from crewai.memory.utils import sanitize_scope_name - flow_name = sanitize_scope_name(self.name or self.__class__.__name__) + flow_name = sanitize_scope_name(self._definition.name) self.memory = Memory(root_scope=f"/flow/{flow_name}") - # Build the runtime method lookup from the static FlowDefinition. - for method_name in type(self).flow_definition().methods: + self._methods.update(methods) + + def _action_bound_methods(self) -> dict[FlowMethodName, Callable[..., Any]]: + def resolve(name: str, definition: FlowMethodDefinition) -> Callable[..., Any]: + try: + return resolve_action(self, definition.do) + except Exception as e: + unresolved.append(f"{name}: {e}") + return lambda *args, **kwargs: None + + methods: dict[FlowMethodName, Callable[..., Any]] = {} + unresolved: list[str] = [] + for method_name, method_definition in self._definition.methods.items(): + methods[FlowMethodName(method_name)] = resolve( + method_name, method_definition + ) + if unresolved: + raise ValueError( + f"Cannot build flow {self._definition.name!r} from its definition; " + "methods with unresolvable actions: " + "; ".join(unresolved) + ) + return methods + + def _class_bound_methods(self) -> dict[FlowMethodName, Callable[..., Any]]: + methods: dict[FlowMethodName, Callable[..., Any]] = {} + for method_name in self._definition.methods: method = getattr(self, method_name, None) if method is None: continue if not hasattr(method, "__self__"): - method = method.__get__(self, self.__class__) - self._methods[FlowMethodName(method_name)] = method + method = method.__get__(self, type(self)) + methods[FlowMethodName(method_name)] = method + return methods def recall(self, query: str, **kwargs: Any) -> Any: """Recall relevant memories. Delegates to this flow's memory. @@ -1043,7 +1135,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): def _start_condition_triggered_by( self, method_name: FlowMethodName, trigger: FlowMethodName ) -> bool: - condition = type(self)._start_condition(method_name) + condition = self._start_condition(method_name) if condition is None: return False return self._condition_met( @@ -1071,7 +1163,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): trigger_str = str(trigger) to_discard: list[FlowMethodName] = [] for listener_name in candidates: - condition = type(self)._listen_condition(listener_name) + condition = self._listen_condition(listener_name) if condition is None: continue if trigger_str in _iter_condition_events(condition): @@ -1093,9 +1185,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): racing_groups: dict[frozenset[FlowMethodName], FlowMethodName] = {} listener_conditions: dict[FlowMethodName, FlowDefinitionCondition] = { listener_name: condition - for listener_name, method_definition, condition in type( - self - )._listener_methods() + for listener_name, method_definition, condition in self._listener_methods() if not method_definition.router } @@ -1368,7 +1458,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): self, FlowStartedEvent( type="flow_started", - flow_name=self.name or self.__class__.__name__, + flow_name=self._definition.name, inputs=None, ), ) @@ -1444,7 +1534,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): self, MethodExecutionFinishedEvent( type="method_execution_finished", - flow_name=self.name or self.__class__.__name__, + flow_name=self._definition.name, method_name=context.method_name, result=collapsed_outcome if emit else result, state=self._state, @@ -1498,7 +1588,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): self, FlowPausedEvent( type="flow_paused", - flow_name=self.name or self.__class__.__name__, + flow_name=self._definition.name, flow_id=e.context.flow_id, method_name=e.context.method_name, state=self._copy_and_serialize_state(), @@ -1529,7 +1619,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): self, FlowFinishedEvent( type="flow_finished", - flow_name=self.name or self.__class__.__name__, + flow_name=self._definition.name, result=final_result, state=self._copy_and_serialize_state(), ), @@ -1595,7 +1685,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): return cast(T, {"id": str(uuid4())}) if init_state is None: - return cast(T, {"id": str(uuid4())}) + return cast(T, self._create_definition_state()) if isinstance(init_state, type): state_class = init_state @@ -1637,6 +1727,34 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): f"Initial state must be dict or BaseModel, got {type(self.initial_state)}" ) + def _create_definition_state(self) -> dict[str, Any] | BaseModel: + state_definition = self._definition.state + if state_definition is None: + return {"id": str(uuid4())} + if state_definition.type in ("pydantic", "json_schema"): + state = _build_definition_state_model(state_definition) + if state is not None: + return state + logger.error( + "Flow %r declares %s state but neither ref nor json_schema " + "produced a model; falling back to dict state", + self._definition.name, + state_definition.type, + ) + elif state_definition.type == "unknown": + logger.warning( + "Flow %r declares state of unknown type; falling back to dict state", + self._definition.name, + ) + dict_state: dict[str, Any] = ( + dict(state_definition.default) + if isinstance(state_definition.default, dict) + else {} + ) + if "id" not in dict_state: + dict_state["id"] = str(uuid4()) + return dict_state + def _copy_state(self) -> T: """Create a copy of the current state. @@ -2172,7 +2290,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): # explicit finalization call closes the batch. started_event = FlowStartedEvent( type="flow_started", - flow_name=self.name or self.__class__.__name__, + flow_name=self._definition.name, inputs=inputs, ) future = crewai_event_bus.emit(self, started_event) @@ -2212,11 +2330,11 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): # Determine which start methods to execute at kickoff # Conditional start methods are only triggered by their conditions # UNLESS there are no unconditional starts (then all starts run as entry points) - start_methods = type(self)._start_method_names() + start_methods = self._start_method_names() unconditional_starts = [ start_method for start_method in start_methods - if type(self)._start_condition(start_method) is None + if self._start_condition(start_method) is None ] # If there are unconditional starts, only run those at kickoff # If there are NO unconditional starts, run all starts (including conditional ones) @@ -2264,7 +2382,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): self, FlowPausedEvent( type="flow_paused", - flow_name=self.name or self.__class__.__name__, + flow_name=self._definition.name, flow_id=e.context.flow_id, method_name=e.context.method_name, state=self._copy_and_serialize_state(), @@ -2314,7 +2432,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): self, FlowFinishedEvent( type="flow_finished", - flow_name=self.name or self.__class__.__name__, + flow_name=self._definition.name, result=final_output, state=self._copy_and_serialize_state(), ), @@ -2400,7 +2518,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): MethodExecutionFinishedEvent, MethodExecutionFailedEvent, ) - flow_name = self.name or self.__class__.__name__ + flow_name = self._definition.name nodes = sorted( ( n @@ -2459,7 +2577,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): ) # If start method is a router, use its result as an additional trigger - if type(self)._is_router(start_method_name) and result is not None: + if self._is_router(start_method_name) and result is not None: # Execute listeners for the start method name first await self._execute_listeners(start_method_name, result, finished_event_id) # Then execute listeners for the router result (e.g., "approved") @@ -2479,15 +2597,16 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): def _inject_trigger_payload_for_start_method( self, original_method: Callable[..., Any] ) -> Callable[..., Any]: + accepts_trigger_payload = ( + "crewai_trigger_payload" in inspect.signature(original_method).parameters + ) + def prepare_kwargs( *args: Any, **kwargs: Any ) -> tuple[tuple[Any, ...], dict[str, Any]]: inputs = cast(dict[str, Any], baggage.get_baggage("flow_inputs") or {}) trigger_payload = inputs.get("crewai_trigger_payload") - sig = inspect.signature(original_method) - accepts_trigger_payload = "crewai_trigger_payload" in sig.parameters - if trigger_payload is not None and accepts_trigger_payload: kwargs["crewai_trigger_payload"] = trigger_payload elif trigger_payload is not None: @@ -2537,7 +2656,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): MethodExecutionStartedEvent( type="method_execution_started", method_name=method_name, - flow_name=self.name or self.__class__.__name__, + flow_name=self._definition.name, params=dumped_params, state=self._copy_and_serialize_state(), ), @@ -2589,7 +2708,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): finished_event = MethodExecutionFinishedEvent( type="method_execution_finished", method_name=method_name, - flow_name=self.name or self.__class__.__name__, + flow_name=self._definition.name, state=self._copy_and_serialize_state(), result=result, ) @@ -2618,7 +2737,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): MethodExecutionPausedEvent( type="method_execution_paused", method_name=method_name, - flow_name=self.name or self.__class__.__name__, + flow_name=self._definition.name, state=self._copy_and_serialize_state(), flow_id=e.context.flow_id, message=e.context.message, @@ -2634,7 +2753,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): MethodExecutionFailedEvent( type="method_execution_failed", method_name=method_name, - flow_name=self.name or self.__class__.__name__, + flow_name=self._definition.name, error=e, ), ) @@ -2766,7 +2885,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): await asyncio.gather(*tasks) if current_trigger in router_results: - for method_name in type(self)._start_method_names(): + for method_name in self._start_method_names(): if self._start_condition_triggered_by( method_name, current_trigger ): @@ -2797,9 +2916,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): ) -> list[FlowMethodName]: triggered: list[FlowMethodName] = [] - for listener_name, method_definition, condition in type( - self - )._listener_methods(): + for listener_name, method_definition, condition in self._listener_methods(): is_router = method_definition.router if router_only != is_router: continue @@ -2865,10 +2982,10 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): # For routers, also check if any conditional starts they triggered are completed # If so, continue their chains - if type(self)._is_router(listener_name): - for start_method_name in type(self)._start_method_names(): + if self._is_router(listener_name): + for start_method_name in self._start_method_names(): if ( - type(self)._start_condition(start_method_name) is not None + self._start_condition(start_method_name) is not None and start_method_name in self._completed_methods ): # This conditional start was executed, continue its chain @@ -2887,8 +3004,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): method = self._methods[listener_name] sig = inspect.signature(method) - params = list(sig.parameters.values()) - method_params = [p for p in params if p.name != "self"] + method_params = [p for p in sig.parameters.values() if p.name != "self"] if triggering_event_id: with triggered_by_scope(triggering_event_id): @@ -3044,7 +3160,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): self, FlowInputRequestedEvent( type="flow_input_requested", - flow_name=self.name or self.__class__.__name__, + flow_name=self._definition.name, method_name=method_name, message=message, metadata=metadata, @@ -3111,7 +3227,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): self, FlowInputReceivedEvent( type="flow_input_received", - flow_name=self.name or self.__class__.__name__, + flow_name=self._definition.name, method_name=method_name, message=message, response=response, @@ -3149,7 +3265,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): self, HumanFeedbackRequestedEvent( type="human_feedback_requested", - flow_name=self.name or self.__class__.__name__, + flow_name=self._definition.name, method_name="", # Will be set by decorator if needed output=output, message=message, @@ -3178,7 +3294,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): self, HumanFeedbackReceivedEvent( type="human_feedback_received", - flow_name=self.name or self.__class__.__name__, + flow_name=self._definition.name, method_name="", # Will be set by decorator if needed feedback=feedback, outcome=None, # Will be determined after collapsing @@ -3353,7 +3469,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): self, FlowPlotEvent( type="flow_plot", - flow_name=self.name or self.__class__.__name__, + flow_name=self._definition.name, ), ) structure = build_flow_structure(cast(Any, self)) diff --git a/lib/crewai/src/crewai/flow/runtime/_action_resolvers.py b/lib/crewai/src/crewai/flow/runtime/_action_resolvers.py new file mode 100644 index 000000000..80512b11d --- /dev/null +++ b/lib/crewai/src/crewai/flow/runtime/_action_resolvers.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +from collections.abc import Callable +import importlib +from operator import attrgetter +from typing import TYPE_CHECKING, Any, cast + +from crewai.flow.flow_definition import FlowActionDefinition + + +if TYPE_CHECKING: + from crewai.flow.runtime import Flow + + +class InvalidActionRefError(ValueError): + def __init__(self, ref: str) -> None: + super().__init__(f"invalid callable {ref!r}; expected 'module:qualname'") + + +def _resolve_code_action( + flow: Flow[Any], action: FlowActionDefinition +) -> Callable[..., Any]: + ref = action.ref + module_name, _, qualname = ref.partition(":") + if "<" in ref or not module_name or not qualname: + raise InvalidActionRefError(ref) + try: + target = attrgetter(qualname)(importlib.import_module(module_name)) + except (ImportError, AttributeError) as e: + raise InvalidActionRefError(ref) from e + if not callable(target): + raise InvalidActionRefError(ref) + handler = cast(Callable[..., Any], target) + if getattr(handler, "__self__", None) is None: + handler = handler.__get__(flow, type(flow)) + return handler + + +def resolve_action(flow: Flow[Any], action: FlowActionDefinition) -> Callable[..., Any]: + """Turn one `do:` action into the callable the flow runs for that node.""" + if action.call == "code": + return _resolve_code_action(flow, action) + raise ValueError(f"unknown call type {action.call!r}") diff --git a/lib/crewai/src/crewai/utilities/pydantic_schema_utils.py b/lib/crewai/src/crewai/utilities/pydantic_schema_utils.py index ff1d5529b..85a53d9bc 100644 --- a/lib/crewai/src/crewai/utilities/pydantic_schema_utils.py +++ b/lib/crewai/src/crewai/utilities/pydantic_schema_utils.py @@ -999,7 +999,11 @@ def _json_schema_to_pydantic_field( if examples: schema_extra["examples"] = examples - default = ... if is_required else None + default = ( + json_schema["default"] + if "default" in json_schema + else (... if is_required else None) + ) if isinstance(type_, type) and issubclass(type_, (int, float)): if "minimum" in json_schema: diff --git a/lib/crewai/tests/test_flow.py b/lib/crewai/tests/test_flow.py index ab50af05e..e7bae8023 100644 --- a/lib/crewai/tests/test_flow.py +++ b/lib/crewai/tests/test_flow.py @@ -1157,6 +1157,26 @@ def test_flow_name(): assert flow.name == "MyFlow" +def test_flow_custom_name_overrides_class_name_in_events(): + class InternalFlowClass(Flow): + name = "PublicName" + + @start() + def begin(self): + return "done" + + received = [] + + with crewai_event_bus.scoped_handlers(): + @crewai_event_bus.on(FlowStartedEvent) + def handle(source, event): + received.append(event) + + InternalFlowClass().kickoff() + + assert received[0].flow_name == "PublicName" + + def test_nested_and_or_conditions(): """Test nested conditions like or_(and_(A, B), and_(C, D)). diff --git a/lib/crewai/tests/test_flow_definition.py b/lib/crewai/tests/test_flow_definition.py index f79917369..ebbb2e60d 100644 --- a/lib/crewai/tests/test_flow_definition.py +++ b/lib/crewai/tests/test_flow_definition.py @@ -36,6 +36,7 @@ def test_flow_public_exports_are_explicit(): "start", } assert set(flow_definition.__all__) == { + "FlowActionDefinition", "FlowConfigDefinition", "FlowConversationalDefinition", "FlowConversationalRouterDefinition", @@ -629,6 +630,7 @@ def test_flow_definition_preserves_diagnostics_loaded_from_contract(): "name": "LoadedDiagnosticsFlow", "methods": { "decision": { + "do": {"ref": "loaded_flows:LoadedDiagnosticsFlow.decision"}, "router": True, "emit": ["continue"], } @@ -662,6 +664,7 @@ def test_router_start_false_without_listen_reports_missing_trigger(): "name": "LoadedFlow", "methods": { "decision": { + "do": {"ref": "loaded_flows:LoadedFlow.decision"}, "router": True, "start": False, "emit": ["continue"], @@ -740,8 +743,14 @@ def test_static_string_listener_is_allowed_by_contract(): "schema": "crewai.flow/v1", "name": "TypoFlow", "methods": { - "begin": {"start": True}, - "handle": {"listen": "begni"}, + "begin": { + "do": {"ref": "loaded_flows:TypoFlow.begin"}, + "start": True, + }, + "handle": { + "do": {"ref": "loaded_flows:TypoFlow.handle"}, + "listen": "begni", + }, }, } ) @@ -754,8 +763,15 @@ def test_start_false_not_classified_as_start_method(): "schema": "crewai.flow/v1", "name": "ExplicitNonStartFlow", "methods": { - "begin": {"start": True}, - "handle": {"start": False, "listen": "begin"}, + "begin": { + "do": {"ref": "loaded_flows:ExplicitNonStartFlow.begin"}, + "start": True, + }, + "handle": { + "do": {"ref": "loaded_flows:ExplicitNonStartFlow.handle"}, + "start": False, + "listen": "begin", + }, }, } ) @@ -812,6 +828,7 @@ def test_flow_definition_logs_diagnostics_when_loaded_from_contract(caplog): "name": "LoadedFlow", "methods": { "decision": { + "do": {"ref": "loaded_flows:LoadedFlow.decision"}, "router": True, "emit": ["continue"], } diff --git a/lib/crewai/tests/test_flow_from_definition.py b/lib/crewai/tests/test_flow_from_definition.py new file mode 100644 index 000000000..14591ca69 --- /dev/null +++ b/lib/crewai/tests/test_flow_from_definition.py @@ -0,0 +1,552 @@ +from __future__ import annotations + +import pytest +from pydantic import ValidationError + +from crewai.events.event_bus import crewai_event_bus +from crewai.events.types.flow_events import ( + MethodExecutionFinishedEvent, + MethodExecutionStartedEvent, +) +from crewai.flow import Flow, and_, listen, or_, router, start +from crewai.flow.flow import FlowState +from crewai.flow.flow_definition import FlowDefinition + + +class ChainFlow(Flow): + @start() + def begin(self): + self.state["begin_ran"] = True + return "hello" + + @listen(begin) + def shout(self, result): + return result.upper() + + @listen(shout) + def confirm(self): + self.state["confirmed"] = True + return f"confirmed:{self.state['confirmed']}" + + +CHAIN_YAML = f""" +schema: crewai.flow/v1 +name: ChainFlow +methods: + begin: + do: + call: code + ref: {__name__}:ChainFlow.begin + start: true + shout: + do: + ref: {__name__}:ChainFlow.shout + listen: begin + confirm: + do: + ref: {__name__}:ChainFlow.confirm + listen: shout +""" + + +class MergeFlow(Flow): + @start() + def begin(self): + return "go" + + @listen(begin) + def left(self): + return "left" + + @listen(begin) + def right(self): + return "right" + + @listen(or_(left, right)) + def either(self): + self.state["either_ran"] = True + return "either" + + @listen(and_(left, right, either)) + def join(self): + self.state["joined"] = True + return "joined" + + +MERGE_YAML = f""" +schema: crewai.flow/v1 +name: MergeFlow +methods: + begin: + do: + ref: {__name__}:MergeFlow.begin + start: true + left: + do: + ref: {__name__}:MergeFlow.left + listen: begin + right: + do: + ref: {__name__}:MergeFlow.right + listen: begin + either: + do: + ref: {__name__}:MergeFlow.either + listen: + or: [left, right] + join: + do: + ref: {__name__}:MergeFlow.join + listen: + and: [left, right, either] +""" + + +class RouteFlow(Flow): + @start() + def begin(self): + return "go" + + @router(begin) + def decide(self): + return "left" if self.state.get("direction") == "left" else "right" + + @listen("left") + def take_left(self): + return "took-left" + + @listen("right") + def take_right(self): + return "took-right" + + +ROUTE_YAML = f""" +schema: crewai.flow/v1 +name: RouteFlow +methods: + begin: + do: + ref: {__name__}:RouteFlow.begin + start: true + decide: + do: + ref: {__name__}:RouteFlow.decide + listen: begin + router: true + take_left: + do: + ref: {__name__}:RouteFlow.take_left + listen: left + take_right: + do: + ref: {__name__}:RouteFlow.take_right + listen: right +""" + + +class LoopFlow(Flow): + @start("retry") + def step(self): + self.state["count"] = self.state.get("count", 0) + 1 + return self.state["count"] + + @router(step) + def decide(self): + if self.state["count"] < 3: + return "retry" + return "done" + + @listen("done") + def finish(self): + return f"finished:{self.state['count']}" + + +LOOP_YAML = f""" +schema: crewai.flow/v1 +name: LoopFlow +methods: + step: + do: + ref: {__name__}:LoopFlow.step + start: retry + decide: + do: + ref: {__name__}:LoopFlow.decide + listen: step + router: true + finish: + do: + ref: {__name__}:LoopFlow.finish + listen: done +""" + + +class CounterState(FlowState): + count: int = 0 + label: str = "none" + + +class PydanticStateFlow(Flow[CounterState]): + @start() + def begin(self): + self.state.count += 1 + return self.state.count + + @listen(begin) + def finish(self): + self.state.label = f"count={self.state.count}" + return self.state.label + + +PYDANTIC_STATE_YAML = f""" +schema: crewai.flow/v1 +name: PydanticStateFlow +state: + type: pydantic + ref: {__name__}:CounterState +methods: + begin: + do: + ref: {__name__}:PydanticStateFlow.begin + start: true + finish: + do: + ref: {__name__}:PydanticStateFlow.finish + listen: begin +""" + +PYDANTIC_STATE_OVERLAY_YAML = f""" +schema: crewai.flow/v1 +name: PydanticStateFlow +state: + type: pydantic + ref: {__name__}:CounterState + default: + count: 5 +methods: + begin: + do: + ref: {__name__}:PydanticStateFlow.begin + start: true + finish: + do: + ref: {__name__}:PydanticStateFlow.finish + listen: begin +""" + +JSON_SCHEMA_STATE_YAML = f""" +schema: crewai.flow/v1 +name: JsonSchemaStateFlow +state: + type: json_schema + json_schema: + title: CounterState + type: object + properties: + count: + type: integer + default: 0 + label: + type: string + default: none +methods: + begin: + do: + ref: {__name__}:PydanticStateFlow.begin + start: true + finish: + do: + ref: {__name__}:PydanticStateFlow.finish + listen: begin +""" + +PYDANTIC_REF_WITH_SCHEMA_FALLBACK_YAML = f""" +schema: crewai.flow/v1 +name: SchemaFallbackFlow +state: + type: pydantic + ref: definitely_not_a_module_xyz:MissingState + json_schema: + title: CounterState + type: object + properties: + count: + type: integer + default: 0 + label: + type: string + default: none +methods: + begin: + do: + ref: {__name__}:PydanticStateFlow.begin + start: true + finish: + do: + ref: {__name__}:PydanticStateFlow.finish + listen: begin +""" + +UNRESOLVABLE_STATE_YAML = f""" +schema: crewai.flow/v1 +name: UnresolvableStateFlow +state: + type: pydantic + ref: definitely_not_a_module_xyz:MissingState +methods: + begin: + do: + ref: {__name__}:ChainFlow.begin + start: true +""" + +DICT_STATE_YAML = f""" +schema: crewai.flow/v1 +name: DictStateFlow +state: + type: dict + default: + count: 5 +methods: + begin: + do: + ref: {__name__}:ChainFlow.begin + start: true +""" + +UNKNOWN_STATE_YAML = f""" +schema: crewai.flow/v1 +name: UnknownStateFlow +state: + type: unknown + ref: somewhere:Something +methods: + begin: + do: + ref: {__name__}:ChainFlow.begin + start: true +""" + + +def _run_with_events(flow, inputs=None): + events = [] + with crewai_event_bus.scoped_handlers(): + + @crewai_event_bus.on(MethodExecutionStartedEvent) + def on_started(source, event): + events.append(event) + + @crewai_event_bus.on(MethodExecutionFinishedEvent) + def on_finished(source, event): + events.append(event) + + result = flow.kickoff(inputs=inputs) + events.sort(key=lambda e: e.timestamp) + return result, [ + (type(e).__name__, str(e.method_name), e.flow_name) for e in events + ] + + +def _state_without_id(flow): + snapshot = dict(flow.state.model_dump()) + snapshot.pop("id", None) + return snapshot + + +def assert_parity(flow_cls, yaml_str, inputs=None, ordered=True): + class_flow = flow_cls() + class_result, class_events = _run_with_events(class_flow, inputs) + + definition = FlowDefinition.from_yaml(yaml_str) + definition_flow = Flow.from_definition(definition) + definition_result, definition_events = _run_with_events(definition_flow, inputs) + + assert definition_result == class_result + assert _state_without_id(definition_flow) == _state_without_id(class_flow) + if ordered: + assert definition_flow.method_outputs == class_flow.method_outputs + assert definition_events == class_events + else: + assert sorted(map(repr, definition_flow.method_outputs)) == sorted( + map(repr, class_flow.method_outputs) + ) + assert sorted(definition_events) == sorted(class_events) + return definition_flow, definition_result + + +def test_simple_chain_parity(): + flow, result = assert_parity(ChainFlow, CHAIN_YAML) + assert result == "confirmed:True" + assert flow.method_outputs == ["hello", "HELLO", "confirmed:True"] + + +def test_and_or_merge_parity(): + flow, _ = assert_parity(MergeFlow, MERGE_YAML, ordered=False) + assert flow.state["joined"] is True + assert flow.state["either_ran"] is True + + +def test_router_label_parity_for_each_branch(): + left_flow, _ = assert_parity(RouteFlow, ROUTE_YAML, inputs={"direction": "left"}) + assert "took-left" in left_flow.method_outputs + assert "took-right" not in left_flow.method_outputs + + right_flow, _ = assert_parity(RouteFlow, ROUTE_YAML, inputs={"direction": "right"}) + assert "took-right" in right_flow.method_outputs + + +def test_cyclic_flow_parity(): + flow, result = assert_parity(LoopFlow, LOOP_YAML) + assert result == "finished:3" + assert flow.state["count"] == 3 + + +def test_definition_flow_events_use_definition_name(): + definition = FlowDefinition.from_yaml(CHAIN_YAML) + flow = Flow.from_definition(definition) + _, events = _run_with_events(flow) + assert events + assert all(flow_name == "ChainFlow" for _, _, flow_name in events) + + +def test_definition_method_without_action_is_invalid(): + with pytest.raises(ValidationError, match="do"): + FlowDefinition.from_dict( + { + "schema": "crewai.flow/v1", + "name": "NoActions", + "methods": {"begin": {"start": True}}, + } + ) + + +def test_from_definition_unresolvable_ref_raises(): + definition = FlowDefinition.from_dict( + { + "schema": "crewai.flow/v1", + "name": "BadRefs", + "methods": { + "begin": { + "start": True, + "do": {"ref": "definitely_not_a_module_xyz:nope"}, + } + }, + } + ) + + with pytest.raises(ValueError, match="unresolvable actions.*begin"): + Flow.from_definition(definition) + + +def test_from_definition_malformed_ref_raises(): + definition = FlowDefinition.from_dict( + { + "schema": "crewai.flow/v1", + "name": "MalformedRefs", + "methods": {"begin": {"start": True, "do": {"ref": "no-colon-here"}}}, + } + ) + + with pytest.raises(ValueError, match="expected 'module:qualname'"): + Flow.from_definition(definition) + + +def test_from_definition_local_scope_ref_raises(): + definition = FlowDefinition.from_dict( + { + "schema": "crewai.flow/v1", + "name": "LocalRefs", + "methods": { + "begin": { + "start": True, + "do": {"ref": f"{__name__}:make..LocalFlow.begin"}, + } + }, + } + ) + + with pytest.raises(ValueError, match="expected 'module:qualname'"): + Flow.from_definition(definition) + + +def test_flow_definition_stamps_refs(): + definition = ChainFlow.flow_definition() + + assert definition.methods["begin"].do.ref == f"{__name__}:ChainFlow.begin" + assert definition.methods["shout"].do.ref == f"{__name__}:ChainFlow.shout" + + +def test_pydantic_state_from_ref_parity(): + flow, result = assert_parity(PydanticStateFlow, PYDANTIC_STATE_YAML) + assert result == "count=1" + assert flow.state.count == 1 + assert flow.state.label == "count=1" + assert flow.state.id + + +def test_pydantic_state_default_overlay(): + flow = Flow.from_definition(FlowDefinition.from_yaml(PYDANTIC_STATE_OVERLAY_YAML)) + result = flow.kickoff() + assert result == "count=6" + assert flow.state.count == 6 + + +def test_json_schema_state(): + flow = Flow.from_definition(FlowDefinition.from_yaml(JSON_SCHEMA_STATE_YAML)) + result = flow.kickoff() + assert result == "count=1" + assert flow.state.count == 1 + assert flow.state.label == "count=1" + assert flow.state.id + + +def test_json_schema_state_validates_inputs(): + flow = Flow.from_definition(FlowDefinition.from_yaml(JSON_SCHEMA_STATE_YAML)) + with pytest.raises(ValueError, match="Invalid inputs"): + flow.kickoff(inputs={"count": "not-a-number"}) + + +def test_pydantic_state_falls_back_to_json_schema_when_ref_unimportable(): + flow = Flow.from_definition( + FlowDefinition.from_yaml(PYDANTIC_REF_WITH_SCHEMA_FALLBACK_YAML) + ) + result = flow.kickoff() + assert result == "count=1" + assert flow.state.count == 1 + + +def test_pydantic_state_without_ref_or_schema_falls_back_to_dict(caplog): + with caplog.at_level("ERROR"): + flow = Flow.from_definition(FlowDefinition.from_yaml(UNRESOLVABLE_STATE_YAML)) + assert "falling back to dict state" in caplog.text + + result = flow.kickoff() + assert result == "hello" + assert flow.state["begin_ran"] is True + assert flow.state["id"] + + +def test_dict_state_is_a_copy_of_default_plus_id(): + definition = FlowDefinition.from_yaml(DICT_STATE_YAML) + + flow = Flow.from_definition(definition) + assert flow.state["count"] == 5 + assert flow.state["id"] + flow.kickoff() + assert flow.state["begin_ran"] is True + + second = Flow.from_definition(definition) + assert second.state["count"] == 5 + assert "begin_ran" not in second.state + assert second.state["id"] != flow.state["id"] + assert definition.state.default == {"count": 5} + + +def test_unknown_state_type_falls_back_to_dict(caplog): + with caplog.at_level("WARNING"): + flow = Flow.from_definition(FlowDefinition.from_yaml(UNKNOWN_STATE_YAML)) + assert "falling back to dict state" in caplog.text + + result = flow.kickoff() + assert result == "hello" + assert flow.state["begin_ran"] is True diff --git a/lib/crewai/tests/test_flow_visualization.py b/lib/crewai/tests/test_flow_visualization.py index 167703a14..3dbe4e140 100644 --- a/lib/crewai/tests/test_flow_visualization.py +++ b/lib/crewai/tests/test_flow_visualization.py @@ -77,12 +77,22 @@ class ComplexFlow(Flow): return "complete" -def _attach_flow_definition(flow_class: type[Flow], methods: dict[str, object]) -> None: +def _attach_flow_definition( + flow_class: type[Flow], methods: dict[str, dict[str, object]] +) -> None: flow_class._flow_definition = FlowDefinition.from_dict( { "schema": "crewai.flow/v1", "name": flow_class.__name__, - "methods": methods, + "methods": { + name: { + "do": { + "ref": f"{flow_class.__module__}:{flow_class.__name__}.{name}" + }, + **spec, + } + for name, spec in methods.items() + }, } ) @@ -125,13 +135,20 @@ def test_build_flow_structure_from_flow_definition(): "schema": "crewai.flow/v1", "name": "DefinedFlow", "methods": { - "begin": {"start": True}, + "begin": { + "do": {"ref": "defined_flows:DefinedFlow.begin"}, + "start": True, + }, "decide": { + "do": {"ref": "defined_flows:DefinedFlow.decide"}, "listen": "begin", "router": True, "emit": ["done"], }, - "finish": {"listen": "done"}, + "finish": { + "do": {"ref": "defined_flows:DefinedFlow.finish"}, + "listen": "done", + }, }, } )