mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-07-01 05:08:12 +00:00
Add optional if expression to each.do steps (#6214)
* Use explicit name/action shape for each.do steps * Add optional `if` expression to `each.do` steps Lets a step inside an `each` action run conditionally based on a CEL expression evaluated against `item` and prior step `outputs`.
This commit is contained in:
@@ -18,7 +18,6 @@ from pydantic import (
|
||||
BaseModel,
|
||||
ConfigDict,
|
||||
Field,
|
||||
RootModel,
|
||||
field_serializer,
|
||||
model_validator,
|
||||
)
|
||||
@@ -38,6 +37,7 @@ _STEP_NAME_PATTERN = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
|
||||
|
||||
__all__ = [
|
||||
"FlowActionDefinition",
|
||||
"FlowAtomicActionDefinition",
|
||||
"FlowCodeActionDefinition",
|
||||
"FlowConfigDefinition",
|
||||
"FlowConversationalDefinition",
|
||||
@@ -47,7 +47,7 @@ __all__ = [
|
||||
"FlowDefinitionCondition",
|
||||
"FlowDictStateDefinition",
|
||||
"FlowEachActionDefinition",
|
||||
"FlowEachInnerActionDefinition",
|
||||
"FlowEachStepDefinition",
|
||||
"FlowExpressionActionDefinition",
|
||||
"FlowHumanFeedbackDefinition",
|
||||
"FlowJsonSchemaStateDefinition",
|
||||
@@ -466,38 +466,48 @@ class FlowScriptActionDefinition(BaseModel):
|
||||
)
|
||||
|
||||
|
||||
FlowInnerActionDefinition = (
|
||||
FlowAtomicActionDefinition: TypeAlias = Annotated[
|
||||
FlowCodeActionDefinition
|
||||
| FlowToolActionDefinition
|
||||
| FlowCrewActionDefinition
|
||||
| FlowExpressionActionDefinition
|
||||
| FlowScriptActionDefinition
|
||||
)
|
||||
| FlowScriptActionDefinition,
|
||||
Field(discriminator="call"),
|
||||
]
|
||||
|
||||
|
||||
class FlowEachInnerActionDefinition(RootModel[dict[str, FlowInnerActionDefinition]]):
|
||||
"""One named action inside an ``each`` composite action."""
|
||||
class FlowEachStepDefinition(BaseModel):
|
||||
"""One named step inside an ``each`` composite action."""
|
||||
|
||||
root: dict[str, FlowInnerActionDefinition] = Field(
|
||||
description="Single-entry mapping from an inner action name to its action.",
|
||||
examples=[{"clean": {"call": "script", "code": "return item.strip()"}}],
|
||||
model_config = ConfigDict(
|
||||
populate_by_name=True,
|
||||
extra="forbid",
|
||||
)
|
||||
|
||||
name: str = Field(
|
||||
description="Step name used to reference this step's output.",
|
||||
examples=["clean"],
|
||||
)
|
||||
if_: str | None = Field(
|
||||
default=None,
|
||||
alias="if",
|
||||
description=(
|
||||
"Optional CEL expression evaluated against state, outputs, and local "
|
||||
"context. When present, the step runs only if the expression evaluates "
|
||||
"to true."
|
||||
),
|
||||
examples=["item.kind == 'invoice'"],
|
||||
)
|
||||
action: FlowAtomicActionDefinition = Field(
|
||||
description="Atomic action to run for this step.",
|
||||
examples=[{"call": "script", "code": "return item.strip()"}],
|
||||
)
|
||||
|
||||
@model_validator(mode="after")
|
||||
def _validate_action_mapping(self) -> FlowEachInnerActionDefinition:
|
||||
if len(self.root) != 1:
|
||||
raise ValueError("each.do entries must be one-key mappings")
|
||||
_validate_step_name(self.name, field="each.do action names")
|
||||
def _validate_step_name(self) -> FlowEachStepDefinition:
|
||||
_validate_step_name(self.name, field="each.do step names")
|
||||
return self
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return next(iter(self.root))
|
||||
|
||||
@property
|
||||
def action(self) -> FlowInnerActionDefinition:
|
||||
return next(iter(self.root.values()))
|
||||
|
||||
|
||||
class FlowEachActionDefinition(BaseModel):
|
||||
"""A composite action that runs a sequential mini-pipeline for each item."""
|
||||
@@ -519,35 +529,36 @@ class FlowEachActionDefinition(BaseModel):
|
||||
description="CEL expression that must evaluate to the list to iterate.",
|
||||
examples=["state.rows"],
|
||||
)
|
||||
do: list[FlowEachInnerActionDefinition] = Field(
|
||||
do: list[FlowEachStepDefinition] = Field(
|
||||
description=(
|
||||
"Ordered inner actions to run for each item. Each entry must be a "
|
||||
"single-key mapping naming that inner action."
|
||||
"Ordered steps to run for each item. Each step has a name, optional "
|
||||
"if expression, and atomic action."
|
||||
),
|
||||
examples=[
|
||||
[
|
||||
{"clean": {"call": "script", "code": "return item.strip()"}},
|
||||
{"tag": {"call": "expression", "expr": "outputs.clean"}},
|
||||
{
|
||||
"name": "clean",
|
||||
"action": {"call": "script", "code": "return item.strip()"},
|
||||
},
|
||||
{
|
||||
"name": "tag",
|
||||
"if": "outputs.clean != ''",
|
||||
"action": {"call": "expression", "expr": "outputs.clean"},
|
||||
},
|
||||
]
|
||||
],
|
||||
)
|
||||
|
||||
@model_validator(mode="after")
|
||||
def _validate_inner_action_list(self) -> FlowEachActionDefinition:
|
||||
def _validate_step_list(self) -> FlowEachActionDefinition:
|
||||
if not self.do:
|
||||
raise ValueError("each.do must contain at least one action")
|
||||
|
||||
seen: set[str] = set()
|
||||
for inner_action in self.do:
|
||||
name = inner_action.name
|
||||
if name in seen:
|
||||
raise ValueError(f"each.do action names must be unique: {name!r}")
|
||||
seen.add(name)
|
||||
raise ValueError("each.do must contain at least one step")
|
||||
|
||||
_validate_step_list(self.do, field="each.do")
|
||||
return self
|
||||
|
||||
|
||||
FlowActionDefinition = (
|
||||
FlowActionDefinition: TypeAlias = (
|
||||
FlowCodeActionDefinition
|
||||
| FlowToolActionDefinition
|
||||
| FlowCrewActionDefinition
|
||||
@@ -733,6 +744,15 @@ def _validate_step_name(name: str, *, field: str) -> None:
|
||||
raise ValueError(f"{field} must match {_STEP_NAME_PATTERN.pattern}")
|
||||
|
||||
|
||||
def _validate_step_list(steps: list[FlowEachStepDefinition], *, field: str) -> None:
|
||||
seen: set[str] = set()
|
||||
for step in steps:
|
||||
name = step.name
|
||||
if name in seen:
|
||||
raise ValueError(f"{field} step names must be unique: {name!r}")
|
||||
seen.add(name)
|
||||
|
||||
|
||||
def log_flow_definition_issues(definition: FlowDefinition) -> None:
|
||||
for method_name, method in definition.methods.items():
|
||||
path = f"methods.{method_name}"
|
||||
|
||||
@@ -4,7 +4,7 @@ from __future__ import annotations
|
||||
|
||||
import ast
|
||||
import asyncio
|
||||
from collections.abc import Callable
|
||||
from collections.abc import Awaitable, Callable
|
||||
import contextvars
|
||||
import inspect
|
||||
import os
|
||||
@@ -15,7 +15,7 @@ from crewai.flow.flow_definition import (
|
||||
FlowCodeActionDefinition,
|
||||
FlowCrewActionDefinition,
|
||||
FlowEachActionDefinition,
|
||||
FlowEachInnerActionDefinition,
|
||||
FlowEachStepDefinition,
|
||||
FlowExpressionActionDefinition,
|
||||
FlowScriptActionDefinition,
|
||||
FlowToolActionDefinition,
|
||||
@@ -32,6 +32,8 @@ if TYPE_CHECKING:
|
||||
__all__ = ["FlowScriptExecutionDisabledError", "build_action"]
|
||||
|
||||
LocalContext = dict[str, Any]
|
||||
NestedStepRunner = Callable[[LocalContext], Awaitable[Any]]
|
||||
NestedStep = tuple[str, str | None, NestedStepRunner]
|
||||
_LOCAL_CONTEXT_KWARG = "__flow_definition_local_context"
|
||||
_ALLOW_SCRIPT_EXECUTION_ENV_VAR = "CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION"
|
||||
_TRUSTED_SCRIPT_EXECUTION_VALUES = frozenset({"1", "true", "yes"})
|
||||
@@ -217,9 +219,9 @@ class EachAction:
|
||||
def __init__(self, flow: Flow[Any], definition: FlowEachActionDefinition) -> None:
|
||||
self.flow = flow
|
||||
self.definition = definition
|
||||
self.inner_actions = [
|
||||
(inner_action.name, self._build_inner_action(inner_action))
|
||||
for inner_action in definition.do
|
||||
self.steps: list[NestedStep] = [
|
||||
(step.name, step.if_, self._build_step_action(step))
|
||||
for step in definition.do
|
||||
]
|
||||
|
||||
async def run(self, *_args: Any, **_kwargs: Any) -> list[Any]:
|
||||
@@ -231,22 +233,30 @@ class EachAction:
|
||||
|
||||
for item in items:
|
||||
local_outputs: dict[str, Any] = {}
|
||||
local_context = {"item": item, "outputs": local_outputs}
|
||||
last_output: Any = None
|
||||
for name, run_inner_action in self.inner_actions:
|
||||
last_output = await run_inner_action(
|
||||
{"item": item, "outputs": local_outputs}
|
||||
)
|
||||
for name, condition, run_step_action in self.steps:
|
||||
if condition is not None and not self._condition_matches(
|
||||
condition, local_context
|
||||
):
|
||||
continue
|
||||
|
||||
last_output = await run_step_action(local_context)
|
||||
local_outputs[name] = last_output
|
||||
results.append(last_output)
|
||||
|
||||
return results
|
||||
|
||||
def _build_inner_action(
|
||||
self, inner_action: FlowEachInnerActionDefinition
|
||||
) -> Callable[[LocalContext], Any]:
|
||||
run_action = build_action(self.flow, inner_action.action)
|
||||
def _condition_matches(self, condition: str, local_context: LocalContext) -> bool:
|
||||
result = evaluate_expression(self.flow, condition, local_context=local_context)
|
||||
if not isinstance(result, bool):
|
||||
raise ValueError("if expression must evaluate to a boolean")
|
||||
return result
|
||||
|
||||
async def run_inner_action(local_context: LocalContext) -> Any:
|
||||
def _build_step_action(self, step: FlowEachStepDefinition) -> NestedStepRunner:
|
||||
run_action = build_action(self.flow, step.action)
|
||||
|
||||
async def run_step_action(local_context: LocalContext) -> Any:
|
||||
kwargs = {_LOCAL_CONTEXT_KWARG: local_context}
|
||||
if inspect.iscoroutinefunction(run_action):
|
||||
result = run_action(**kwargs)
|
||||
@@ -261,7 +271,7 @@ class EachAction:
|
||||
result = await result
|
||||
return result
|
||||
|
||||
return run_inner_action
|
||||
return run_step_action
|
||||
|
||||
|
||||
_ACTION_TYPES: tuple[_ActionType, ...] = (
|
||||
|
||||
@@ -37,6 +37,7 @@ def test_flow_public_exports_are_explicit():
|
||||
}
|
||||
assert set(flow_definition.__all__) == {
|
||||
"FlowActionDefinition",
|
||||
"FlowAtomicActionDefinition",
|
||||
"FlowCodeActionDefinition",
|
||||
"FlowConfigDefinition",
|
||||
"FlowConversationalDefinition",
|
||||
@@ -46,7 +47,7 @@ def test_flow_public_exports_are_explicit():
|
||||
"FlowDefinitionCondition",
|
||||
"FlowDictStateDefinition",
|
||||
"FlowEachActionDefinition",
|
||||
"FlowEachInnerActionDefinition",
|
||||
"FlowEachStepDefinition",
|
||||
"FlowExpressionActionDefinition",
|
||||
"FlowHumanFeedbackDefinition",
|
||||
"FlowJsonSchemaStateDefinition",
|
||||
@@ -107,7 +108,10 @@ def test_flow_definition_json_schema_carries_reference_descriptions():
|
||||
|
||||
each_properties = defs["FlowEachActionDefinition"]["properties"]
|
||||
assert "list to iterate" in each_properties["in"]["description"]
|
||||
assert "Ordered inner actions" in each_properties["do"]["description"]
|
||||
assert "Ordered steps" in each_properties["do"]["description"]
|
||||
|
||||
step_properties = defs["FlowEachStepDefinition"]["properties"]
|
||||
assert "runs only if" in step_properties["if"]["description"]
|
||||
|
||||
|
||||
def test_flow_definition_json_schema_carries_field_examples_only():
|
||||
@@ -122,6 +126,7 @@ def test_flow_definition_json_schema_carries_field_examples_only():
|
||||
"FlowExpressionActionDefinition",
|
||||
"FlowScriptActionDefinition",
|
||||
"FlowEachActionDefinition",
|
||||
"FlowEachStepDefinition",
|
||||
"FlowMethodDefinition",
|
||||
"FlowDictStateDefinition",
|
||||
"FlowJsonSchemaStateDefinition",
|
||||
@@ -154,7 +159,12 @@ def test_flow_definition_json_schema_carries_field_examples_only():
|
||||
|
||||
each_properties = defs["FlowEachActionDefinition"]["properties"]
|
||||
assert each_properties["in"]["examples"] == ["state.rows"]
|
||||
assert each_properties["do"]["examples"][0][0]["clean"]["call"] == "script"
|
||||
assert each_properties["do"]["examples"][0][0]["name"] == "clean"
|
||||
assert each_properties["do"]["examples"][0][0]["action"]["call"] == "script"
|
||||
assert each_properties["do"]["examples"][0][1]["if"] == "outputs.clean != ''"
|
||||
|
||||
step_properties = defs["FlowEachStepDefinition"]["properties"]
|
||||
assert step_properties["if"]["examples"] == ["item.kind == 'invoice'"]
|
||||
|
||||
method_properties = defs["FlowMethodDefinition"]["properties"]
|
||||
assert method_properties["listen"]["examples"] == [
|
||||
@@ -584,14 +594,16 @@ def test_each_action_round_trips_json_and_yaml():
|
||||
"in": "state.rows",
|
||||
"do": [
|
||||
{
|
||||
"normalize": {
|
||||
"name": "normalize",
|
||||
"action": {
|
||||
"call": "tool",
|
||||
"ref": "my_tools:NormalizeRowTool",
|
||||
"with": {"row": "${ item }"},
|
||||
}
|
||||
},
|
||||
{
|
||||
"save": {
|
||||
"name": "save",
|
||||
"action": {
|
||||
"call": "code",
|
||||
"ref": "my_flow:save_row",
|
||||
"with": {
|
||||
|
||||
@@ -114,7 +114,7 @@ class EachActionFlow(Flow):
|
||||
except RuntimeError:
|
||||
pass
|
||||
else:
|
||||
raise RuntimeError("inner action ran on the event loop")
|
||||
raise RuntimeError("each step ran on the event loop")
|
||||
|
||||
from crewai.flow.flow_context import current_flow_method_name
|
||||
|
||||
@@ -1081,7 +1081,8 @@ methods:
|
||||
call: each
|
||||
in: state.rows
|
||||
do:
|
||||
- normalize:
|
||||
- name: normalize
|
||||
action:
|
||||
call: code
|
||||
ref: {__name__}:EachActionFlow.normalize_row
|
||||
with:
|
||||
@@ -1097,7 +1098,7 @@ methods:
|
||||
]
|
||||
|
||||
|
||||
def test_each_action_runs_sync_inner_actions_off_event_loop_with_context():
|
||||
def test_each_action_runs_sync_steps_off_event_loop_with_context():
|
||||
yaml_str = f"""
|
||||
schema: crewai.flow/v1
|
||||
name: EachFlow
|
||||
@@ -1107,7 +1108,8 @@ methods:
|
||||
call: each
|
||||
in: state.rows
|
||||
do:
|
||||
- threaded:
|
||||
- name: threaded
|
||||
action:
|
||||
call: code
|
||||
ref: {__name__}:EachActionFlow.require_threaded_context
|
||||
with:
|
||||
@@ -1123,7 +1125,7 @@ methods:
|
||||
assert flow.inner_thread_id != caller_thread_id
|
||||
|
||||
|
||||
def test_each_action_runs_async_tool_results_from_sync_inner_actions():
|
||||
def test_each_action_runs_async_tool_results_from_sync_steps():
|
||||
yaml_str = f"""
|
||||
schema: crewai.flow/v1
|
||||
name: EachFlow
|
||||
@@ -1133,7 +1135,8 @@ methods:
|
||||
call: each
|
||||
in: state.rows
|
||||
do:
|
||||
- async_tool:
|
||||
- name: async_tool
|
||||
action:
|
||||
call: tool
|
||||
ref: {__name__}:AsyncResultTool
|
||||
with:
|
||||
@@ -1222,7 +1225,7 @@ methods:
|
||||
assert flow.state["input_matches_output"] is True
|
||||
|
||||
|
||||
def test_script_each_action_reads_item_and_inner_outputs(
|
||||
def test_script_each_action_reads_item_and_step_outputs(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
):
|
||||
monkeypatch.setenv("CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION", "1")
|
||||
@@ -1241,11 +1244,13 @@ methods:
|
||||
call: each
|
||||
in: state.rows
|
||||
do:
|
||||
- clean:
|
||||
- name: clean
|
||||
action:
|
||||
call: script
|
||||
code: |
|
||||
return item.strip()
|
||||
- tag:
|
||||
- name: tag
|
||||
action:
|
||||
call: script
|
||||
code: |
|
||||
return f"{outputs['seed']}:{outputs['clean']}"
|
||||
@@ -1257,7 +1262,7 @@ methods:
|
||||
assert flow.kickoff(inputs={"rows": [" a ", " b "]}) == ["global:a", "global:b"]
|
||||
|
||||
|
||||
def test_each_action_uses_iteration_outputs_between_nested_actions():
|
||||
def test_each_action_uses_iteration_outputs_between_steps():
|
||||
yaml_str = f"""
|
||||
schema: crewai.flow/v1
|
||||
name: EachFlow
|
||||
@@ -1267,13 +1272,15 @@ methods:
|
||||
call: each
|
||||
in: state.rows
|
||||
do:
|
||||
- normalize:
|
||||
- name: normalize
|
||||
action:
|
||||
call: code
|
||||
ref: {__name__}:EachActionFlow.normalize_row
|
||||
with:
|
||||
row: "${{item}}"
|
||||
prefix: saved
|
||||
- save:
|
||||
- name: save
|
||||
action:
|
||||
call: code
|
||||
ref: {__name__}:EachActionFlow.save_row
|
||||
with:
|
||||
@@ -1290,7 +1297,7 @@ methods:
|
||||
]
|
||||
|
||||
|
||||
def test_each_action_resets_inner_outputs_between_iterations():
|
||||
def test_each_action_resets_step_outputs_between_iterations():
|
||||
yaml_str = """
|
||||
schema: crewai.flow/v1
|
||||
name: EachFlow
|
||||
@@ -1300,10 +1307,12 @@ methods:
|
||||
call: each
|
||||
in: state.rows
|
||||
do:
|
||||
- leak_check:
|
||||
- name: leak_check
|
||||
action:
|
||||
call: expression
|
||||
expr: "has(outputs.previous) ? outputs.previous : 'empty'"
|
||||
- previous:
|
||||
- name: previous
|
||||
action:
|
||||
call: expression
|
||||
expr: item
|
||||
start: true
|
||||
@@ -1317,7 +1326,7 @@ methods:
|
||||
]
|
||||
|
||||
|
||||
def test_each_action_preserves_flow_outputs_and_prefers_inner_outputs():
|
||||
def test_each_action_preserves_flow_outputs_and_prefers_step_outputs():
|
||||
yaml_str = """
|
||||
schema: crewai.flow/v1
|
||||
name: EachFlow
|
||||
@@ -1332,13 +1341,16 @@ methods:
|
||||
call: each
|
||||
in: state.rows
|
||||
do:
|
||||
- before_shadow:
|
||||
- name: before_shadow
|
||||
action:
|
||||
call: expression
|
||||
expr: "outputs.seed + ':' + item"
|
||||
- seed:
|
||||
- name: seed
|
||||
action:
|
||||
call: expression
|
||||
expr: "'local:' + item"
|
||||
- after_shadow:
|
||||
- name: after_shadow
|
||||
action:
|
||||
call: expression
|
||||
expr: "outputs.seed"
|
||||
listen: seed
|
||||
@@ -1356,6 +1368,103 @@ methods:
|
||||
]
|
||||
|
||||
|
||||
def test_each_action_runs_simple_if_clauses():
|
||||
yaml_str = """
|
||||
schema: crewai.flow/v1
|
||||
name: EachIfFlow
|
||||
methods:
|
||||
process_rows:
|
||||
do:
|
||||
call: each
|
||||
in: state.rows
|
||||
do:
|
||||
- name: kind
|
||||
action:
|
||||
call: expression
|
||||
expr: item.kind
|
||||
- name: kept
|
||||
if: "outputs.kind == 'keep'"
|
||||
action:
|
||||
call: expression
|
||||
expr: "'kept:' + item.value"
|
||||
- name: skipped
|
||||
if: "outputs.kind != 'keep'"
|
||||
action:
|
||||
call: expression
|
||||
expr: "'skipped:' + item.value"
|
||||
start: true
|
||||
"""
|
||||
|
||||
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
|
||||
|
||||
assert flow.kickoff(
|
||||
inputs={
|
||||
"rows": [
|
||||
{"kind": "keep", "value": "a"},
|
||||
{"kind": "drop", "value": "b"},
|
||||
]
|
||||
}
|
||||
) == ["kept:a", "skipped:b"]
|
||||
|
||||
|
||||
def test_each_action_skipped_if_keeps_previous_output():
|
||||
yaml_str = """
|
||||
schema: crewai.flow/v1
|
||||
name: EachIfFlow
|
||||
methods:
|
||||
process_rows:
|
||||
do:
|
||||
call: each
|
||||
in: state.rows
|
||||
do:
|
||||
- name: original
|
||||
action:
|
||||
call: expression
|
||||
expr: item.value
|
||||
- name: maybe_included
|
||||
if: item.include
|
||||
action:
|
||||
call: expression
|
||||
expr: "'included:' + item.value"
|
||||
start: true
|
||||
"""
|
||||
|
||||
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
|
||||
|
||||
assert flow.kickoff(
|
||||
inputs={
|
||||
"rows": [
|
||||
{"include": True, "value": "a"},
|
||||
{"include": False, "value": "b"},
|
||||
]
|
||||
}
|
||||
) == ["included:a", "b"]
|
||||
|
||||
|
||||
def test_each_action_if_condition_must_be_boolean():
|
||||
yaml_str = """
|
||||
schema: crewai.flow/v1
|
||||
name: EachIfFlow
|
||||
methods:
|
||||
process_rows:
|
||||
do:
|
||||
call: each
|
||||
in: state.rows
|
||||
do:
|
||||
- name: value
|
||||
if: item.value
|
||||
action:
|
||||
call: expression
|
||||
expr: item.value
|
||||
start: true
|
||||
"""
|
||||
|
||||
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
|
||||
|
||||
with pytest.raises(ValueError, match="if expression must evaluate to a boolean"):
|
||||
flow.kickoff(inputs={"rows": [{"value": "truthy"}]})
|
||||
|
||||
|
||||
def test_each_action_empty_list_returns_empty_and_listener_runs_once():
|
||||
yaml_str = f"""
|
||||
schema: crewai.flow/v1
|
||||
@@ -1366,7 +1475,8 @@ methods:
|
||||
call: each
|
||||
in: state.rows
|
||||
do:
|
||||
- normalize:
|
||||
- name: normalize
|
||||
action:
|
||||
call: code
|
||||
ref: {__name__}:EachActionFlow.normalize_row
|
||||
with:
|
||||
@@ -1415,7 +1525,12 @@ def test_each_action_rejects_non_list_inputs(expr, inputs):
|
||||
"do": {
|
||||
"call": "each",
|
||||
"in": expr,
|
||||
"do": [{"value": {"call": "expression", "expr": "item"}}],
|
||||
"do": [
|
||||
{
|
||||
"name": "value",
|
||||
"action": {"call": "expression", "expr": "item"},
|
||||
}
|
||||
],
|
||||
},
|
||||
}
|
||||
},
|
||||
@@ -1431,15 +1546,25 @@ def test_each_action_rejects_non_list_inputs(expr, inputs):
|
||||
"action_do",
|
||||
[
|
||||
[],
|
||||
[{"first": {"call": "expression", "expr": "item"}, "second": {"call": "expression", "expr": "item"}}],
|
||||
[{"1bad": {"call": "expression", "expr": "item"}}],
|
||||
[{"value": {"call": "expression", "expr": "item"}}],
|
||||
[{"name": "1bad", "action": {"call": "expression", "expr": "item"}}],
|
||||
[{"name": "missing_action"}],
|
||||
[{"action": {"call": "expression", "expr": "item"}}],
|
||||
[
|
||||
{"same": {"call": "expression", "expr": "item"}},
|
||||
{"same": {"call": "expression", "expr": "item"}},
|
||||
{
|
||||
"name": "value",
|
||||
"if": "true",
|
||||
"then": [],
|
||||
"action": {"call": "expression", "expr": "item"},
|
||||
}
|
||||
],
|
||||
[
|
||||
{"name": "same", "action": {"call": "expression", "expr": "item"}},
|
||||
{"name": "same", "action": {"call": "expression", "expr": "item"}},
|
||||
],
|
||||
],
|
||||
)
|
||||
def test_each_action_validates_inner_action_shape(action_do):
|
||||
def test_each_action_validates_step_shape(action_do):
|
||||
with pytest.raises(ValidationError):
|
||||
FlowDefinition.from_dict(
|
||||
{
|
||||
@@ -1459,6 +1584,26 @@ def test_each_action_validates_inner_action_shape(action_do):
|
||||
)
|
||||
|
||||
|
||||
def test_if_clauses_are_rejected_at_method_level():
|
||||
with pytest.raises(ValidationError):
|
||||
FlowDefinition.from_dict(
|
||||
{
|
||||
"schema": "crewai.flow/v1",
|
||||
"name": "TopLevelIfFlow",
|
||||
"methods": {
|
||||
"process": {
|
||||
"start": True,
|
||||
"do": {
|
||||
"call": "expression",
|
||||
"if": "true",
|
||||
"expr": "'ok'",
|
||||
},
|
||||
}
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def test_each_action_rejects_nested_each_actions():
|
||||
with pytest.raises(ValidationError):
|
||||
FlowDefinition.from_dict(
|
||||
@@ -1473,12 +1618,14 @@ def test_each_action_rejects_nested_each_actions():
|
||||
"in": "state.rows",
|
||||
"do": [
|
||||
{
|
||||
"nested": {
|
||||
"name": "nested",
|
||||
"action": {
|
||||
"call": "each",
|
||||
"in": "state.children",
|
||||
"do": [
|
||||
{
|
||||
"child": {
|
||||
"name": "child",
|
||||
"action": {
|
||||
"call": "expression",
|
||||
"expr": "item",
|
||||
}
|
||||
@@ -1504,7 +1651,8 @@ methods:
|
||||
call: each
|
||||
in: state.rows
|
||||
do:
|
||||
- validate:
|
||||
- name: validate
|
||||
action:
|
||||
call: code
|
||||
ref: {__name__}:EachActionFlow.fail_on_bad_row
|
||||
with:
|
||||
|
||||
Reference in New Issue
Block a user