Compare commits

...

5 Commits

Author SHA1 Message Date
Vinicius Brasil
2105a0ebf8 Add optional if expression to each.do steps
Lets a step inside an `each` action run conditionally based on a CEL
expression evaluated against `item` and prior step `outputs`.
2026-06-17 22:10:01 -07:00
Vinicius Brasil
4f8d5cf7cb Use explicit name/action shape for each.do steps 2026-06-17 21:36:03 -07:00
Vinicius Brasil
218dc82bf7 Replace flow diagnostics with logging (#6212)
This commit removes flow diagnostics from the definition. These were
used for logging only, and should not be coupled to the definition.
2026-06-17 19:37:52 -07:00
Vinicius Brasil
7374486f00 Document FlowDefinition fields in the JSON schema (#6198)
Add a description and examples to every FlowDefinition field and
standardize on `typing.Literal`, so the generated JSON schema documents
itself — each action discriminator, state branch, and config option
explains what it is and shows a realistic value.

Examples live on individual fields only, never at the model level, which
keeps the schema readable for tooling that renders field-level help.

Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-17 18:49:01 -07:00
Vinicius Brasil
5bd10ee2c4 Add script/code block action to FlowDefinition (#6197)
* Add script/code blocks to FlowDefinition

Let a Flow method run trusted inline Python with `call: script`. The code
is compiled once into a generated function and receives the runtime
values as arguments.

```yaml
methods:
  normalize:
    start: true
    do:
      call: script
      code: |
        import math
        state["rounded"] = math.ceil(state["raw_score"])
        return f"rounded:{state['rounded']}"
```

Even though this shares the same surface of tools (custom code), I
decided to make it opt-in for now, using
`CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION=1`.

* Address code review comments
2026-06-17 18:38:41 -07:00
9 changed files with 1063 additions and 370 deletions

View File

@@ -14,7 +14,6 @@ from crewai.flow.flow_definition import (
FlowConversationalDefinition,
FlowConversationalRouterDefinition,
FlowDefinition,
FlowDefinitionDiagnostic,
FlowDictStateDefinition,
FlowHumanFeedbackDefinition,
FlowMethodDefinition,
@@ -23,6 +22,7 @@ from crewai.flow.flow_definition import (
FlowStateDefinition,
FlowUnknownStateDefinition,
_object_ref,
log_flow_definition_issues,
)
from crewai.flow.flow_wrappers import (
FlowMethod,
@@ -116,7 +116,6 @@ def _is_json_serializable(value: Any) -> bool:
def _serialize_static_value(
value: Any,
diagnostics: list[FlowDefinitionDiagnostic],
path: str,
) -> Any:
if value is None or _is_json_serializable(value):
@@ -148,12 +147,11 @@ def _serialize_static_value(
)
ref = _object_ref(value)
diagnostics.append(
FlowDefinitionDiagnostic(
code="non_serializable_value",
path=path,
message=f"value is not fully serializable; preserved import reference {ref}",
)
logger.warning(
"Flow definition value at %s is not fully serializable; "
"preserved import reference %s.",
path,
ref,
)
return {"ref": ref}
@@ -169,10 +167,7 @@ def _state_ref(value: Any) -> str | None:
return None
def _build_state_definition(
flow_class: type,
diagnostics: list[FlowDefinitionDiagnostic],
) -> FlowStateDefinition | None:
def _build_state_definition(flow_class: type) -> FlowStateDefinition | None:
from pydantic import BaseModel as PydanticBaseModel
state_value = getattr(flow_class, "_initial_state_t", None)
@@ -187,29 +182,23 @@ def _build_state_definition(
if state_value is dict or isinstance(state_value, dict):
default = None
if isinstance(state_value, dict):
default = _serialize_static_value(state_value, diagnostics, "state.default")
default = _serialize_static_value(state_value, "state.default")
return FlowDictStateDefinition(default=default)
if isinstance(state_value, type) and issubclass(state_value, PydanticBaseModel):
return FlowPydanticStateDefinition(ref=_state_ref(state_value))
if isinstance(state_value, PydanticBaseModel):
return FlowPydanticStateDefinition(
ref=_state_ref(state_value),
default=_serialize_static_value(state_value, diagnostics, "state.default"),
)
diagnostics.append(
FlowDefinitionDiagnostic(
code="unknown_state_type",
path="state",
message=f"could not serialize state type {_object_ref(state_value)}",
default=_serialize_static_value(state_value, "state.default"),
)
logger.warning(
"Flow definition state could not serialize state type %s.",
_object_ref(state_value),
)
return FlowUnknownStateDefinition(ref=_state_ref(state_value))
def _build_config_definition(
flow_class: type,
diagnostics: list[FlowDefinitionDiagnostic],
) -> FlowConfigDefinition:
def _build_config_definition(flow_class: type) -> FlowConfigDefinition:
config_field_names = set(FlowConfigDefinition.model_fields)
field_defaults = {
name: field.get_default(call_default_factory=True)
@@ -225,15 +214,12 @@ def _build_config_definition(
value if value is None or isinstance(value, str) else _object_ref(value)
)
else:
values[field_name] = _serialize_static_value(
value, diagnostics, f"config.{field_name}"
)
values[field_name] = _serialize_static_value(value, f"config.{field_name}")
return FlowConfigDefinition(**values)
def _build_human_feedback_definition(
method: Any,
diagnostics: list[FlowDefinitionDiagnostic],
path: str,
) -> FlowHumanFeedbackDefinition | None:
config = getattr(method, "__human_feedback_config__", None)
@@ -248,7 +234,7 @@ def _build_human_feedback_definition(
llm=getattr(config, "llm", None),
default_outcome=getattr(config, "default_outcome", None),
metadata=_serialize_static_value(
getattr(config, "metadata", None), diagnostics, f"{path}.metadata"
getattr(config, "metadata", None), f"{path}.metadata"
),
provider=getattr(config, "provider", None),
learn=bool(getattr(config, "learn", False)),
@@ -273,7 +259,6 @@ def _build_persistence_definition(value: Any) -> FlowPersistenceDefinition | Non
def _build_conversational_router_definition(
router_config: Any,
diagnostics: list[FlowDefinitionDiagnostic],
path: str,
) -> FlowConversationalRouterDefinition | None:
if router_config is None:
@@ -284,12 +269,9 @@ def _build_conversational_router_definition(
prompt=getattr(router_config, "prompt", None),
response_format=_serialize_static_value(
getattr(router_config, "response_format", None),
diagnostics,
f"{path}.response_format",
),
llm=_serialize_static_value(
getattr(router_config, "llm", None), diagnostics, f"{path}.llm"
),
llm=_serialize_static_value(getattr(router_config, "llm", None), f"{path}.llm"),
routes=[str(route) for route in routes] if routes is not None else None,
route_descriptions=getattr(router_config, "route_descriptions", None),
default_intent=getattr(router_config, "default_intent", "converse"),
@@ -300,7 +282,6 @@ def _build_conversational_router_definition(
def _build_conversational_definition(
flow_class: type,
diagnostics: list[FlowDefinitionDiagnostic],
) -> FlowConversationalDefinition | None:
if not _is_conversational_flow(flow_class):
return None
@@ -324,12 +305,9 @@ def _build_conversational_definition(
return FlowConversationalDefinition(
enabled=True,
system_prompt=getattr(config, "system_prompt", None),
llm=_serialize_static_value(
getattr(config, "llm", None), diagnostics, "conversational.llm"
),
llm=_serialize_static_value(getattr(config, "llm", None), "conversational.llm"),
router=_build_conversational_router_definition(
getattr(config, "router", None),
diagnostics,
"conversational.router",
),
answer_from_history_prompt=getattr(config, "answer_from_history_prompt", None),
@@ -340,12 +318,10 @@ def _build_conversational_definition(
),
intent_llm=_serialize_static_value(
getattr(config, "intent_llm", None),
diagnostics,
"conversational.intent_llm",
),
answer_from_history_llm=_serialize_static_value(
getattr(config, "answer_from_history_llm", None),
diagnostics,
"conversational.answer_from_history_llm",
),
visible_agent_outputs=(
@@ -365,7 +341,6 @@ def _build_conversational_definition(
def _build_method_definition(
method: Any,
diagnostics: list[FlowDefinitionDiagnostic],
path: str,
) -> FlowMethodDefinition:
fragment = _get_flow_method_definition(method)
@@ -376,9 +351,7 @@ def _build_method_definition(
deep=True, update={"do": _method_action(method)}
)
human_feedback = _build_human_feedback_definition(
method, diagnostics, f"{path}.human_feedback"
)
human_feedback = _build_human_feedback_definition(method, f"{path}.human_feedback")
if human_feedback is not None:
method_definition.human_feedback = human_feedback
if human_feedback.emit:
@@ -444,7 +417,6 @@ def _build_flow_definition_from_class(
flow_class: type,
namespace: dict[str, Any] | None = None,
) -> FlowDefinition:
diagnostics: list[FlowDefinitionDiagnostic] = []
methods: dict[str, FlowMethodDefinition] = {}
flow_methods = _iter_flow_methods(flow_class)
if namespace is not None:
@@ -456,7 +428,7 @@ def _build_flow_definition_from_class(
for method_name, method in flow_methods.items():
methods[method_name] = _build_method_definition(
method, diagnostics, f"methods.{method_name}"
method, f"methods.{method_name}"
)
description = None
@@ -467,15 +439,13 @@ def _build_flow_definition_from_class(
definition = FlowDefinition(
name=getattr(flow_class, "__name__", "Flow"),
description=description,
state=_build_state_definition(flow_class, diagnostics),
config=_build_config_definition(flow_class, diagnostics),
state=_build_state_definition(flow_class),
config=_build_config_definition(flow_class),
persist=_build_persistence_definition(flow_class),
conversational=_build_conversational_definition(flow_class, diagnostics),
conversational=_build_conversational_definition(flow_class),
methods=methods,
diagnostics=diagnostics,
)
definition.diagnostics.extend(definition.validate_contract())
definition.log_diagnostics()
log_flow_definition_issues(definition)
return definition

View File

@@ -12,13 +12,12 @@ from __future__ import annotations
import json
import logging
import re
from typing import Annotated, Any, Literal as TypingLiteral, TypeAlias
from typing import Annotated, Any, Literal, TypeAlias
from pydantic import (
BaseModel,
ConfigDict,
Field,
RootModel,
field_serializer,
model_validator,
)
@@ -38,6 +37,7 @@ _STEP_NAME_PATTERN = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
__all__ = [
"FlowActionDefinition",
"FlowAtomicActionDefinition",
"FlowCodeActionDefinition",
"FlowConfigDefinition",
"FlowConversationalDefinition",
@@ -45,16 +45,16 @@ __all__ = [
"FlowCrewActionDefinition",
"FlowDefinition",
"FlowDefinitionCondition",
"FlowDefinitionDiagnostic",
"FlowDictStateDefinition",
"FlowEachActionDefinition",
"FlowEachInnerActionDefinition",
"FlowEachStepDefinition",
"FlowExpressionActionDefinition",
"FlowHumanFeedbackDefinition",
"FlowJsonSchemaStateDefinition",
"FlowMethodDefinition",
"FlowPersistenceDefinition",
"FlowPydanticStateDefinition",
"FlowScriptActionDefinition",
"FlowStateDefinition",
"FlowToolActionDefinition",
"FlowUnknownStateDefinition",
@@ -69,21 +69,12 @@ def _object_ref(value: Any) -> str:
return f"{module}:{qualname}" if module and qualname else repr(value)
class FlowDefinitionDiagnostic(BaseModel):
"""A non-fatal Flow Definition build or validation diagnostic."""
code: str
message: str
severity: TypingLiteral["warning", "error"] = "warning"
path: str | None = None
class FlowDictStateDefinition(BaseModel):
"""Static description of a plain dictionary Flow state contract."""
model_config = ConfigDict(extra="forbid")
type: TypingLiteral["dict"] = Field(
type: Literal["dict"] = Field(
default="dict",
description="Plain dictionary state with optional default values.",
examples=["dict"],
@@ -100,7 +91,7 @@ class FlowPydanticStateDefinition(BaseModel):
model_config = ConfigDict(extra="forbid")
type: TypingLiteral["pydantic"] = Field(
type: Literal["pydantic"] = Field(
default="pydantic",
description="Importable Pydantic model used as the Flow state type.",
examples=["pydantic"],
@@ -135,7 +126,7 @@ class FlowJsonSchemaStateDefinition(BaseModel):
model_config = ConfigDict(extra="forbid")
type: TypingLiteral["json_schema"] = Field(
type: Literal["json_schema"] = Field(
default="json_schema",
description="Inline JSON Schema used as the Flow state contract.",
examples=["json_schema"],
@@ -162,7 +153,7 @@ class FlowUnknownStateDefinition(BaseModel):
model_config = ConfigDict(extra="forbid")
type: TypingLiteral["unknown"] = Field(
type: Literal["unknown"] = Field(
default="unknown",
description="Unknown state representation; runtime falls back to dictionary state.",
examples=["unknown"],
@@ -191,14 +182,46 @@ FlowStateDefinition: TypeAlias = Annotated[
class FlowConfigDefinition(BaseModel):
"""Serializable Flow-level configuration."""
tracing: bool | None = None
stream: bool = False
memory: dict[str, Any] | None = None
input_provider: str | None = None
suppress_flow_events: bool = False
max_method_calls: int = 100
defer_trace_finalization: bool = False
checkpoint: bool | dict[str, Any] | None = None
tracing: bool | None = Field(
default=None,
description="Override for flow tracing; when omitted, runtime defaults apply.",
examples=[True],
)
stream: bool = Field(
default=False,
description="Whether the flow should emit streaming events when supported.",
examples=[True],
)
memory: dict[str, Any] | None = Field(
default=None,
description="Serializable memory configuration passed to flow execution.",
examples=[{"enabled": True}],
)
input_provider: str | None = Field(
default=None,
description="Import reference or provider key used to supply flow inputs.",
examples=["my_project.inputs:load_inputs"],
)
suppress_flow_events: bool = Field(
default=False,
description="Disable flow event emission for this definition.",
examples=[False],
)
max_method_calls: int = Field(
default=100,
description="Maximum number of method executions allowed during one kickoff.",
examples=[20],
)
defer_trace_finalization: bool = Field(
default=False,
description="Defer trace finalization so callers can complete tracing later.",
examples=[False],
)
checkpoint: bool | dict[str, Any] | None = Field(
default=None,
description="Checkpointing configuration, or true to use default checkpointing.",
examples=[True, {"enabled": True}],
)
class FlowPersistenceDefinition(BaseModel):
@@ -210,9 +233,21 @@ class FlowPersistenceDefinition(BaseModel):
serialized config.
"""
enabled: bool = False
verbose: bool = False
persistence: Any = None
enabled: bool = Field(
default=False,
description="Whether persistence is enabled for this flow or method.",
examples=[True],
)
verbose: bool = Field(
default=False,
description="Whether persistence should emit verbose diagnostic output.",
examples=[False],
)
persistence: Any = Field(
default=None,
description="Persistence backend configuration or import reference.",
examples=[{"ref": "my_project.persistence:FlowStore"}],
)
@field_serializer("persistence", when_used="json")
def _serialize_persistence(self, value: Any) -> Any:
@@ -238,15 +273,53 @@ class FlowHumanFeedbackDefinition(BaseModel):
a serialized config (``llm``) or a ``module:qualname`` ref (``provider``).
"""
message: str
emit: list[str] | None = None
llm: Any = "gpt-4o-mini"
default_outcome: str | None = None
metadata: dict[str, Any] | None = None
provider: Any = None
learn: bool = False
learn_source: str = "hitl"
learn_strict: bool = False
message: str = Field(
description="Prompt shown to the human reviewer when feedback is requested.",
examples=["Review the research summary before publishing."],
)
emit: list[str] | None = Field(
default=None,
description=(
"Allowed feedback outcomes. When set, the method routes like a router "
"using the selected outcome."
),
examples=[["approved", "revise"]],
)
llm: Any = Field(
default="gpt-4o-mini",
description="LLM configuration used to assist or process human feedback.",
examples=["gpt-4o-mini"],
)
default_outcome: str | None = Field(
default=None,
description="Outcome to use when feedback cannot be collected.",
examples=["revise"],
)
metadata: dict[str, Any] | None = Field(
default=None,
description="Serializable metadata attached to the feedback request.",
examples=[{"team": "research"}],
)
provider: Any = Field(
default=None,
description="Feedback provider configuration or import reference.",
examples=["my_project.feedback:provider"],
)
learn: bool = Field(
default=False,
description="Whether feedback should be recorded for later learning workflows.",
examples=[True],
)
learn_source: str = Field(
default="hitl",
description="Source label attached to learned feedback records.",
examples=["hitl"],
)
learn_strict: bool = Field(
default=False,
description="Whether learning should enforce strict validation of feedback data.",
examples=[False],
)
@field_serializer("llm", when_used="json")
def _serialize_llm(self, value: Any) -> dict[str, Any] | str | None:
@@ -266,30 +339,89 @@ class FlowHumanFeedbackDefinition(BaseModel):
class FlowCodeActionDefinition(BaseModel):
"""A Flow method action that executes importable Python code."""
model_config = ConfigDict(populate_by_name=True, extra="forbid")
model_config = ConfigDict(
populate_by_name=True,
extra="forbid",
)
call: TypingLiteral["code"] = "code"
ref: str
with_: dict[str, Any] | None = Field(default=None, alias="with")
call: Literal["code"] = Field(
default="code",
description="Action discriminator. Use code to call importable Python.",
examples=["code"],
)
ref: str = Field(
description="Import reference for the callable, formatted as module:qualname.",
examples=["my_project.flows:normalize_topic"],
)
with_: dict[str, Any] | None = Field(
default=None,
alias="with",
description="Keyword arguments passed to the callable after expression rendering.",
examples=[{"topic": "${state.topic}"}],
)
class FlowToolActionDefinition(BaseModel):
"""A Flow method action that invokes a CrewAI tool."""
model_config = ConfigDict(populate_by_name=True, extra="forbid")
model_config = ConfigDict(
populate_by_name=True,
extra="forbid",
)
call: TypingLiteral["tool"]
ref: str
with_: dict[str, Any] | None = Field(default=None, alias="with")
call: Literal["tool"] = Field(
description="Action discriminator. Use tool to instantiate and run a CrewAI tool.",
examples=["tool"],
)
ref: str = Field(
description="Import reference for a BaseTool class, formatted as module:qualname.",
examples=["my_project.tools:SearchTool"],
)
with_: dict[str, Any] | None = Field(
default=None,
alias="with",
description="Tool input arguments after expression rendering.",
examples=[{"query": "${outputs.normalize_topic}", "limit": 5}],
)
class FlowCrewActionDefinition(BaseModel):
"""A Flow method action that builds and kicks off a CrewAI crew."""
model_config = ConfigDict(populate_by_name=True, extra="forbid")
model_config = ConfigDict(
populate_by_name=True,
extra="forbid",
)
call: TypingLiteral["crew"]
with_: CrewDefinition = Field(alias="with")
call: Literal["crew"] = Field(
description="Action discriminator. Use crew to run an inline Crew definition.",
examples=["crew"],
)
with_: CrewDefinition = Field(
alias="with",
description="Inline Crew definition to load and execute for this action.",
examples=[
{
"name": "inline_research",
"agents": {
"researcher": {
"role": "Researcher",
"goal": "Research {topic}",
"backstory": "Knows the domain.",
}
},
"tasks": [
{
"name": "research_task",
"description": "Research {topic}",
"expected_output": "Findings about {topic}",
"agent": "researcher",
}
],
"inputs": {"topic": "${state.topic}"},
}
],
)
class FlowExpressionActionDefinition(BaseModel):
@@ -297,66 +429,141 @@ class FlowExpressionActionDefinition(BaseModel):
model_config = ConfigDict(extra="forbid")
call: TypingLiteral["expression"]
expr: str
call: Literal["expression"] = Field(
description="Action discriminator. Use expression to evaluate a CEL expression.",
examples=["expression"],
)
expr: str = Field(
description="CEL expression evaluated against state, outputs, and local context.",
examples=["state.topic", "outputs.normalize_topic"],
)
FlowInnerActionDefinition = (
class FlowScriptActionDefinition(BaseModel):
"""A Flow method action that executes trusted inline Python."""
model_config = ConfigDict(extra="forbid")
call: Literal["script"] = Field(
description="Action discriminator. Use script to execute trusted inline Python.",
examples=["script"],
)
code: str = Field(
description=(
"Trusted Python source executed as a generated function. Runtime values are "
"passed as state, outputs, input, and item; they are not interpolated into "
"the source. This is not sandboxed."
),
examples=[
"state['normalized_topic'] = input.strip()\n"
"return state['normalized_topic']"
],
)
language: Literal["python"] = Field(
default="python",
description="Script language. Only python is currently supported.",
examples=["python"],
)
FlowAtomicActionDefinition: TypeAlias = Annotated[
FlowCodeActionDefinition
| FlowToolActionDefinition
| FlowCrewActionDefinition
| FlowExpressionActionDefinition
)
| FlowScriptActionDefinition,
Field(discriminator="call"),
]
class FlowEachInnerActionDefinition(RootModel[dict[str, FlowInnerActionDefinition]]):
"""One named action inside an ``each`` composite action."""
class FlowEachStepDefinition(BaseModel):
"""One named step inside an ``each`` composite action."""
model_config = ConfigDict(
populate_by_name=True,
extra="forbid",
)
name: str = Field(
description="Step name used to reference this step's output.",
examples=["clean"],
)
if_: str | None = Field(
default=None,
alias="if",
description=(
"Optional CEL expression evaluated against state, outputs, and local "
"context. When present, the step runs only if the expression evaluates "
"to true."
),
examples=["item.kind == 'invoice'"],
)
action: FlowAtomicActionDefinition = Field(
description="Atomic action to run for this step.",
examples=[{"call": "script", "code": "return item.strip()"}],
)
@model_validator(mode="after")
def _validate_action_mapping(self) -> FlowEachInnerActionDefinition:
if len(self.root) != 1:
raise ValueError("each.do entries must be one-key mappings")
_validate_step_name(self.name, field="each.do action names")
def _validate_step_name(self) -> FlowEachStepDefinition:
_validate_step_name(self.name, field="each.do step names")
return self
@property
def name(self) -> str:
return next(iter(self.root))
@property
def action(self) -> FlowInnerActionDefinition:
return next(iter(self.root.values()))
class FlowEachActionDefinition(BaseModel):
"""A composite action that runs a sequential mini-pipeline for each item."""
model_config = ConfigDict(populate_by_name=True, extra="forbid")
model_config = ConfigDict(
populate_by_name=True,
extra="forbid",
)
call: TypingLiteral["each"]
in_: str = Field(alias="in")
do: list[FlowEachInnerActionDefinition]
call: Literal["each"] = Field(
description=(
"Action discriminator. Use each to run a sequence of actions for every "
"item in an input list."
),
examples=["each"],
)
in_: str = Field(
alias="in",
description="CEL expression that must evaluate to the list to iterate.",
examples=["state.rows"],
)
do: list[FlowEachStepDefinition] = Field(
description=(
"Ordered steps to run for each item. Each step has a name, optional "
"if expression, and atomic action."
),
examples=[
[
{
"name": "clean",
"action": {"call": "script", "code": "return item.strip()"},
},
{
"name": "tag",
"if": "outputs.clean != ''",
"action": {"call": "expression", "expr": "outputs.clean"},
},
]
],
)
@model_validator(mode="after")
def _validate_inner_action_list(self) -> FlowEachActionDefinition:
def _validate_step_list(self) -> FlowEachActionDefinition:
if not self.do:
raise ValueError("each.do must contain at least one action")
seen: set[str] = set()
for inner_action in self.do:
name = inner_action.name
if name in seen:
raise ValueError(f"each.do action names must be unique: {name!r}")
seen.add(name)
raise ValueError("each.do must contain at least one step")
_validate_step_list(self.do, field="each.do")
return self
FlowActionDefinition = (
FlowActionDefinition: TypeAlias = (
FlowCodeActionDefinition
| FlowToolActionDefinition
| FlowCrewActionDefinition
| FlowExpressionActionDefinition
| FlowScriptActionDefinition
| FlowEachActionDefinition
)
@@ -364,14 +571,48 @@ FlowActionDefinition = (
class FlowMethodDefinition(BaseModel):
"""Static definition of one Flow method and its execution roles."""
description: str | None = None
do: FlowActionDefinition
start: bool | FlowDefinitionCondition | None = None
listen: FlowDefinitionCondition | None = None
router: bool = False
emit: list[str] | None = None
human_feedback: FlowHumanFeedbackDefinition | None = None
persist: FlowPersistenceDefinition | None = None
description: str | None = Field(
default=None,
description="Human-readable summary of what this method does.",
examples=["Normalize the incoming topic."],
)
do: FlowActionDefinition = Field(
description="Action executed when this method runs.",
examples=[{"call": "script", "code": "return input.strip()"}],
)
start: bool | FlowDefinitionCondition | None = Field(
default=None,
description=(
"Marks a start method. True starts unconditionally; a condition starts "
"when the kickoff inputs or events satisfy it."
),
examples=[True],
)
listen: FlowDefinitionCondition | None = Field(
default=None,
description="Trigger condition that runs this method after upstream events.",
examples=["seed", {"or": ["approved", "revise"]}],
)
router: bool = Field(
default=False,
description="Whether the method output should be treated as the next event name.",
examples=[True],
)
emit: list[str] | None = Field(
default=None,
description="Declared router events this method may emit.",
examples=[["approved", "revise"]],
)
human_feedback: FlowHumanFeedbackDefinition | None = Field(
default=None,
description="Optional human feedback step applied after the method action.",
examples=[{"message": "Review the research summary before publishing."}],
)
persist: FlowPersistenceDefinition | None = Field(
default=None,
description="Method-level persistence override.",
examples=[{"enabled": True}],
)
@model_validator(mode="after")
def _canonicalize_human_feedback_routing(self) -> FlowMethodDefinition:
@@ -397,19 +638,57 @@ class FlowMethodDefinition(BaseModel):
class FlowDefinition(BaseModel):
"""Static, serializable definition of a Flow."""
model_config = ConfigDict(populate_by_name=True, arbitrary_types_allowed=True)
schema_: TypingLiteral["crewai.flow/v1"] = Field(
default="crewai.flow/v1", alias="schema"
model_config = ConfigDict(
populate_by_name=True,
arbitrary_types_allowed=True,
)
schema_: Literal["crewai.flow/v1"] = Field(
default="crewai.flow/v1",
alias="schema",
description="Flow Definition schema identifier and version.",
examples=["crewai.flow/v1"],
)
name: str = Field(
description="Unique flow name used in logs, events, and traces.",
examples=["ResearchFlow"],
)
description: str | None = Field(
default=None,
description="Human-readable summary of the flow.",
examples=["Normalize a topic and prepare it for research."],
)
state: FlowStateDefinition | None = Field(
default=None,
description="State contract for kickoff inputs and runtime state.",
examples=[{"type": "dict", "default": {"topic": "AI agents"}}],
)
config: FlowConfigDefinition = Field(
default_factory=FlowConfigDefinition,
description="Serializable flow-level runtime configuration.",
examples=[{"stream": True, "max_method_calls": 20}],
)
persist: FlowPersistenceDefinition | None = Field(
default=None,
description="Flow-level persistence configuration.",
examples=[{"enabled": True}],
)
conversational: FlowConversationalDefinition | None = Field(
default=None,
description="Conversational flow configuration, when the flow supports chat.",
)
methods: dict[str, FlowMethodDefinition] = Field(
default_factory=dict,
description="Mapping of method names to method definitions.",
examples=[
{
"seed": {
"start": True,
"do": {"call": "expression", "expr": "state.topic"},
}
}
],
)
name: str
description: str | None = None
state: FlowStateDefinition | None = None
config: FlowConfigDefinition = Field(default_factory=FlowConfigDefinition)
persist: FlowPersistenceDefinition | None = None
conversational: FlowConversationalDefinition | None = None
methods: dict[str, FlowMethodDefinition] = Field(default_factory=dict)
diagnostics: list[FlowDefinitionDiagnostic] = Field(default_factory=list)
@model_validator(mode="after")
def _validate_method_names(self) -> FlowDefinition:
@@ -436,13 +715,9 @@ class FlowDefinition(BaseModel):
@classmethod
def from_dict(cls, data: dict[str, Any]) -> FlowDefinition:
"""Load a definition from a dictionary and attach diagnostics."""
serialized_diagnostics = _deserialize_diagnostics(data.get("diagnostics", []))
"""Load a definition from a dictionary."""
definition = cls.model_validate(data)
definition.diagnostics = _merge_diagnostics(
serialized_diagnostics, definition.validate_contract()
)
definition.log_diagnostics()
log_flow_definition_issues(definition)
return definition
@classmethod
@@ -463,122 +738,90 @@ class FlowDefinition(BaseModel):
"""Return the JSON Schema for the Flow Definition contract."""
return cls.model_json_schema(by_alias=True)
def validate_contract(self) -> list[FlowDefinitionDiagnostic]:
"""Validate the static contract without rejecting dynamic routing."""
diagnostics: list[FlowDefinitionDiagnostic] = []
for method_name, method in self.methods.items():
path = f"methods.{method_name}"
if method.router and not method.is_start and method.listen is None:
diagnostics.append(
FlowDefinitionDiagnostic(
code="router_without_trigger",
severity="error",
path=path,
message="router: true requires either start or listen",
)
)
if method.emit and not method.router:
diagnostics.append(
FlowDefinitionDiagnostic(
code="emit_without_router",
path=f"{path}.emit",
message="emit is only used by routers to declare downstream events",
)
)
if method.human_feedback:
human_feedback_config = method.human_feedback
if human_feedback_config.emit and not human_feedback_config.llm:
diagnostics.append(
FlowDefinitionDiagnostic(
code="human_feedback_llm_required",
severity="error",
path=f"{path}.human_feedback.llm",
message="llm is required when human_feedback.emit is set",
)
)
if (
human_feedback_config.default_outcome is not None
and not human_feedback_config.emit
):
diagnostics.append(
FlowDefinitionDiagnostic(
code="human_feedback_default_requires_emit",
severity="error",
path=f"{path}.human_feedback.default_outcome",
message="default_outcome requires human_feedback.emit",
)
)
elif (
human_feedback_config.default_outcome is not None
and human_feedback_config.emit
):
if (
human_feedback_config.default_outcome
not in human_feedback_config.emit
):
diagnostics.append(
FlowDefinitionDiagnostic(
code="human_feedback_default_not_in_emit",
severity="error",
path=f"{path}.human_feedback.default_outcome",
message="default_outcome must be one of human_feedback.emit",
)
)
return diagnostics
def with_diagnostics(self) -> FlowDefinition:
"""Attach fresh diagnostics and return this definition."""
self.diagnostics = self.validate_contract()
self.log_diagnostics()
return self
def log_diagnostics(self) -> None:
"""Emit all attached diagnostics through the flow definition logger."""
_log_flow_definition_diagnostics(self.name, self.diagnostics)
def _log_flow_definition_diagnostics(
definition_name: str,
diagnostics: list[FlowDefinitionDiagnostic],
) -> None:
for diagnostic in diagnostics:
level = logging.ERROR if diagnostic.severity == "error" else logging.WARNING
path = f" at {diagnostic.path}" if diagnostic.path else ""
logger.log(
level,
"Flow definition diagnostic for %s%s [%s]: %s",
definition_name,
path,
diagnostic.code,
diagnostic.message,
)
def _deserialize_diagnostics(value: Any) -> list[FlowDefinitionDiagnostic]:
return [FlowDefinitionDiagnostic.model_validate(item) for item in value or []]
def _validate_step_name(name: str, *, field: str) -> None:
if not isinstance(name, str) or not _STEP_NAME_PATTERN.fullmatch(name):
raise ValueError(f"{field} must match {_STEP_NAME_PATTERN.pattern}")
def _merge_diagnostics(
*diagnostic_groups: list[FlowDefinitionDiagnostic],
) -> list[FlowDefinitionDiagnostic]:
diagnostics: list[FlowDefinitionDiagnostic] = []
seen: set[tuple[str, str, str | None, str]] = set()
for group in diagnostic_groups:
for diagnostic in group:
key = (
diagnostic.code,
diagnostic.severity,
diagnostic.path,
diagnostic.message,
def _validate_step_list(steps: list[FlowEachStepDefinition], *, field: str) -> None:
seen: set[str] = set()
for step in steps:
name = step.name
if name in seen:
raise ValueError(f"{field} step names must be unique: {name!r}")
seen.add(name)
def log_flow_definition_issues(definition: FlowDefinition) -> None:
for method_name, method in definition.methods.items():
path = f"methods.{method_name}"
if method.router and not method.is_start and method.listen is None:
_log_flow_definition_issue(
definition.name,
code="router_without_trigger",
severity="error",
path=path,
message="router: true requires either start or listen",
)
if key in seen:
continue
seen.add(key)
diagnostics.append(diagnostic)
return diagnostics
if method.emit and not method.router:
_log_flow_definition_issue(
definition.name,
code="emit_without_router",
path=f"{path}.emit",
message="emit is only used by routers to declare downstream events",
)
if method.human_feedback:
human_feedback_config = method.human_feedback
if human_feedback_config.emit and not human_feedback_config.llm:
_log_flow_definition_issue(
definition.name,
code="human_feedback_llm_required",
severity="error",
path=f"{path}.human_feedback.llm",
message="llm is required when human_feedback.emit is set",
)
if (
human_feedback_config.default_outcome is not None
and not human_feedback_config.emit
):
_log_flow_definition_issue(
definition.name,
code="human_feedback_default_requires_emit",
severity="error",
path=f"{path}.human_feedback.default_outcome",
message="default_outcome requires human_feedback.emit",
)
elif (
human_feedback_config.default_outcome is not None
and human_feedback_config.emit
and human_feedback_config.default_outcome
not in human_feedback_config.emit
):
_log_flow_definition_issue(
definition.name,
code="human_feedback_default_not_in_emit",
severity="error",
path=f"{path}.human_feedback.default_outcome",
message="default_outcome must be one of human_feedback.emit",
)
def _log_flow_definition_issue(
definition_name: str,
*,
code: str,
message: str,
severity: Literal["warning", "error"] = "warning",
path: str | None = None,
) -> None:
level = logging.ERROR if severity == "error" else logging.WARNING
location = f" at {path}" if path else ""
logger.log(
level,
"Flow definition issue for %s%s [%s]: %s",
definition_name,
location,
code,
message,
)

View File

@@ -121,7 +121,7 @@ from crewai.flow.human_feedback import (
)
from crewai.flow.input_provider import InputProvider
from crewai.flow.persistence.base import FlowPersistence
from crewai.flow.runtime._actions import build_action
from crewai.flow.runtime._actions import FlowScriptExecutionDisabledError, build_action
from crewai.flow.runtime._refs import resolve_instance_ref, resolve_ref
from crewai.flow.types import (
FlowExecutionData,
@@ -1090,6 +1090,8 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
def build(name: str, definition: FlowMethodDefinition) -> Callable[..., Any]:
try:
return build_action(self, definition.do)
except FlowScriptExecutionDisabledError:
raise
except Exception as e:
unresolved.append(f"{name}: {e}")
return lambda *args, **kwargs: None

View File

@@ -2,10 +2,12 @@
from __future__ import annotations
import ast
import asyncio
from collections.abc import Callable
from collections.abc import Awaitable, Callable
import contextvars
import inspect
import os
from typing import TYPE_CHECKING, Any, Protocol, cast
from crewai.flow.flow_definition import (
@@ -13,11 +15,13 @@ from crewai.flow.flow_definition import (
FlowCodeActionDefinition,
FlowCrewActionDefinition,
FlowEachActionDefinition,
FlowEachInnerActionDefinition,
FlowEachStepDefinition,
FlowExpressionActionDefinition,
FlowScriptActionDefinition,
FlowToolActionDefinition,
)
from crewai.flow.runtime._expressions import evaluate_expression, render_with_block
from crewai.flow.runtime._outputs import outputs_by_name
from crewai.flow.runtime._refs import InvalidRefError, resolve_ref
@@ -25,10 +29,18 @@ if TYPE_CHECKING:
from crewai.flow.runtime import Flow
__all__ = ["build_action"]
__all__ = ["FlowScriptExecutionDisabledError", "build_action"]
LocalContext = dict[str, Any]
NestedStepRunner = Callable[[LocalContext], Awaitable[Any]]
NestedStep = tuple[str, str | None, NestedStepRunner]
_LOCAL_CONTEXT_KWARG = "__flow_definition_local_context"
_ALLOW_SCRIPT_EXECUTION_ENV_VAR = "CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION"
_TRUSTED_SCRIPT_EXECUTION_VALUES = frozenset({"1", "true", "yes"})
class FlowScriptExecutionDisabledError(RuntimeError):
"""Raised when a flow definition tries to execute inline script code."""
class _BuiltAction(Protocol):
@@ -140,15 +152,76 @@ class ExpressionAction:
)
class ScriptAction:
definition_type = FlowScriptActionDefinition
def __init__(self, flow: Flow[Any], definition: FlowScriptActionDefinition) -> None:
self.flow = flow
self.definition = definition
self.handler = self._compile_handler()
def run(self, *args: Any, **kwargs: Any) -> Any:
local_context = _pop_local_context(kwargs)
return self.handler(
state=self.flow.state,
outputs=outputs_by_name(
self.flow._method_outputs,
local_outputs=local_context.get("outputs") if local_context else None,
),
input=args[0] if args else None,
item=local_context.get("item") if local_context else None,
)
def _compile_handler(self) -> Callable[..., Any]:
raw = os.environ.get(_ALLOW_SCRIPT_EXECUTION_ENV_VAR, "")
if raw.strip().lower() not in _TRUSTED_SCRIPT_EXECUTION_VALUES:
raise FlowScriptExecutionDisabledError(
"Flow script execution is disabled by default. "
f"Set {_ALLOW_SCRIPT_EXECUTION_ENV_VAR}=1 to enable it only for "
"trusted flow definitions."
)
filename = f"crewai.flow.script.{self.flow._definition.name}"
module = ast.parse(self.definition.code, filename=filename)
function = ast.FunctionDef(
name="_flow_script",
args=ast.arguments(
posonlyargs=[],
args=[ast.arg(arg) for arg in ("state", "outputs", "input", "item")],
vararg=None,
kwonlyargs=[],
kw_defaults=[],
kwarg=None,
defaults=[],
),
body=module.body or [ast.Pass()],
decorator_list=[],
returns=None,
type_comment=None,
type_params=[],
)
module.body = [function]
ast.fix_missing_locations(module)
# The YAML here is trusted project source authored by the code owner,
# so this has the same trust boundary as using custom tools. We
# intentionally do not interpolate user input and runtime values are passed
# as function arguments. This is still arbitrary trusted Python execution,
# so it remains disabled by default behind `CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION`
namespace: dict[str, Any] = {"__name__": filename}
exec(compile(module, filename, "exec"), namespace) # nosec B102 # noqa: S102
return cast(Callable[..., Any], namespace["_flow_script"])
class EachAction:
definition_type = FlowEachActionDefinition
def __init__(self, flow: Flow[Any], definition: FlowEachActionDefinition) -> None:
self.flow = flow
self.definition = definition
self.inner_actions = [
(inner_action.name, self._build_inner_action(inner_action))
for inner_action in definition.do
self.steps: list[NestedStep] = [
(step.name, step.if_, self._build_step_action(step))
for step in definition.do
]
async def run(self, *_args: Any, **_kwargs: Any) -> list[Any]:
@@ -160,22 +233,30 @@ class EachAction:
for item in items:
local_outputs: dict[str, Any] = {}
local_context = {"item": item, "outputs": local_outputs}
last_output: Any = None
for name, run_inner_action in self.inner_actions:
last_output = await run_inner_action(
{"item": item, "outputs": local_outputs}
)
for name, condition, run_step_action in self.steps:
if condition is not None and not self._condition_matches(
condition, local_context
):
continue
last_output = await run_step_action(local_context)
local_outputs[name] = last_output
results.append(last_output)
return results
def _build_inner_action(
self, inner_action: FlowEachInnerActionDefinition
) -> Callable[[LocalContext], Any]:
run_action = build_action(self.flow, inner_action.action)
def _condition_matches(self, condition: str, local_context: LocalContext) -> bool:
result = evaluate_expression(self.flow, condition, local_context=local_context)
if not isinstance(result, bool):
raise ValueError("if expression must evaluate to a boolean")
return result
async def run_inner_action(local_context: LocalContext) -> Any:
def _build_step_action(self, step: FlowEachStepDefinition) -> NestedStepRunner:
run_action = build_action(self.flow, step.action)
async def run_step_action(local_context: LocalContext) -> Any:
kwargs = {_LOCAL_CONTEXT_KWARG: local_context}
if inspect.iscoroutinefunction(run_action):
result = run_action(**kwargs)
@@ -190,7 +271,7 @@ class EachAction:
result = await result
return result
return run_inner_action
return run_step_action
_ACTION_TYPES: tuple[_ActionType, ...] = (
@@ -199,6 +280,7 @@ _ACTION_TYPES: tuple[_ActionType, ...] = (
ToolAction,
CrewAction,
ExpressionAction,
ScriptAction,
)

View File

@@ -7,6 +7,7 @@ import json
import re
from typing import TYPE_CHECKING, Any, cast
from crewai.flow.runtime._outputs import outputs_by_name
from crewai.utilities.serialization import to_serializable
@@ -44,7 +45,12 @@ def evaluate_expression(
def _expression_context(
flow: Flow[Any], local_context: dict[str, Any] | None = None
) -> dict[str, Any]:
outputs = _outputs_by_name(flow._method_outputs)
local_outputs = local_context.get("outputs") if local_context else None
outputs = outputs_by_name(
flow._method_outputs,
local_outputs=local_outputs,
serialize=True,
)
context: dict[str, Any] = {
"state": flow._copy_and_serialize_state(),
"outputs": outputs,
@@ -53,29 +59,12 @@ def _expression_context(
local_values = {
key: to_serializable(value, max_depth=0)
for key, value in local_context.items()
if key not in {"outputs", "state"}
}
local_outputs = local_values.pop("outputs", None)
local_values.pop("state", None)
context.update(local_values)
if local_outputs is not None:
if not isinstance(local_outputs, dict):
raise TypeError("flow definition local outputs must be a mapping")
context["outputs"] = {**outputs, **local_outputs}
return context
def _outputs_by_name(method_outputs: list[Any]) -> dict[str, Any]:
outputs: dict[str, Any] = {}
for entry in method_outputs:
method = ""
output = entry
if isinstance(entry, dict) and "output" in entry:
method = str(entry.get("method", ""))
output = entry["output"]
outputs[method] = to_serializable(output, max_depth=0)
return outputs
def _render_value(value: Any, context: dict[str, Any]) -> Any:
if isinstance(value, str):
return _render_string(value, context)

View File

@@ -0,0 +1,40 @@
"""Shared FlowDefinition runtime output helpers."""
from __future__ import annotations
from collections.abc import Mapping
from typing import Any, TypedDict
from crewai.utilities.serialization import to_serializable
class _MethodOutput(TypedDict):
method: str
output: Any
def outputs_by_name(
method_outputs: list[_MethodOutput],
*,
local_outputs: Mapping[str, Any] | None = None,
serialize: bool = False,
) -> dict[str, Any]:
outputs: dict[str, Any] = {}
for entry in method_outputs:
outputs[entry["method"]] = _output_value(entry["output"], serialize=serialize)
if local_outputs is not None:
outputs.update(
{
key: _output_value(output, serialize=serialize)
for key, output in local_outputs.items()
}
)
return outputs
def _output_value(value: Any, *, serialize: bool) -> Any:
if not serialize:
return value
return to_serializable(value, max_depth=0)

View File

@@ -645,14 +645,11 @@ class TestLegacyMethodOutputsRestore:
context = _expression_context(restored)
assert context["outputs"] == {"": "legacy"}
def test_raw_legacy_outputs_remain_readable(self) -> None:
from crewai.flow.runtime._expressions import _expression_context
def test_raw_legacy_outputs_property_remains_readable(self) -> None:
flow = Flow()
flow._method_outputs = ["legacy"]
assert flow.method_outputs == ["legacy"]
assert _expression_context(flow)["outputs"] == {"": "legacy"}
class TestAgentCheckpoint:

View File

@@ -37,6 +37,7 @@ def test_flow_public_exports_are_explicit():
}
assert set(flow_definition.__all__) == {
"FlowActionDefinition",
"FlowAtomicActionDefinition",
"FlowCodeActionDefinition",
"FlowConfigDefinition",
"FlowConversationalDefinition",
@@ -44,16 +45,16 @@ def test_flow_public_exports_are_explicit():
"FlowCrewActionDefinition",
"FlowDefinition",
"FlowDefinitionCondition",
"FlowDefinitionDiagnostic",
"FlowDictStateDefinition",
"FlowEachActionDefinition",
"FlowEachInnerActionDefinition",
"FlowEachStepDefinition",
"FlowExpressionActionDefinition",
"FlowHumanFeedbackDefinition",
"FlowJsonSchemaStateDefinition",
"FlowMethodDefinition",
"FlowPersistenceDefinition",
"FlowPydanticStateDefinition",
"FlowScriptActionDefinition",
"FlowStateDefinition",
"FlowToolActionDefinition",
"FlowUnknownStateDefinition",
@@ -62,6 +63,117 @@ def test_flow_public_exports_are_explicit():
assert "calculate_node_levels" not in flow_visualization.__all__
def test_flow_definition_json_schema_carries_reference_descriptions():
schema = flow_definition.FlowDefinition.json_schema()
defs = schema["$defs"]
assert schema["properties"]["schema"]["description"]
assert schema["properties"]["methods"]["description"]
assert "diagnostics" not in schema["properties"]
method_properties = defs["FlowMethodDefinition"]["properties"]
assert method_properties["do"]["description"] == "Action executed when this method runs."
assert "Trigger condition" in method_properties["listen"]["description"]
script_properties = defs["FlowScriptActionDefinition"]["properties"]
assert "trusted inline Python" in script_properties["call"]["description"]
assert "not interpolated" in script_properties["code"]["description"]
assert "not sandboxed" in script_properties["code"]["description"]
state_schema = next(
branch
for branch in schema["properties"]["state"]["anyOf"]
if "discriminator" in branch
)
assert state_schema["discriminator"]["propertyName"] == "type"
assert state_schema["discriminator"]["mapping"] == {
"dict": "#/$defs/FlowDictStateDefinition",
"json_schema": "#/$defs/FlowJsonSchemaStateDefinition",
"pydantic": "#/$defs/FlowPydanticStateDefinition",
"unknown": "#/$defs/FlowUnknownStateDefinition",
}
dict_state_properties = defs["FlowDictStateDefinition"]["properties"]
assert dict_state_properties["type"]["description"]
assert "ref" not in dict_state_properties
json_schema_state_properties = defs["FlowJsonSchemaStateDefinition"]["properties"]
assert json_schema_state_properties["json_schema"]["description"]
assert "json_schema" in defs["FlowJsonSchemaStateDefinition"]["required"]
pydantic_state_properties = defs["FlowPydanticStateDefinition"]["properties"]
assert "Fallback JSON Schema" in pydantic_state_properties["json_schema"][
"description"
]
each_properties = defs["FlowEachActionDefinition"]["properties"]
assert "list to iterate" in each_properties["in"]["description"]
assert "Ordered steps" in each_properties["do"]["description"]
step_properties = defs["FlowEachStepDefinition"]["properties"]
assert "runs only if" in step_properties["if"]["description"]
def test_flow_definition_json_schema_carries_field_examples_only():
schema = flow_definition.FlowDefinition.json_schema()
defs = schema["$defs"]
for model_name in [
"FlowDefinition",
"FlowCodeActionDefinition",
"FlowToolActionDefinition",
"FlowCrewActionDefinition",
"FlowExpressionActionDefinition",
"FlowScriptActionDefinition",
"FlowEachActionDefinition",
"FlowEachStepDefinition",
"FlowMethodDefinition",
"FlowDictStateDefinition",
"FlowJsonSchemaStateDefinition",
"FlowPydanticStateDefinition",
"FlowUnknownStateDefinition",
"FlowConfigDefinition",
"FlowPersistenceDefinition",
"FlowHumanFeedbackDefinition",
]:
model_schema = schema if model_name == "FlowDefinition" else defs[model_name]
assert "examples" not in model_schema
assert schema["properties"]["name"]["examples"] == ["ResearchFlow"]
assert schema["properties"]["schema"]["examples"] == ["crewai.flow/v1"]
assert schema["properties"]["methods"]["examples"][0]["seed"]["do"] == {
"call": "expression",
"expr": "state.topic",
}
script_properties = defs["FlowScriptActionDefinition"]["properties"]
assert script_properties["call"]["examples"] == ["script"]
assert "input.strip()" in script_properties["code"]["examples"][0]
assert script_properties["language"]["examples"] == ["python"]
action_properties = defs["FlowCodeActionDefinition"]["properties"]
assert action_properties["ref"]["examples"] == [
"my_project.flows:normalize_topic"
]
assert action_properties["with"]["examples"] == [{"topic": "${state.topic}"}]
each_properties = defs["FlowEachActionDefinition"]["properties"]
assert each_properties["in"]["examples"] == ["state.rows"]
assert each_properties["do"]["examples"][0][0]["name"] == "clean"
assert each_properties["do"]["examples"][0][0]["action"]["call"] == "script"
assert each_properties["do"]["examples"][0][1]["if"] == "outputs.clean != ''"
step_properties = defs["FlowEachStepDefinition"]["properties"]
assert step_properties["if"]["examples"] == ["item.kind == 'invoice'"]
method_properties = defs["FlowMethodDefinition"]["properties"]
assert method_properties["listen"]["examples"] == [
"seed",
{"or": ["approved", "revise"]},
]
assert method_properties["emit"]["examples"] == [["approved", "revise"]]
def test_flow_state_definition_uses_discriminated_branches():
definition = flow_definition.FlowDefinition.model_validate(
{
@@ -233,7 +345,7 @@ def test_flow_definition_maps_dsl_to_static_contract():
assert review.human_feedback.learn_strict is True
assert definition.methods["audit"].listen == {"and": ["begin", "process"]}
assert definition.diagnostics == []
assert "diagnostics" not in definition.to_dict()
def test_flow_definition_excludes_conversational_builtins_for_regular_flows():
@@ -315,7 +427,8 @@ def test_flow_definition_uses_collapsed_conversational_router_start():
assert methods["route_conversation"].router is True
def test_flow_definition_serializes_human_feedback_metadata():
def test_flow_definition_serializes_human_feedback_metadata(caplog):
caplog.set_level(logging.WARNING, logger="crewai.flow.dsl._utils")
marker = object()
class MetadataFlow(Flow):
@@ -334,9 +447,9 @@ def test_flow_definition_serializes_human_feedback_metadata():
assert review.human_feedback is not None
assert review.human_feedback.metadata == {"ref": "builtins:dict"}
assert any(
diagnostic.code == "non_serializable_value"
and diagnostic.path == "methods.review.human_feedback.metadata"
for diagnostic in definition.diagnostics
"methods.review.human_feedback.metadata" in record.message
and "not fully serializable" in record.message
for record in caplog.records
)
definition.to_json()
@@ -481,14 +594,16 @@ def test_each_action_round_trips_json_and_yaml():
"in": "state.rows",
"do": [
{
"normalize": {
"name": "normalize",
"action": {
"call": "tool",
"ref": "my_tools:NormalizeRowTool",
"with": {"row": "${ item }"},
}
},
{
"save": {
"name": "save",
"action": {
"call": "code",
"ref": "my_flow:save_row",
"with": {
@@ -575,7 +690,6 @@ def test_flow_definition_allows_dynamic_router_emit():
definition = DynamicRouterFlow.flow_definition()
assert definition.methods["decide"].emit is None
assert definition.diagnostics == []
def test_flow_definition_infers_literal_router_emit():
@@ -728,16 +842,15 @@ def test_flow_definition_accepts_explicit_router_events():
assert definition.methods["decide"].emit == ["left", "right"]
def test_flow_definition_preserves_diagnostics_loaded_from_contract():
def test_flow_definition_ignores_legacy_diagnostics_loaded_from_contract():
definition = flow_definition.FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "LoadedDiagnosticsFlow",
"methods": {
"decision": {
"do": {"ref": "loaded_flows:LoadedDiagnosticsFlow.decision"},
"router": True,
"emit": ["continue"],
"begin": {
"do": {"ref": "loaded_flows:LoadedDiagnosticsFlow.begin"},
"start": True,
}
},
"diagnostics": [
@@ -757,13 +870,13 @@ def test_flow_definition_preserves_diagnostics_loaded_from_contract():
}
)
codes = [diagnostic.code for diagnostic in definition.diagnostics]
assert "serialized_warning" in codes
assert codes.count("router_without_trigger") == 1
assert "diagnostics" not in definition.to_dict()
def test_router_start_false_without_listen_reports_missing_trigger():
definition = flow_definition.FlowDefinition.from_dict(
def test_router_start_false_without_listen_logs_missing_trigger(caplog):
caplog.set_level(logging.ERROR, logger="crewai.flow.flow_definition")
flow_definition.FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "LoadedFlow",
@@ -779,9 +892,10 @@ def test_router_start_false_without_listen_reports_missing_trigger():
)
assert any(
diagnostic.code == "router_without_trigger"
and diagnostic.path == "methods.decision"
for diagnostic in definition.diagnostics
record.levelno == logging.ERROR
and "router_without_trigger" in record.message
and "methods.decision" in record.message
for record in caplog.records
)
@@ -809,7 +923,7 @@ def test_router_human_feedback_preserves_existing_router_metadata():
assert method.human_feedback is not None
def test_dynamic_router_flow_definition_has_no_diagnostics():
def test_dynamic_router_flow_definition_allows_dynamic_emit():
class LazyDynamicRouterFlow(Flow):
@start()
def begin(self):
@@ -820,7 +934,7 @@ def test_dynamic_router_flow_definition_has_no_diagnostics():
return self.state["dynamic_event"]
definition = LazyDynamicRouterFlow.flow_definition()
assert definition.diagnostics == []
assert definition.methods["decide"].emit is None
def test_dynamic_router_string_listener_is_valid_contract():
@@ -839,7 +953,7 @@ def test_dynamic_router_string_listener_is_valid_contract():
definition = DynamicRouterListenerFlow.flow_definition()
assert definition.diagnostics == []
assert definition.methods["handle"].listen == "dynamic_event"
def test_static_string_listener_is_allowed_by_contract():
@@ -859,7 +973,7 @@ def test_static_string_listener_is_allowed_by_contract():
},
}
)
assert definition.diagnostics == []
assert definition.methods["handle"].listen == "begni"
def test_start_false_not_classified_as_start_method():
@@ -924,10 +1038,10 @@ def test_flow_definition_cache_is_not_reused_by_subclasses():
assert set(child_definition.methods) == {"child_step"}
def test_flow_definition_logs_diagnostics_when_loaded_from_contract(caplog):
def test_flow_definition_logs_validation_issues_when_loaded_from_contract(caplog):
caplog.set_level(logging.WARNING, logger="crewai.flow.flow_definition")
definition = flow_definition.FlowDefinition.from_dict(
flow_definition.FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "LoadedFlow",
@@ -941,10 +1055,6 @@ def test_flow_definition_logs_diagnostics_when_loaded_from_contract(caplog):
}
)
assert any(
diagnostic.code == "router_without_trigger"
for diagnostic in definition.diagnostics
)
assert any(
record.levelno == logging.ERROR
and "LoadedFlow" in record.message

View File

@@ -26,6 +26,7 @@ from crewai.flow.flow_config import flow_config
from crewai.flow.flow_definition import FlowConfigDefinition, FlowDefinition
from crewai.flow.persistence import persist
from crewai.flow.persistence.base import FlowPersistence
from crewai.flow.runtime._actions import FlowScriptExecutionDisabledError
from crewai.state.checkpoint_config import CheckpointConfig
from crewai.tools import BaseTool
from crewai.types.streaming import FlowStreamingOutput
@@ -113,7 +114,7 @@ class EachActionFlow(Flow):
except RuntimeError:
pass
else:
raise RuntimeError("inner action ran on the event loop")
raise RuntimeError("each step ran on the event loop")
from crewai.flow.flow_context import current_flow_method_name
@@ -1080,7 +1081,8 @@ methods:
call: each
in: state.rows
do:
- normalize:
- name: normalize
action:
call: code
ref: {__name__}:EachActionFlow.normalize_row
with:
@@ -1096,7 +1098,7 @@ methods:
]
def test_each_action_runs_sync_inner_actions_off_event_loop_with_context():
def test_each_action_runs_sync_steps_off_event_loop_with_context():
yaml_str = f"""
schema: crewai.flow/v1
name: EachFlow
@@ -1106,7 +1108,8 @@ methods:
call: each
in: state.rows
do:
- threaded:
- name: threaded
action:
call: code
ref: {__name__}:EachActionFlow.require_threaded_context
with:
@@ -1122,7 +1125,7 @@ methods:
assert flow.inner_thread_id != caller_thread_id
def test_each_action_runs_async_tool_results_from_sync_inner_actions():
def test_each_action_runs_async_tool_results_from_sync_steps():
yaml_str = f"""
schema: crewai.flow/v1
name: EachFlow
@@ -1132,7 +1135,8 @@ methods:
call: each
in: state.rows
do:
- async_tool:
- name: async_tool
action:
call: tool
ref: {__name__}:AsyncResultTool
with:
@@ -1145,7 +1149,120 @@ methods:
assert flow.kickoff(inputs={"rows": ["a", "b"]}) == ["async:a", "async:b"]
def test_each_action_uses_iteration_outputs_between_nested_actions():
def test_script_action_requires_explicit_opt_in():
yaml_str = """
schema: crewai.flow/v1
name: ScriptFlow
methods:
normalize:
do:
call: script
code: |
return "blocked"
start: true
"""
with pytest.raises(
FlowScriptExecutionDisabledError,
match="CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION=1",
) as exc_info:
Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
assert "methods with unresolvable actions" not in str(exc_info.value)
def test_script_action_runs_python_imports_mutates_state_and_returns_value(
monkeypatch: pytest.MonkeyPatch,
):
monkeypatch.setenv("CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION", "1")
yaml_str = """
schema: crewai.flow/v1
name: ScriptFlow
methods:
normalize:
do:
call: script
code: |
import math
state["rounded"] = math.ceil(state["raw_score"])
return f"rounded:{state['rounded']}"
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
assert flow.kickoff(inputs={"raw_score": 3.2}) == "rounded:4"
assert flow.state["rounded"] == 4
def test_script_listener_reads_trigger_input_and_outputs(
monkeypatch: pytest.MonkeyPatch,
):
monkeypatch.setenv("CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION", "1")
yaml_str = """
schema: crewai.flow/v1
name: ScriptFlow
methods:
seed:
do:
call: expression
expr: "'alpha'"
start: true
combine:
do:
call: script
code: |
state["input_matches_output"] = input == outputs["seed"]
return f"{outputs['seed']}:{input}"
listen: seed
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
assert flow.kickoff() == "alpha:alpha"
assert flow.state["input_matches_output"] is True
def test_script_each_action_reads_item_and_step_outputs(
monkeypatch: pytest.MonkeyPatch,
):
monkeypatch.setenv("CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION", "1")
yaml_str = """
schema: crewai.flow/v1
name: ScriptEachFlow
methods:
seed:
do:
call: expression
expr: "'global'"
start: true
process_rows:
do:
call: each
in: state.rows
do:
- name: clean
action:
call: script
code: |
return item.strip()
- name: tag
action:
call: script
code: |
return f"{outputs['seed']}:{outputs['clean']}"
listen: seed
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
assert flow.kickoff(inputs={"rows": [" a ", " b "]}) == ["global:a", "global:b"]
def test_each_action_uses_iteration_outputs_between_steps():
yaml_str = f"""
schema: crewai.flow/v1
name: EachFlow
@@ -1155,13 +1272,15 @@ methods:
call: each
in: state.rows
do:
- normalize:
- name: normalize
action:
call: code
ref: {__name__}:EachActionFlow.normalize_row
with:
row: "${{item}}"
prefix: saved
- save:
- name: save
action:
call: code
ref: {__name__}:EachActionFlow.save_row
with:
@@ -1178,7 +1297,7 @@ methods:
]
def test_each_action_resets_inner_outputs_between_iterations():
def test_each_action_resets_step_outputs_between_iterations():
yaml_str = """
schema: crewai.flow/v1
name: EachFlow
@@ -1188,10 +1307,12 @@ methods:
call: each
in: state.rows
do:
- leak_check:
- name: leak_check
action:
call: expression
expr: "has(outputs.previous) ? outputs.previous : 'empty'"
- previous:
- name: previous
action:
call: expression
expr: item
start: true
@@ -1205,7 +1326,7 @@ methods:
]
def test_each_action_preserves_flow_outputs_and_prefers_inner_outputs():
def test_each_action_preserves_flow_outputs_and_prefers_step_outputs():
yaml_str = """
schema: crewai.flow/v1
name: EachFlow
@@ -1220,13 +1341,16 @@ methods:
call: each
in: state.rows
do:
- before_shadow:
- name: before_shadow
action:
call: expression
expr: "outputs.seed + ':' + item"
- seed:
- name: seed
action:
call: expression
expr: "'local:' + item"
- after_shadow:
- name: after_shadow
action:
call: expression
expr: "outputs.seed"
listen: seed
@@ -1244,6 +1368,103 @@ methods:
]
def test_each_action_runs_simple_if_clauses():
yaml_str = """
schema: crewai.flow/v1
name: EachIfFlow
methods:
process_rows:
do:
call: each
in: state.rows
do:
- name: kind
action:
call: expression
expr: item.kind
- name: kept
if: "outputs.kind == 'keep'"
action:
call: expression
expr: "'kept:' + item.value"
- name: skipped
if: "outputs.kind != 'keep'"
action:
call: expression
expr: "'skipped:' + item.value"
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
assert flow.kickoff(
inputs={
"rows": [
{"kind": "keep", "value": "a"},
{"kind": "drop", "value": "b"},
]
}
) == ["kept:a", "skipped:b"]
def test_each_action_skipped_if_keeps_previous_output():
yaml_str = """
schema: crewai.flow/v1
name: EachIfFlow
methods:
process_rows:
do:
call: each
in: state.rows
do:
- name: original
action:
call: expression
expr: item.value
- name: maybe_included
if: item.include
action:
call: expression
expr: "'included:' + item.value"
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
assert flow.kickoff(
inputs={
"rows": [
{"include": True, "value": "a"},
{"include": False, "value": "b"},
]
}
) == ["included:a", "b"]
def test_each_action_if_condition_must_be_boolean():
yaml_str = """
schema: crewai.flow/v1
name: EachIfFlow
methods:
process_rows:
do:
call: each
in: state.rows
do:
- name: value
if: item.value
action:
call: expression
expr: item.value
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
with pytest.raises(ValueError, match="if expression must evaluate to a boolean"):
flow.kickoff(inputs={"rows": [{"value": "truthy"}]})
def test_each_action_empty_list_returns_empty_and_listener_runs_once():
yaml_str = f"""
schema: crewai.flow/v1
@@ -1254,7 +1475,8 @@ methods:
call: each
in: state.rows
do:
- normalize:
- name: normalize
action:
call: code
ref: {__name__}:EachActionFlow.normalize_row
with:
@@ -1303,7 +1525,12 @@ def test_each_action_rejects_non_list_inputs(expr, inputs):
"do": {
"call": "each",
"in": expr,
"do": [{"value": {"call": "expression", "expr": "item"}}],
"do": [
{
"name": "value",
"action": {"call": "expression", "expr": "item"},
}
],
},
}
},
@@ -1319,15 +1546,25 @@ def test_each_action_rejects_non_list_inputs(expr, inputs):
"action_do",
[
[],
[{"first": {"call": "expression", "expr": "item"}, "second": {"call": "expression", "expr": "item"}}],
[{"1bad": {"call": "expression", "expr": "item"}}],
[{"value": {"call": "expression", "expr": "item"}}],
[{"name": "1bad", "action": {"call": "expression", "expr": "item"}}],
[{"name": "missing_action"}],
[{"action": {"call": "expression", "expr": "item"}}],
[
{"same": {"call": "expression", "expr": "item"}},
{"same": {"call": "expression", "expr": "item"}},
{
"name": "value",
"if": "true",
"then": [],
"action": {"call": "expression", "expr": "item"},
}
],
[
{"name": "same", "action": {"call": "expression", "expr": "item"}},
{"name": "same", "action": {"call": "expression", "expr": "item"}},
],
],
)
def test_each_action_validates_inner_action_shape(action_do):
def test_each_action_validates_step_shape(action_do):
with pytest.raises(ValidationError):
FlowDefinition.from_dict(
{
@@ -1347,6 +1584,26 @@ def test_each_action_validates_inner_action_shape(action_do):
)
def test_if_clauses_are_rejected_at_method_level():
with pytest.raises(ValidationError):
FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "TopLevelIfFlow",
"methods": {
"process": {
"start": True,
"do": {
"call": "expression",
"if": "true",
"expr": "'ok'",
},
}
},
}
)
def test_each_action_rejects_nested_each_actions():
with pytest.raises(ValidationError):
FlowDefinition.from_dict(
@@ -1361,12 +1618,14 @@ def test_each_action_rejects_nested_each_actions():
"in": "state.rows",
"do": [
{
"nested": {
"name": "nested",
"action": {
"call": "each",
"in": "state.children",
"do": [
{
"child": {
"name": "child",
"action": {
"call": "expression",
"expr": "item",
}
@@ -1392,7 +1651,8 @@ methods:
call: each
in: state.rows
do:
- validate:
- name: validate
action:
call: code
ref: {__name__}:EachActionFlow.fail_on_bad_row
with: