diff --git a/lib/crewai/src/crewai/flow/dsl/_listen.py b/lib/crewai/src/crewai/flow/dsl/_listen.py index ba7794e3a..37c9a9d25 100644 --- a/lib/crewai/src/crewai/flow/dsl/_listen.py +++ b/lib/crewai/src/crewai/flow/dsl/_listen.py @@ -8,6 +8,7 @@ from crewai.flow.dsl._types import FlowMethodDecorator, FlowTrigger from crewai.flow.dsl._utils import ( P, R, + _method_action, _set_flow_method_definition, ) from crewai.flow.flow_definition import FlowMethodDefinition @@ -45,7 +46,11 @@ def listen(condition: FlowTrigger) -> FlowMethodDecorator: wrapper = ListenMethod(func) _set_flow_method_definition( - wrapper, FlowMethodDefinition(listen=_to_definition_condition(condition)) + wrapper, + FlowMethodDefinition( + do=_method_action(func), + listen=_to_definition_condition(condition), + ), ) return wrapper diff --git a/lib/crewai/src/crewai/flow/dsl/_router.py b/lib/crewai/src/crewai/flow/dsl/_router.py index 327f3a943..3edbf33ba 100644 --- a/lib/crewai/src/crewai/flow/dsl/_router.py +++ b/lib/crewai/src/crewai/flow/dsl/_router.py @@ -19,6 +19,7 @@ from crewai.flow.dsl._types import FlowMethodDecorator, FlowTrigger from crewai.flow.dsl._utils import ( P, R, + _method_action, _set_flow_method_definition, ) from crewai.flow.flow_definition import FlowMethodDefinition @@ -148,6 +149,7 @@ def router( _set_flow_method_definition( wrapper, FlowMethodDefinition( + do=_method_action(func), listen=_to_definition_condition(condition), router=True, emit=router_events or None, diff --git a/lib/crewai/src/crewai/flow/dsl/_start.py b/lib/crewai/src/crewai/flow/dsl/_start.py index 882128d6c..fe9f82974 100644 --- a/lib/crewai/src/crewai/flow/dsl/_start.py +++ b/lib/crewai/src/crewai/flow/dsl/_start.py @@ -8,6 +8,7 @@ from crewai.flow.dsl._types import FlowMethodDecorator, FlowTrigger from crewai.flow.dsl._utils import ( P, R, + _method_action, _set_flow_method_definition, ) from crewai.flow.flow_definition import FlowMethodDefinition @@ -53,13 +54,17 @@ def start( def decorator(func: Callable[P, R]) -> StartMethod[P, R]: wrapper = StartMethod(func) - if condition is not None: - _set_flow_method_definition( - wrapper, - FlowMethodDefinition(start=_to_definition_condition(condition)), - ) - else: - _set_flow_method_definition(wrapper, FlowMethodDefinition(start=True)) + _set_flow_method_definition( + wrapper, + FlowMethodDefinition( + do=_method_action(func), + start=( + _to_definition_condition(condition) + if condition is not None + else True + ), + ), + ) return wrapper return cast(FlowMethodDecorator, decorator) diff --git a/lib/crewai/src/crewai/flow/dsl/_utils.py b/lib/crewai/src/crewai/flow/dsl/_utils.py index d25cb3b54..c9ceebdc0 100644 --- a/lib/crewai/src/crewai/flow/dsl/_utils.py +++ b/lib/crewai/src/crewai/flow/dsl/_utils.py @@ -8,6 +8,7 @@ from pydantic import BaseModel from typing_extensions import TypeIs from crewai.flow.flow_definition import ( + FlowActionDefinition, FlowConfigDefinition, FlowConversationalDefinition, FlowConversationalRouterDefinition, @@ -84,6 +85,10 @@ def _stamp_inherited_conversational_metadata( return method +def _method_action(method: Any) -> FlowActionDefinition: + return FlowActionDefinition(ref=f"{method.__module__}:{method.__qualname__}") + + def _set_flow_method_definition( wrapper: FlowMethod[P, R], definition: FlowMethodDefinition, @@ -373,14 +378,11 @@ def _build_method_definition( ) -> FlowMethodDefinition: fragment = _get_flow_method_definition(method) if fragment is None: - method_definition = FlowMethodDefinition() + method_definition = FlowMethodDefinition(do=_method_action(method)) else: - method_definition = fragment.model_copy(deep=True) - - # Skip / qualnames: they can never be re-imported, so a - # missing handler is more honest than a dead reference. - if "<" not in method.__qualname__: - method_definition.handler = f"{method.__module__}:{method.__qualname__}" + method_definition = fragment.model_copy( + deep=True, update={"do": _method_action(method)} + ) human_feedback = _build_human_feedback_definition( method, diagnostics, f"{path}.human_feedback" diff --git a/lib/crewai/src/crewai/flow/flow_definition.py b/lib/crewai/src/crewai/flow/flow_definition.py index 157cb56f7..365bfd7a7 100644 --- a/lib/crewai/src/crewai/flow/flow_definition.py +++ b/lib/crewai/src/crewai/flow/flow_definition.py @@ -27,6 +27,7 @@ logger = logging.getLogger(__name__) FlowDefinitionCondition = str | dict[str, Any] __all__ = [ + "FlowActionDefinition", "FlowConfigDefinition", "FlowConversationalDefinition", "FlowConversationalRouterDefinition", @@ -91,10 +92,17 @@ class FlowHumanFeedbackDefinition(BaseModel): learn_strict: bool = False +class FlowActionDefinition(BaseModel): + """What a Flow method node executes, independent of when it fires.""" + + call: TypingLiteral["code"] = "code" + ref: str + + class FlowMethodDefinition(BaseModel): """Static definition of one Flow method and its execution roles.""" - handler: str | None = None + do: FlowActionDefinition start: bool | FlowDefinitionCondition | None = None listen: FlowDefinitionCondition | None = None router: bool = False diff --git a/lib/crewai/src/crewai/flow/runtime.py b/lib/crewai/src/crewai/flow/runtime/__init__.py similarity index 98% rename from lib/crewai/src/crewai/flow/runtime.py rename to lib/crewai/src/crewai/flow/runtime/__init__.py index 011074992..33d399da5 100644 --- a/lib/crewai/src/crewai/flow/runtime.py +++ b/lib/crewai/src/crewai/flow/runtime/__init__.py @@ -107,6 +107,7 @@ from crewai.flow.flow_wrappers import ( from crewai.flow.human_feedback import HumanFeedbackResult from crewai.flow.input_provider import InputProvider from crewai.flow.persistence.base import FlowPersistence +from crewai.flow.runtime._action_resolvers import resolve_action from crewai.flow.types import ( FlowExecutionData, FlowMethodName, @@ -171,24 +172,6 @@ def _condition_satisfied(condition: FlowDefinitionCondition, events: set[str]) - return combine(_condition_satisfied(branch, events) for branch in branches) -def _resolve_handler(ref: str) -> Callable[..., Any]: - module_name, separator, qualname = ref.partition(":") - if not separator or not module_name or not qualname: - raise ValueError( - f"invalid handler reference {ref!r}; expected 'module:qualname'" - ) - module = importlib.import_module(module_name) - target: Any = module - for part in qualname.split("."): - target = getattr(target, part) - if not callable(target): - raise TypeError( - f"handler reference {ref!r} resolved to a non-callable " - f"{type(target).__name__}" - ) - return cast(Callable[..., Any], target) - - def _build_definition_state_model( state_definition: FlowStateDefinition, ) -> BaseModel | None: @@ -201,7 +184,10 @@ def _build_definition_state_model( model_class: type[BaseModel] | None = None if state_definition.ref: try: - resolved = _resolve_handler(state_definition.ref) + module_name, _, qualname = state_definition.ref.partition(":") + resolved: Any = importlib.import_module(module_name) + for part in qualname.split("."): + resolved = getattr(resolved, part) except Exception: logger.warning( "Could not import state ref %r", state_definition.ref, exc_info=True @@ -1007,7 +993,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): if self.name and self.name != self._definition.name: self._definition = self._definition.model_copy(update={"name": self.name}) methods = ( - self._handler_bound_methods() + self._action_bound_methods() if definition is not None else self._class_bound_methods() ) @@ -1041,26 +1027,24 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): self._methods.update(methods) - def _handler_bound_methods(self) -> dict[FlowMethodName, Callable[..., Any]]: + def _action_bound_methods(self) -> dict[FlowMethodName, Callable[..., Any]]: + def resolve(name: str, definition: FlowMethodDefinition) -> Callable[..., Any]: + try: + return resolve_action(self, definition.do) + except Exception as e: + unresolved.append(f"{name}: {e}") + return lambda *args, **kwargs: None + methods: dict[FlowMethodName, Callable[..., Any]] = {} unresolved: list[str] = [] for method_name, method_definition in self._definition.methods.items(): - if method_definition.handler is None: - unresolved.append(f"{method_name}: no handler") - continue - try: - handler = _resolve_handler(method_definition.handler) - except Exception as e: - unresolved.append(f"{method_name}: {e}") - continue - if getattr(handler, "__self__", None) is None: - handler = handler.__get__(self, type(self)) - methods[FlowMethodName(method_name)] = handler + methods[FlowMethodName(method_name)] = resolve( + method_name, method_definition + ) if unresolved: raise ValueError( f"Cannot build flow {self._definition.name!r} from its definition; " - "methods with missing or unresolvable handlers: " - + "; ".join(unresolved) + "methods with unresolvable actions: " + "; ".join(unresolved) ) return methods diff --git a/lib/crewai/src/crewai/flow/runtime/_action_resolvers.py b/lib/crewai/src/crewai/flow/runtime/_action_resolvers.py new file mode 100644 index 000000000..80512b11d --- /dev/null +++ b/lib/crewai/src/crewai/flow/runtime/_action_resolvers.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +from collections.abc import Callable +import importlib +from operator import attrgetter +from typing import TYPE_CHECKING, Any, cast + +from crewai.flow.flow_definition import FlowActionDefinition + + +if TYPE_CHECKING: + from crewai.flow.runtime import Flow + + +class InvalidActionRefError(ValueError): + def __init__(self, ref: str) -> None: + super().__init__(f"invalid callable {ref!r}; expected 'module:qualname'") + + +def _resolve_code_action( + flow: Flow[Any], action: FlowActionDefinition +) -> Callable[..., Any]: + ref = action.ref + module_name, _, qualname = ref.partition(":") + if "<" in ref or not module_name or not qualname: + raise InvalidActionRefError(ref) + try: + target = attrgetter(qualname)(importlib.import_module(module_name)) + except (ImportError, AttributeError) as e: + raise InvalidActionRefError(ref) from e + if not callable(target): + raise InvalidActionRefError(ref) + handler = cast(Callable[..., Any], target) + if getattr(handler, "__self__", None) is None: + handler = handler.__get__(flow, type(flow)) + return handler + + +def resolve_action(flow: Flow[Any], action: FlowActionDefinition) -> Callable[..., Any]: + """Turn one `do:` action into the callable the flow runs for that node.""" + if action.call == "code": + return _resolve_code_action(flow, action) + raise ValueError(f"unknown call type {action.call!r}") diff --git a/lib/crewai/tests/test_flow_definition.py b/lib/crewai/tests/test_flow_definition.py index f79917369..ebbb2e60d 100644 --- a/lib/crewai/tests/test_flow_definition.py +++ b/lib/crewai/tests/test_flow_definition.py @@ -36,6 +36,7 @@ def test_flow_public_exports_are_explicit(): "start", } assert set(flow_definition.__all__) == { + "FlowActionDefinition", "FlowConfigDefinition", "FlowConversationalDefinition", "FlowConversationalRouterDefinition", @@ -629,6 +630,7 @@ def test_flow_definition_preserves_diagnostics_loaded_from_contract(): "name": "LoadedDiagnosticsFlow", "methods": { "decision": { + "do": {"ref": "loaded_flows:LoadedDiagnosticsFlow.decision"}, "router": True, "emit": ["continue"], } @@ -662,6 +664,7 @@ def test_router_start_false_without_listen_reports_missing_trigger(): "name": "LoadedFlow", "methods": { "decision": { + "do": {"ref": "loaded_flows:LoadedFlow.decision"}, "router": True, "start": False, "emit": ["continue"], @@ -740,8 +743,14 @@ def test_static_string_listener_is_allowed_by_contract(): "schema": "crewai.flow/v1", "name": "TypoFlow", "methods": { - "begin": {"start": True}, - "handle": {"listen": "begni"}, + "begin": { + "do": {"ref": "loaded_flows:TypoFlow.begin"}, + "start": True, + }, + "handle": { + "do": {"ref": "loaded_flows:TypoFlow.handle"}, + "listen": "begni", + }, }, } ) @@ -754,8 +763,15 @@ def test_start_false_not_classified_as_start_method(): "schema": "crewai.flow/v1", "name": "ExplicitNonStartFlow", "methods": { - "begin": {"start": True}, - "handle": {"start": False, "listen": "begin"}, + "begin": { + "do": {"ref": "loaded_flows:ExplicitNonStartFlow.begin"}, + "start": True, + }, + "handle": { + "do": {"ref": "loaded_flows:ExplicitNonStartFlow.handle"}, + "start": False, + "listen": "begin", + }, }, } ) @@ -812,6 +828,7 @@ def test_flow_definition_logs_diagnostics_when_loaded_from_contract(caplog): "name": "LoadedFlow", "methods": { "decision": { + "do": {"ref": "loaded_flows:LoadedFlow.decision"}, "router": True, "emit": ["continue"], } diff --git a/lib/crewai/tests/test_flow_from_definition.py b/lib/crewai/tests/test_flow_from_definition.py index f93dab69e..a273efba1 100644 --- a/lib/crewai/tests/test_flow_from_definition.py +++ b/lib/crewai/tests/test_flow_from_definition.py @@ -1,6 +1,7 @@ from __future__ import annotations import pytest +from pydantic import ValidationError from crewai.events.event_bus import crewai_event_bus from crewai.events.types.flow_events import ( @@ -33,13 +34,17 @@ schema: crewai.flow/v1 name: ChainFlow methods: begin: - handler: {__name__}:ChainFlow.begin + do: + call: code + ref: {__name__}:ChainFlow.begin start: true shout: - handler: {__name__}:ChainFlow.shout + do: + ref: {__name__}:ChainFlow.shout listen: begin confirm: - handler: {__name__}:ChainFlow.confirm + do: + ref: {__name__}:ChainFlow.confirm listen: shout """ @@ -73,20 +78,25 @@ schema: crewai.flow/v1 name: MergeFlow methods: begin: - handler: {__name__}:MergeFlow.begin + do: + ref: {__name__}:MergeFlow.begin start: true left: - handler: {__name__}:MergeFlow.left + do: + ref: {__name__}:MergeFlow.left listen: begin right: - handler: {__name__}:MergeFlow.right + do: + ref: {__name__}:MergeFlow.right listen: begin either: - handler: {__name__}:MergeFlow.either + do: + ref: {__name__}:MergeFlow.either listen: or: [left, right] join: - handler: {__name__}:MergeFlow.join + do: + ref: {__name__}:MergeFlow.join listen: and: [left, right, either] """ @@ -115,17 +125,21 @@ schema: crewai.flow/v1 name: RouteFlow methods: begin: - handler: {__name__}:RouteFlow.begin + do: + ref: {__name__}:RouteFlow.begin start: true decide: - handler: {__name__}:RouteFlow.decide + do: + ref: {__name__}:RouteFlow.decide listen: begin router: true take_left: - handler: {__name__}:RouteFlow.take_left + do: + ref: {__name__}:RouteFlow.take_left listen: left take_right: - handler: {__name__}:RouteFlow.take_right + do: + ref: {__name__}:RouteFlow.take_right listen: right """ @@ -152,14 +166,17 @@ schema: crewai.flow/v1 name: LoopFlow methods: step: - handler: {__name__}:LoopFlow.step + do: + ref: {__name__}:LoopFlow.step start: retry decide: - handler: {__name__}:LoopFlow.decide + do: + ref: {__name__}:LoopFlow.decide listen: step router: true finish: - handler: {__name__}:LoopFlow.finish + do: + ref: {__name__}:LoopFlow.finish listen: done """ @@ -189,10 +206,12 @@ state: ref: {__name__}:CounterState methods: begin: - handler: {__name__}:PydanticStateFlow.begin + do: + ref: {__name__}:PydanticStateFlow.begin start: true finish: - handler: {__name__}:PydanticStateFlow.finish + do: + ref: {__name__}:PydanticStateFlow.finish listen: begin """ @@ -206,10 +225,12 @@ state: count: 5 methods: begin: - handler: {__name__}:PydanticStateFlow.begin + do: + ref: {__name__}:PydanticStateFlow.begin start: true finish: - handler: {__name__}:PydanticStateFlow.finish + do: + ref: {__name__}:PydanticStateFlow.finish listen: begin """ @@ -230,10 +251,12 @@ state: default: none methods: begin: - handler: {__name__}:PydanticStateFlow.begin + do: + ref: {__name__}:PydanticStateFlow.begin start: true finish: - handler: {__name__}:PydanticStateFlow.finish + do: + ref: {__name__}:PydanticStateFlow.finish listen: begin """ @@ -255,10 +278,12 @@ state: default: none methods: begin: - handler: {__name__}:PydanticStateFlow.begin + do: + ref: {__name__}:PydanticStateFlow.begin start: true finish: - handler: {__name__}:PydanticStateFlow.finish + do: + ref: {__name__}:PydanticStateFlow.finish listen: begin """ @@ -270,7 +295,8 @@ state: ref: definitely_not_a_module_xyz:MissingState methods: begin: - handler: {__name__}:ChainFlow.begin + do: + ref: {__name__}:ChainFlow.begin start: true """ @@ -283,7 +309,8 @@ state: count: 5 methods: begin: - handler: {__name__}:ChainFlow.begin + do: + ref: {__name__}:ChainFlow.begin start: true """ @@ -295,7 +322,8 @@ state: ref: somewhere:Something methods: begin: - handler: {__name__}:ChainFlow.begin + do: + ref: {__name__}:ChainFlow.begin start: true """ @@ -381,43 +409,41 @@ def test_definition_flow_events_use_definition_name(): assert all(flow_name == "ChainFlow" for _, _, flow_name in events) -def test_from_definition_missing_handler_raises(): +def test_definition_method_without_action_is_invalid(): + with pytest.raises(ValidationError, match="do"): + FlowDefinition.from_dict( + { + "schema": "crewai.flow/v1", + "name": "NoActions", + "methods": {"begin": {"start": True}}, + } + ) + + +def test_from_definition_unresolvable_ref_raises(): definition = FlowDefinition.from_dict( { "schema": "crewai.flow/v1", - "name": "NoHandlers", - "methods": {"begin": {"start": True}}, - } - ) - - with pytest.raises(ValueError, match="begin: no handler"): - Flow.from_definition(definition) - - -def test_from_definition_unresolvable_handler_raises(): - definition = FlowDefinition.from_dict( - { - "schema": "crewai.flow/v1", - "name": "BadHandlers", + "name": "BadRefs", "methods": { "begin": { "start": True, - "handler": "definitely_not_a_module_xyz:nope", + "do": {"ref": "definitely_not_a_module_xyz:nope"}, } }, } ) - with pytest.raises(ValueError, match="missing or unresolvable handlers.*begin"): + with pytest.raises(ValueError, match="unresolvable actions.*begin"): Flow.from_definition(definition) -def test_from_definition_malformed_handler_raises(): +def test_from_definition_malformed_ref_raises(): definition = FlowDefinition.from_dict( { "schema": "crewai.flow/v1", - "name": "MalformedHandlers", - "methods": {"begin": {"start": True, "handler": "no-colon-here"}}, + "name": "MalformedRefs", + "methods": {"begin": {"start": True, "do": {"ref": "no-colon-here"}}}, } ) @@ -425,11 +451,29 @@ def test_from_definition_malformed_handler_raises(): Flow.from_definition(definition) -def test_flow_definition_stamps_handler_refs(): +def test_from_definition_local_scope_ref_raises(): + definition = FlowDefinition.from_dict( + { + "schema": "crewai.flow/v1", + "name": "LocalRefs", + "methods": { + "begin": { + "start": True, + "do": {"ref": f"{__name__}:make..LocalFlow.begin"}, + } + }, + } + ) + + with pytest.raises(ValueError, match="expected 'module:qualname'"): + Flow.from_definition(definition) + + +def test_flow_definition_stamps_refs(): definition = ChainFlow.flow_definition() - assert definition.methods["begin"].handler == f"{__name__}:ChainFlow.begin" - assert definition.methods["shout"].handler == f"{__name__}:ChainFlow.shout" + assert definition.methods["begin"].do.ref == f"{__name__}:ChainFlow.begin" + assert definition.methods["shout"].do.ref == f"{__name__}:ChainFlow.shout" def test_pydantic_state_from_ref_parity(): diff --git a/lib/crewai/tests/test_flow_visualization.py b/lib/crewai/tests/test_flow_visualization.py index 167703a14..3dbe4e140 100644 --- a/lib/crewai/tests/test_flow_visualization.py +++ b/lib/crewai/tests/test_flow_visualization.py @@ -77,12 +77,22 @@ class ComplexFlow(Flow): return "complete" -def _attach_flow_definition(flow_class: type[Flow], methods: dict[str, object]) -> None: +def _attach_flow_definition( + flow_class: type[Flow], methods: dict[str, dict[str, object]] +) -> None: flow_class._flow_definition = FlowDefinition.from_dict( { "schema": "crewai.flow/v1", "name": flow_class.__name__, - "methods": methods, + "methods": { + name: { + "do": { + "ref": f"{flow_class.__module__}:{flow_class.__name__}.{name}" + }, + **spec, + } + for name, spec in methods.items() + }, } ) @@ -125,13 +135,20 @@ def test_build_flow_structure_from_flow_definition(): "schema": "crewai.flow/v1", "name": "DefinedFlow", "methods": { - "begin": {"start": True}, + "begin": { + "do": {"ref": "defined_flows:DefinedFlow.begin"}, + "start": True, + }, "decide": { + "do": {"ref": "defined_flows:DefinedFlow.decide"}, "listen": "begin", "router": True, "emit": ["done"], }, - "finish": {"listen": "done"}, + "finish": { + "do": {"ref": "defined_flows:DefinedFlow.finish"}, + "listen": "done", + }, }, } )