diff --git a/lib/crewai/src/crewai/flow/flow_definition.py b/lib/crewai/src/crewai/flow/flow_definition.py index 5f813915a..85f20239b 100644 --- a/lib/crewai/src/crewai/flow/flow_definition.py +++ b/lib/crewai/src/crewai/flow/flow_definition.py @@ -479,12 +479,25 @@ FlowAtomicActionDefinition: TypeAlias = Annotated[ class FlowEachStepDefinition(BaseModel): """One named step inside an ``each`` composite action.""" - model_config = ConfigDict(extra="forbid") + model_config = ConfigDict( + populate_by_name=True, + extra="forbid", + ) name: str = Field( description="Step name used to reference this step's output.", examples=["clean"], ) + if_: str | None = Field( + default=None, + alias="if", + description=( + "Optional CEL expression evaluated against state, outputs, and local " + "context. When present, the step runs only if the expression evaluates " + "to true." + ), + examples=["item.kind == 'invoice'"], + ) action: FlowAtomicActionDefinition = Field( description="Atomic action to run for this step.", examples=[{"call": "script", "code": "return item.strip()"}], @@ -518,8 +531,8 @@ class FlowEachActionDefinition(BaseModel): ) do: list[FlowEachStepDefinition] = Field( description=( - "Ordered steps to run for each item. Each step has a name and an " - "atomic action." + "Ordered steps to run for each item. Each step has a name, optional " + "if expression, and atomic action." ), examples=[ [ @@ -529,6 +542,7 @@ class FlowEachActionDefinition(BaseModel): }, { "name": "tag", + "if": "outputs.clean != ''", "action": {"call": "expression", "expr": "outputs.clean"}, }, ] @@ -540,13 +554,7 @@ class FlowEachActionDefinition(BaseModel): if not self.do: raise ValueError("each.do must contain at least one step") - seen: set[str] = set() - for step in self.do: - name = step.name - if name in seen: - raise ValueError(f"each.do step names must be unique: {name!r}") - seen.add(name) - + _validate_step_list(self.do, field="each.do") return self @@ -736,6 +744,15 @@ def _validate_step_name(name: str, *, field: str) -> None: raise ValueError(f"{field} must match {_STEP_NAME_PATTERN.pattern}") +def _validate_step_list(steps: list[FlowEachStepDefinition], *, field: str) -> None: + seen: set[str] = set() + for step in steps: + name = step.name + if name in seen: + raise ValueError(f"{field} step names must be unique: {name!r}") + seen.add(name) + + def log_flow_definition_issues(definition: FlowDefinition) -> None: for method_name, method in definition.methods.items(): path = f"methods.{method_name}" diff --git a/lib/crewai/src/crewai/flow/runtime/_actions.py b/lib/crewai/src/crewai/flow/runtime/_actions.py index 949ba9106..65fa50588 100644 --- a/lib/crewai/src/crewai/flow/runtime/_actions.py +++ b/lib/crewai/src/crewai/flow/runtime/_actions.py @@ -4,7 +4,7 @@ from __future__ import annotations import ast import asyncio -from collections.abc import Callable +from collections.abc import Awaitable, Callable import contextvars import inspect import os @@ -32,6 +32,8 @@ if TYPE_CHECKING: __all__ = ["FlowScriptExecutionDisabledError", "build_action"] LocalContext = dict[str, Any] +NestedStepRunner = Callable[[LocalContext], Awaitable[Any]] +NestedStep = tuple[str, str | None, NestedStepRunner] _LOCAL_CONTEXT_KWARG = "__flow_definition_local_context" _ALLOW_SCRIPT_EXECUTION_ENV_VAR = "CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION" _TRUSTED_SCRIPT_EXECUTION_VALUES = frozenset({"1", "true", "yes"}) @@ -217,8 +219,9 @@ class EachAction: def __init__(self, flow: Flow[Any], definition: FlowEachActionDefinition) -> None: self.flow = flow self.definition = definition - self.steps = [ - (step.name, self._build_step_action(step)) for step in definition.do + self.steps: list[NestedStep] = [ + (step.name, step.if_, self._build_step_action(step)) + for step in definition.do ] async def run(self, *_args: Any, **_kwargs: Any) -> list[Any]: @@ -230,19 +233,27 @@ class EachAction: for item in items: local_outputs: dict[str, Any] = {} + local_context = {"item": item, "outputs": local_outputs} last_output: Any = None - for name, run_step_action in self.steps: - last_output = await run_step_action( - {"item": item, "outputs": local_outputs} - ) + for name, condition, run_step_action in self.steps: + if condition is not None and not self._condition_matches( + condition, local_context + ): + continue + + last_output = await run_step_action(local_context) local_outputs[name] = last_output results.append(last_output) return results - def _build_step_action( - self, step: FlowEachStepDefinition - ) -> Callable[[LocalContext], Any]: + def _condition_matches(self, condition: str, local_context: LocalContext) -> bool: + result = evaluate_expression(self.flow, condition, local_context=local_context) + if not isinstance(result, bool): + raise ValueError("if expression must evaluate to a boolean") + return result + + def _build_step_action(self, step: FlowEachStepDefinition) -> NestedStepRunner: run_action = build_action(self.flow, step.action) async def run_step_action(local_context: LocalContext) -> Any: diff --git a/lib/crewai/tests/test_flow_definition.py b/lib/crewai/tests/test_flow_definition.py index 190459e14..73120e7db 100644 --- a/lib/crewai/tests/test_flow_definition.py +++ b/lib/crewai/tests/test_flow_definition.py @@ -110,6 +110,9 @@ def test_flow_definition_json_schema_carries_reference_descriptions(): assert "list to iterate" in each_properties["in"]["description"] assert "Ordered steps" in each_properties["do"]["description"] + step_properties = defs["FlowEachStepDefinition"]["properties"] + assert "runs only if" in step_properties["if"]["description"] + def test_flow_definition_json_schema_carries_field_examples_only(): schema = flow_definition.FlowDefinition.json_schema() @@ -158,6 +161,10 @@ def test_flow_definition_json_schema_carries_field_examples_only(): assert each_properties["in"]["examples"] == ["state.rows"] assert each_properties["do"]["examples"][0][0]["name"] == "clean" assert each_properties["do"]["examples"][0][0]["action"]["call"] == "script" + assert each_properties["do"]["examples"][0][1]["if"] == "outputs.clean != ''" + + step_properties = defs["FlowEachStepDefinition"]["properties"] + assert step_properties["if"]["examples"] == ["item.kind == 'invoice'"] method_properties = defs["FlowMethodDefinition"]["properties"] assert method_properties["listen"]["examples"] == [ diff --git a/lib/crewai/tests/test_flow_from_definition.py b/lib/crewai/tests/test_flow_from_definition.py index 656d07e73..da1fd27e9 100644 --- a/lib/crewai/tests/test_flow_from_definition.py +++ b/lib/crewai/tests/test_flow_from_definition.py @@ -1368,6 +1368,103 @@ methods: ] +def test_each_action_runs_simple_if_clauses(): + yaml_str = """ +schema: crewai.flow/v1 +name: EachIfFlow +methods: + process_rows: + do: + call: each + in: state.rows + do: + - name: kind + action: + call: expression + expr: item.kind + - name: kept + if: "outputs.kind == 'keep'" + action: + call: expression + expr: "'kept:' + item.value" + - name: skipped + if: "outputs.kind != 'keep'" + action: + call: expression + expr: "'skipped:' + item.value" + start: true +""" + + flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + + assert flow.kickoff( + inputs={ + "rows": [ + {"kind": "keep", "value": "a"}, + {"kind": "drop", "value": "b"}, + ] + } + ) == ["kept:a", "skipped:b"] + + +def test_each_action_skipped_if_keeps_previous_output(): + yaml_str = """ +schema: crewai.flow/v1 +name: EachIfFlow +methods: + process_rows: + do: + call: each + in: state.rows + do: + - name: original + action: + call: expression + expr: item.value + - name: maybe_included + if: item.include + action: + call: expression + expr: "'included:' + item.value" + start: true +""" + + flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + + assert flow.kickoff( + inputs={ + "rows": [ + {"include": True, "value": "a"}, + {"include": False, "value": "b"}, + ] + } + ) == ["included:a", "b"] + + +def test_each_action_if_condition_must_be_boolean(): + yaml_str = """ +schema: crewai.flow/v1 +name: EachIfFlow +methods: + process_rows: + do: + call: each + in: state.rows + do: + - name: value + if: item.value + action: + call: expression + expr: item.value + start: true +""" + + flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + + with pytest.raises(ValueError, match="if expression must evaluate to a boolean"): + flow.kickoff(inputs={"rows": [{"value": "truthy"}]}) + + def test_each_action_empty_list_returns_empty_and_listener_runs_once(): yaml_str = f""" schema: crewai.flow/v1 @@ -1453,6 +1550,14 @@ def test_each_action_rejects_non_list_inputs(expr, inputs): [{"name": "1bad", "action": {"call": "expression", "expr": "item"}}], [{"name": "missing_action"}], [{"action": {"call": "expression", "expr": "item"}}], + [ + { + "name": "value", + "if": "true", + "then": [], + "action": {"call": "expression", "expr": "item"}, + } + ], [ {"name": "same", "action": {"call": "expression", "expr": "item"}}, {"name": "same", "action": {"call": "expression", "expr": "item"}}, @@ -1479,6 +1584,26 @@ def test_each_action_validates_step_shape(action_do): ) +def test_if_clauses_are_rejected_at_method_level(): + with pytest.raises(ValidationError): + FlowDefinition.from_dict( + { + "schema": "crewai.flow/v1", + "name": "TopLevelIfFlow", + "methods": { + "process": { + "start": True, + "do": { + "call": "expression", + "if": "true", + "expr": "'ok'", + }, + } + }, + } + ) + + def test_each_action_rejects_nested_each_actions(): with pytest.raises(ValidationError): FlowDefinition.from_dict(