From 5c18f3335b8152ae876ec48e314d32875f4d4d39 Mon Sep 17 00:00:00 2001 From: Vinicius Brasil Date: Tue, 9 Jun 2026 22:03:05 -0700 Subject: [PATCH] Read flow dispatch from FlowDefinition Store the definition in a `_definition` PrivateAttr at post-init and convert the dispatch helpers (`_start_method_names`, `_listener_methods`, `_start_condition`, `_listen_condition`, `_is_router`) from classmethods to instance methods that read it. Event names now fall back to `self._definition.name` instead of `self.__class__.__name__`. Behavior is identical for decorator subclasses, but the engine no longer assumes the definition comes from the class. This is the seam for `Flow.from_definition`, where an instance runs a definition that was loaded rather than built from a Python subclass. --- lib/crewai/src/crewai/flow/runtime.py | 94 +++++++++++++-------------- 1 file changed, 44 insertions(+), 50 deletions(-) diff --git a/lib/crewai/src/crewai/flow/runtime.py b/lib/crewai/src/crewai/flow/runtime.py index 6a9dfeda7..0ceb0815d 100644 --- a/lib/crewai/src/crewai/flow/runtime.py +++ b/lib/crewai/src/crewai/flow/runtime.py @@ -694,22 +694,20 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): cls._flow_definition = flow_definition return flow_definition - @classmethod - def _start_method_names(cls) -> list[FlowMethodName]: + def _start_method_names(self) -> list[FlowMethodName]: return [ FlowMethodName(method_name) - for method_name, method_definition in cls.flow_definition().methods.items() + for method_name, method_definition in self._definition.methods.items() if method_definition.is_start ] - @classmethod def _listener_methods( - cls, + self, ) -> Iterator[tuple[FlowMethodName, FlowMethodDefinition, FlowDefinitionCondition]]: # (name, definition, condition) for every non-start method that listens. # Routers are included (they listen too); callers wanting only plain # listeners filter on definition.router. - for method_name, method_definition in cls.flow_definition().methods.items(): + for method_name, method_definition in self._definition.methods.items(): if method_definition.listen is not None and not method_definition.is_start: yield ( FlowMethodName(method_name), @@ -717,25 +715,22 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): method_definition.listen, ) - @classmethod def _start_condition( - cls, method_name: FlowMethodName + self, method_name: FlowMethodName ) -> FlowDefinitionCondition | None: - method_definition = cls.flow_definition().methods[str(method_name)] + method_definition = self._definition.methods[str(method_name)] start = method_definition.start if isinstance(start, (str, dict)): return start return None - @classmethod def _listen_condition( - cls, method_name: FlowMethodName + self, method_name: FlowMethodName ) -> FlowDefinitionCondition | None: - return cls.flow_definition().methods[str(method_name)].listen + return self._definition.methods[str(method_name)].listen - @classmethod - def _is_router(cls, method_name: FlowMethodName) -> bool: - return cls.flow_definition().methods[str(method_name)].router + def _is_router(self, method_name: FlowMethodName) -> bool: + return self._definition.methods[str(method_name)].router initial_state: Annotated[ # type: ignore[type-arg] type[BaseModel] | type[dict] | dict[str, Any] | BaseModel | None, @@ -893,6 +888,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): PrivateAttr(default=None) ) _method_outputs: list[Any] = PrivateAttr(default_factory=list) + _definition: FlowDefinition = PrivateAttr() _state_lock: threading.Lock = PrivateAttr(default_factory=threading.Lock) _or_listeners_lock: threading.Lock = PrivateAttr(default_factory=threading.Lock) _completed_methods: set[FlowMethodName] = PrivateAttr(default_factory=set) @@ -931,6 +927,8 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): object.__setattr__(self, "_flow_post_init_done", True) self._initialize_runtime_extension_attrs() + self._definition = type(self).flow_definition() + if self._state is None: self._state = self._create_initial_state() @@ -945,7 +943,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): self, FlowCreatedEvent( type="flow_created", - flow_name=self.name or self.__class__.__name__, + flow_name=self.name or self._definition.name, ), ) @@ -955,11 +953,11 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): if self.memory is None and not getattr(self, "_skip_auto_memory", False): from crewai.memory.utils import sanitize_scope_name - flow_name = sanitize_scope_name(self.name or self.__class__.__name__) + flow_name = sanitize_scope_name(self.name or self._definition.name) self.memory = Memory(root_scope=f"/flow/{flow_name}") # Build the runtime method lookup from the static FlowDefinition. - for method_name in type(self).flow_definition().methods: + for method_name in self._definition.methods: method = getattr(self, method_name, None) if method is None: continue @@ -1043,7 +1041,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): def _start_condition_triggered_by( self, method_name: FlowMethodName, trigger: FlowMethodName ) -> bool: - condition = type(self)._start_condition(method_name) + condition = self._start_condition(method_name) if condition is None: return False return self._condition_met( @@ -1071,7 +1069,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): trigger_str = str(trigger) to_discard: list[FlowMethodName] = [] for listener_name in candidates: - condition = type(self)._listen_condition(listener_name) + condition = self._listen_condition(listener_name) if condition is None: continue if trigger_str in _iter_condition_events(condition): @@ -1093,9 +1091,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): racing_groups: dict[frozenset[FlowMethodName], FlowMethodName] = {} listener_conditions: dict[FlowMethodName, FlowDefinitionCondition] = { listener_name: condition - for listener_name, method_definition, condition in type( - self - )._listener_methods() + for listener_name, method_definition, condition in self._listener_methods() if not method_definition.router } @@ -1368,7 +1364,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): self, FlowStartedEvent( type="flow_started", - flow_name=self.name or self.__class__.__name__, + flow_name=self.name or self._definition.name, inputs=None, ), ) @@ -1444,7 +1440,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): self, MethodExecutionFinishedEvent( type="method_execution_finished", - flow_name=self.name or self.__class__.__name__, + flow_name=self.name or self._definition.name, method_name=context.method_name, result=collapsed_outcome if emit else result, state=self._state, @@ -1498,7 +1494,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): self, FlowPausedEvent( type="flow_paused", - flow_name=self.name or self.__class__.__name__, + flow_name=self.name or self._definition.name, flow_id=e.context.flow_id, method_name=e.context.method_name, state=self._copy_and_serialize_state(), @@ -1529,7 +1525,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): self, FlowFinishedEvent( type="flow_finished", - flow_name=self.name or self.__class__.__name__, + flow_name=self.name or self._definition.name, result=final_result, state=self._copy_and_serialize_state(), ), @@ -2172,7 +2168,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): # explicit finalization call closes the batch. started_event = FlowStartedEvent( type="flow_started", - flow_name=self.name or self.__class__.__name__, + flow_name=self.name or self._definition.name, inputs=inputs, ) future = crewai_event_bus.emit(self, started_event) @@ -2212,11 +2208,11 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): # Determine which start methods to execute at kickoff # Conditional start methods are only triggered by their conditions # UNLESS there are no unconditional starts (then all starts run as entry points) - start_methods = type(self)._start_method_names() + start_methods = self._start_method_names() unconditional_starts = [ start_method for start_method in start_methods - if type(self)._start_condition(start_method) is None + if self._start_condition(start_method) is None ] # If there are unconditional starts, only run those at kickoff # If there are NO unconditional starts, run all starts (including conditional ones) @@ -2264,7 +2260,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): self, FlowPausedEvent( type="flow_paused", - flow_name=self.name or self.__class__.__name__, + flow_name=self.name or self._definition.name, flow_id=e.context.flow_id, method_name=e.context.method_name, state=self._copy_and_serialize_state(), @@ -2314,7 +2310,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): self, FlowFinishedEvent( type="flow_finished", - flow_name=self.name or self.__class__.__name__, + flow_name=self.name or self._definition.name, result=final_output, state=self._copy_and_serialize_state(), ), @@ -2400,7 +2396,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): MethodExecutionFinishedEvent, MethodExecutionFailedEvent, ) - flow_name = self.name or self.__class__.__name__ + flow_name = self.name or self._definition.name nodes = sorted( ( n @@ -2459,7 +2455,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): ) # If start method is a router, use its result as an additional trigger - if type(self)._is_router(start_method_name) and result is not None: + if self._is_router(start_method_name) and result is not None: # Execute listeners for the start method name first await self._execute_listeners(start_method_name, result, finished_event_id) # Then execute listeners for the router result (e.g., "approved") @@ -2537,7 +2533,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): MethodExecutionStartedEvent( type="method_execution_started", method_name=method_name, - flow_name=self.name or self.__class__.__name__, + flow_name=self.name or self._definition.name, params=dumped_params, state=self._copy_and_serialize_state(), ), @@ -2589,7 +2585,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): finished_event = MethodExecutionFinishedEvent( type="method_execution_finished", method_name=method_name, - flow_name=self.name or self.__class__.__name__, + flow_name=self.name or self._definition.name, state=self._copy_and_serialize_state(), result=result, ) @@ -2618,7 +2614,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): MethodExecutionPausedEvent( type="method_execution_paused", method_name=method_name, - flow_name=self.name or self.__class__.__name__, + flow_name=self.name or self._definition.name, state=self._copy_and_serialize_state(), flow_id=e.context.flow_id, message=e.context.message, @@ -2634,7 +2630,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): MethodExecutionFailedEvent( type="method_execution_failed", method_name=method_name, - flow_name=self.name or self.__class__.__name__, + flow_name=self.name or self._definition.name, error=e, ), ) @@ -2766,7 +2762,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): await asyncio.gather(*tasks) if current_trigger in router_results: - for method_name in type(self)._start_method_names(): + for method_name in self._start_method_names(): if self._start_condition_triggered_by( method_name, current_trigger ): @@ -2797,9 +2793,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): ) -> list[FlowMethodName]: triggered: list[FlowMethodName] = [] - for listener_name, method_definition, condition in type( - self - )._listener_methods(): + for listener_name, method_definition, condition in self._listener_methods(): is_router = method_definition.router if router_only != is_router: continue @@ -2865,10 +2859,10 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): # For routers, also check if any conditional starts they triggered are completed # If so, continue their chains - if type(self)._is_router(listener_name): - for start_method_name in type(self)._start_method_names(): + if self._is_router(listener_name): + for start_method_name in self._start_method_names(): if ( - type(self)._start_condition(start_method_name) is not None + self._start_condition(start_method_name) is not None and start_method_name in self._completed_methods ): # This conditional start was executed, continue its chain @@ -3044,7 +3038,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): self, FlowInputRequestedEvent( type="flow_input_requested", - flow_name=self.name or self.__class__.__name__, + flow_name=self.name or self._definition.name, method_name=method_name, message=message, metadata=metadata, @@ -3111,7 +3105,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): self, FlowInputReceivedEvent( type="flow_input_received", - flow_name=self.name or self.__class__.__name__, + flow_name=self.name or self._definition.name, method_name=method_name, message=message, response=response, @@ -3149,7 +3143,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): self, HumanFeedbackRequestedEvent( type="human_feedback_requested", - flow_name=self.name or self.__class__.__name__, + flow_name=self.name or self._definition.name, method_name="", # Will be set by decorator if needed output=output, message=message, @@ -3178,7 +3172,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): self, HumanFeedbackReceivedEvent( type="human_feedback_received", - flow_name=self.name or self.__class__.__name__, + flow_name=self.name or self._definition.name, method_name="", # Will be set by decorator if needed feedback=feedback, outcome=None, # Will be determined after collapsing @@ -3353,7 +3347,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): self, FlowPlotEvent( type="flow_plot", - flow_name=self.name or self.__class__.__name__, + flow_name=self.name or self._definition.name, ), ) structure = build_flow_structure(cast(Any, self))