Use explicit name/action shape for each.do steps

This commit is contained in:
Vinicius Brasil
2026-06-17 21:36:03 -07:00
parent 218dc82bf7
commit 4f8d5cf7cb
4 changed files with 108 additions and 78 deletions

View File

@@ -18,7 +18,6 @@ from pydantic import (
BaseModel,
ConfigDict,
Field,
RootModel,
field_serializer,
model_validator,
)
@@ -38,6 +37,7 @@ _STEP_NAME_PATTERN = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
__all__ = [
"FlowActionDefinition",
"FlowAtomicActionDefinition",
"FlowCodeActionDefinition",
"FlowConfigDefinition",
"FlowConversationalDefinition",
@@ -47,7 +47,7 @@ __all__ = [
"FlowDefinitionCondition",
"FlowDictStateDefinition",
"FlowEachActionDefinition",
"FlowEachInnerActionDefinition",
"FlowEachStepDefinition",
"FlowExpressionActionDefinition",
"FlowHumanFeedbackDefinition",
"FlowJsonSchemaStateDefinition",
@@ -466,38 +466,35 @@ class FlowScriptActionDefinition(BaseModel):
)
FlowInnerActionDefinition = (
FlowAtomicActionDefinition: TypeAlias = Annotated[
FlowCodeActionDefinition
| FlowToolActionDefinition
| FlowCrewActionDefinition
| FlowExpressionActionDefinition
| FlowScriptActionDefinition
)
| FlowScriptActionDefinition,
Field(discriminator="call"),
]
class FlowEachInnerActionDefinition(RootModel[dict[str, FlowInnerActionDefinition]]):
"""One named action inside an ``each`` composite action."""
class FlowEachStepDefinition(BaseModel):
"""One named step inside an ``each`` composite action."""
root: dict[str, FlowInnerActionDefinition] = Field(
description="Single-entry mapping from an inner action name to its action.",
examples=[{"clean": {"call": "script", "code": "return item.strip()"}}],
model_config = ConfigDict(extra="forbid")
name: str = Field(
description="Step name used to reference this step's output.",
examples=["clean"],
)
action: FlowAtomicActionDefinition = Field(
description="Atomic action to run for this step.",
examples=[{"call": "script", "code": "return item.strip()"}],
)
@model_validator(mode="after")
def _validate_action_mapping(self) -> FlowEachInnerActionDefinition:
if len(self.root) != 1:
raise ValueError("each.do entries must be one-key mappings")
_validate_step_name(self.name, field="each.do action names")
def _validate_step_name(self) -> FlowEachStepDefinition:
_validate_step_name(self.name, field="each.do step names")
return self
@property
def name(self) -> str:
return next(iter(self.root))
@property
def action(self) -> FlowInnerActionDefinition:
return next(iter(self.root.values()))
class FlowEachActionDefinition(BaseModel):
"""A composite action that runs a sequential mini-pipeline for each item."""
@@ -519,35 +516,41 @@ class FlowEachActionDefinition(BaseModel):
description="CEL expression that must evaluate to the list to iterate.",
examples=["state.rows"],
)
do: list[FlowEachInnerActionDefinition] = Field(
do: list[FlowEachStepDefinition] = Field(
description=(
"Ordered inner actions to run for each item. Each entry must be a "
"single-key mapping naming that inner action."
"Ordered steps to run for each item. Each step has a name and an "
"atomic action."
),
examples=[
[
{"clean": {"call": "script", "code": "return item.strip()"}},
{"tag": {"call": "expression", "expr": "outputs.clean"}},
{
"name": "clean",
"action": {"call": "script", "code": "return item.strip()"},
},
{
"name": "tag",
"action": {"call": "expression", "expr": "outputs.clean"},
},
]
],
)
@model_validator(mode="after")
def _validate_inner_action_list(self) -> FlowEachActionDefinition:
def _validate_step_list(self) -> FlowEachActionDefinition:
if not self.do:
raise ValueError("each.do must contain at least one action")
raise ValueError("each.do must contain at least one step")
seen: set[str] = set()
for inner_action in self.do:
name = inner_action.name
for step in self.do:
name = step.name
if name in seen:
raise ValueError(f"each.do action names must be unique: {name!r}")
raise ValueError(f"each.do step names must be unique: {name!r}")
seen.add(name)
return self
FlowActionDefinition = (
FlowActionDefinition: TypeAlias = (
FlowCodeActionDefinition
| FlowToolActionDefinition
| FlowCrewActionDefinition

View File

@@ -15,7 +15,7 @@ from crewai.flow.flow_definition import (
FlowCodeActionDefinition,
FlowCrewActionDefinition,
FlowEachActionDefinition,
FlowEachInnerActionDefinition,
FlowEachStepDefinition,
FlowExpressionActionDefinition,
FlowScriptActionDefinition,
FlowToolActionDefinition,
@@ -217,9 +217,8 @@ class EachAction:
def __init__(self, flow: Flow[Any], definition: FlowEachActionDefinition) -> None:
self.flow = flow
self.definition = definition
self.inner_actions = [
(inner_action.name, self._build_inner_action(inner_action))
for inner_action in definition.do
self.steps = [
(step.name, self._build_step_action(step)) for step in definition.do
]
async def run(self, *_args: Any, **_kwargs: Any) -> list[Any]:
@@ -232,8 +231,8 @@ class EachAction:
for item in items:
local_outputs: dict[str, Any] = {}
last_output: Any = None
for name, run_inner_action in self.inner_actions:
last_output = await run_inner_action(
for name, run_step_action in self.steps:
last_output = await run_step_action(
{"item": item, "outputs": local_outputs}
)
local_outputs[name] = last_output
@@ -241,12 +240,12 @@ class EachAction:
return results
def _build_inner_action(
self, inner_action: FlowEachInnerActionDefinition
def _build_step_action(
self, step: FlowEachStepDefinition
) -> Callable[[LocalContext], Any]:
run_action = build_action(self.flow, inner_action.action)
run_action = build_action(self.flow, step.action)
async def run_inner_action(local_context: LocalContext) -> Any:
async def run_step_action(local_context: LocalContext) -> Any:
kwargs = {_LOCAL_CONTEXT_KWARG: local_context}
if inspect.iscoroutinefunction(run_action):
result = run_action(**kwargs)
@@ -261,7 +260,7 @@ class EachAction:
result = await result
return result
return run_inner_action
return run_step_action
_ACTION_TYPES: tuple[_ActionType, ...] = (

View File

@@ -37,6 +37,7 @@ def test_flow_public_exports_are_explicit():
}
assert set(flow_definition.__all__) == {
"FlowActionDefinition",
"FlowAtomicActionDefinition",
"FlowCodeActionDefinition",
"FlowConfigDefinition",
"FlowConversationalDefinition",
@@ -46,7 +47,7 @@ def test_flow_public_exports_are_explicit():
"FlowDefinitionCondition",
"FlowDictStateDefinition",
"FlowEachActionDefinition",
"FlowEachInnerActionDefinition",
"FlowEachStepDefinition",
"FlowExpressionActionDefinition",
"FlowHumanFeedbackDefinition",
"FlowJsonSchemaStateDefinition",
@@ -107,7 +108,7 @@ def test_flow_definition_json_schema_carries_reference_descriptions():
each_properties = defs["FlowEachActionDefinition"]["properties"]
assert "list to iterate" in each_properties["in"]["description"]
assert "Ordered inner actions" in each_properties["do"]["description"]
assert "Ordered steps" in each_properties["do"]["description"]
def test_flow_definition_json_schema_carries_field_examples_only():
@@ -122,6 +123,7 @@ def test_flow_definition_json_schema_carries_field_examples_only():
"FlowExpressionActionDefinition",
"FlowScriptActionDefinition",
"FlowEachActionDefinition",
"FlowEachStepDefinition",
"FlowMethodDefinition",
"FlowDictStateDefinition",
"FlowJsonSchemaStateDefinition",
@@ -154,7 +156,8 @@ def test_flow_definition_json_schema_carries_field_examples_only():
each_properties = defs["FlowEachActionDefinition"]["properties"]
assert each_properties["in"]["examples"] == ["state.rows"]
assert each_properties["do"]["examples"][0][0]["clean"]["call"] == "script"
assert each_properties["do"]["examples"][0][0]["name"] == "clean"
assert each_properties["do"]["examples"][0][0]["action"]["call"] == "script"
method_properties = defs["FlowMethodDefinition"]["properties"]
assert method_properties["listen"]["examples"] == [
@@ -584,14 +587,16 @@ def test_each_action_round_trips_json_and_yaml():
"in": "state.rows",
"do": [
{
"normalize": {
"name": "normalize",
"action": {
"call": "tool",
"ref": "my_tools:NormalizeRowTool",
"with": {"row": "${ item }"},
}
},
{
"save": {
"name": "save",
"action": {
"call": "code",
"ref": "my_flow:save_row",
"with": {

View File

@@ -114,7 +114,7 @@ class EachActionFlow(Flow):
except RuntimeError:
pass
else:
raise RuntimeError("inner action ran on the event loop")
raise RuntimeError("each step ran on the event loop")
from crewai.flow.flow_context import current_flow_method_name
@@ -1081,7 +1081,8 @@ methods:
call: each
in: state.rows
do:
- normalize:
- name: normalize
action:
call: code
ref: {__name__}:EachActionFlow.normalize_row
with:
@@ -1097,7 +1098,7 @@ methods:
]
def test_each_action_runs_sync_inner_actions_off_event_loop_with_context():
def test_each_action_runs_sync_steps_off_event_loop_with_context():
yaml_str = f"""
schema: crewai.flow/v1
name: EachFlow
@@ -1107,7 +1108,8 @@ methods:
call: each
in: state.rows
do:
- threaded:
- name: threaded
action:
call: code
ref: {__name__}:EachActionFlow.require_threaded_context
with:
@@ -1123,7 +1125,7 @@ methods:
assert flow.inner_thread_id != caller_thread_id
def test_each_action_runs_async_tool_results_from_sync_inner_actions():
def test_each_action_runs_async_tool_results_from_sync_steps():
yaml_str = f"""
schema: crewai.flow/v1
name: EachFlow
@@ -1133,7 +1135,8 @@ methods:
call: each
in: state.rows
do:
- async_tool:
- name: async_tool
action:
call: tool
ref: {__name__}:AsyncResultTool
with:
@@ -1222,7 +1225,7 @@ methods:
assert flow.state["input_matches_output"] is True
def test_script_each_action_reads_item_and_inner_outputs(
def test_script_each_action_reads_item_and_step_outputs(
monkeypatch: pytest.MonkeyPatch,
):
monkeypatch.setenv("CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION", "1")
@@ -1241,11 +1244,13 @@ methods:
call: each
in: state.rows
do:
- clean:
- name: clean
action:
call: script
code: |
return item.strip()
- tag:
- name: tag
action:
call: script
code: |
return f"{outputs['seed']}:{outputs['clean']}"
@@ -1257,7 +1262,7 @@ methods:
assert flow.kickoff(inputs={"rows": [" a ", " b "]}) == ["global:a", "global:b"]
def test_each_action_uses_iteration_outputs_between_nested_actions():
def test_each_action_uses_iteration_outputs_between_steps():
yaml_str = f"""
schema: crewai.flow/v1
name: EachFlow
@@ -1267,13 +1272,15 @@ methods:
call: each
in: state.rows
do:
- normalize:
- name: normalize
action:
call: code
ref: {__name__}:EachActionFlow.normalize_row
with:
row: "${{item}}"
prefix: saved
- save:
- name: save
action:
call: code
ref: {__name__}:EachActionFlow.save_row
with:
@@ -1290,7 +1297,7 @@ methods:
]
def test_each_action_resets_inner_outputs_between_iterations():
def test_each_action_resets_step_outputs_between_iterations():
yaml_str = """
schema: crewai.flow/v1
name: EachFlow
@@ -1300,10 +1307,12 @@ methods:
call: each
in: state.rows
do:
- leak_check:
- name: leak_check
action:
call: expression
expr: "has(outputs.previous) ? outputs.previous : 'empty'"
- previous:
- name: previous
action:
call: expression
expr: item
start: true
@@ -1317,7 +1326,7 @@ methods:
]
def test_each_action_preserves_flow_outputs_and_prefers_inner_outputs():
def test_each_action_preserves_flow_outputs_and_prefers_step_outputs():
yaml_str = """
schema: crewai.flow/v1
name: EachFlow
@@ -1332,13 +1341,16 @@ methods:
call: each
in: state.rows
do:
- before_shadow:
- name: before_shadow
action:
call: expression
expr: "outputs.seed + ':' + item"
- seed:
- name: seed
action:
call: expression
expr: "'local:' + item"
- after_shadow:
- name: after_shadow
action:
call: expression
expr: "outputs.seed"
listen: seed
@@ -1366,7 +1378,8 @@ methods:
call: each
in: state.rows
do:
- normalize:
- name: normalize
action:
call: code
ref: {__name__}:EachActionFlow.normalize_row
with:
@@ -1415,7 +1428,12 @@ def test_each_action_rejects_non_list_inputs(expr, inputs):
"do": {
"call": "each",
"in": expr,
"do": [{"value": {"call": "expression", "expr": "item"}}],
"do": [
{
"name": "value",
"action": {"call": "expression", "expr": "item"},
}
],
},
}
},
@@ -1431,15 +1449,17 @@ def test_each_action_rejects_non_list_inputs(expr, inputs):
"action_do",
[
[],
[{"first": {"call": "expression", "expr": "item"}, "second": {"call": "expression", "expr": "item"}}],
[{"1bad": {"call": "expression", "expr": "item"}}],
[{"value": {"call": "expression", "expr": "item"}}],
[{"name": "1bad", "action": {"call": "expression", "expr": "item"}}],
[{"name": "missing_action"}],
[{"action": {"call": "expression", "expr": "item"}}],
[
{"same": {"call": "expression", "expr": "item"}},
{"same": {"call": "expression", "expr": "item"}},
{"name": "same", "action": {"call": "expression", "expr": "item"}},
{"name": "same", "action": {"call": "expression", "expr": "item"}},
],
],
)
def test_each_action_validates_inner_action_shape(action_do):
def test_each_action_validates_step_shape(action_do):
with pytest.raises(ValidationError):
FlowDefinition.from_dict(
{
@@ -1473,12 +1493,14 @@ def test_each_action_rejects_nested_each_actions():
"in": "state.rows",
"do": [
{
"nested": {
"name": "nested",
"action": {
"call": "each",
"in": "state.children",
"do": [
{
"child": {
"name": "child",
"action": {
"call": "expression",
"expr": "item",
}
@@ -1504,7 +1526,8 @@ methods:
call: each
in: state.rows
do:
- validate:
- name: validate
action:
call: code
ref: {__name__}:EachActionFlow.fail_on_bad_row
with: