Compare commits

..

2 Commits

Author SHA1 Message Date
Vinicius Brasil
51d0ce4cc5 Document FlowDefinition fields in the JSON schema
Add a description and examples to every FlowDefinition field and
standardize on `typing.Literal`, so the generated JSON schema documents
itself — each action discriminator, state branch, and config option
explains what it is and shows a realistic value.

Examples live on individual fields only, never at the model level, which
keeps the schema readable for tooling that renders field-level help.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-16 21:31:42 -07:00
Vinicius Brasil
b69e5ccc10 Add script action to FlowDefinition
Let a Flow method run trusted inline Python with `call: script`. The code
is compiled once into a generated function and receives the runtime
values as arguments — state, outputs, input, and item — so nothing is
interpolated into the source. This is not sandboxed.

Also extracts the shared `outputs_by_name` helper into `runtime/_outputs`
so the script action and the expression evaluator build the outputs map
the same way, including each-iteration local outputs.

```yaml
methods:
  normalize:
    start: true
    do:
      call: script
      code: |
        import math
        state["rounded"] = math.ceil(state["raw_score"])
        return f"rounded:{state['rounded']}"
```

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-16 21:31:23 -07:00
7 changed files with 478 additions and 150 deletions

View File

@@ -12,7 +12,7 @@ from __future__ import annotations
import json
import logging
import re
from typing import Annotated, Any, Literal as TypingLiteral, TypeAlias
from typing import Annotated, Any, Literal, TypeAlias
from pydantic import (
BaseModel,
@@ -73,10 +73,24 @@ def _object_ref(value: Any) -> str:
class FlowDefinitionDiagnostic(BaseModel):
"""A non-fatal Flow Definition build or validation diagnostic."""
code: str
message: str
severity: TypingLiteral["warning", "error"] = "warning"
path: str | None = None
code: str = Field(
description="Stable diagnostic identifier for tooling and tests.",
examples=["router_without_trigger"],
)
message: str = Field(
description="Human-readable explanation of the diagnostic.",
examples=["router: true requires either start or listen"],
)
severity: Literal["warning", "error"] = Field(
default="warning",
description="Diagnostic severity. Errors indicate an invalid or incomplete contract.",
examples=["error"],
)
path: str | None = Field(
default=None,
description="Dot path to the definition field that produced the diagnostic.",
examples=["methods.decide"],
)
class FlowDictStateDefinition(BaseModel):
@@ -84,7 +98,7 @@ class FlowDictStateDefinition(BaseModel):
model_config = ConfigDict(extra="forbid")
type: TypingLiteral["dict"] = Field(
type: Literal["dict"] = Field(
default="dict",
description="Plain dictionary state with optional default values.",
examples=["dict"],
@@ -101,7 +115,7 @@ class FlowPydanticStateDefinition(BaseModel):
model_config = ConfigDict(extra="forbid")
type: TypingLiteral["pydantic"] = Field(
type: Literal["pydantic"] = Field(
default="pydantic",
description="Importable Pydantic model used as the Flow state type.",
examples=["pydantic"],
@@ -136,7 +150,7 @@ class FlowJsonSchemaStateDefinition(BaseModel):
model_config = ConfigDict(extra="forbid")
type: TypingLiteral["json_schema"] = Field(
type: Literal["json_schema"] = Field(
default="json_schema",
description="Inline JSON Schema used as the Flow state contract.",
examples=["json_schema"],
@@ -163,7 +177,7 @@ class FlowUnknownStateDefinition(BaseModel):
model_config = ConfigDict(extra="forbid")
type: TypingLiteral["unknown"] = Field(
type: Literal["unknown"] = Field(
default="unknown",
description="Unknown state representation; runtime falls back to dictionary state.",
examples=["unknown"],
@@ -192,14 +206,46 @@ FlowStateDefinition: TypeAlias = Annotated[
class FlowConfigDefinition(BaseModel):
"""Serializable Flow-level configuration."""
tracing: bool | None = None
stream: bool = False
memory: dict[str, Any] | None = None
input_provider: str | None = None
suppress_flow_events: bool = False
max_method_calls: int = 100
defer_trace_finalization: bool = False
checkpoint: bool | dict[str, Any] | None = None
tracing: bool | None = Field(
default=None,
description="Override for flow tracing; when omitted, runtime defaults apply.",
examples=[True],
)
stream: bool = Field(
default=False,
description="Whether the flow should emit streaming events when supported.",
examples=[True],
)
memory: dict[str, Any] | None = Field(
default=None,
description="Serializable memory configuration passed to flow execution.",
examples=[{"enabled": True}],
)
input_provider: str | None = Field(
default=None,
description="Import reference or provider key used to supply flow inputs.",
examples=["my_project.inputs:load_inputs"],
)
suppress_flow_events: bool = Field(
default=False,
description="Disable flow event emission for this definition.",
examples=[False],
)
max_method_calls: int = Field(
default=100,
description="Maximum number of method executions allowed during one kickoff.",
examples=[20],
)
defer_trace_finalization: bool = Field(
default=False,
description="Defer trace finalization so callers can complete tracing later.",
examples=[False],
)
checkpoint: bool | dict[str, Any] | None = Field(
default=None,
description="Checkpointing configuration, or true to use default checkpointing.",
examples=[True, {"enabled": True}],
)
class FlowPersistenceDefinition(BaseModel):
@@ -211,9 +257,21 @@ class FlowPersistenceDefinition(BaseModel):
serialized config.
"""
enabled: bool = False
verbose: bool = False
persistence: Any = None
enabled: bool = Field(
default=False,
description="Whether persistence is enabled for this flow or method.",
examples=[True],
)
verbose: bool = Field(
default=False,
description="Whether persistence should emit verbose diagnostic output.",
examples=[False],
)
persistence: Any = Field(
default=None,
description="Persistence backend configuration or import reference.",
examples=[{"ref": "my_project.persistence:FlowStore"}],
)
@field_serializer("persistence", when_used="json")
def _serialize_persistence(self, value: Any) -> Any:
@@ -239,15 +297,53 @@ class FlowHumanFeedbackDefinition(BaseModel):
a serialized config (``llm``) or a ``module:qualname`` ref (``provider``).
"""
message: str
emit: list[str] | None = None
llm: Any = "gpt-4o-mini"
default_outcome: str | None = None
metadata: dict[str, Any] | None = None
provider: Any = None
learn: bool = False
learn_source: str = "hitl"
learn_strict: bool = False
message: str = Field(
description="Prompt shown to the human reviewer when feedback is requested.",
examples=["Review the research summary before publishing."],
)
emit: list[str] | None = Field(
default=None,
description=(
"Allowed feedback outcomes. When set, the method routes like a router "
"using the selected outcome."
),
examples=[["approved", "revise"]],
)
llm: Any = Field(
default="gpt-4o-mini",
description="LLM configuration used to assist or process human feedback.",
examples=["gpt-4o-mini"],
)
default_outcome: str | None = Field(
default=None,
description="Outcome to use when feedback cannot be collected.",
examples=["revise"],
)
metadata: dict[str, Any] | None = Field(
default=None,
description="Serializable metadata attached to the feedback request.",
examples=[{"team": "research"}],
)
provider: Any = Field(
default=None,
description="Feedback provider configuration or import reference.",
examples=["my_project.feedback:provider"],
)
learn: bool = Field(
default=False,
description="Whether feedback should be recorded for later learning workflows.",
examples=[True],
)
learn_source: str = Field(
default="hitl",
description="Source label attached to learned feedback records.",
examples=["hitl"],
)
learn_strict: bool = Field(
default=False,
description="Whether learning should enforce strict validation of feedback data.",
examples=[False],
)
@field_serializer("llm", when_used="json")
def _serialize_llm(self, value: Any) -> dict[str, Any] | str | None:
@@ -267,30 +363,89 @@ class FlowHumanFeedbackDefinition(BaseModel):
class FlowCodeActionDefinition(BaseModel):
"""A Flow method action that executes importable Python code."""
model_config = ConfigDict(populate_by_name=True, extra="forbid")
model_config = ConfigDict(
populate_by_name=True,
extra="forbid",
)
call: TypingLiteral["code"] = "code"
ref: str
with_: dict[str, Any] | None = Field(default=None, alias="with")
call: Literal["code"] = Field(
default="code",
description="Action discriminator. Use code to call importable Python.",
examples=["code"],
)
ref: str = Field(
description="Import reference for the callable, formatted as module:qualname.",
examples=["my_project.flows:normalize_topic"],
)
with_: dict[str, Any] | None = Field(
default=None,
alias="with",
description="Keyword arguments passed to the callable after expression rendering.",
examples=[{"topic": "${state.topic}"}],
)
class FlowToolActionDefinition(BaseModel):
"""A Flow method action that invokes a CrewAI tool."""
model_config = ConfigDict(populate_by_name=True, extra="forbid")
model_config = ConfigDict(
populate_by_name=True,
extra="forbid",
)
call: TypingLiteral["tool"]
ref: str
with_: dict[str, Any] | None = Field(default=None, alias="with")
call: Literal["tool"] = Field(
description="Action discriminator. Use tool to instantiate and run a CrewAI tool.",
examples=["tool"],
)
ref: str = Field(
description="Import reference for a BaseTool class, formatted as module:qualname.",
examples=["my_project.tools:SearchTool"],
)
with_: dict[str, Any] | None = Field(
default=None,
alias="with",
description="Tool input arguments after expression rendering.",
examples=[{"query": "${outputs.normalize_topic}", "limit": 5}],
)
class FlowCrewActionDefinition(BaseModel):
"""A Flow method action that builds and kicks off a CrewAI crew."""
model_config = ConfigDict(populate_by_name=True, extra="forbid")
model_config = ConfigDict(
populate_by_name=True,
extra="forbid",
)
call: TypingLiteral["crew"]
with_: CrewDefinition = Field(alias="with")
call: Literal["crew"] = Field(
description="Action discriminator. Use crew to run an inline Crew definition.",
examples=["crew"],
)
with_: CrewDefinition = Field(
alias="with",
description="Inline Crew definition to load and execute for this action.",
examples=[
{
"name": "inline_research",
"agents": {
"researcher": {
"role": "Researcher",
"goal": "Research {topic}",
"backstory": "Knows the domain.",
}
},
"tasks": [
{
"name": "research_task",
"description": "Research {topic}",
"expected_output": "Findings about {topic}",
"agent": "researcher",
}
],
"inputs": {"topic": "${state.topic}"},
}
],
)
class FlowExpressionActionDefinition(BaseModel):
@@ -298,8 +453,14 @@ class FlowExpressionActionDefinition(BaseModel):
model_config = ConfigDict(extra="forbid")
call: TypingLiteral["expression"]
expr: str
call: Literal["expression"] = Field(
description="Action discriminator. Use expression to evaluate a CEL expression.",
examples=["expression"],
)
expr: str = Field(
description="CEL expression evaluated against state, outputs, and local context.",
examples=["state.topic", "outputs.normalize_topic"],
)
class FlowScriptActionDefinition(BaseModel):
@@ -307,7 +468,7 @@ class FlowScriptActionDefinition(BaseModel):
model_config = ConfigDict(extra="forbid")
call: TypingLiteral["script"] = Field(
call: Literal["script"] = Field(
description="Action discriminator. Use script to execute trusted inline Python.",
examples=["script"],
)
@@ -322,7 +483,7 @@ class FlowScriptActionDefinition(BaseModel):
"return state['normalized_topic']"
],
)
language: TypingLiteral["python"] = Field(
language: Literal["python"] = Field(
default="python",
description="Script language. Only python is currently supported.",
examples=["python"],
@@ -341,6 +502,11 @@ FlowInnerActionDefinition = (
class FlowEachInnerActionDefinition(RootModel[dict[str, FlowInnerActionDefinition]]):
"""One named action inside an ``each`` composite action."""
root: dict[str, FlowInnerActionDefinition] = Field(
description="Single-entry mapping from an inner action name to its action.",
examples=[{"clean": {"call": "script", "code": "return item.strip()"}}],
)
@model_validator(mode="after")
def _validate_action_mapping(self) -> FlowEachInnerActionDefinition:
if len(self.root) != 1:
@@ -360,11 +526,35 @@ class FlowEachInnerActionDefinition(RootModel[dict[str, FlowInnerActionDefinitio
class FlowEachActionDefinition(BaseModel):
"""A composite action that runs a sequential mini-pipeline for each item."""
model_config = ConfigDict(populate_by_name=True, extra="forbid")
model_config = ConfigDict(
populate_by_name=True,
extra="forbid",
)
call: TypingLiteral["each"]
in_: str = Field(alias="in")
do: list[FlowEachInnerActionDefinition]
call: Literal["each"] = Field(
description=(
"Action discriminator. Use each to run a sequence of actions for every "
"item in an input list."
),
examples=["each"],
)
in_: str = Field(
alias="in",
description="CEL expression that must evaluate to the list to iterate.",
examples=["state.rows"],
)
do: list[FlowEachInnerActionDefinition] = Field(
description=(
"Ordered inner actions to run for each item. Each entry must be a "
"single-key mapping naming that inner action."
),
examples=[
[
{"clean": {"call": "script", "code": "return item.strip()"}},
{"tag": {"call": "expression", "expr": "outputs.clean"}},
]
],
)
@model_validator(mode="after")
def _validate_inner_action_list(self) -> FlowEachActionDefinition:
@@ -394,14 +584,48 @@ FlowActionDefinition = (
class FlowMethodDefinition(BaseModel):
"""Static definition of one Flow method and its execution roles."""
description: str | None = None
do: FlowActionDefinition
start: bool | FlowDefinitionCondition | None = None
listen: FlowDefinitionCondition | None = None
router: bool = False
emit: list[str] | None = None
human_feedback: FlowHumanFeedbackDefinition | None = None
persist: FlowPersistenceDefinition | None = None
description: str | None = Field(
default=None,
description="Human-readable summary of what this method does.",
examples=["Normalize the incoming topic."],
)
do: FlowActionDefinition = Field(
description="Action executed when this method runs.",
examples=[{"call": "script", "code": "return input.strip()"}],
)
start: bool | FlowDefinitionCondition | None = Field(
default=None,
description=(
"Marks a start method. True starts unconditionally; a condition starts "
"when the kickoff inputs or events satisfy it."
),
examples=[True],
)
listen: FlowDefinitionCondition | None = Field(
default=None,
description="Trigger condition that runs this method after upstream events.",
examples=["seed", {"or": ["approved", "revise"]}],
)
router: bool = Field(
default=False,
description="Whether the method output should be treated as the next event name.",
examples=[True],
)
emit: list[str] | None = Field(
default=None,
description="Declared router events this method may emit.",
examples=[["approved", "revise"]],
)
human_feedback: FlowHumanFeedbackDefinition | None = Field(
default=None,
description="Optional human feedback step applied after the method action.",
examples=[{"message": "Review the research summary before publishing."}],
)
persist: FlowPersistenceDefinition | None = Field(
default=None,
description="Method-level persistence override.",
examples=[{"enabled": True}],
)
@model_validator(mode="after")
def _canonicalize_human_feedback_routing(self) -> FlowMethodDefinition:
@@ -427,19 +651,71 @@ class FlowMethodDefinition(BaseModel):
class FlowDefinition(BaseModel):
"""Static, serializable definition of a Flow."""
model_config = ConfigDict(populate_by_name=True, arbitrary_types_allowed=True)
schema_: TypingLiteral["crewai.flow/v1"] = Field(
default="crewai.flow/v1", alias="schema"
model_config = ConfigDict(
populate_by_name=True,
arbitrary_types_allowed=True,
)
schema_: Literal["crewai.flow/v1"] = Field(
default="crewai.flow/v1",
alias="schema",
description="Flow Definition schema identifier and version.",
examples=["crewai.flow/v1"],
)
name: str = Field(
description="Unique flow name used in logs, events, and traces.",
examples=["ResearchFlow"],
)
description: str | None = Field(
default=None,
description="Human-readable summary of the flow.",
examples=["Normalize a topic and prepare it for research."],
)
state: FlowStateDefinition | None = Field(
default=None,
description="State contract for kickoff inputs and runtime state.",
examples=[{"type": "dict", "default": {"topic": "AI agents"}}],
)
config: FlowConfigDefinition = Field(
default_factory=FlowConfigDefinition,
description="Serializable flow-level runtime configuration.",
examples=[{"stream": True, "max_method_calls": 20}],
)
persist: FlowPersistenceDefinition | None = Field(
default=None,
description="Flow-level persistence configuration.",
examples=[{"enabled": True}],
)
conversational: FlowConversationalDefinition | None = Field(
default=None,
description="Conversational flow configuration, when the flow supports chat.",
)
methods: dict[str, FlowMethodDefinition] = Field(
default_factory=dict,
description="Mapping of method names to method definitions.",
examples=[
{
"seed": {
"start": True,
"do": {"call": "expression", "expr": "state.topic"},
}
}
],
)
diagnostics: list[FlowDefinitionDiagnostic] = Field(
default_factory=list,
description="Validation diagnostics attached to this definition.",
examples=[
[
{
"code": "router_without_trigger",
"message": "router: true requires either start or listen",
"severity": "error",
"path": "methods.decide",
}
]
],
)
name: str
description: str | None = None
state: FlowStateDefinition | None = None
config: FlowConfigDefinition = Field(default_factory=FlowConfigDefinition)
persist: FlowPersistenceDefinition | None = None
conversational: FlowConversationalDefinition | None = None
methods: dict[str, FlowMethodDefinition] = Field(default_factory=dict)
diagnostics: list[FlowDefinitionDiagnostic] = Field(default_factory=list)
@model_validator(mode="after")
def _validate_method_names(self) -> FlowDefinition:

View File

@@ -1090,8 +1090,6 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
def build(name: str, definition: FlowMethodDefinition) -> Callable[..., Any]:
try:
return build_action(self, definition.do)
except RuntimeError:
raise
except Exception as e:
unresolved.append(f"{name}: {e}")
return lambda *args, **kwargs: None

View File

@@ -2,12 +2,11 @@
from __future__ import annotations
import ast
import asyncio
from collections.abc import Callable
import contextvars
import inspect
import os
import textwrap
from typing import TYPE_CHECKING, Any, Protocol, cast
from crewai.flow.flow_definition import (
@@ -33,8 +32,6 @@ __all__ = ["build_action"]
LocalContext = dict[str, Any]
_LOCAL_CONTEXT_KWARG = "__flow_definition_local_context"
_ALLOW_SCRIPT_EXECUTION_ENV_VAR = "CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION"
_TRUSTED_SCRIPT_EXECUTION_VALUES = frozenset({"1", "true", "yes"})
class _BuiltAction(Protocol):
@@ -167,39 +164,14 @@ class ScriptAction:
)
def _compile_handler(self) -> Callable[..., Any]:
raw = os.environ.get(_ALLOW_SCRIPT_EXECUTION_ENV_VAR, "")
if raw.strip().lower() not in _TRUSTED_SCRIPT_EXECUTION_VALUES:
raise RuntimeError(
"Flow script execution is disabled by default. "
f"Set {_ALLOW_SCRIPT_EXECUTION_ENV_VAR}=1 to enable it only for "
"trusted flow definitions."
)
filename = f"crewai.flow.script.{self.flow._definition.name}"
module = ast.parse(self.definition.code, filename=filename)
function = ast.FunctionDef(
name="_flow_script",
args=ast.arguments(
posonlyargs=[],
args=[ast.arg(arg) for arg in ("state", "outputs", "input", "item")],
vararg=None,
kwonlyargs=[],
kw_defaults=[],
kwarg=None,
defaults=[],
),
body=module.body or [ast.Pass()],
decorator_list=[],
returns=None,
type_comment=None,
type_params=[],
namespace: dict[str, Any] = {
"__name__": f"crewai.flow.script.{self.flow._definition.name}",
}
source = _script_function_source(self.definition.code)
exec( # nosec B102 # noqa: S102
compile(source, namespace["__name__"], "exec"), namespace
)
module.body = [function]
ast.fix_missing_locations(module)
namespace: dict[str, Any] = {"__name__": filename}
exec(compile(module, filename, "exec"), namespace) # nosec B102 # noqa: S102
return cast(Callable[..., Any], namespace["_flow_script"])
return cast(Callable[..., Any], namespace["__flow_script__"])
class EachAction:
@@ -303,3 +275,14 @@ def _pop_local_context(kwargs: dict[str, Any]) -> LocalContext | None:
if not isinstance(local_context, dict):
raise TypeError("flow definition local context must be a mapping")
return cast(LocalContext, local_context)
def _script_function_source(code: str) -> str:
body = code if code.strip() else "pass"
source = (
"def __flow_script__(state, outputs, input, item):\n"
f"{textwrap.indent(body, ' ')}"
)
if not source.endswith("\n"):
source += "\n"
return source

View File

@@ -3,27 +3,29 @@
from __future__ import annotations
from collections.abc import Mapping
from typing import Any, TypedDict
from typing import Any
from crewai.utilities.serialization import to_serializable
class _MethodOutput(TypedDict):
method: str
output: Any
def outputs_by_name(
method_outputs: list[_MethodOutput],
method_outputs: list[Any],
*,
local_outputs: Mapping[str, Any] | None = None,
serialize: bool = False,
) -> dict[str, Any]:
outputs: dict[str, Any] = {}
for entry in method_outputs:
outputs[entry["method"]] = _output_value(entry["output"], serialize=serialize)
method = ""
output = entry
if isinstance(entry, dict) and "output" in entry:
method = str(entry.get("method", ""))
output = entry["output"]
outputs[method] = _output_value(output, serialize=serialize)
if local_outputs is not None:
if not isinstance(local_outputs, Mapping):
raise TypeError("flow definition local outputs must be a mapping")
outputs.update(
{
key: _output_value(output, serialize=serialize)

View File

@@ -645,11 +645,14 @@ class TestLegacyMethodOutputsRestore:
context = _expression_context(restored)
assert context["outputs"] == {"": "legacy"}
def test_raw_legacy_outputs_property_remains_readable(self) -> None:
def test_raw_legacy_outputs_remain_readable(self) -> None:
from crewai.flow.runtime._expressions import _expression_context
flow = Flow()
flow._method_outputs = ["legacy"]
assert flow.method_outputs == ["legacy"]
assert _expression_context(flow)["outputs"] == {"": "legacy"}
class TestAgentCheckpoint:

View File

@@ -63,6 +63,104 @@ def test_flow_public_exports_are_explicit():
assert "calculate_node_levels" not in flow_visualization.__all__
def test_flow_definition_json_schema_carries_reference_descriptions():
schema = flow_definition.FlowDefinition.json_schema()
defs = schema["$defs"]
assert schema["properties"]["schema"]["description"]
assert schema["properties"]["methods"]["description"]
method_properties = defs["FlowMethodDefinition"]["properties"]
assert method_properties["do"]["description"] == "Action executed when this method runs."
assert "Trigger condition" in method_properties["listen"]["description"]
script_properties = defs["FlowScriptActionDefinition"]["properties"]
assert "trusted inline Python" in script_properties["call"]["description"]
assert "not interpolated" in script_properties["code"]["description"]
assert "not sandboxed" in script_properties["code"]["description"]
state_schema = schema["properties"]["state"]["anyOf"][0]
assert state_schema["discriminator"]["propertyName"] == "type"
assert state_schema["discriminator"]["mapping"] == {
"dict": "#/$defs/FlowDictStateDefinition",
"json_schema": "#/$defs/FlowJsonSchemaStateDefinition",
"pydantic": "#/$defs/FlowPydanticStateDefinition",
"unknown": "#/$defs/FlowUnknownStateDefinition",
}
dict_state_properties = defs["FlowDictStateDefinition"]["properties"]
assert dict_state_properties["type"]["description"]
assert "ref" not in dict_state_properties
json_schema_state_properties = defs["FlowJsonSchemaStateDefinition"]["properties"]
assert json_schema_state_properties["json_schema"]["description"]
assert "json_schema" in defs["FlowJsonSchemaStateDefinition"]["required"]
pydantic_state_properties = defs["FlowPydanticStateDefinition"]["properties"]
assert "Fallback JSON Schema" in pydantic_state_properties["json_schema"][
"description"
]
each_properties = defs["FlowEachActionDefinition"]["properties"]
assert "list to iterate" in each_properties["in"]["description"]
assert "Ordered inner actions" in each_properties["do"]["description"]
def test_flow_definition_json_schema_carries_field_examples_only():
schema = flow_definition.FlowDefinition.json_schema()
defs = schema["$defs"]
for model_name in [
"FlowDefinition",
"FlowCodeActionDefinition",
"FlowToolActionDefinition",
"FlowCrewActionDefinition",
"FlowExpressionActionDefinition",
"FlowScriptActionDefinition",
"FlowEachActionDefinition",
"FlowMethodDefinition",
"FlowDictStateDefinition",
"FlowJsonSchemaStateDefinition",
"FlowPydanticStateDefinition",
"FlowUnknownStateDefinition",
"FlowConfigDefinition",
"FlowPersistenceDefinition",
"FlowHumanFeedbackDefinition",
"FlowDefinitionDiagnostic",
]:
model_schema = schema if model_name == "FlowDefinition" else defs[model_name]
assert "examples" not in model_schema
assert schema["properties"]["name"]["examples"] == ["ResearchFlow"]
assert schema["properties"]["schema"]["examples"] == ["crewai.flow/v1"]
assert schema["properties"]["methods"]["examples"][0]["seed"]["do"] == {
"call": "expression",
"expr": "state.topic",
}
script_properties = defs["FlowScriptActionDefinition"]["properties"]
assert script_properties["call"]["examples"] == ["script"]
assert "input.strip()" in script_properties["code"]["examples"][0]
assert script_properties["language"]["examples"] == ["python"]
action_properties = defs["FlowCodeActionDefinition"]["properties"]
assert action_properties["ref"]["examples"] == [
"my_project.flows:normalize_topic"
]
assert action_properties["with"]["examples"] == [{"topic": "${state.topic}"}]
each_properties = defs["FlowEachActionDefinition"]["properties"]
assert each_properties["in"]["examples"] == ["state.rows"]
assert each_properties["do"]["examples"][0][0]["clean"]["call"] == "script"
method_properties = defs["FlowMethodDefinition"]["properties"]
assert method_properties["listen"]["examples"] == [
"seed",
{"or": ["approved", "revise"]},
]
assert method_properties["emit"]["examples"] == [["approved", "revise"]]
def test_flow_state_definition_uses_discriminated_branches():
definition = flow_definition.FlowDefinition.model_validate(
{

View File

@@ -1145,31 +1145,7 @@ methods:
assert flow.kickoff(inputs={"rows": ["a", "b"]}) == ["async:a", "async:b"]
def test_script_action_requires_explicit_opt_in():
yaml_str = """
schema: crewai.flow/v1
name: ScriptFlow
methods:
normalize:
do:
call: script
code: |
return "blocked"
start: true
"""
with pytest.raises(
RuntimeError, match="CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION=1"
) as exc_info:
Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
assert "methods with unresolvable actions" not in str(exc_info.value)
def test_script_action_runs_python_imports_mutates_state_and_returns_value(
monkeypatch: pytest.MonkeyPatch,
):
monkeypatch.setenv("CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION", "1")
def test_script_action_runs_python_imports_mutates_state_and_returns_value():
yaml_str = """
schema: crewai.flow/v1
name: ScriptFlow
@@ -1191,11 +1167,7 @@ methods:
assert flow.state["rounded"] == 4
def test_script_listener_reads_trigger_input_and_outputs(
monkeypatch: pytest.MonkeyPatch,
):
monkeypatch.setenv("CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION", "1")
def test_script_listener_reads_trigger_input_and_outputs():
yaml_str = """
schema: crewai.flow/v1
name: ScriptFlow
@@ -1220,11 +1192,7 @@ methods:
assert flow.state["input_matches_output"] is True
def test_script_each_action_reads_item_and_inner_outputs(
monkeypatch: pytest.MonkeyPatch,
):
monkeypatch.setenv("CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION", "1")
def test_script_each_action_reads_item_and_inner_outputs():
yaml_str = """
schema: crewai.flow/v1
name: ScriptEachFlow