From b5e23a87f24798c1d60e14c78ea5ce95e9fbdd7e Mon Sep 17 00:00:00 2001 From: Vinicius Brasil Date: Thu, 18 Jun 2026 10:33:13 -0700 Subject: [PATCH] Add optional if expression to each.do steps (#6214) * Use explicit name/action shape for each.do steps * Add optional `if` expression to `each.do` steps Lets a step inside an `each` action run conditionally based on a CEL expression evaluated against `item` and prior step `outputs`. --- lib/crewai/src/crewai/flow/flow_definition.py | 94 ++++---- .../src/crewai/flow/runtime/_actions.py | 40 ++-- lib/crewai/tests/test_flow_definition.py | 22 +- lib/crewai/tests/test_flow_from_definition.py | 206 +++++++++++++++--- 4 files changed, 276 insertions(+), 86 deletions(-) diff --git a/lib/crewai/src/crewai/flow/flow_definition.py b/lib/crewai/src/crewai/flow/flow_definition.py index 230dfe058..85f20239b 100644 --- a/lib/crewai/src/crewai/flow/flow_definition.py +++ b/lib/crewai/src/crewai/flow/flow_definition.py @@ -18,7 +18,6 @@ from pydantic import ( BaseModel, ConfigDict, Field, - RootModel, field_serializer, model_validator, ) @@ -38,6 +37,7 @@ _STEP_NAME_PATTERN = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$") __all__ = [ "FlowActionDefinition", + "FlowAtomicActionDefinition", "FlowCodeActionDefinition", "FlowConfigDefinition", "FlowConversationalDefinition", @@ -47,7 +47,7 @@ __all__ = [ "FlowDefinitionCondition", "FlowDictStateDefinition", "FlowEachActionDefinition", - "FlowEachInnerActionDefinition", + "FlowEachStepDefinition", "FlowExpressionActionDefinition", "FlowHumanFeedbackDefinition", "FlowJsonSchemaStateDefinition", @@ -466,38 +466,48 @@ class FlowScriptActionDefinition(BaseModel): ) -FlowInnerActionDefinition = ( +FlowAtomicActionDefinition: TypeAlias = Annotated[ FlowCodeActionDefinition | FlowToolActionDefinition | FlowCrewActionDefinition | FlowExpressionActionDefinition - | FlowScriptActionDefinition -) + | FlowScriptActionDefinition, + Field(discriminator="call"), +] -class FlowEachInnerActionDefinition(RootModel[dict[str, FlowInnerActionDefinition]]): - """One named action inside an ``each`` composite action.""" +class FlowEachStepDefinition(BaseModel): + """One named step inside an ``each`` composite action.""" - root: dict[str, FlowInnerActionDefinition] = Field( - description="Single-entry mapping from an inner action name to its action.", - examples=[{"clean": {"call": "script", "code": "return item.strip()"}}], + model_config = ConfigDict( + populate_by_name=True, + extra="forbid", + ) + + name: str = Field( + description="Step name used to reference this step's output.", + examples=["clean"], + ) + if_: str | None = Field( + default=None, + alias="if", + description=( + "Optional CEL expression evaluated against state, outputs, and local " + "context. When present, the step runs only if the expression evaluates " + "to true." + ), + examples=["item.kind == 'invoice'"], + ) + action: FlowAtomicActionDefinition = Field( + description="Atomic action to run for this step.", + examples=[{"call": "script", "code": "return item.strip()"}], ) @model_validator(mode="after") - def _validate_action_mapping(self) -> FlowEachInnerActionDefinition: - if len(self.root) != 1: - raise ValueError("each.do entries must be one-key mappings") - _validate_step_name(self.name, field="each.do action names") + def _validate_step_name(self) -> FlowEachStepDefinition: + _validate_step_name(self.name, field="each.do step names") return self - @property - def name(self) -> str: - return next(iter(self.root)) - - @property - def action(self) -> FlowInnerActionDefinition: - return next(iter(self.root.values())) - class FlowEachActionDefinition(BaseModel): """A composite action that runs a sequential mini-pipeline for each item.""" @@ -519,35 +529,36 @@ class FlowEachActionDefinition(BaseModel): description="CEL expression that must evaluate to the list to iterate.", examples=["state.rows"], ) - do: list[FlowEachInnerActionDefinition] = Field( + do: list[FlowEachStepDefinition] = Field( description=( - "Ordered inner actions to run for each item. Each entry must be a " - "single-key mapping naming that inner action." + "Ordered steps to run for each item. Each step has a name, optional " + "if expression, and atomic action." ), examples=[ [ - {"clean": {"call": "script", "code": "return item.strip()"}}, - {"tag": {"call": "expression", "expr": "outputs.clean"}}, + { + "name": "clean", + "action": {"call": "script", "code": "return item.strip()"}, + }, + { + "name": "tag", + "if": "outputs.clean != ''", + "action": {"call": "expression", "expr": "outputs.clean"}, + }, ] ], ) @model_validator(mode="after") - def _validate_inner_action_list(self) -> FlowEachActionDefinition: + def _validate_step_list(self) -> FlowEachActionDefinition: if not self.do: - raise ValueError("each.do must contain at least one action") - - seen: set[str] = set() - for inner_action in self.do: - name = inner_action.name - if name in seen: - raise ValueError(f"each.do action names must be unique: {name!r}") - seen.add(name) + raise ValueError("each.do must contain at least one step") + _validate_step_list(self.do, field="each.do") return self -FlowActionDefinition = ( +FlowActionDefinition: TypeAlias = ( FlowCodeActionDefinition | FlowToolActionDefinition | FlowCrewActionDefinition @@ -733,6 +744,15 @@ def _validate_step_name(name: str, *, field: str) -> None: raise ValueError(f"{field} must match {_STEP_NAME_PATTERN.pattern}") +def _validate_step_list(steps: list[FlowEachStepDefinition], *, field: str) -> None: + seen: set[str] = set() + for step in steps: + name = step.name + if name in seen: + raise ValueError(f"{field} step names must be unique: {name!r}") + seen.add(name) + + def log_flow_definition_issues(definition: FlowDefinition) -> None: for method_name, method in definition.methods.items(): path = f"methods.{method_name}" diff --git a/lib/crewai/src/crewai/flow/runtime/_actions.py b/lib/crewai/src/crewai/flow/runtime/_actions.py index 180bc3c4b..65fa50588 100644 --- a/lib/crewai/src/crewai/flow/runtime/_actions.py +++ b/lib/crewai/src/crewai/flow/runtime/_actions.py @@ -4,7 +4,7 @@ from __future__ import annotations import ast import asyncio -from collections.abc import Callable +from collections.abc import Awaitable, Callable import contextvars import inspect import os @@ -15,7 +15,7 @@ from crewai.flow.flow_definition import ( FlowCodeActionDefinition, FlowCrewActionDefinition, FlowEachActionDefinition, - FlowEachInnerActionDefinition, + FlowEachStepDefinition, FlowExpressionActionDefinition, FlowScriptActionDefinition, FlowToolActionDefinition, @@ -32,6 +32,8 @@ if TYPE_CHECKING: __all__ = ["FlowScriptExecutionDisabledError", "build_action"] LocalContext = dict[str, Any] +NestedStepRunner = Callable[[LocalContext], Awaitable[Any]] +NestedStep = tuple[str, str | None, NestedStepRunner] _LOCAL_CONTEXT_KWARG = "__flow_definition_local_context" _ALLOW_SCRIPT_EXECUTION_ENV_VAR = "CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION" _TRUSTED_SCRIPT_EXECUTION_VALUES = frozenset({"1", "true", "yes"}) @@ -217,9 +219,9 @@ class EachAction: def __init__(self, flow: Flow[Any], definition: FlowEachActionDefinition) -> None: self.flow = flow self.definition = definition - self.inner_actions = [ - (inner_action.name, self._build_inner_action(inner_action)) - for inner_action in definition.do + self.steps: list[NestedStep] = [ + (step.name, step.if_, self._build_step_action(step)) + for step in definition.do ] async def run(self, *_args: Any, **_kwargs: Any) -> list[Any]: @@ -231,22 +233,30 @@ class EachAction: for item in items: local_outputs: dict[str, Any] = {} + local_context = {"item": item, "outputs": local_outputs} last_output: Any = None - for name, run_inner_action in self.inner_actions: - last_output = await run_inner_action( - {"item": item, "outputs": local_outputs} - ) + for name, condition, run_step_action in self.steps: + if condition is not None and not self._condition_matches( + condition, local_context + ): + continue + + last_output = await run_step_action(local_context) local_outputs[name] = last_output results.append(last_output) return results - def _build_inner_action( - self, inner_action: FlowEachInnerActionDefinition - ) -> Callable[[LocalContext], Any]: - run_action = build_action(self.flow, inner_action.action) + def _condition_matches(self, condition: str, local_context: LocalContext) -> bool: + result = evaluate_expression(self.flow, condition, local_context=local_context) + if not isinstance(result, bool): + raise ValueError("if expression must evaluate to a boolean") + return result - async def run_inner_action(local_context: LocalContext) -> Any: + def _build_step_action(self, step: FlowEachStepDefinition) -> NestedStepRunner: + run_action = build_action(self.flow, step.action) + + async def run_step_action(local_context: LocalContext) -> Any: kwargs = {_LOCAL_CONTEXT_KWARG: local_context} if inspect.iscoroutinefunction(run_action): result = run_action(**kwargs) @@ -261,7 +271,7 @@ class EachAction: result = await result return result - return run_inner_action + return run_step_action _ACTION_TYPES: tuple[_ActionType, ...] = ( diff --git a/lib/crewai/tests/test_flow_definition.py b/lib/crewai/tests/test_flow_definition.py index f947291b8..73120e7db 100644 --- a/lib/crewai/tests/test_flow_definition.py +++ b/lib/crewai/tests/test_flow_definition.py @@ -37,6 +37,7 @@ def test_flow_public_exports_are_explicit(): } assert set(flow_definition.__all__) == { "FlowActionDefinition", + "FlowAtomicActionDefinition", "FlowCodeActionDefinition", "FlowConfigDefinition", "FlowConversationalDefinition", @@ -46,7 +47,7 @@ def test_flow_public_exports_are_explicit(): "FlowDefinitionCondition", "FlowDictStateDefinition", "FlowEachActionDefinition", - "FlowEachInnerActionDefinition", + "FlowEachStepDefinition", "FlowExpressionActionDefinition", "FlowHumanFeedbackDefinition", "FlowJsonSchemaStateDefinition", @@ -107,7 +108,10 @@ def test_flow_definition_json_schema_carries_reference_descriptions(): each_properties = defs["FlowEachActionDefinition"]["properties"] assert "list to iterate" in each_properties["in"]["description"] - assert "Ordered inner actions" in each_properties["do"]["description"] + assert "Ordered steps" in each_properties["do"]["description"] + + step_properties = defs["FlowEachStepDefinition"]["properties"] + assert "runs only if" in step_properties["if"]["description"] def test_flow_definition_json_schema_carries_field_examples_only(): @@ -122,6 +126,7 @@ def test_flow_definition_json_schema_carries_field_examples_only(): "FlowExpressionActionDefinition", "FlowScriptActionDefinition", "FlowEachActionDefinition", + "FlowEachStepDefinition", "FlowMethodDefinition", "FlowDictStateDefinition", "FlowJsonSchemaStateDefinition", @@ -154,7 +159,12 @@ def test_flow_definition_json_schema_carries_field_examples_only(): each_properties = defs["FlowEachActionDefinition"]["properties"] assert each_properties["in"]["examples"] == ["state.rows"] - assert each_properties["do"]["examples"][0][0]["clean"]["call"] == "script" + assert each_properties["do"]["examples"][0][0]["name"] == "clean" + assert each_properties["do"]["examples"][0][0]["action"]["call"] == "script" + assert each_properties["do"]["examples"][0][1]["if"] == "outputs.clean != ''" + + step_properties = defs["FlowEachStepDefinition"]["properties"] + assert step_properties["if"]["examples"] == ["item.kind == 'invoice'"] method_properties = defs["FlowMethodDefinition"]["properties"] assert method_properties["listen"]["examples"] == [ @@ -584,14 +594,16 @@ def test_each_action_round_trips_json_and_yaml(): "in": "state.rows", "do": [ { - "normalize": { + "name": "normalize", + "action": { "call": "tool", "ref": "my_tools:NormalizeRowTool", "with": {"row": "${ item }"}, } }, { - "save": { + "name": "save", + "action": { "call": "code", "ref": "my_flow:save_row", "with": { diff --git a/lib/crewai/tests/test_flow_from_definition.py b/lib/crewai/tests/test_flow_from_definition.py index 989eb6396..da1fd27e9 100644 --- a/lib/crewai/tests/test_flow_from_definition.py +++ b/lib/crewai/tests/test_flow_from_definition.py @@ -114,7 +114,7 @@ class EachActionFlow(Flow): except RuntimeError: pass else: - raise RuntimeError("inner action ran on the event loop") + raise RuntimeError("each step ran on the event loop") from crewai.flow.flow_context import current_flow_method_name @@ -1081,7 +1081,8 @@ methods: call: each in: state.rows do: - - normalize: + - name: normalize + action: call: code ref: {__name__}:EachActionFlow.normalize_row with: @@ -1097,7 +1098,7 @@ methods: ] -def test_each_action_runs_sync_inner_actions_off_event_loop_with_context(): +def test_each_action_runs_sync_steps_off_event_loop_with_context(): yaml_str = f""" schema: crewai.flow/v1 name: EachFlow @@ -1107,7 +1108,8 @@ methods: call: each in: state.rows do: - - threaded: + - name: threaded + action: call: code ref: {__name__}:EachActionFlow.require_threaded_context with: @@ -1123,7 +1125,7 @@ methods: assert flow.inner_thread_id != caller_thread_id -def test_each_action_runs_async_tool_results_from_sync_inner_actions(): +def test_each_action_runs_async_tool_results_from_sync_steps(): yaml_str = f""" schema: crewai.flow/v1 name: EachFlow @@ -1133,7 +1135,8 @@ methods: call: each in: state.rows do: - - async_tool: + - name: async_tool + action: call: tool ref: {__name__}:AsyncResultTool with: @@ -1222,7 +1225,7 @@ methods: assert flow.state["input_matches_output"] is True -def test_script_each_action_reads_item_and_inner_outputs( +def test_script_each_action_reads_item_and_step_outputs( monkeypatch: pytest.MonkeyPatch, ): monkeypatch.setenv("CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION", "1") @@ -1241,11 +1244,13 @@ methods: call: each in: state.rows do: - - clean: + - name: clean + action: call: script code: | return item.strip() - - tag: + - name: tag + action: call: script code: | return f"{outputs['seed']}:{outputs['clean']}" @@ -1257,7 +1262,7 @@ methods: assert flow.kickoff(inputs={"rows": [" a ", " b "]}) == ["global:a", "global:b"] -def test_each_action_uses_iteration_outputs_between_nested_actions(): +def test_each_action_uses_iteration_outputs_between_steps(): yaml_str = f""" schema: crewai.flow/v1 name: EachFlow @@ -1267,13 +1272,15 @@ methods: call: each in: state.rows do: - - normalize: + - name: normalize + action: call: code ref: {__name__}:EachActionFlow.normalize_row with: row: "${{item}}" prefix: saved - - save: + - name: save + action: call: code ref: {__name__}:EachActionFlow.save_row with: @@ -1290,7 +1297,7 @@ methods: ] -def test_each_action_resets_inner_outputs_between_iterations(): +def test_each_action_resets_step_outputs_between_iterations(): yaml_str = """ schema: crewai.flow/v1 name: EachFlow @@ -1300,10 +1307,12 @@ methods: call: each in: state.rows do: - - leak_check: + - name: leak_check + action: call: expression expr: "has(outputs.previous) ? outputs.previous : 'empty'" - - previous: + - name: previous + action: call: expression expr: item start: true @@ -1317,7 +1326,7 @@ methods: ] -def test_each_action_preserves_flow_outputs_and_prefers_inner_outputs(): +def test_each_action_preserves_flow_outputs_and_prefers_step_outputs(): yaml_str = """ schema: crewai.flow/v1 name: EachFlow @@ -1332,13 +1341,16 @@ methods: call: each in: state.rows do: - - before_shadow: + - name: before_shadow + action: call: expression expr: "outputs.seed + ':' + item" - - seed: + - name: seed + action: call: expression expr: "'local:' + item" - - after_shadow: + - name: after_shadow + action: call: expression expr: "outputs.seed" listen: seed @@ -1356,6 +1368,103 @@ methods: ] +def test_each_action_runs_simple_if_clauses(): + yaml_str = """ +schema: crewai.flow/v1 +name: EachIfFlow +methods: + process_rows: + do: + call: each + in: state.rows + do: + - name: kind + action: + call: expression + expr: item.kind + - name: kept + if: "outputs.kind == 'keep'" + action: + call: expression + expr: "'kept:' + item.value" + - name: skipped + if: "outputs.kind != 'keep'" + action: + call: expression + expr: "'skipped:' + item.value" + start: true +""" + + flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + + assert flow.kickoff( + inputs={ + "rows": [ + {"kind": "keep", "value": "a"}, + {"kind": "drop", "value": "b"}, + ] + } + ) == ["kept:a", "skipped:b"] + + +def test_each_action_skipped_if_keeps_previous_output(): + yaml_str = """ +schema: crewai.flow/v1 +name: EachIfFlow +methods: + process_rows: + do: + call: each + in: state.rows + do: + - name: original + action: + call: expression + expr: item.value + - name: maybe_included + if: item.include + action: + call: expression + expr: "'included:' + item.value" + start: true +""" + + flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + + assert flow.kickoff( + inputs={ + "rows": [ + {"include": True, "value": "a"}, + {"include": False, "value": "b"}, + ] + } + ) == ["included:a", "b"] + + +def test_each_action_if_condition_must_be_boolean(): + yaml_str = """ +schema: crewai.flow/v1 +name: EachIfFlow +methods: + process_rows: + do: + call: each + in: state.rows + do: + - name: value + if: item.value + action: + call: expression + expr: item.value + start: true +""" + + flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + + with pytest.raises(ValueError, match="if expression must evaluate to a boolean"): + flow.kickoff(inputs={"rows": [{"value": "truthy"}]}) + + def test_each_action_empty_list_returns_empty_and_listener_runs_once(): yaml_str = f""" schema: crewai.flow/v1 @@ -1366,7 +1475,8 @@ methods: call: each in: state.rows do: - - normalize: + - name: normalize + action: call: code ref: {__name__}:EachActionFlow.normalize_row with: @@ -1415,7 +1525,12 @@ def test_each_action_rejects_non_list_inputs(expr, inputs): "do": { "call": "each", "in": expr, - "do": [{"value": {"call": "expression", "expr": "item"}}], + "do": [ + { + "name": "value", + "action": {"call": "expression", "expr": "item"}, + } + ], }, } }, @@ -1431,15 +1546,25 @@ def test_each_action_rejects_non_list_inputs(expr, inputs): "action_do", [ [], - [{"first": {"call": "expression", "expr": "item"}, "second": {"call": "expression", "expr": "item"}}], - [{"1bad": {"call": "expression", "expr": "item"}}], + [{"value": {"call": "expression", "expr": "item"}}], + [{"name": "1bad", "action": {"call": "expression", "expr": "item"}}], + [{"name": "missing_action"}], + [{"action": {"call": "expression", "expr": "item"}}], [ - {"same": {"call": "expression", "expr": "item"}}, - {"same": {"call": "expression", "expr": "item"}}, + { + "name": "value", + "if": "true", + "then": [], + "action": {"call": "expression", "expr": "item"}, + } + ], + [ + {"name": "same", "action": {"call": "expression", "expr": "item"}}, + {"name": "same", "action": {"call": "expression", "expr": "item"}}, ], ], ) -def test_each_action_validates_inner_action_shape(action_do): +def test_each_action_validates_step_shape(action_do): with pytest.raises(ValidationError): FlowDefinition.from_dict( { @@ -1459,6 +1584,26 @@ def test_each_action_validates_inner_action_shape(action_do): ) +def test_if_clauses_are_rejected_at_method_level(): + with pytest.raises(ValidationError): + FlowDefinition.from_dict( + { + "schema": "crewai.flow/v1", + "name": "TopLevelIfFlow", + "methods": { + "process": { + "start": True, + "do": { + "call": "expression", + "if": "true", + "expr": "'ok'", + }, + } + }, + } + ) + + def test_each_action_rejects_nested_each_actions(): with pytest.raises(ValidationError): FlowDefinition.from_dict( @@ -1473,12 +1618,14 @@ def test_each_action_rejects_nested_each_actions(): "in": "state.rows", "do": [ { - "nested": { + "name": "nested", + "action": { "call": "each", "in": "state.children", "do": [ { - "child": { + "name": "child", + "action": { "call": "expression", "expr": "item", } @@ -1504,7 +1651,8 @@ methods: call: each in: state.rows do: - - validate: + - name: validate + action: call: code ref: {__name__}:EachActionFlow.fail_on_bad_row with: