Replace handler string with structured FlowActionDefinition

`handler: str | None` was optional and opaque — missing handlers only
surfaced at kickoff time. `do: FlowActionDefinition` is required, so
Pydantic rejects invalid definitions at parse time.

The `call: "code"` discriminator prepares the schema for future
non-Python action types (e.g. MCP tool, crew) without touching
`FlowMethodDefinition`. Resolution logic is extracted to
`runtime/_action_resolvers.py` to keep the dispatch point isolated.
This commit is contained in:
Vinicius Brasil
2026-06-11 10:17:49 -07:00
parent d029a5cd92
commit 837ae2bf7f
10 changed files with 234 additions and 107 deletions

View File

@@ -8,6 +8,7 @@ from crewai.flow.dsl._types import FlowMethodDecorator, FlowTrigger
from crewai.flow.dsl._utils import (
P,
R,
_method_action,
_set_flow_method_definition,
)
from crewai.flow.flow_definition import FlowMethodDefinition
@@ -45,7 +46,11 @@ def listen(condition: FlowTrigger) -> FlowMethodDecorator:
wrapper = ListenMethod(func)
_set_flow_method_definition(
wrapper, FlowMethodDefinition(listen=_to_definition_condition(condition))
wrapper,
FlowMethodDefinition(
do=_method_action(func),
listen=_to_definition_condition(condition),
),
)
return wrapper

View File

