Add script action to FlowDefinition

Let a Flow method run trusted inline Python with `call: script`. The code
is compiled once into a generated function and receives the runtime
values as arguments — state, outputs, input, and item — so nothing is
interpolated into the source. This is not sandboxed.

Also extracts the shared `outputs_by_name` helper into `runtime/_outputs`
so the script action and the expression evaluator build the outputs map
the same way, including each-iteration local outputs.

```yaml
methods:
  normalize:
    start: true
    do:
      call: script
      code: |
        import math
        state["rounded"] = math.ceil(state["raw_score"])
        return f"rounded:{state['rounded']}"
```

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Vinicius Brasil
2026-06-16 21:19:48 -07:00
parent 7bb9bc7e1a
commit b69e5ccc10
6 changed files with 205 additions and 19 deletions

View File

@@ -55,6 +55,7 @@ __all__ = [
"FlowMethodDefinition",
"FlowPersistenceDefinition",
"FlowPydanticStateDefinition",
"FlowScriptActionDefinition",
"FlowStateDefinition",
"FlowToolActionDefinition",
"FlowUnknownStateDefinition",
@@ -301,11 +302,39 @@ class FlowExpressionActionDefinition(BaseModel):
expr: str
class FlowScriptActionDefinition(BaseModel):
"""A Flow method action that executes trusted inline Python."""
model_config = ConfigDict(extra="forbid")
call: TypingLiteral["script"] = Field(
description="Action discriminator. Use script to execute trusted inline Python.",
examples=["script"],
)
code: str = Field(
description=(
"Trusted Python source executed as a generated function. Runtime values are "
"passed as state, outputs, input, and item; they are not interpolated into "
"the source. This is not sandboxed."
),
examples=[
"state['normalized_topic'] = input.strip()\n"
"return state['normalized_topic']"
],
)
language: TypingLiteral["python"] = Field(
default="python",
description="Script language. Only python is currently supported.",
examples=["python"],
)
FlowInnerActionDefinition = (
FlowCodeActionDefinition
| FlowToolActionDefinition
| FlowCrewActionDefinition
| FlowExpressionActionDefinition
| FlowScriptActionDefinition
)
@@ -357,6 +386,7 @@ FlowActionDefinition = (
| FlowToolActionDefinition
| FlowCrewActionDefinition
| FlowExpressionActionDefinition
| FlowScriptActionDefinition
| FlowEachActionDefinition
)

View File

@@ -6,6 +6,7 @@ import asyncio
from collections.abc import Callable
import contextvars
import inspect
import textwrap
from typing import TYPE_CHECKING, Any, Protocol, cast
from crewai.flow.flow_definition import (
@@ -15,9 +16,11 @@ from crewai.flow.flow_definition import (
FlowEachActionDefinition,
FlowEachInnerActionDefinition,
FlowExpressionActionDefinition,
FlowScriptActionDefinition,
FlowToolActionDefinition,
)
from crewai.flow.runtime._expressions import evaluate_expression, render_with_block
from crewai.flow.runtime._outputs import outputs_by_name
from crewai.flow.runtime._refs import InvalidRefError, resolve_ref
@@ -140,6 +143,37 @@ class ExpressionAction:
)
class ScriptAction:
definition_type = FlowScriptActionDefinition
def __init__(self, flow: Flow[Any], definition: FlowScriptActionDefinition) -> None:
self.flow = flow
self.definition = definition
self.handler = self._compile_handler()
def run(self, *args: Any, **kwargs: Any) -> Any:
local_context = _pop_local_context(kwargs)
return self.handler(
state=self.flow.state,
outputs=outputs_by_name(
self.flow._method_outputs,
local_outputs=local_context.get("outputs") if local_context else None,
),
input=args[0] if args else None,
item=local_context.get("item") if local_context else None,
)
def _compile_handler(self) -> Callable[..., Any]:
namespace: dict[str, Any] = {
"__name__": f"crewai.flow.script.{self.flow._definition.name}",
}
source = _script_function_source(self.definition.code)
exec( # nosec B102 # noqa: S102
compile(source, namespace["__name__"], "exec"), namespace
)
return cast(Callable[..., Any], namespace["__flow_script__"])
class EachAction:
definition_type = FlowEachActionDefinition
@@ -199,6 +233,7 @@ _ACTION_TYPES: tuple[_ActionType, ...] = (
ToolAction,
CrewAction,
ExpressionAction,
ScriptAction,
)
@@ -240,3 +275,14 @@ def _pop_local_context(kwargs: dict[str, Any]) -> LocalContext | None:
if not isinstance(local_context, dict):
raise TypeError("flow definition local context must be a mapping")
return cast(LocalContext, local_context)
def _script_function_source(code: str) -> str:
body = code if code.strip() else "pass"
source = (
"def __flow_script__(state, outputs, input, item):\n"
f"{textwrap.indent(body, ' ')}"
)
if not source.endswith("\n"):
source += "\n"
return source

View File

@@ -7,6 +7,7 @@ import json
import re
from typing import TYPE_CHECKING, Any, cast
from crewai.flow.runtime._outputs import outputs_by_name
from crewai.utilities.serialization import to_serializable
@@ -44,7 +45,12 @@ def evaluate_expression(
def _expression_context(
flow: Flow[Any], local_context: dict[str, Any] | None = None
) -> dict[str, Any]:
outputs = _outputs_by_name(flow._method_outputs)
local_outputs = local_context.get("outputs") if local_context else None
outputs = outputs_by_name(
flow._method_outputs,
local_outputs=local_outputs,
serialize=True,
)
context: dict[str, Any] = {
"state": flow._copy_and_serialize_state(),
"outputs": outputs,
@@ -53,29 +59,12 @@ def _expression_context(
local_values = {
key: to_serializable(value, max_depth=0)
for key, value in local_context.items()
if key not in {"outputs", "state"}
}
local_outputs = local_values.pop("outputs", None)
local_values.pop("state", None)
context.update(local_values)
if local_outputs is not None:
if not isinstance(local_outputs, dict):
raise TypeError("flow definition local outputs must be a mapping")
context["outputs"] = {**outputs, **local_outputs}
return context
def _outputs_by_name(method_outputs: list[Any]) -> dict[str, Any]:
outputs: dict[str, Any] = {}
for entry in method_outputs:
method = ""
output = entry
if isinstance(entry, dict) and "output" in entry:
method = str(entry.get("method", ""))
output = entry["output"]
outputs[method] = to_serializable(output, max_depth=0)
return outputs
def _render_value(value: Any, context: dict[str, Any]) -> Any:
if isinstance(value, str):
return _render_string(value, context)

View File

@@ -0,0 +1,42 @@
"""Shared FlowDefinition runtime output helpers."""
from __future__ import annotations
from collections.abc import Mapping
from typing import Any
from crewai.utilities.serialization import to_serializable
def outputs_by_name(
method_outputs: list[Any],
*,
local_outputs: Mapping[str, Any] | None = None,
serialize: bool = False,
) -> dict[str, Any]:
outputs: dict[str, Any] = {}
for entry in method_outputs:
method = ""
output = entry
if isinstance(entry, dict) and "output" in entry:
method = str(entry.get("method", ""))
output = entry["output"]
outputs[method] = _output_value(output, serialize=serialize)
if local_outputs is not None:
if not isinstance(local_outputs, Mapping):
raise TypeError("flow definition local outputs must be a mapping")
outputs.update(
{
key: _output_value(output, serialize=serialize)
for key, output in local_outputs.items()
}
)
return outputs
def _output_value(value: Any, *, serialize: bool) -> Any:
if not serialize:
return value
return to_serializable(value, max_depth=0)

View File

@@ -54,6 +54,7 @@ def test_flow_public_exports_are_explicit():
"FlowMethodDefinition",
"FlowPersistenceDefinition",
"FlowPydanticStateDefinition",
"FlowScriptActionDefinition",
"FlowStateDefinition",
"FlowToolActionDefinition",
"FlowUnknownStateDefinition",

View File

@@ -1145,6 +1145,84 @@ methods:
assert flow.kickoff(inputs={"rows": ["a", "b"]}) == ["async:a", "async:b"]
def test_script_action_runs_python_imports_mutates_state_and_returns_value():
yaml_str = """
schema: crewai.flow/v1
name: ScriptFlow
methods:
normalize:
do:
call: script
code: |
import math
state["rounded"] = math.ceil(state["raw_score"])
return f"rounded:{state['rounded']}"
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
assert flow.kickoff(inputs={"raw_score": 3.2}) == "rounded:4"
assert flow.state["rounded"] == 4
def test_script_listener_reads_trigger_input_and_outputs():
yaml_str = """
schema: crewai.flow/v1
name: ScriptFlow
methods:
seed:
do:
call: expression
expr: "'alpha'"
start: true
combine:
do:
call: script
code: |
state["input_matches_output"] = input == outputs["seed"]
return f"{outputs['seed']}:{input}"
listen: seed
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
assert flow.kickoff() == "alpha:alpha"
assert flow.state["input_matches_output"] is True
def test_script_each_action_reads_item_and_inner_outputs():
yaml_str = """
schema: crewai.flow/v1
name: ScriptEachFlow
methods:
seed:
do:
call: expression
expr: "'global'"
start: true
process_rows:
do:
call: each
in: state.rows
do:
- clean:
call: script
code: |
return item.strip()
- tag:
call: script
code: |
return f"{outputs['seed']}:{outputs['clean']}"
listen: seed
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
assert flow.kickoff(inputs={"rows": [" a ", " b "]}) == ["global:a", "global:b"]
def test_each_action_uses_iteration_outputs_between_nested_actions():
yaml_str = f"""
schema: crewai.flow/v1