Compare commits

..

1 Commits

Author SHA1 Message Date
Vinicius Brasil
386a1650da Add script/code blocks to FlowDefinition
Let a Flow method run trusted inline Python with `call: script`. The code
is compiled once into a generated function and receives the runtime
values as arguments.

```yaml
methods:
  normalize:
    start: true
    do:
      call: script
      code: |
        import math
        state["rounded"] = math.ceil(state["raw_score"])
        return f"rounded:{state['rounded']}"
```

Even though this shares the same surface of tools (custom code), I
decided to make it opt-in for now, using
`CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION=1`.
2026-06-16 22:12:44 -07:00
8 changed files with 255 additions and 23 deletions

View File

@@ -55,6 +55,7 @@ __all__ = [
"FlowMethodDefinition",
"FlowPersistenceDefinition",
"FlowPydanticStateDefinition",
"FlowScriptActionDefinition",
"FlowStateDefinition",
"FlowToolActionDefinition",
"FlowUnknownStateDefinition",
@@ -301,11 +302,39 @@ class FlowExpressionActionDefinition(BaseModel):
expr: str
class FlowScriptActionDefinition(BaseModel):
"""A Flow method action that executes trusted inline Python."""
model_config = ConfigDict(extra="forbid")
call: TypingLiteral["script"] = Field(
description="Action discriminator. Use script to execute trusted inline Python.",
examples=["script"],
)
code: str = Field(
description=(
"Trusted Python source executed as a generated function. Runtime values are "
"passed as state, outputs, input, and item; they are not interpolated into "
"the source. This is not sandboxed."
),
examples=[
"state['normalized_topic'] = input.strip()\n"
"return state['normalized_topic']"
],
)
language: TypingLiteral["python"] = Field(
default="python",
description="Script language. Only python is currently supported.",
examples=["python"],
)
FlowInnerActionDefinition = (
FlowCodeActionDefinition
| FlowToolActionDefinition
| FlowCrewActionDefinition
| FlowExpressionActionDefinition
| FlowScriptActionDefinition
)
@@ -357,6 +386,7 @@ FlowActionDefinition = (
| FlowToolActionDefinition
| FlowCrewActionDefinition
| FlowExpressionActionDefinition
| FlowScriptActionDefinition
| FlowEachActionDefinition
)

View File

@@ -1090,6 +1090,8 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
def build(name: str, definition: FlowMethodDefinition) -> Callable[..., Any]:
try:
return build_action(self, definition.do)
except RuntimeError:
raise
except Exception as e:
unresolved.append(f"{name}: {e}")
return lambda *args, **kwargs: None

View File

@@ -2,10 +2,12 @@
from __future__ import annotations
import ast
import asyncio
from collections.abc import Callable
import contextvars
import inspect
import os
from typing import TYPE_CHECKING, Any, Protocol, cast
from crewai.flow.flow_definition import (
@@ -15,9 +17,11 @@ from crewai.flow.flow_definition import (
FlowEachActionDefinition,
FlowEachInnerActionDefinition,
FlowExpressionActionDefinition,
FlowScriptActionDefinition,
FlowToolActionDefinition,
)
from crewai.flow.runtime._expressions import evaluate_expression, render_with_block
from crewai.flow.runtime._outputs import outputs_by_name
from crewai.flow.runtime._refs import InvalidRefError, resolve_ref
@@ -29,6 +33,8 @@ __all__ = ["build_action"]
LocalContext = dict[str, Any]
_LOCAL_CONTEXT_KWARG = "__flow_definition_local_context"
_ALLOW_SCRIPT_EXECUTION_ENV_VAR = "CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION"
_TRUSTED_SCRIPT_EXECUTION_VALUES = frozenset({"1", "true", "yes"})
class _BuiltAction(Protocol):
@@ -140,6 +146,62 @@ class ExpressionAction:
)
class ScriptAction:
definition_type = FlowScriptActionDefinition
def __init__(self, flow: Flow[Any], definition: FlowScriptActionDefinition) -> None:
self.flow = flow
self.definition = definition
self.handler = self._compile_handler()
def run(self, *args: Any, **kwargs: Any) -> Any:
local_context = _pop_local_context(kwargs)
return self.handler(
state=self.flow.state,
outputs=outputs_by_name(
self.flow._method_outputs,
local_outputs=local_context.get("outputs") if local_context else None,
),
input=args[0] if args else None,
item=local_context.get("item") if local_context else None,
)
def _compile_handler(self) -> Callable[..., Any]:
raw = os.environ.get(_ALLOW_SCRIPT_EXECUTION_ENV_VAR, "")
if raw.strip().lower() not in _TRUSTED_SCRIPT_EXECUTION_VALUES:
raise RuntimeError(
"Flow script execution is disabled by default. "
f"Set {_ALLOW_SCRIPT_EXECUTION_ENV_VAR}=1 to enable it only for "
"trusted flow definitions."
)
filename = f"crewai.flow.script.{self.flow._definition.name}"
module = ast.parse(self.definition.code, filename=filename)
function = ast.FunctionDef(
name="_flow_script",
args=ast.arguments(
posonlyargs=[],
args=[ast.arg(arg) for arg in ("state", "outputs", "input", "item")],
vararg=None,
kwonlyargs=[],
kw_defaults=[],
kwarg=None,
defaults=[],
),
body=module.body or [ast.Pass()],
decorator_list=[],
returns=None,
type_comment=None,
type_params=[],
)
module.body = [function]
ast.fix_missing_locations(module)
namespace: dict[str, Any] = {"__name__": filename}
exec(compile(module, filename, "exec"), namespace) # nosec B102 # noqa: S102
return cast(Callable[..., Any], namespace["_flow_script"])
class EachAction:
definition_type = FlowEachActionDefinition
@@ -199,6 +261,7 @@ _ACTION_TYPES: tuple[_ActionType, ...] = (
ToolAction,
CrewAction,
ExpressionAction,
ScriptAction,
)

View File

@@ -7,6 +7,7 @@ import json
import re
from typing import TYPE_CHECKING, Any, cast
from crewai.flow.runtime._outputs import outputs_by_name
from crewai.utilities.serialization import to_serializable
@@ -44,7 +45,12 @@ def evaluate_expression(
def _expression_context(
flow: Flow[Any], local_context: dict[str, Any] | None = None
) -> dict[str, Any]:
outputs = _outputs_by_name(flow._method_outputs)
local_outputs = local_context.get("outputs") if local_context else None
outputs = outputs_by_name(
flow._method_outputs,
local_outputs=local_outputs,
serialize=True,
)
context: dict[str, Any] = {
"state": flow._copy_and_serialize_state(),
"outputs": outputs,
@@ -53,29 +59,12 @@ def _expression_context(
local_values = {
key: to_serializable(value, max_depth=0)
for key, value in local_context.items()
if key not in {"outputs", "state"}
}
local_outputs = local_values.pop("outputs", None)
local_values.pop("state", None)
context.update(local_values)
if local_outputs is not None:
if not isinstance(local_outputs, dict):
raise TypeError("flow definition local outputs must be a mapping")
context["outputs"] = {**outputs, **local_outputs}
return context
def _outputs_by_name(method_outputs: list[Any]) -> dict[str, Any]:
outputs: dict[str, Any] = {}
for entry in method_outputs:
method = ""
output = entry
if isinstance(entry, dict) and "output" in entry:
method = str(entry.get("method", ""))
output = entry["output"]
outputs[method] = to_serializable(output, max_depth=0)
return outputs
def _render_value(value: Any, context: dict[str, Any]) -> Any:
if isinstance(value, str):
return _render_string(value, context)

View File

@@ -0,0 +1,40 @@
"""Shared FlowDefinition runtime output helpers."""
from __future__ import annotations
from collections.abc import Mapping
from typing import Any, TypedDict
from crewai.utilities.serialization import to_serializable
class _MethodOutput(TypedDict):
method: str
output: Any
def outputs_by_name(
method_outputs: list[_MethodOutput],
*,
local_outputs: Mapping[str, Any] | None = None,
serialize: bool = False,
) -> dict[str, Any]:
outputs: dict[str, Any] = {}
for entry in method_outputs:
outputs[entry["method"]] = _output_value(entry["output"], serialize=serialize)
if local_outputs is not None:
outputs.update(
{
key: _output_value(output, serialize=serialize)
for key, output in local_outputs.items()
}
)
return outputs
def _output_value(value: Any, *, serialize: bool) -> Any:
if not serialize:
return value
return to_serializable(value, max_depth=0)

View File

@@ -645,14 +645,11 @@ class TestLegacyMethodOutputsRestore:
context = _expression_context(restored)
assert context["outputs"] == {"": "legacy"}
def test_raw_legacy_outputs_remain_readable(self) -> None:
from crewai.flow.runtime._expressions import _expression_context
def test_raw_legacy_outputs_property_remains_readable(self) -> None:
flow = Flow()
flow._method_outputs = ["legacy"]
assert flow.method_outputs == ["legacy"]
assert _expression_context(flow)["outputs"] == {"": "legacy"}
class TestAgentCheckpoint:

View File

@@ -54,6 +54,7 @@ def test_flow_public_exports_are_explicit():
"FlowMethodDefinition",
"FlowPersistenceDefinition",
"FlowPydanticStateDefinition",
"FlowScriptActionDefinition",
"FlowStateDefinition",
"FlowToolActionDefinition",
"FlowUnknownStateDefinition",

View File

@@ -1145,6 +1145,116 @@ methods:
assert flow.kickoff(inputs={"rows": ["a", "b"]}) == ["async:a", "async:b"]
def test_script_action_requires_explicit_opt_in():
yaml_str = """
schema: crewai.flow/v1
name: ScriptFlow
methods:
normalize:
do:
call: script
code: |
return "blocked"
start: true
"""
with pytest.raises(
RuntimeError, match="CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION=1"
) as exc_info:
Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
assert "methods with unresolvable actions" not in str(exc_info.value)
def test_script_action_runs_python_imports_mutates_state_and_returns_value(
monkeypatch: pytest.MonkeyPatch,
):
monkeypatch.setenv("CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION", "1")
yaml_str = """
schema: crewai.flow/v1
name: ScriptFlow
methods:
normalize:
do:
call: script
code: |
import math
state["rounded"] = math.ceil(state["raw_score"])
return f"rounded:{state['rounded']}"
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
assert flow.kickoff(inputs={"raw_score": 3.2}) == "rounded:4"
assert flow.state["rounded"] == 4
def test_script_listener_reads_trigger_input_and_outputs(
monkeypatch: pytest.MonkeyPatch,
):
monkeypatch.setenv("CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION", "1")
yaml_str = """
schema: crewai.flow/v1
name: ScriptFlow
methods:
seed:
do:
call: expression
expr: "'alpha'"
start: true
combine:
do:
call: script
code: |
state["input_matches_output"] = input == outputs["seed"]
return f"{outputs['seed']}:{input}"
listen: seed
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
assert flow.kickoff() == "alpha:alpha"
assert flow.state["input_matches_output"] is True
def test_script_each_action_reads_item_and_inner_outputs(
monkeypatch: pytest.MonkeyPatch,
):
monkeypatch.setenv("CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION", "1")
yaml_str = """
schema: crewai.flow/v1
name: ScriptEachFlow
methods:
seed:
do:
call: expression
expr: "'global'"
start: true
process_rows:
do:
call: each
in: state.rows
do:
- clean:
call: script
code: |
return item.strip()
- tag:
call: script
code: |
return f"{outputs['seed']}:{outputs['clean']}"
listen: seed
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
assert flow.kickoff(inputs={"rows": [" a ", " b "]}) == ["global:a", "global:b"]
def test_each_action_uses_iteration_outputs_between_nested_actions():
yaml_str = f"""
schema: crewai.flow/v1