@@ -19,6 +19,7 @@ from crewai.flow.dsl._types import FlowMethodDecorator, FlowTrigger
from crewai.flow.dsl._utils import (
P,
R,
_method_action,
_set_flow_method_definition,
)
from crewai.flow.flow_definition import FlowMethodDefinition
@@ -148,6 +149,7 @@ def router(
_set_flow_method_definition(
wrapper,
FlowMethodDefinition(
do=_method_action(func),
listen=_to_definition_condition(condition),
router=True,
emit=router_events or None,

View File

@@ -8,6 +8,7 @@ from crewai.flow.dsl._types import FlowMethodDecorator, FlowTrigger
from crewai.flow.dsl._utils import (
P,
R,
_method_action,
_set_flow_method_definition,
)
from crewai.flow.flow_definition import FlowMethodDefinition
@@ -53,13 +54,17 @@ def start(
def decorator(func: Callable[P, R]) -> StartMethod[P, R]:
wrapper = StartMethod(func)
if condition is not None:
_set_flow_method_definition(
wrapper,
FlowMethodDefinition(start=_to_definition_condition(condition)),
)
else:
_set_flow_method_definition(wrapper, FlowMethodDefinition(start=True))
_set_flow_method_definition(
wrapper,
FlowMethodDefinition(
do=_method_action(func),
start=(
_to_definition_condition(condition)
if condition is not None
else True
),
),
)
return wrapper
return cast(FlowMethodDecorator, decorator)

View File

@@ -8,6 +8,7 @@ from pydantic import BaseModel
from typing_extensions import TypeIs
from crewai.flow.flow_definition import (
FlowActionDefinition,
FlowConfigDefinition,
FlowConversationalDefinition,
FlowConversationalRouterDefinition,
@@ -84,6 +85,10 @@ def _stamp_inherited_conversational_metadata(
return method
def _method_action(method: Any) -> FlowActionDefinition:
return FlowActionDefinition(ref=f"{method.__module__}:{method.__qualname__}")
def _set_flow_method_definition(
wrapper: FlowMethod[P, R],
definition: FlowMethodDefinition,
@@ -373,14 +378,11 @@ def _build_method_definition(
) -> FlowMethodDefinition:
fragment = _get_flow_method_definition(method)
if fragment is None:
method_definition = FlowMethodDefinition()
method_definition = FlowMethodDefinition(do=_method_action(method))
else:
method_definition = fragment.model_copy(deep=True)
# Skip <locals>/<lambda> qualnames: they can never be re-imported, so a
# missing handler is more honest than a dead reference.
if "<" not in method.__qualname__:
method_definition.handler = f"{method.__module__}:{method.__qualname__}"
method_definition = fragment.model_copy(
deep=True, update={"do": _method_action(method)}
)
human_feedback = _build_human_feedback_definition(
method, diagnostics, f"{path}.human_feedback"

View File

@@ -27,6 +27,7 @@ logger = logging.getLogger(__name__)
FlowDefinitionCondition = str | dict[str, Any]
__all__ = [
"FlowActionDefinition",
"FlowConfigDefinition",
"FlowConversationalDefinition",
"FlowConversationalRouterDefinition",
@@ -91,10 +92,17 @@ class FlowHumanFeedbackDefinition(BaseModel):
learn_strict: bool = False
class FlowActionDefinition(BaseModel):
"""What a Flow method node executes, independent of when it fires."""
call: TypingLiteral["code"] = "code"
ref: str
class FlowMethodDefinition(BaseModel):
"""Static definition of one Flow method and its execution roles."""
handler: str | None = None
do: FlowActionDefinition
start: bool | FlowDefinitionCondition | None = None
listen: FlowDefinitionCondition | None = None
router: bool = False

View File

@@ -107,6 +107,7 @@ from crewai.flow.flow_wrappers import (
from crewai.flow.human_feedback import HumanFeedbackResult
from crewai.flow.input_provider import InputProvider
from crewai.flow.persistence.base import FlowPersistence
from crewai.flow.runtime._action_resolvers import resolve_action
from crewai.flow.types import (
FlowExecutionData,
FlowMethodName,
@@ -171,24 +172,6 @@ def _condition_satisfied(condition: FlowDefinitionCondition, events: set[str]) -
return combine(_condition_satisfied(branch, events) for branch in branches)
def _resolve_handler(ref: str) -> Callable[..., Any]:
module_name, separator, qualname = ref.partition(":")
if not separator or not module_name or not qualname:
raise ValueError(
f"invalid handler reference {ref!r}; expected 'module:qualname'"
)
module = importlib.import_module(module_name)
target: Any = module
for part in qualname.split("."):
target = getattr(target, part)
if not callable(target):
raise TypeError(
f"handler reference {ref!r} resolved to a non-callable "
f"{type(target).__name__}"
)
return cast(Callable[..., Any], target)
def _build_definition_state_model(
state_definition: FlowStateDefinition,
) -> BaseModel | None:
@@ -201,7 +184,10 @@ def _build_definition_state_model(
model_class: type[BaseModel] | None = None
if state_definition.ref:
try:
resolved = _resolve_handler(state_definition.ref)
module_name, _, qualname = state_definition.ref.partition(":")
resolved: Any = importlib.import_module(module_name)
for part in qualname.split("."):
resolved = getattr(resolved, part)
except Exception:
logger.warning(
"Could not import state ref %r", state_definition.ref, exc_info=True
@@ -1007,7 +993,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
if self.name and self.name != self._definition.name:
self._definition = self._definition.model_copy(update={"name": self.name})
methods = (
self._handler_bound_methods()
self._action_bound_methods()
if definition is not None
else self._class_bound_methods()
)
@@ -1041,26 +1027,24 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
self._methods.update(methods)
def _handler_bound_methods(self) -> dict[FlowMethodName, Callable[..., Any]]:
def _action_bound_methods(self) -> dict[FlowMethodName, Callable[..., Any]]:
def resolve(name: str, definition: FlowMethodDefinition) -> Callable[..., Any]:
try:
return resolve_action(self, definition.do)
except Exception as e:
unresolved.append(f"{name}: {e}")
return lambda *args, **kwargs: None
methods: dict[FlowMethodName, Callable[..., Any]] = {}
unresolved: list[str] = []
for method_name, method_definition in self._definition.methods.items():
if method_definition.handler is None:
unresolved.append(f"{method_name}: no handler")
continue
try:
handler = _resolve_handler(method_definition.handler)
except Exception as e:
unresolved.append(f"{method_name}: {e}")
continue
if getattr(handler, "__self__", None) is None:
handler = handler.__get__(self, type(self))
methods[FlowMethodName(method_name)] = handler
methods[FlowMethodName(method_name)] = resolve(
method_name, method_definition
)
if unresolved:
raise ValueError(
f"Cannot build flow {self._definition.name!r} from its definition; "
"methods with missing or unresolvable handlers: "
+ "; ".join(unresolved)
"methods with unresolvable actions: " + "; ".join(unresolved)
)
return methods

View File

@@ -0,0 +1,43 @@
from __future__ import annotations
from collections.abc import Callable
import importlib
from operator import attrgetter
from typing import TYPE_CHECKING, Any, cast
from crewai.flow.flow_definition import FlowActionDefinition
if TYPE_CHECKING:
from crewai.flow.runtime import Flow
class InvalidActionRefError(ValueError):
def __init__(self, ref: str) -> None:
super().__init__(f"invalid callable {ref!r}; expected 'module:qualname'")
def _resolve_code_action(
flow: Flow[Any], action: FlowActionDefinition
) -> Callable[..., Any]:
ref = action.ref
module_name, _, qualname = ref.partition(":")
if "<" in ref or not module_name or not qualname:
raise InvalidActionRefError(ref)
try:
target = attrgetter(qualname)(importlib.import_module(module_name))
except (ImportError, AttributeError) as e:
raise InvalidActionRefError(ref) from e
if not callable(target):
raise InvalidActionRefError(ref)
handler = cast(Callable[..., Any], target)
if getattr(handler, "__self__", None) is None:
handler = handler.__get__(flow, type(flow))
return handler
def resolve_action(flow: Flow[Any], action: FlowActionDefinition) -> Callable[..., Any]:
"""Turn one `do:` action into the callable the flow runs for that node."""
if action.call == "code":
return _resolve_code_action(flow, action)
raise ValueError(f"unknown call type {action.call!r}")

View File

@@ -36,6 +36,7 @@ def test_flow_public_exports_are_explicit():
"start",
}
assert set(flow_definition.__all__) == {
"FlowActionDefinition",
"FlowConfigDefinition",
"FlowConversationalDefinition",
"FlowConversationalRouterDefinition",
@@ -629,6 +630,7 @@ def test_flow_definition_preserves_diagnostics_loaded_from_contract():
"name": "LoadedDiagnosticsFlow",
"methods": {
"decision": {
"do": {"ref": "loaded_flows:LoadedDiagnosticsFlow.decision"},
"router": True,
"emit": ["continue"],
}
@@ -662,6 +664,7 @@ def test_router_start_false_without_listen_reports_missing_trigger():
"name": "LoadedFlow",
"methods": {
"decision": {
"do": {"ref": "loaded_flows:LoadedFlow.decision"},
"router": True,
"start": False,
"emit": ["continue"],
@@ -740,8 +743,14 @@ def test_static_string_listener_is_allowed_by_contract():
"schema": "crewai.flow/v1",
"name": "TypoFlow",
"methods": {
"begin": {"start": True},
"handle": {"listen": "begni"},
"begin": {
"do": {"ref": "loaded_flows:TypoFlow.begin"},
"start": True,
},
"handle": {
"do": {"ref": "loaded_flows:TypoFlow.handle"},
"listen": "begni",
},
},
}
)
@@ -754,8 +763,15 @@ def test_start_false_not_classified_as_start_method():
"schema": "crewai.flow/v1",
"name": "ExplicitNonStartFlow",
"methods": {
"begin": {"start": True},
"handle": {"start": False, "listen": "begin"},
"begin": {
"do": {"ref": "loaded_flows:ExplicitNonStartFlow.begin"},
"start": True,
},
"handle": {
"do": {"ref": "loaded_flows:ExplicitNonStartFlow.handle"},
"start": False,
"listen": "begin",
},
},
}
)
@@ -812,6 +828,7 @@ def test_flow_definition_logs_diagnostics_when_loaded_from_contract(caplog):
"name": "LoadedFlow",
"methods": {
"decision": {
"do": {"ref": "loaded_flows:LoadedFlow.decision"},
"router": True,
"emit": ["continue"],
}

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
import pytest
from pydantic import ValidationError
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.flow_events import (
@@ -33,13 +34,17 @@ schema: crewai.flow/v1
name: ChainFlow
methods:
begin:
handler: {__name__}:ChainFlow.begin
do:
call: code
ref: {__name__}:ChainFlow.begin
start: true
shout:
handler: {__name__}:ChainFlow.shout
do:
ref: {__name__}:ChainFlow.shout
listen: begin
confirm:
handler: {__name__}:ChainFlow.confirm
do:
ref: {__name__}:ChainFlow.confirm
listen: shout
"""
@@ -73,20 +78,25 @@ schema: crewai.flow/v1
name: MergeFlow
methods:
begin:
handler: {__name__}:MergeFlow.begin
do:
ref: {__name__}:MergeFlow.begin
start: true
left:
handler: {__name__}:MergeFlow.left
do:
ref: {__name__}:MergeFlow.left
listen: begin
right:
handler: {__name__}:MergeFlow.right
do:
ref: {__name__}:MergeFlow.right
listen: begin
either:
handler: {__name__}:MergeFlow.either
do:
ref: {__name__}:MergeFlow.either
listen:
or: [left, right]
join:
handler: {__name__}:MergeFlow.join
do:
ref: {__name__}:MergeFlow.join
listen:
and: [left, right, either]
"""
@@ -115,17 +125,21 @@ schema: crewai.flow/v1
name: RouteFlow
methods:
begin:
handler: {__name__}:RouteFlow.begin
do:
ref: {__name__}:RouteFlow.begin
start: true
decide:
handler: {__name__}:RouteFlow.decide
do:
ref: {__name__}:RouteFlow.decide
listen: begin
router: true
take_left:
handler: {__name__}:RouteFlow.take_left
do:
ref: {__name__}:RouteFlow.take_left
listen: left
take_right:
handler: {__name__}:RouteFlow.take_right
do:
ref: {__name__}:RouteFlow.take_right
listen: right
"""
@@ -152,14 +166,17 @@ schema: crewai.flow/v1
name: LoopFlow
methods:
step:
handler: {__name__}:LoopFlow.step
do:
ref: {__name__}:LoopFlow.step
start: retry
decide:
handler: {__name__}:LoopFlow.decide
do:
ref: {__name__}:LoopFlow.decide
listen: step
router: true
finish:
handler: {__name__}:LoopFlow.finish
do:
ref: {__name__}:LoopFlow.finish
listen: done
"""
@@ -189,10 +206,12 @@ state:
ref: {__name__}:CounterState
methods:
begin:
handler: {__name__}:PydanticStateFlow.begin
do:
ref: {__name__}:PydanticStateFlow.begin
start: true
finish:
handler: {__name__}:PydanticStateFlow.finish
do:
ref: {__name__}:PydanticStateFlow.finish
listen: begin
"""
@@ -206,10 +225,12 @@ state:
count: 5
methods:
begin:
handler: {__name__}:PydanticStateFlow.begin
do:
ref: {__name__}:PydanticStateFlow.begin
start: true
finish:
handler: {__name__}:PydanticStateFlow.finish
do:
ref: {__name__}:PydanticStateFlow.finish
listen: begin
"""
@@ -230,10 +251,12 @@ state:
default: none
methods:
begin:
handler: {__name__}:PydanticStateFlow.begin
do:
ref: {__name__}:PydanticStateFlow.begin
start: true
finish:
handler: {__name__}:PydanticStateFlow.finish
do:
ref: {__name__}:PydanticStateFlow.finish
listen: begin
"""
@@ -255,10 +278,12 @@ state:
default: none
methods:
begin:
handler: {__name__}:PydanticStateFlow.begin
do:
ref: {__name__}:PydanticStateFlow.begin
start: true
finish:
handler: {__name__}:PydanticStateFlow.finish
do:
ref: {__name__}:PydanticStateFlow.finish
listen: begin
"""
@@ -270,7 +295,8 @@ state:
ref: definitely_not_a_module_xyz:MissingState
methods:
begin:
handler: {__name__}:ChainFlow.begin
do:
ref: {__name__}:ChainFlow.begin
start: true
"""
@@ -283,7 +309,8 @@ state:
count: 5
methods:
begin:
handler: {__name__}:ChainFlow.begin
do:
ref: {__name__}:ChainFlow.begin
start: true
"""
@@ -295,7 +322,8 @@ state:
ref: somewhere:Something
methods:
begin:
handler: {__name__}:ChainFlow.begin
do:
ref: {__name__}:ChainFlow.begin
start: true
"""
@@ -381,43 +409,41 @@ def test_definition_flow_events_use_definition_name():
assert all(flow_name == "ChainFlow" for _, _, flow_name in events)
def test_from_definition_missing_handler_raises():
def test_definition_method_without_action_is_invalid():
with pytest.raises(ValidationError, match="do"):
FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "NoActions",
"methods": {"begin": {"start": True}},
}
)
def test_from_definition_unresolvable_ref_raises():
definition = FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "NoHandlers",
"methods": {"begin": {"start": True}},
}
)
with pytest.raises(ValueError, match="begin: no handler"):
Flow.from_definition(definition)
def test_from_definition_unresolvable_handler_raises():
definition = FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "BadHandlers",
"name": "BadRefs",
"methods": {
"begin": {
"start": True,
"handler": "definitely_not_a_module_xyz:nope",
"do": {"ref": "definitely_not_a_module_xyz:nope"},
}
},
}
)
with pytest.raises(ValueError, match="missing or unresolvable handlers.*begin"):
with pytest.raises(ValueError, match="unresolvable actions.*begin"):
Flow.from_definition(definition)
def test_from_definition_malformed_handler_raises():
def test_from_definition_malformed_ref_raises():
definition = FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "MalformedHandlers",
"methods": {"begin": {"start": True, "handler": "no-colon-here"}},
"name": "MalformedRefs",
"methods": {"begin": {"start": True, "do": {"ref": "no-colon-here"}}},
}
)
@@ -425,11 +451,29 @@ def test_from_definition_malformed_handler_raises():
Flow.from_definition(definition)
def test_flow_definition_stamps_handler_refs():
def test_from_definition_local_scope_ref_raises():
definition = FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "LocalRefs",
"methods": {
"begin": {
"start": True,
"do": {"ref": f"{__name__}:make.<locals>.LocalFlow.begin"},
}
},
}
)
with pytest.raises(ValueError, match="expected 'module:qualname'"):
Flow.from_definition(definition)
def test_flow_definition_stamps_refs():
definition = ChainFlow.flow_definition()
assert definition.methods["begin"].handler == f"{__name__}:ChainFlow.begin"
assert definition.methods["shout"].handler == f"{__name__}:ChainFlow.shout"
assert definition.methods["begin"].do.ref == f"{__name__}:ChainFlow.begin"
assert definition.methods["shout"].do.ref == f"{__name__}:ChainFlow.shout"
def test_pydantic_state_from_ref_parity():

View File

@@ -77,12 +77,22 @@ class ComplexFlow(Flow):
return "complete"
def _attach_flow_definition(flow_class: type[Flow], methods: dict[str, object]) -> None:
def _attach_flow_definition(
flow_class: type[Flow], methods: dict[str, dict[str, object]]
) -> None:
flow_class._flow_definition = FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": flow_class.__name__,
"methods": methods,
"methods": {
name: {
"do": {
"ref": f"{flow_class.__module__}:{flow_class.__name__}.{name}"
},
**spec,
}
for name, spec in methods.items()
},
}
)
@@ -125,13 +135,20 @@ def test_build_flow_structure_from_flow_definition():
"schema": "crewai.flow/v1",
"name": "DefinedFlow",
"methods": {
"begin": {"start": True},
"begin": {
"do": {"ref": "defined_flows:DefinedFlow.begin"},
"start": True,
},
"decide": {
"do": {"ref": "defined_flows:DefinedFlow.decide"},
"listen": "begin",
"router": True,
"emit": ["done"],
},
"finish": {"listen": "done"},
"finish": {
"do": {"ref": "defined_flows:DefinedFlow.finish"},
"listen": "done",
},
},
}
)