From fe2c236601bda2f53913d64390c3327d5a72dd5b Mon Sep 17 00:00:00 2001 From: Vinicius Brasil Date: Mon, 15 Jun 2026 21:44:33 -0700 Subject: [PATCH] Add `each` composite action to FlowDefinition (#6164) Lets a definition loop over an array without writing Python. Each iteration exposes `item` and prior steps `outputs`. ```yaml do: call: each in: state.rows do: - normalize: call: tool ref: my_tools:NormalizeRowTool with: { row: "${ item }" } - lead_scoring: call: agent # ... ``` --- lib/crewai/src/crewai/flow/flow_definition.py | 81 ++- .../src/crewai/flow/runtime/__init__.py | 15 +- .../src/crewai/flow/runtime/_actions.py | 221 ++++++++ .../src/crewai/flow/runtime/_expressions.py | 45 +- lib/crewai/src/crewai/flow/runtime/_refs.py | 38 ++ .../src/crewai/flow/runtime/_resolvers.py | 116 ----- .../src/crewai/utilities/serialization.py | 25 +- lib/crewai/tests/test_flow_definition.py | 69 +++ lib/crewai/tests/test_flow_from_definition.py | 475 +++++++++++++++++- .../tests/utilities/test_serialization.py | 49 +- 10 files changed, 962 insertions(+), 172 deletions(-) create mode 100644 lib/crewai/src/crewai/flow/runtime/_actions.py create mode 100644 lib/crewai/src/crewai/flow/runtime/_refs.py delete mode 100644 lib/crewai/src/crewai/flow/runtime/_resolvers.py diff --git a/lib/crewai/src/crewai/flow/flow_definition.py b/lib/crewai/src/crewai/flow/flow_definition.py index e70813e7a..ae8be4ec5 100644 --- a/lib/crewai/src/crewai/flow/flow_definition.py +++ b/lib/crewai/src/crewai/flow/flow_definition.py @@ -11,9 +11,17 @@ from __future__ import annotations import json import logging +import re from typing import Any, Literal as TypingLiteral -from pydantic import BaseModel, ConfigDict, Field, field_serializer, model_validator +from pydantic import ( + BaseModel, + ConfigDict, + Field, + RootModel, + field_serializer, + model_validator, +) import yaml from crewai.flow.conversational_definition import ( @@ -25,6 +33,7 @@ from crewai.flow.conversational_definition import ( logger = logging.getLogger(__name__) FlowDefinitionCondition = str | dict[str, Any] +_STEP_NAME_PATTERN = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$") __all__ = [ "FlowActionDefinition", @@ -35,6 +44,8 @@ __all__ = [ "FlowDefinition", "FlowDefinitionCondition", "FlowDefinitionDiagnostic", + "FlowEachActionDefinition", + "FlowEachInnerActionDefinition", "FlowExpressionActionDefinition", "FlowHumanFeedbackDefinition", "FlowMethodDefinition", @@ -148,10 +159,11 @@ class FlowHumanFeedbackDefinition(BaseModel): class FlowCodeActionDefinition(BaseModel): """A Flow method action that executes importable Python code.""" - model_config = ConfigDict(extra="forbid") + model_config = ConfigDict(populate_by_name=True, extra="forbid") call: TypingLiteral["code"] = "code" ref: str + with_: dict[str, Any] | None = Field(default=None, alias="with") class FlowToolActionDefinition(BaseModel): @@ -173,14 +185,66 @@ class FlowExpressionActionDefinition(BaseModel): expr: str -FlowActionDefinition = ( +FlowInnerActionDefinition = ( FlowCodeActionDefinition | FlowToolActionDefinition | FlowExpressionActionDefinition ) +class FlowEachInnerActionDefinition(RootModel[dict[str, FlowInnerActionDefinition]]): + """One named action inside an ``each`` composite action.""" + + @model_validator(mode="after") + def _validate_action_mapping(self) -> FlowEachInnerActionDefinition: + if len(self.root) != 1: + raise ValueError("each.do entries must be one-key mappings") + _validate_step_name(self.name, field="each.do action names") + return self + + @property + def name(self) -> str: + return next(iter(self.root)) + + @property + def action(self) -> FlowInnerActionDefinition: + return next(iter(self.root.values())) + + +class FlowEachActionDefinition(BaseModel): + """A composite action that runs a sequential mini-pipeline for each item.""" + + model_config = ConfigDict(populate_by_name=True, extra="forbid") + + call: TypingLiteral["each"] + in_: str = Field(alias="in") + do: list[FlowEachInnerActionDefinition] + + @model_validator(mode="after") + def _validate_inner_action_list(self) -> FlowEachActionDefinition: + if not self.do: + raise ValueError("each.do must contain at least one action") + + seen: set[str] = set() + for inner_action in self.do: + name = inner_action.name + if name in seen: + raise ValueError(f"each.do action names must be unique: {name!r}") + seen.add(name) + + return self + + +FlowActionDefinition = ( + FlowCodeActionDefinition + | FlowToolActionDefinition + | FlowExpressionActionDefinition + | FlowEachActionDefinition +) + + class FlowMethodDefinition(BaseModel): """Static definition of one Flow method and its execution roles.""" + description: str | None = None do: FlowActionDefinition start: bool | FlowDefinitionCondition | None = None listen: FlowDefinitionCondition | None = None @@ -227,6 +291,12 @@ class FlowDefinition(BaseModel): methods: dict[str, FlowMethodDefinition] = Field(default_factory=dict) diagnostics: list[FlowDefinitionDiagnostic] = Field(default_factory=list) + @model_validator(mode="after") + def _validate_method_names(self) -> FlowDefinition: + for method_name in self.methods: + _validate_step_name(method_name, field="Flow method names") + return self + def to_dict(self, *, exclude_none: bool = True) -> dict[str, Any]: """Serialize the definition to a JSON/YAML-ready dictionary.""" return self.model_dump(by_alias=True, exclude_none=exclude_none, mode="json") @@ -369,6 +439,11 @@ def _deserialize_diagnostics(value: Any) -> list[FlowDefinitionDiagnostic]: return [FlowDefinitionDiagnostic.model_validate(item) for item in value or []] +def _validate_step_name(name: str, *, field: str) -> None: + if not isinstance(name, str) or not _STEP_NAME_PATTERN.fullmatch(name): + raise ValueError(f"{field} must match {_STEP_NAME_PATTERN.pattern}") + + def _merge_diagnostics( *diagnostic_groups: list[FlowDefinitionDiagnostic], ) -> list[FlowDefinitionDiagnostic]: diff --git a/lib/crewai/src/crewai/flow/runtime/__init__.py b/lib/crewai/src/crewai/flow/runtime/__init__.py index 902387133..e91de05d3 100644 --- a/lib/crewai/src/crewai/flow/runtime/__init__.py +++ b/lib/crewai/src/crewai/flow/runtime/__init__.py @@ -121,11 +121,8 @@ from crewai.flow.human_feedback import ( ) from crewai.flow.input_provider import InputProvider from crewai.flow.persistence.base import FlowPersistence -from crewai.flow.runtime._resolvers import ( - resolve_action, - resolve_instance_ref, - resolve_ref, -) +from crewai.flow.runtime._actions import build_action +from crewai.flow.runtime._refs import resolve_instance_ref, resolve_ref from crewai.flow.types import ( FlowExecutionData, FlowMethodName, @@ -1092,9 +1089,9 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): self._methods.update(methods) def _action_bound_methods(self) -> dict[FlowMethodName, Callable[..., Any]]: - def resolve(name: str, definition: FlowMethodDefinition) -> Callable[..., Any]: + def build(name: str, definition: FlowMethodDefinition) -> Callable[..., Any]: try: - return resolve_action(self, definition.do) + return build_action(self, definition.do) except Exception as e: unresolved.append(f"{name}: {e}") return lambda *args, **kwargs: None @@ -1102,9 +1099,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): methods: dict[FlowMethodName, Callable[..., Any]] = {} unresolved: list[str] = [] for method_name, method_definition in self._definition.methods.items(): - methods[FlowMethodName(method_name)] = resolve( - method_name, method_definition - ) + methods[FlowMethodName(method_name)] = build(method_name, method_definition) if unresolved: raise ValueError( f"Cannot build flow {self._definition.name!r} from its definition; " diff --git a/lib/crewai/src/crewai/flow/runtime/_actions.py b/lib/crewai/src/crewai/flow/runtime/_actions.py new file mode 100644 index 000000000..480fbb982 --- /dev/null +++ b/lib/crewai/src/crewai/flow/runtime/_actions.py @@ -0,0 +1,221 @@ +"""Build FlowDefinition actions into live runtime callables.""" + +from __future__ import annotations + +import asyncio +from collections.abc import Callable +import contextvars +import inspect +from typing import TYPE_CHECKING, Any, Protocol, cast + +from crewai.flow.flow_definition import ( + FlowActionDefinition, + FlowCodeActionDefinition, + FlowEachActionDefinition, + FlowEachInnerActionDefinition, + FlowExpressionActionDefinition, + FlowToolActionDefinition, +) +from crewai.flow.runtime._expressions import evaluate_expression, render_with_block +from crewai.flow.runtime._refs import InvalidRefError, resolve_ref + + +if TYPE_CHECKING: + from crewai.flow.runtime import Flow + + +__all__ = ["build_action"] + +LocalContext = dict[str, Any] +_LOCAL_CONTEXT_KWARG = "__flow_definition_local_context" + + +class _BuiltAction(Protocol): + def run(self, *args: Any, **kwargs: Any) -> Any: ... + + +class _ActionType(Protocol): + definition_type: type[Any] + + def __call__(self, flow: Flow[Any], definition: Any) -> _BuiltAction: ... + + +class CodeAction: + definition_type = FlowCodeActionDefinition + + def __init__(self, flow: Flow[Any], definition: FlowCodeActionDefinition) -> None: + self.flow = flow + self.definition = definition + self.handler = self._resolve_handler() + self.signature = inspect.signature(self.handler) + + def run(self, *args: Any, **kwargs: Any) -> Any: + local_context = _pop_local_context(kwargs) + if self.definition.with_ is None: + return self.handler(*args, **kwargs) + return self.handler( + **render_with_block( + self.flow, self.definition.with_, local_context=local_context + ) + ) + + def _resolve_handler(self) -> Callable[..., Any]: + ref = self.definition.ref + target = resolve_ref(ref, field="do") + if not callable(target): + raise InvalidRefError(f"invalid do ref {ref!r}; object is not callable") + handler = cast(Callable[..., Any], target) + if getattr(handler, "__self__", None) is None and hasattr(handler, "__get__"): + handler = handler.__get__(self.flow, type(self.flow)) + return handler + + +class ToolAction: + definition_type = FlowToolActionDefinition + + def __init__(self, flow: Flow[Any], definition: FlowToolActionDefinition) -> None: + self.flow = flow + self.definition = definition + self.tool = self._build_tool() + self.kwargs = definition.with_ or {} + + def run(self, *_args: Any, **kwargs: Any) -> Any: + local_context = _pop_local_context(kwargs) + return self.tool.run( + **render_with_block(self.flow, self.kwargs, local_context=local_context) + ) + + def _build_tool(self) -> Any: + target = resolve_ref(self.definition.ref, field="do") + from crewai.tools import BaseTool + + if not (inspect.isclass(target) and issubclass(target, BaseTool)): + raise InvalidRefError( + f"invalid tool ref {self.definition.ref!r}; expected a BaseTool class" + ) + + try: + tool_cls = cast(Callable[[], BaseTool], target) + return tool_cls() + except Exception as e: + raise InvalidRefError( + f"cannot instantiate tool ref {self.definition.ref!r} " + f"without arguments: {e}" + ) from e + + +class ExpressionAction: + definition_type = FlowExpressionActionDefinition + + def __init__( + self, flow: Flow[Any], definition: FlowExpressionActionDefinition + ) -> None: + self.flow = flow + self.definition = definition + + def run(self, *_args: Any, **kwargs: Any) -> Any: + local_context = _pop_local_context(kwargs) + return evaluate_expression( + self.flow, self.definition.expr, local_context=local_context + ) + + +class EachAction: + definition_type = FlowEachActionDefinition + + def __init__(self, flow: Flow[Any], definition: FlowEachActionDefinition) -> None: + self.flow = flow + self.definition = definition + self.inner_actions = [ + (inner_action.name, self._build_inner_action(inner_action)) + for inner_action in definition.do + ] + + async def run(self, *_args: Any, **_kwargs: Any) -> list[Any]: + items = evaluate_expression(self.flow, self.definition.in_) + if not isinstance(items, list): + raise ValueError("each.in must evaluate to an array") + + results: list[Any] = [] + + for item in items: + local_outputs: dict[str, Any] = {} + last_output: Any = None + for name, run_inner_action in self.inner_actions: + last_output = await run_inner_action( + {"item": item, "outputs": local_outputs} + ) + local_outputs[name] = last_output + results.append(last_output) + + return results + + def _build_inner_action( + self, inner_action: FlowEachInnerActionDefinition + ) -> Callable[[LocalContext], Any]: + run_action = build_action(self.flow, inner_action.action) + + async def run_inner_action(local_context: LocalContext) -> Any: + kwargs = {_LOCAL_CONTEXT_KWARG: local_context} + if inspect.iscoroutinefunction(run_action): + result = run_action(**kwargs) + else: + ctx = contextvars.copy_context() + + def run_with_context() -> Any: + return run_action(**kwargs) + + result = await asyncio.to_thread(ctx.run, run_with_context) + if inspect.isawaitable(result): + result = await result + return result + + return run_inner_action + + +_ACTION_TYPES: tuple[_ActionType, ...] = ( + EachAction, + CodeAction, + ToolAction, + ExpressionAction, +) + + +def build_action( + flow: Flow[Any], definition: FlowActionDefinition +) -> Callable[..., Any]: + """Turn one `do:` action into the callable the flow runs for that node.""" + for action_type in _ACTION_TYPES: + if isinstance(definition, action_type.definition_type): + return _as_flow_method(action_type(flow, definition)) + raise ValueError(f"unknown call type {getattr(definition, 'call', None)!r}") + + +def _as_flow_method(action: _BuiltAction) -> Callable[..., Any]: + run: Callable[..., Any] + if inspect.iscoroutinefunction(action.run): + + async def run_async(*args: Any, **kwargs: Any) -> Any: + return await action.run(*args, **kwargs) + + run = run_async + else: + + def run_sync(*args: Any, **kwargs: Any) -> Any: + return action.run(*args, **kwargs) + + run = run_sync + + signature = getattr(action, "signature", None) + if signature is not None: + object.__setattr__(run, "__signature__", signature) + return run + + +def _pop_local_context(kwargs: dict[str, Any]) -> LocalContext | None: + local_context = kwargs.pop(_LOCAL_CONTEXT_KWARG, None) + if local_context is None: + return None + if not isinstance(local_context, dict): + raise TypeError("flow definition local context must be a mapping") + return cast(LocalContext, local_context) diff --git a/lib/crewai/src/crewai/flow/runtime/_expressions.py b/lib/crewai/src/crewai/flow/runtime/_expressions.py index 33c852f60..b015a9608 100644 --- a/lib/crewai/src/crewai/flow/runtime/_expressions.py +++ b/lib/crewai/src/crewai/flow/runtime/_expressions.py @@ -2,14 +2,12 @@ from __future__ import annotations -import copy -import dataclasses from itertools import pairwise import json import re from typing import TYPE_CHECKING, Any, cast -from pydantic import BaseModel +from crewai.utilities.serialization import to_serializable if TYPE_CHECKING: @@ -25,25 +23,45 @@ class FlowExpressionError(ValueError): """A FlowDefinition expression failed to parse or evaluate.""" -def render_with_block(flow: Flow[Any], value: Any) -> Any: +def render_with_block( + flow: Flow[Any], value: Any, local_context: dict[str, Any] | None = None +) -> Any: """Render CEL expressions inside a FlowDefinition ``with:`` payload.""" - context = _expression_context(flow) + context = _expression_context(flow, local_context=local_context) return _render_value(value, context) -def evaluate_expression(flow: Flow[Any], expression: str) -> Any: +def evaluate_expression( + flow: Flow[Any], expression: str, local_context: dict[str, Any] | None = None +) -> Any: """Evaluate a FlowDefinition CEL expression against runtime context.""" expression = expression.strip() if not expression: raise FlowExpressionError("empty CEL expression") - return _eval_cel(expression, _expression_context(flow)) + return _eval_cel(expression, _expression_context(flow, local_context=local_context)) -def _expression_context(flow: Flow[Any]) -> dict[str, Any]: - return { +def _expression_context( + flow: Flow[Any], local_context: dict[str, Any] | None = None +) -> dict[str, Any]: + outputs = _outputs_by_name(flow._method_outputs) + context: dict[str, Any] = { "state": flow._copy_and_serialize_state(), - "outputs": _outputs_by_name(flow._method_outputs), + "outputs": outputs, } + if local_context: + local_values = { + key: to_serializable(value, max_depth=0) + for key, value in local_context.items() + } + local_outputs = local_values.pop("outputs", None) + local_values.pop("state", None) + context.update(local_values) + if local_outputs is not None: + if not isinstance(local_outputs, dict): + raise TypeError("flow definition local outputs must be a mapping") + context["outputs"] = {**outputs, **local_outputs} + return context def _outputs_by_name(method_outputs: list[Any]) -> dict[str, Any]: @@ -54,12 +72,7 @@ def _outputs_by_name(method_outputs: list[Any]) -> dict[str, Any]: if isinstance(entry, dict) and "output" in entry: method = str(entry.get("method", "")) output = entry["output"] - output = copy.deepcopy(output) - if isinstance(output, BaseModel): - output = output.model_dump(mode="json") - elif dataclasses.is_dataclass(output) and not isinstance(output, type): - output = dataclasses.asdict(output) - outputs[method] = output + outputs[method] = to_serializable(output, max_depth=0) return outputs diff --git a/lib/crewai/src/crewai/flow/runtime/_refs.py b/lib/crewai/src/crewai/flow/runtime/_refs.py new file mode 100644 index 000000000..23ddafadb --- /dev/null +++ b/lib/crewai/src/crewai/flow/runtime/_refs.py @@ -0,0 +1,38 @@ +"""Resolution of ``module:qualname`` refs into live Python objects.""" + +from __future__ import annotations + +import importlib +import inspect +from operator import attrgetter +from typing import Any + + +class InvalidRefError(ValueError): + """A definition ref that cannot be resolved to a live object.""" + + +def resolve_ref(ref: str, *, field: str) -> Any: + """Import the object a definition's `module:qualname` ref points to.""" + module_name, _, qualname = ref.partition(":") + if "<" in ref or not module_name or not qualname: + raise InvalidRefError( + f"invalid {field} ref {ref!r}; expected 'module:qualname'" + ) + try: + return attrgetter(qualname)(importlib.import_module(module_name)) + except (ImportError, AttributeError) as e: + raise InvalidRefError(f"unresolvable {field} ref {ref!r}") from e + + +def resolve_instance_ref(ref: str, *, field: str) -> Any: + """Resolve a ref, auto-instantiating a no-arg class into an instance.""" + target = resolve_ref(ref, field=field) + if not inspect.isclass(target): + return target + try: + return target() + except Exception as e: + raise InvalidRefError( + f"cannot instantiate {field} ref {ref!r} without arguments: {e}" + ) from e diff --git a/lib/crewai/src/crewai/flow/runtime/_resolvers.py b/lib/crewai/src/crewai/flow/runtime/_resolvers.py deleted file mode 100644 index dea531dfb..000000000 --- a/lib/crewai/src/crewai/flow/runtime/_resolvers.py +++ /dev/null @@ -1,116 +0,0 @@ -"""Resolution of FlowDefinition refs (``module:qualname``) into live objects. - -Every ref-shaped value in a definition — ``do`` actions, ``state.ref``, -``config.input_provider``, ``human_feedback.provider`` — resolves through -:func:`resolve_ref`. Failures are loud and name the field and the ref. -""" - -from __future__ import annotations - -from collections.abc import Callable -import importlib -import inspect -from operator import attrgetter -from typing import TYPE_CHECKING, Any, cast - -from crewai.flow.flow_definition import ( - FlowActionDefinition, - FlowCodeActionDefinition, - FlowExpressionActionDefinition, - FlowToolActionDefinition, -) -from crewai.flow.runtime._expressions import evaluate_expression, render_with_block - - -if TYPE_CHECKING: - from crewai.flow.runtime import Flow - - -class InvalidRefError(ValueError): - """A definition ref that cannot be resolved to a live object.""" - - -def resolve_ref(ref: str, *, field: str) -> Any: - """Import the object a definition's `module:qualname` ref points to.""" - module_name, _, qualname = ref.partition(":") - if "<" in ref or not module_name or not qualname: - raise InvalidRefError( - f"invalid {field} ref {ref!r}; expected 'module:qualname'" - ) - try: - return attrgetter(qualname)(importlib.import_module(module_name)) - except (ImportError, AttributeError) as e: - raise InvalidRefError(f"unresolvable {field} ref {ref!r}") from e - - -def resolve_instance_ref(ref: str, *, field: str) -> Any: - """Resolve a ref, auto-instantiating a no-arg class into an instance.""" - target = resolve_ref(ref, field=field) - if not inspect.isclass(target): - return target - try: - return target() - except Exception as e: - raise InvalidRefError( - f"cannot instantiate {field} ref {ref!r} without arguments: {e}" - ) from e - - -def _resolve_code_action( - flow: Flow[Any], action: FlowCodeActionDefinition -) -> Callable[..., Any]: - ref = action.ref - target = resolve_ref(ref, field="do") - if not callable(target): - raise InvalidRefError(f"invalid do ref {ref!r}; object is not callable") - handler = cast(Callable[..., Any], target) - if getattr(handler, "__self__", None) is None: - handler = handler.__get__(flow, type(flow)) - return handler - - -def _resolve_tool_action( - flow: Flow[Any], action: FlowToolActionDefinition -) -> Callable[..., Any]: - target = resolve_ref(action.ref, field="do") - from crewai.tools import BaseTool - - if not (inspect.isclass(target) and issubclass(target, BaseTool)): - raise InvalidRefError( - f"invalid tool ref {action.ref!r}; expected a BaseTool class" - ) - - try: - tool_cls = cast(Callable[[], BaseTool], target) - tool = tool_cls() - except Exception as e: - raise InvalidRefError( - f"cannot instantiate tool ref {action.ref!r} without arguments: {e}" - ) from e - - tool_kwargs = action.with_ or {} - - def run_tool(*_args: Any, **_kwargs: Any) -> Any: - return tool.run(**render_with_block(flow, tool_kwargs)) - - return run_tool - - -def _resolve_expression_action( - flow: Flow[Any], action: FlowExpressionActionDefinition -) -> Callable[..., Any]: - def run_expression(*_args: Any, **_kwargs: Any) -> Any: - return evaluate_expression(flow, action.expr) - - return run_expression - - -def resolve_action(flow: Flow[Any], action: FlowActionDefinition) -> Callable[..., Any]: - """Turn one `do:` action into the callable the flow runs for that node.""" - if action.call == "code": - return _resolve_code_action(flow, action) - if action.call == "tool": - return _resolve_tool_action(flow, action) - if action.call == "expression": - return _resolve_expression_action(flow, action) - raise ValueError(f"unknown call type {action.call!r}") diff --git a/lib/crewai/src/crewai/utilities/serialization.py b/lib/crewai/src/crewai/utilities/serialization.py index 0207e80ab..1b9e76588 100644 --- a/lib/crewai/src/crewai/utilities/serialization.py +++ b/lib/crewai/src/crewai/utilities/serialization.py @@ -1,5 +1,6 @@ from __future__ import annotations +import dataclasses from datetime import date, datetime import json from typing import Any, TypeAlias @@ -23,21 +24,23 @@ def to_serializable( ) -> Serializable: """Converts a Python object into a JSON-compatible representation. - Supports primitives, datetime objects, collections, dictionaries, and - Pydantic models. Recursion depth is limited to prevent infinite nesting. + Supports primitives, datetime objects, collections, dictionaries, + dataclasses, and Pydantic models. Recursion depth is limited to prevent + infinite nesting. Non-convertible objects default to their string representations. Args: obj: Object to transform. exclude: Set of keys to exclude from the result. - max_depth: Maximum recursion depth. Defaults to 5. + max_depth: Maximum recursion depth. Defaults to 5. Values less than or + equal to 0 disable the depth limit. _current_depth: Current recursion depth (for internal use). _ancestors: Set of ancestor object ids for cycle detection (for internal use). Returns: Serializable: A JSON-compatible structure. """ - if _current_depth >= max_depth: + if max_depth > 0 and _current_depth >= max_depth: return repr(obj) if exclude is None: @@ -58,6 +61,18 @@ def to_serializable( return f"" new_ancestors = _ancestors | {object_id} + if dataclasses.is_dataclass(obj) and not isinstance(obj, type): + return { + field.name: to_serializable( + obj=getattr(obj, field.name), + exclude=exclude, + max_depth=max_depth, + _current_depth=_current_depth + 1, + _ancestors=new_ancestors, + ) + for field in dataclasses.fields(obj) + if field.name not in exclude + } if isinstance(obj, (list, tuple, set)): return [ to_serializable( @@ -84,7 +99,7 @@ def to_serializable( if isinstance(obj, BaseModel): try: return to_serializable( - obj=obj.model_dump(exclude=exclude), + obj=obj.model_dump(mode="json", exclude=exclude), max_depth=max_depth, _current_depth=_current_depth + 1, _ancestors=new_ancestors, diff --git a/lib/crewai/tests/test_flow_definition.py b/lib/crewai/tests/test_flow_definition.py index f0a2e62c3..e2b3a7ad4 100644 --- a/lib/crewai/tests/test_flow_definition.py +++ b/lib/crewai/tests/test_flow_definition.py @@ -44,6 +44,8 @@ def test_flow_public_exports_are_explicit(): "FlowDefinition", "FlowDefinitionCondition", "FlowDefinitionDiagnostic", + "FlowEachActionDefinition", + "FlowEachInnerActionDefinition", "FlowExpressionActionDefinition", "FlowHumanFeedbackDefinition", "FlowMethodDefinition", @@ -432,6 +434,73 @@ def test_flow_definition_round_trips_json_and_yaml(): assert yaml_round_trip.methods["decide"].listen == "begin" +def test_each_action_round_trips_json_and_yaml(): + definition = flow_definition.FlowDefinition.from_dict( + { + "schema": "crewai.flow/v1", + "name": "EachFlow", + "methods": { + "process_rows": { + "description": "Process every loaded row.", + "start": True, + "do": { + "call": "each", + "in": "state.rows", + "do": [ + { + "normalize": { + "call": "tool", + "ref": "my_tools:NormalizeRowTool", + "with": {"row": "${ item }"}, + } + }, + { + "save": { + "call": "code", + "ref": "my_flow:save_row", + "with": { + "row": "${ item }", + "normalized": "${ outputs.normalize }", + }, + } + }, + ], + }, + } + }, + } + ) + + json_round_trip = flow_definition.FlowDefinition.from_json(definition.to_json()) + yaml_round_trip = flow_definition.FlowDefinition.from_yaml(definition.to_yaml()) + + assert json_round_trip.to_dict() == definition.to_dict() + assert yaml_round_trip.to_dict() == definition.to_dict() + assert yaml_round_trip.methods["process_rows"].description == ( + "Process every loaded row." + ) + assert yaml_round_trip.methods["process_rows"].do.call == "each" + + +def test_flow_definition_rejects_invalid_method_names(): + with pytest.raises(ValueError, match="Flow method names must match"): + flow_definition.FlowDefinition.from_dict( + { + "schema": "crewai.flow/v1", + "name": "InvalidMethodNameFlow", + "methods": { + "process-rows": { + "start": True, + "do": { + "call": "expression", + "expr": "'done'", + }, + } + }, + } + ) + + def test_flow_definition_detects_persist_metadata(): @persist(verbose=True) class PersistedFlow(Flow[dict]): diff --git a/lib/crewai/tests/test_flow_from_definition.py b/lib/crewai/tests/test_flow_from_definition.py index 6d241c919..aac114c4d 100644 --- a/lib/crewai/tests/test_flow_from_definition.py +++ b/lib/crewai/tests/test_flow_from_definition.py @@ -1,12 +1,15 @@ from __future__ import annotations +import asyncio from collections import defaultdict +from dataclasses import dataclass from pathlib import Path +import threading from typing import Any, ClassVar from unittest.mock import patch import pytest -from pydantic import ValidationError +from pydantic import BaseModel, ValidationError from crewai.events.event_bus import crewai_event_bus from crewai.events.types.flow_events import ( @@ -44,6 +47,26 @@ class TypedInputsTool(BaseTool): return f"{count}:{','.join(include_domains)}" +class AsyncResultTool(BaseTool): + name: str = "AsyncResultTool" + description: str = "Returns an async result from its sync entrypoint." + + def _run(self, value: str) -> Any: + async def build_result() -> str: + await asyncio.sleep(0) + return f"async:{value}" + + return build_result() + + +class CallableCodeAction: + def __call__(self, value: str) -> str: + return f"callable:{value}" + + +CALLABLE_CODE_ACTION = CallableCodeAction() + + class ChainFlow(Flow): @start() def begin(self): @@ -67,6 +90,41 @@ class ToolInputFlow(Flow): return {"query": "ai agents", "suffix": " news"} +class EachActionFlow(Flow): + inner_thread_id: int | None = None + + def normalize_row(self, row: str, prefix: str = "normalized") -> str: + return f"{prefix}:{row}" + + def save_row(self, row: str, normalized: str) -> dict[str, str]: + return {"row": row, "normalized": normalized} + + def keyword_code(self, name: str, punctuation: str) -> str: + return f"{name}{punctuation}" + + def fail_on_bad_row(self, row: str) -> str: + if row == "bad": + raise RuntimeError("bad row") + return row + + def require_threaded_context(self, row: str) -> str: + try: + asyncio.get_running_loop() + except RuntimeError: + pass + else: + raise RuntimeError("inner action ran on the event loop") + + from crewai.flow.flow_context import current_flow_method_name + + self.inner_thread_id = threading.get_ident() + return f"{current_flow_method_name.get()}:{row}" + + def after_each(self) -> str: + self.state["after_count"] = self.state.get("after_count", 0) + 1 + return f"after:{self.state['after_count']}" + + CHAIN_YAML = f""" schema: crewai.flow/v1 name: ChainFlow @@ -727,6 +785,381 @@ methods: flow.kickoff() +def test_code_action_renders_keyword_inputs(): + yaml_str = f""" +schema: crewai.flow/v1 +name: CodeWithFlow +methods: + greet: + do: + call: code + ref: {__name__}:EachActionFlow.keyword_code + with: + name: "${{state.name}}" + punctuation: "!" + start: true +""" + + flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + + assert flow.kickoff(inputs={"name": "hello"}) == "hello!" + + +def test_code_action_supports_callable_instance_refs(): + yaml_str = f""" +schema: crewai.flow/v1 +name: CallableInstanceFlow +methods: + call_instance: + do: + call: code + ref: {__name__}:CALLABLE_CODE_ACTION + with: + value: "${{state.value}}" + start: true +""" + + flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + + assert flow.kickoff(inputs={"value": "ok"}) == "callable:ok" + + +def test_each_action_executes_one_nested_code_action(): + yaml_str = f""" +schema: crewai.flow/v1 +name: EachFlow +methods: + process_rows: + do: + call: each + in: state.rows + do: + - normalize: + call: code + ref: {__name__}:EachActionFlow.normalize_row + with: + row: "${{item}}" + start: true +""" + + flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + + assert flow.kickoff(inputs={"rows": ["a", "b"]}) == [ + "normalized:a", + "normalized:b", + ] + + +def test_each_action_runs_sync_inner_actions_off_event_loop_with_context(): + yaml_str = f""" +schema: crewai.flow/v1 +name: EachFlow +methods: + process_rows: + do: + call: each + in: state.rows + do: + - threaded: + call: code + ref: {__name__}:EachActionFlow.require_threaded_context + with: + row: "${{item}}" + start: true +""" + + flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + caller_thread_id = threading.get_ident() + + assert flow.kickoff(inputs={"rows": ["a"]}) == ["process_rows:a"] + assert flow.inner_thread_id is not None + assert flow.inner_thread_id != caller_thread_id + + +def test_each_action_runs_async_tool_results_from_sync_inner_actions(): + yaml_str = f""" +schema: crewai.flow/v1 +name: EachFlow +methods: + process_rows: + do: + call: each + in: state.rows + do: + - async_tool: + call: tool + ref: {__name__}:AsyncResultTool + with: + value: "${{item}}" + start: true +""" + + flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + + assert flow.kickoff(inputs={"rows": ["a", "b"]}) == ["async:a", "async:b"] + + +def test_each_action_uses_iteration_outputs_between_nested_actions(): + yaml_str = f""" +schema: crewai.flow/v1 +name: EachFlow +methods: + process_rows: + do: + call: each + in: state.rows + do: + - normalize: + call: code + ref: {__name__}:EachActionFlow.normalize_row + with: + row: "${{item}}" + prefix: saved + - save: + call: code + ref: {__name__}:EachActionFlow.save_row + with: + row: "${{item}}" + normalized: "${{outputs.normalize}}" + start: true +""" + + flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + + assert flow.kickoff(inputs={"rows": ["a", "b"]}) == [ + {"row": "a", "normalized": "saved:a"}, + {"row": "b", "normalized": "saved:b"}, + ] + + +def test_each_action_resets_inner_outputs_between_iterations(): + yaml_str = """ +schema: crewai.flow/v1 +name: EachFlow +methods: + process_rows: + do: + call: each + in: state.rows + do: + - leak_check: + call: expression + expr: "has(outputs.previous) ? outputs.previous : 'empty'" + - previous: + call: expression + expr: item + start: true +""" + + flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + + assert flow.kickoff(inputs={"rows": ["a", "b"]}) == ["a", "b"] + assert flow._method_outputs == [ + {"method": "process_rows", "output": ["a", "b"]} + ] + + +def test_each_action_preserves_flow_outputs_and_prefers_inner_outputs(): + yaml_str = """ +schema: crewai.flow/v1 +name: EachFlow +methods: + seed: + do: + call: expression + expr: "'global'" + start: true + process_rows: + do: + call: each + in: state.rows + do: + - before_shadow: + call: expression + expr: "outputs.seed + ':' + item" + - seed: + call: expression + expr: "'local:' + item" + - after_shadow: + call: expression + expr: "outputs.seed" + listen: seed +""" + + flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + + assert flow.kickoff(inputs={"rows": ["a", "b"]}) == [ + "local:a", + "local:b", + ] + assert flow._method_outputs == [ + {"method": "seed", "output": "global"}, + {"method": "process_rows", "output": ["local:a", "local:b"]}, + ] + + +def test_each_action_empty_list_returns_empty_and_listener_runs_once(): + yaml_str = f""" +schema: crewai.flow/v1 +name: EachFlow +methods: + process_rows: + do: + call: each + in: state.rows + do: + - normalize: + call: code + ref: {__name__}:EachActionFlow.normalize_row + with: + row: "${{item}}" + start: true + after_each: + do: + call: code + ref: {__name__}:EachActionFlow.after_each + listen: process_rows +""" + + flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + events = [] + with crewai_event_bus.scoped_handlers(): + + @crewai_event_bus.on(MethodExecutionFinishedEvent) + def on_finished(source, event): + events.append(event.method_name) + + result = flow.kickoff(inputs={"rows": []}) + + assert result == "after:1" + assert flow.method_outputs == [[], "after:1"] + assert flow.state["after_count"] == 1 + assert events.count("process_rows") == 1 + assert events.count("after_each") == 1 + + +@pytest.mark.parametrize( + ("expr", "inputs"), + [ + ("1", {}), + ('"rows"', {}), + ("state.rows", {"rows": {"a": 1}}), + ], +) +def test_each_action_rejects_non_list_inputs(expr, inputs): + definition = FlowDefinition.from_dict( + { + "schema": "crewai.flow/v1", + "name": "EachFlow", + "methods": { + "process_rows": { + "start": True, + "do": { + "call": "each", + "in": expr, + "do": [{"value": {"call": "expression", "expr": "item"}}], + }, + } + }, + } + ) + flow = Flow.from_definition(definition) + + with pytest.raises(ValueError, match="each.in must evaluate to an array"): + flow.kickoff(inputs=inputs) + + +@pytest.mark.parametrize( + "action_do", + [ + [], + [{"first": {"call": "expression", "expr": "item"}, "second": {"call": "expression", "expr": "item"}}], + [{"1bad": {"call": "expression", "expr": "item"}}], + [ + {"same": {"call": "expression", "expr": "item"}}, + {"same": {"call": "expression", "expr": "item"}}, + ], + ], +) +def test_each_action_validates_inner_action_shape(action_do): + with pytest.raises(ValidationError): + FlowDefinition.from_dict( + { + "schema": "crewai.flow/v1", + "name": "EachFlow", + "methods": { + "process_rows": { + "start": True, + "do": { + "call": "each", + "in": "state.rows", + "do": action_do, + }, + } + }, + } + ) + + +def test_each_action_rejects_nested_each_actions(): + with pytest.raises(ValidationError): + FlowDefinition.from_dict( + { + "schema": "crewai.flow/v1", + "name": "EachFlow", + "methods": { + "process_rows": { + "start": True, + "do": { + "call": "each", + "in": "state.rows", + "do": [ + { + "nested": { + "call": "each", + "in": "state.children", + "do": [ + { + "child": { + "call": "expression", + "expr": "item", + } + } + ], + } + } + ], + }, + } + }, + } + ) + + +def test_each_action_failure_fails_outer_method(): + yaml_str = f""" +schema: crewai.flow/v1 +name: EachFlow +methods: + process_rows: + do: + call: each + in: state.rows + do: + - validate: + call: code + ref: {__name__}:EachActionFlow.fail_on_bad_row + with: + row: "${{item}}" + start: true +""" + + flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + + with pytest.raises(RuntimeError, match="bad row"): + flow.kickoff(inputs={"rows": ["ok", "bad"]}) + + def test_expression_action_round_trips(): definition = FlowDefinition.from_dict( { @@ -751,6 +1184,26 @@ def test_expression_action_round_trips(): assert Flow.from_definition(definition).kickoff(inputs={"score": 90}) == "qualified" +def test_expression_local_context_recurses_into_dataclass_values(): + from crewai.flow.runtime._expressions import evaluate_expression + + class Payload(BaseModel): + name: str + + @dataclass + class Row: + payload: Payload + + assert ( + evaluate_expression( + Flow(), + "item.payload.name", + local_context={"item": Row(payload=Payload(name="qualified"))}, + ) + == "qualified" + ) + + def test_expression_action_can_route_like_if_else(): yaml_str = f""" schema: crewai.flow/v1 @@ -830,26 +1283,6 @@ def test_tool_action_requires_module_qualname_ref(): Flow.from_definition(definition) -def test_code_action_rejects_tool_inputs(): - with pytest.raises(ValidationError): - FlowDefinition.from_dict( - { - "schema": "crewai.flow/v1", - "name": "InvalidCodeActionFlow", - "methods": { - "begin": { - "start": True, - "do": { - "call": "code", - "ref": f"{__name__}:ChainFlow.begin", - "with": {"search_query": "ai agents"}, - }, - } - }, - } - ) - - def test_pydantic_state_from_ref_parity(): flow, result = assert_parity(PydanticStateFlow, PYDANTIC_STATE_YAML) assert result == "count=1" diff --git a/lib/crewai/tests/utilities/test_serialization.py b/lib/crewai/tests/utilities/test_serialization.py index e786554cb..8ec68ead8 100644 --- a/lib/crewai/tests/utilities/test_serialization.py +++ b/lib/crewai/tests/utilities/test_serialization.py @@ -1,5 +1,6 @@ +from dataclasses import dataclass from datetime import date, datetime -from typing import List +from typing import Any, List import pytest from crewai.utilities.serialization import to_serializable, to_string @@ -20,6 +21,13 @@ class Person(BaseModel): skills: List[str] +@dataclass +class DataclassPerson: + name: str + address: Address + skills: tuple[str, ...] + + @pytest.mark.parametrize( "test_input,expected", [ @@ -106,6 +114,24 @@ def test_pydantic_model_serialization(): ) +def test_dataclass_serialization_recurses_into_nested_values(): + person = DataclassPerson( + name="Ada", + address=Address(street="1 Loop", city="Compute", country="Pythonia"), + skills=("Python", "Math"), + ) + + assert to_serializable(person) == { + "name": "Ada", + "address": { + "street": "1 Loop", + "city": "Compute", + "country": "Pythonia", + }, + "skills": ["Python", "Math"], + } + + def test_depth_limit(): """Test max depth handling with a deeply nested structure""" @@ -130,6 +156,27 @@ def test_depth_limit(): } +@pytest.mark.parametrize("max_depth", [0, -1]) +def test_non_positive_max_depth_disables_depth_limit(max_depth): + def create_nested(depth): + if depth == 0: + return "value" + return {"next": create_nested(depth - 1)} + + assert to_serializable(create_nested(10), max_depth=max_depth) == create_nested(10) + + +def test_unlimited_depth_still_detects_dataclass_cycles(): + @dataclass + class Node: + child: Any = None + + node = Node() + node.child = node + + assert to_serializable(node, max_depth=0) == {"child": ""} + + def test_exclude_keys(): result = to_serializable({"key1": "value1", "key2": "value2"}, exclude={"key1"}) assert result == {"key2": "value2"}