Add optional if expression to each.do steps

Lets a step inside an `each` action run conditionally based on a CEL
expression evaluated against `item` and prior step `outputs`.
This commit is contained in:
Vinicius Brasil
2026-06-17 22:10:01 -07:00
parent 4f8d5cf7cb
commit 2105a0ebf8
4 changed files with 180 additions and 20 deletions

View File

@@ -479,12 +479,25 @@ FlowAtomicActionDefinition: TypeAlias = Annotated[
class FlowEachStepDefinition(BaseModel):
"""One named step inside an ``each`` composite action."""
model_config = ConfigDict(extra="forbid")
model_config = ConfigDict(
populate_by_name=True,
extra="forbid",
)
name: str = Field(
description="Step name used to reference this step's output.",
examples=["clean"],
)
if_: str | None = Field(
default=None,
alias="if",
description=(
"Optional CEL expression evaluated against state, outputs, and local "
"context. When present, the step runs only if the expression evaluates "
"to true."
),
examples=["item.kind == 'invoice'"],
)
action: FlowAtomicActionDefinition = Field(
description="Atomic action to run for this step.",
examples=[{"call": "script", "code": "return item.strip()"}],
@@ -518,8 +531,8 @@ class FlowEachActionDefinition(BaseModel):
)
do: list[FlowEachStepDefinition] = Field(
description=(
"Ordered steps to run for each item. Each step has a name and an "
"atomic action."
"Ordered steps to run for each item. Each step has a name, optional "
"if expression, and atomic action."
),
examples=[
[
@@ -529,6 +542,7 @@ class FlowEachActionDefinition(BaseModel):
},
{
"name": "tag",
"if": "outputs.clean != ''",
"action": {"call": "expression", "expr": "outputs.clean"},
},
]
@@ -540,13 +554,7 @@ class FlowEachActionDefinition(BaseModel):
if not self.do:
raise ValueError("each.do must contain at least one step")
seen: set[str] = set()
for step in self.do:
name = step.name
if name in seen:
raise ValueError(f"each.do step names must be unique: {name!r}")
seen.add(name)
_validate_step_list(self.do, field="each.do")
return self
@@ -736,6 +744,15 @@ def _validate_step_name(name: str, *, field: str) -> None:
raise ValueError(f"{field} must match {_STEP_NAME_PATTERN.pattern}")
def _validate_step_list(steps: list[FlowEachStepDefinition], *, field: str) -> None:
seen: set[str] = set()
for step in steps:
name = step.name
if name in seen:
raise ValueError(f"{field} step names must be unique: {name!r}")
seen.add(name)
def log_flow_definition_issues(definition: FlowDefinition) -> None:
for method_name, method in definition.methods.items():
path = f"methods.{method_name}"

View File

@@ -4,7 +4,7 @@ from __future__ import annotations
import ast
import asyncio
from collections.abc import Callable
from collections.abc import Awaitable, Callable
import contextvars
import inspect
import os
@@ -32,6 +32,8 @@ if TYPE_CHECKING:
__all__ = ["FlowScriptExecutionDisabledError", "build_action"]
LocalContext = dict[str, Any]
NestedStepRunner = Callable[[LocalContext], Awaitable[Any]]
NestedStep = tuple[str, str | None, NestedStepRunner]
_LOCAL_CONTEXT_KWARG = "__flow_definition_local_context"
_ALLOW_SCRIPT_EXECUTION_ENV_VAR = "CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION"
_TRUSTED_SCRIPT_EXECUTION_VALUES = frozenset({"1", "true", "yes"})
@@ -217,8 +219,9 @@ class EachAction:
def __init__(self, flow: Flow[Any], definition: FlowEachActionDefinition) -> None:
self.flow = flow
self.definition = definition
self.steps = [
(step.name, self._build_step_action(step)) for step in definition.do
self.steps: list[NestedStep] = [
(step.name, step.if_, self._build_step_action(step))
for step in definition.do
]
async def run(self, *_args: Any, **_kwargs: Any) -> list[Any]:
@@ -230,19 +233,27 @@ class EachAction:
for item in items:
local_outputs: dict[str, Any] = {}
local_context = {"item": item, "outputs": local_outputs}
last_output: Any = None
for name, run_step_action in self.steps:
last_output = await run_step_action(
{"item": item, "outputs": local_outputs}
)
for name, condition, run_step_action in self.steps:
if condition is not None and not self._condition_matches(
condition, local_context
):
continue
last_output = await run_step_action(local_context)
local_outputs[name] = last_output
results.append(last_output)
return results
def _build_step_action(
self, step: FlowEachStepDefinition
) -> Callable[[LocalContext], Any]:
def _condition_matches(self, condition: str, local_context: LocalContext) -> bool:
result = evaluate_expression(self.flow, condition, local_context=local_context)
if not isinstance(result, bool):
raise ValueError("if expression must evaluate to a boolean")
return result
def _build_step_action(self, step: FlowEachStepDefinition) -> NestedStepRunner:
run_action = build_action(self.flow, step.action)
async def run_step_action(local_context: LocalContext) -> Any:

View File

@@ -110,6 +110,9 @@ def test_flow_definition_json_schema_carries_reference_descriptions():
assert "list to iterate" in each_properties["in"]["description"]
assert "Ordered steps" in each_properties["do"]["description"]
step_properties = defs["FlowEachStepDefinition"]["properties"]
assert "runs only if" in step_properties["if"]["description"]
def test_flow_definition_json_schema_carries_field_examples_only():
schema = flow_definition.FlowDefinition.json_schema()
@@ -158,6 +161,10 @@ def test_flow_definition_json_schema_carries_field_examples_only():
assert each_properties["in"]["examples"] == ["state.rows"]
assert each_properties["do"]["examples"][0][0]["name"] == "clean"
assert each_properties["do"]["examples"][0][0]["action"]["call"] == "script"
assert each_properties["do"]["examples"][0][1]["if"] == "outputs.clean != ''"
step_properties = defs["FlowEachStepDefinition"]["properties"]
assert step_properties["if"]["examples"] == ["item.kind == 'invoice'"]
method_properties = defs["FlowMethodDefinition"]["properties"]
assert method_properties["listen"]["examples"] == [

View File

@@ -1368,6 +1368,103 @@ methods:
]
def test_each_action_runs_simple_if_clauses():
yaml_str = """
schema: crewai.flow/v1
name: EachIfFlow
methods:
process_rows:
do:
call: each
in: state.rows
do:
- name: kind
action:
call: expression
expr: item.kind
- name: kept
if: "outputs.kind == 'keep'"
action:
call: expression
expr: "'kept:' + item.value"
- name: skipped
if: "outputs.kind != 'keep'"
action:
call: expression
expr: "'skipped:' + item.value"
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
assert flow.kickoff(
inputs={
"rows": [
{"kind": "keep", "value": "a"},
{"kind": "drop", "value": "b"},
]
}
) == ["kept:a", "skipped:b"]
def test_each_action_skipped_if_keeps_previous_output():
yaml_str = """
schema: crewai.flow/v1
name: EachIfFlow
methods:
process_rows:
do:
call: each
in: state.rows
do:
- name: original
action:
call: expression
expr: item.value
- name: maybe_included
if: item.include
action:
call: expression
expr: "'included:' + item.value"
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
assert flow.kickoff(
inputs={
"rows": [
{"include": True, "value": "a"},
{"include": False, "value": "b"},
]
}
) == ["included:a", "b"]
def test_each_action_if_condition_must_be_boolean():
yaml_str = """
schema: crewai.flow/v1
name: EachIfFlow
methods:
process_rows:
do:
call: each
in: state.rows
do:
- name: value
if: item.value
action:
call: expression
expr: item.value
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
with pytest.raises(ValueError, match="if expression must evaluate to a boolean"):
flow.kickoff(inputs={"rows": [{"value": "truthy"}]})
def test_each_action_empty_list_returns_empty_and_listener_runs_once():
yaml_str = f"""
schema: crewai.flow/v1
@@ -1453,6 +1550,14 @@ def test_each_action_rejects_non_list_inputs(expr, inputs):
[{"name": "1bad", "action": {"call": "expression", "expr": "item"}}],
[{"name": "missing_action"}],
[{"action": {"call": "expression", "expr": "item"}}],
[
{
"name": "value",
"if": "true",
"then": [],
"action": {"call": "expression", "expr": "item"},
}
],
[
{"name": "same", "action": {"call": "expression", "expr": "item"}},
{"name": "same", "action": {"call": "expression", "expr": "item"}},
@@ -1479,6 +1584,26 @@ def test_each_action_validates_step_shape(action_do):
)
def test_if_clauses_are_rejected_at_method_level():
with pytest.raises(ValidationError):
FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "TopLevelIfFlow",
"methods": {
"process": {
"start": True,
"do": {
"call": "expression",
"if": "true",
"expr": "'ok'",
},
}
},
}
)
def test_each_action_rejects_nested_each_actions():
with pytest.raises(ValidationError):
FlowDefinition.from_dict(