Read flow dispatch from FlowDefinition

Store the definition in a `_definition` PrivateAttr at post-init and
convert the dispatch helpers (`_start_method_names`, `_listener_methods`,
`_start_condition`, `_listen_condition`, `_is_router`) from classmethods
to instance methods that read it. Event names now fall back to
`self._definition.name` instead of `self.__class__.__name__`.

Behavior is identical for decorator subclasses, but the engine no longer
assumes the definition comes from the class. This is the seam for
`Flow.from_definition`, where an instance runs a definition that was
loaded rather than built from a Python subclass.
This commit is contained in:
Vinicius Brasil
2026-06-09 22:03:05 -07:00
parent 21fa8e32d9
commit 5c18f3335b

View File

@@ -694,22 +694,20 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
cls._flow_definition = flow_definition
return flow_definition
@classmethod
def _start_method_names(cls) -> list[FlowMethodName]:
def _start_method_names(self) -> list[FlowMethodName]:
return [
FlowMethodName(method_name)
for method_name, method_definition in cls.flow_definition().methods.items()
for method_name, method_definition in self._definition.methods.items()
if method_definition.is_start
]
@classmethod
def _listener_methods(
cls,
self,
) -> Iterator[tuple[FlowMethodName, FlowMethodDefinition, FlowDefinitionCondition]]:
# (name, definition, condition) for every non-start method that listens.
# Routers are included (they listen too); callers wanting only plain
# listeners filter on definition.router.
for method_name, method_definition in cls.flow_definition().methods.items():
for method_name, method_definition in self._definition.methods.items():
if method_definition.listen is not None and not method_definition.is_start:
yield (
FlowMethodName(method_name),
@@ -717,25 +715,22 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
method_definition.listen,
)
@classmethod
def _start_condition(
cls, method_name: FlowMethodName
self, method_name: FlowMethodName
) -> FlowDefinitionCondition | None:
method_definition = cls.flow_definition().methods[str(method_name)]
method_definition = self._definition.methods[str(method_name)]
start = method_definition.start
if isinstance(start, (str, dict)):
return start
return None
@classmethod
def _listen_condition(
cls, method_name: FlowMethodName
self, method_name: FlowMethodName
) -> FlowDefinitionCondition | None:
return cls.flow_definition().methods[str(method_name)].listen
return self._definition.methods[str(method_name)].listen
@classmethod
def _is_router(cls, method_name: FlowMethodName) -> bool:
return cls.flow_definition().methods[str(method_name)].router
def _is_router(self, method_name: FlowMethodName) -> bool:
return self._definition.methods[str(method_name)].router
initial_state: Annotated[ # type: ignore[type-arg]
type[BaseModel] | type[dict] | dict[str, Any] | BaseModel | None,
@@ -893,6 +888,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
PrivateAttr(default=None)
)
_method_outputs: list[Any] = PrivateAttr(default_factory=list)
_definition: FlowDefinition = PrivateAttr()
_state_lock: threading.Lock = PrivateAttr(default_factory=threading.Lock)
_or_listeners_lock: threading.Lock = PrivateAttr(default_factory=threading.Lock)
_completed_methods: set[FlowMethodName] = PrivateAttr(default_factory=set)
@@ -931,6 +927,8 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
object.__setattr__(self, "_flow_post_init_done", True)
self._initialize_runtime_extension_attrs()
self._definition = type(self).flow_definition()
if self._state is None:
self._state = self._create_initial_state()
@@ -945,7 +943,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
self,
FlowCreatedEvent(
type="flow_created",
flow_name=self.name or self.__class__.__name__,
flow_name=self.name or self._definition.name,
),
)
@@ -955,11 +953,11 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
if self.memory is None and not getattr(self, "_skip_auto_memory", False):
from crewai.memory.utils import sanitize_scope_name
flow_name = sanitize_scope_name(self.name or self.__class__.__name__)
flow_name = sanitize_scope_name(self.name or self._definition.name)
self.memory = Memory(root_scope=f"/flow/{flow_name}")
# Build the runtime method lookup from the static FlowDefinition.
for method_name in type(self).flow_definition().methods:
for method_name in self._definition.methods:
method = getattr(self, method_name, None)
if method is None:
continue
@@ -1043,7 +1041,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
def _start_condition_triggered_by(
self, method_name: FlowMethodName, trigger: FlowMethodName
) -> bool:
condition = type(self)._start_condition(method_name)
condition = self._start_condition(method_name)
if condition is None:
return False
return self._condition_met(
@@ -1071,7 +1069,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
trigger_str = str(trigger)
to_discard: list[FlowMethodName] = []
for listener_name in candidates:
condition = type(self)._listen_condition(listener_name)
condition = self._listen_condition(listener_name)
if condition is None:
continue
if trigger_str in _iter_condition_events(condition):
@@ -1093,9 +1091,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
racing_groups: dict[frozenset[FlowMethodName], FlowMethodName] = {}
listener_conditions: dict[FlowMethodName, FlowDefinitionCondition] = {
listener_name: condition
for listener_name, method_definition, condition in type(
self
)._listener_methods()
for listener_name, method_definition, condition in self._listener_methods()
if not method_definition.router
}
@@ -1368,7 +1364,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
self,
FlowStartedEvent(
type="flow_started",
flow_name=self.name or self.__class__.__name__,
flow_name=self.name or self._definition.name,
inputs=None,
),
)
@@ -1444,7 +1440,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
self,
MethodExecutionFinishedEvent(
type="method_execution_finished",
flow_name=self.name or self.__class__.__name__,
flow_name=self.name or self._definition.name,
method_name=context.method_name,
result=collapsed_outcome if emit else result,
state=self._state,
@@ -1498,7 +1494,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
self,
FlowPausedEvent(
type="flow_paused",
flow_name=self.name or self.__class__.__name__,
flow_name=self.name or self._definition.name,
flow_id=e.context.flow_id,
method_name=e.context.method_name,
state=self._copy_and_serialize_state(),
@@ -1529,7 +1525,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
self,
FlowFinishedEvent(
type="flow_finished",
flow_name=self.name or self.__class__.__name__,
flow_name=self.name or self._definition.name,
result=final_result,
state=self._copy_and_serialize_state(),
),
@@ -2172,7 +2168,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
# explicit finalization call closes the batch.
started_event = FlowStartedEvent(
type="flow_started",
flow_name=self.name or self.__class__.__name__,
flow_name=self.name or self._definition.name,
inputs=inputs,
)
future = crewai_event_bus.emit(self, started_event)
@@ -2212,11 +2208,11 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
# Determine which start methods to execute at kickoff
# Conditional start methods are only triggered by their conditions
# UNLESS there are no unconditional starts (then all starts run as entry points)
start_methods = type(self)._start_method_names()
start_methods = self._start_method_names()
unconditional_starts = [
start_method
for start_method in start_methods
if type(self)._start_condition(start_method) is None
if self._start_condition(start_method) is None
]
# If there are unconditional starts, only run those at kickoff
# If there are NO unconditional starts, run all starts (including conditional ones)
@@ -2264,7 +2260,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
self,
FlowPausedEvent(
type="flow_paused",
flow_name=self.name or self.__class__.__name__,
flow_name=self.name or self._definition.name,
flow_id=e.context.flow_id,
method_name=e.context.method_name,
state=self._copy_and_serialize_state(),
@@ -2314,7 +2310,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
self,
FlowFinishedEvent(
type="flow_finished",
flow_name=self.name or self.__class__.__name__,
flow_name=self.name or self._definition.name,
result=final_output,
state=self._copy_and_serialize_state(),
),
@@ -2400,7 +2396,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
MethodExecutionFinishedEvent,
MethodExecutionFailedEvent,
)
flow_name = self.name or self.__class__.__name__
flow_name = self.name or self._definition.name
nodes = sorted(
(
n
@@ -2459,7 +2455,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
)
# If start method is a router, use its result as an additional trigger
if type(self)._is_router(start_method_name) and result is not None:
if self._is_router(start_method_name) and result is not None:
# Execute listeners for the start method name first
await self._execute_listeners(start_method_name, result, finished_event_id)
# Then execute listeners for the router result (e.g., "approved")
@@ -2537,7 +2533,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
MethodExecutionStartedEvent(
type="method_execution_started",
method_name=method_name,
flow_name=self.name or self.__class__.__name__,
flow_name=self.name or self._definition.name,
params=dumped_params,
state=self._copy_and_serialize_state(),
),
@@ -2589,7 +2585,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
finished_event = MethodExecutionFinishedEvent(
type="method_execution_finished",
method_name=method_name,
flow_name=self.name or self.__class__.__name__,
flow_name=self.name or self._definition.name,
state=self._copy_and_serialize_state(),
result=result,
)
@@ -2618,7 +2614,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
MethodExecutionPausedEvent(
type="method_execution_paused",
method_name=method_name,
flow_name=self.name or self.__class__.__name__,
flow_name=self.name or self._definition.name,
state=self._copy_and_serialize_state(),
flow_id=e.context.flow_id,
message=e.context.message,
@@ -2634,7 +2630,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
MethodExecutionFailedEvent(
type="method_execution_failed",
method_name=method_name,
flow_name=self.name or self.__class__.__name__,
flow_name=self.name or self._definition.name,
error=e,
),
)
@@ -2766,7 +2762,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
await asyncio.gather(*tasks)
if current_trigger in router_results:
for method_name in type(self)._start_method_names():
for method_name in self._start_method_names():
if self._start_condition_triggered_by(
method_name, current_trigger
):
@@ -2797,9 +2793,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
) -> list[FlowMethodName]:
triggered: list[FlowMethodName] = []
for listener_name, method_definition, condition in type(
self
)._listener_methods():
for listener_name, method_definition, condition in self._listener_methods():
is_router = method_definition.router
if router_only != is_router:
continue
@@ -2865,10 +2859,10 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
# For routers, also check if any conditional starts they triggered are completed
# If so, continue their chains
if type(self)._is_router(listener_name):
for start_method_name in type(self)._start_method_names():
if self._is_router(listener_name):
for start_method_name in self._start_method_names():
if (
type(self)._start_condition(start_method_name) is not None
self._start_condition(start_method_name) is not None
and start_method_name in self._completed_methods
):
# This conditional start was executed, continue its chain
@@ -3044,7 +3038,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
self,
FlowInputRequestedEvent(
type="flow_input_requested",
flow_name=self.name or self.__class__.__name__,
flow_name=self.name or self._definition.name,
method_name=method_name,
message=message,
metadata=metadata,
@@ -3111,7 +3105,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
self,
FlowInputReceivedEvent(
type="flow_input_received",
flow_name=self.name or self.__class__.__name__,
flow_name=self.name or self._definition.name,
method_name=method_name,
message=message,
response=response,
@@ -3149,7 +3143,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
self,
HumanFeedbackRequestedEvent(
type="human_feedback_requested",
flow_name=self.name or self.__class__.__name__,
flow_name=self.name or self._definition.name,
method_name="", # Will be set by decorator if needed
output=output,
message=message,
@@ -3178,7 +3172,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
self,
HumanFeedbackReceivedEvent(
type="human_feedback_received",
flow_name=self.name or self.__class__.__name__,
flow_name=self.name or self._definition.name,
method_name="", # Will be set by decorator if needed
feedback=feedback,
outcome=None, # Will be determined after collapsing
@@ -3353,7 +3347,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
self,
FlowPlotEvent(
type="flow_plot",
flow_name=self.name or self.__class__.__name__,
flow_name=self.name or self._definition.name,
),
)
structure = build_flow_structure(cast(Any, self))