From 5bd10ee2c43b56a16da312fe1774716dc47acfcd Mon Sep 17 00:00:00 2001 From: Vinicius Brasil Date: Wed, 17 Jun 2026 18:38:41 -0700 Subject: [PATCH] Add script/code block action to FlowDefinition (#6197) * Add script/code blocks to FlowDefinition Let a Flow method run trusted inline Python with `call: script`. The code is compiled once into a generated function and receives the runtime values as arguments. ```yaml methods: normalize: start: true do: call: script code: | import math state["rounded"] = math.ceil(state["raw_score"]) return f"rounded:{state['rounded']}" ``` Even though this shares the same surface of tools (custom code), I decided to make it opt-in for now, using `CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION=1`. * Address code review comments --- lib/crewai/src/crewai/flow/flow_definition.py | 30 +++++ .../src/crewai/flow/runtime/__init__.py | 4 +- .../src/crewai/flow/runtime/_actions.py | 74 +++++++++++- .../src/crewai/flow/runtime/_expressions.py | 27 ++--- .../src/crewai/flow/runtime/_outputs.py | 40 +++++++ lib/crewai/tests/test_checkpoint.py | 5 +- lib/crewai/tests/test_flow_definition.py | 1 + lib/crewai/tests/test_flow_from_definition.py | 112 ++++++++++++++++++ 8 files changed, 268 insertions(+), 25 deletions(-) create mode 100644 lib/crewai/src/crewai/flow/runtime/_outputs.py diff --git a/lib/crewai/src/crewai/flow/flow_definition.py b/lib/crewai/src/crewai/flow/flow_definition.py index 29c561486..b1edef2cd 100644 --- a/lib/crewai/src/crewai/flow/flow_definition.py +++ b/lib/crewai/src/crewai/flow/flow_definition.py @@ -55,6 +55,7 @@ __all__ = [ "FlowMethodDefinition", "FlowPersistenceDefinition", "FlowPydanticStateDefinition", + "FlowScriptActionDefinition", "FlowStateDefinition", "FlowToolActionDefinition", "FlowUnknownStateDefinition", @@ -301,11 +302,39 @@ class FlowExpressionActionDefinition(BaseModel): expr: str +class FlowScriptActionDefinition(BaseModel): + """A Flow method action that executes trusted inline Python.""" + + model_config = ConfigDict(extra="forbid") + + call: TypingLiteral["script"] = Field( + description="Action discriminator. Use script to execute trusted inline Python.", + examples=["script"], + ) + code: str = Field( + description=( + "Trusted Python source executed as a generated function. Runtime values are " + "passed as state, outputs, input, and item; they are not interpolated into " + "the source. This is not sandboxed." + ), + examples=[ + "state['normalized_topic'] = input.strip()\n" + "return state['normalized_topic']" + ], + ) + language: TypingLiteral["python"] = Field( + default="python", + description="Script language. Only python is currently supported.", + examples=["python"], + ) + + FlowInnerActionDefinition = ( FlowCodeActionDefinition | FlowToolActionDefinition | FlowCrewActionDefinition | FlowExpressionActionDefinition + | FlowScriptActionDefinition ) @@ -357,6 +386,7 @@ FlowActionDefinition = ( | FlowToolActionDefinition | FlowCrewActionDefinition | FlowExpressionActionDefinition + | FlowScriptActionDefinition | FlowEachActionDefinition ) diff --git a/lib/crewai/src/crewai/flow/runtime/__init__.py b/lib/crewai/src/crewai/flow/runtime/__init__.py index 9451ecf49..85f150546 100644 --- a/lib/crewai/src/crewai/flow/runtime/__init__.py +++ b/lib/crewai/src/crewai/flow/runtime/__init__.py @@ -121,7 +121,7 @@ from crewai.flow.human_feedback import ( ) from crewai.flow.input_provider import InputProvider from crewai.flow.persistence.base import FlowPersistence -from crewai.flow.runtime._actions import build_action +from crewai.flow.runtime._actions import FlowScriptExecutionDisabledError, build_action from crewai.flow.runtime._refs import resolve_instance_ref, resolve_ref from crewai.flow.types import ( FlowExecutionData, @@ -1090,6 +1090,8 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): def build(name: str, definition: FlowMethodDefinition) -> Callable[..., Any]: try: return build_action(self, definition.do) + except FlowScriptExecutionDisabledError: + raise except Exception as e: unresolved.append(f"{name}: {e}") return lambda *args, **kwargs: None diff --git a/lib/crewai/src/crewai/flow/runtime/_actions.py b/lib/crewai/src/crewai/flow/runtime/_actions.py index 97333e209..180bc3c4b 100644 --- a/lib/crewai/src/crewai/flow/runtime/_actions.py +++ b/lib/crewai/src/crewai/flow/runtime/_actions.py @@ -2,10 +2,12 @@ from __future__ import annotations +import ast import asyncio from collections.abc import Callable import contextvars import inspect +import os from typing import TYPE_CHECKING, Any, Protocol, cast from crewai.flow.flow_definition import ( @@ -15,9 +17,11 @@ from crewai.flow.flow_definition import ( FlowEachActionDefinition, FlowEachInnerActionDefinition, FlowExpressionActionDefinition, + FlowScriptActionDefinition, FlowToolActionDefinition, ) from crewai.flow.runtime._expressions import evaluate_expression, render_with_block +from crewai.flow.runtime._outputs import outputs_by_name from crewai.flow.runtime._refs import InvalidRefError, resolve_ref @@ -25,10 +29,16 @@ if TYPE_CHECKING: from crewai.flow.runtime import Flow -__all__ = ["build_action"] +__all__ = ["FlowScriptExecutionDisabledError", "build_action"] LocalContext = dict[str, Any] _LOCAL_CONTEXT_KWARG = "__flow_definition_local_context" +_ALLOW_SCRIPT_EXECUTION_ENV_VAR = "CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION" +_TRUSTED_SCRIPT_EXECUTION_VALUES = frozenset({"1", "true", "yes"}) + + +class FlowScriptExecutionDisabledError(RuntimeError): + """Raised when a flow definition tries to execute inline script code.""" class _BuiltAction(Protocol): @@ -140,6 +150,67 @@ class ExpressionAction: ) +class ScriptAction: + definition_type = FlowScriptActionDefinition + + def __init__(self, flow: Flow[Any], definition: FlowScriptActionDefinition) -> None: + self.flow = flow + self.definition = definition + self.handler = self._compile_handler() + + def run(self, *args: Any, **kwargs: Any) -> Any: + local_context = _pop_local_context(kwargs) + return self.handler( + state=self.flow.state, + outputs=outputs_by_name( + self.flow._method_outputs, + local_outputs=local_context.get("outputs") if local_context else None, + ), + input=args[0] if args else None, + item=local_context.get("item") if local_context else None, + ) + + def _compile_handler(self) -> Callable[..., Any]: + raw = os.environ.get(_ALLOW_SCRIPT_EXECUTION_ENV_VAR, "") + if raw.strip().lower() not in _TRUSTED_SCRIPT_EXECUTION_VALUES: + raise FlowScriptExecutionDisabledError( + "Flow script execution is disabled by default. " + f"Set {_ALLOW_SCRIPT_EXECUTION_ENV_VAR}=1 to enable it only for " + "trusted flow definitions." + ) + + filename = f"crewai.flow.script.{self.flow._definition.name}" + module = ast.parse(self.definition.code, filename=filename) + function = ast.FunctionDef( + name="_flow_script", + args=ast.arguments( + posonlyargs=[], + args=[ast.arg(arg) for arg in ("state", "outputs", "input", "item")], + vararg=None, + kwonlyargs=[], + kw_defaults=[], + kwarg=None, + defaults=[], + ), + body=module.body or [ast.Pass()], + decorator_list=[], + returns=None, + type_comment=None, + type_params=[], + ) + module.body = [function] + ast.fix_missing_locations(module) + + # The YAML here is trusted project source authored by the code owner, + # so this has the same trust boundary as using custom tools. We + # intentionally do not interpolate user input and runtime values are passed + # as function arguments. This is still arbitrary trusted Python execution, + # so it remains disabled by default behind `CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION` + namespace: dict[str, Any] = {"__name__": filename} + exec(compile(module, filename, "exec"), namespace) # nosec B102 # noqa: S102 + return cast(Callable[..., Any], namespace["_flow_script"]) + + class EachAction: definition_type = FlowEachActionDefinition @@ -199,6 +270,7 @@ _ACTION_TYPES: tuple[_ActionType, ...] = ( ToolAction, CrewAction, ExpressionAction, + ScriptAction, ) diff --git a/lib/crewai/src/crewai/flow/runtime/_expressions.py b/lib/crewai/src/crewai/flow/runtime/_expressions.py index b015a9608..dae39e650 100644 --- a/lib/crewai/src/crewai/flow/runtime/_expressions.py +++ b/lib/crewai/src/crewai/flow/runtime/_expressions.py @@ -7,6 +7,7 @@ import json import re from typing import TYPE_CHECKING, Any, cast +from crewai.flow.runtime._outputs import outputs_by_name from crewai.utilities.serialization import to_serializable @@ -44,7 +45,12 @@ def evaluate_expression( def _expression_context( flow: Flow[Any], local_context: dict[str, Any] | None = None ) -> dict[str, Any]: - outputs = _outputs_by_name(flow._method_outputs) + local_outputs = local_context.get("outputs") if local_context else None + outputs = outputs_by_name( + flow._method_outputs, + local_outputs=local_outputs, + serialize=True, + ) context: dict[str, Any] = { "state": flow._copy_and_serialize_state(), "outputs": outputs, @@ -53,29 +59,12 @@ def _expression_context( local_values = { key: to_serializable(value, max_depth=0) for key, value in local_context.items() + if key not in {"outputs", "state"} } - local_outputs = local_values.pop("outputs", None) - local_values.pop("state", None) context.update(local_values) - if local_outputs is not None: - if not isinstance(local_outputs, dict): - raise TypeError("flow definition local outputs must be a mapping") - context["outputs"] = {**outputs, **local_outputs} return context -def _outputs_by_name(method_outputs: list[Any]) -> dict[str, Any]: - outputs: dict[str, Any] = {} - for entry in method_outputs: - method = "" - output = entry - if isinstance(entry, dict) and "output" in entry: - method = str(entry.get("method", "")) - output = entry["output"] - outputs[method] = to_serializable(output, max_depth=0) - return outputs - - def _render_value(value: Any, context: dict[str, Any]) -> Any: if isinstance(value, str): return _render_string(value, context) diff --git a/lib/crewai/src/crewai/flow/runtime/_outputs.py b/lib/crewai/src/crewai/flow/runtime/_outputs.py new file mode 100644 index 000000000..321c5d456 --- /dev/null +++ b/lib/crewai/src/crewai/flow/runtime/_outputs.py @@ -0,0 +1,40 @@ +"""Shared FlowDefinition runtime output helpers.""" + +from __future__ import annotations + +from collections.abc import Mapping +from typing import Any, TypedDict + +from crewai.utilities.serialization import to_serializable + + +class _MethodOutput(TypedDict): + method: str + output: Any + + +def outputs_by_name( + method_outputs: list[_MethodOutput], + *, + local_outputs: Mapping[str, Any] | None = None, + serialize: bool = False, +) -> dict[str, Any]: + outputs: dict[str, Any] = {} + for entry in method_outputs: + outputs[entry["method"]] = _output_value(entry["output"], serialize=serialize) + + if local_outputs is not None: + outputs.update( + { + key: _output_value(output, serialize=serialize) + for key, output in local_outputs.items() + } + ) + + return outputs + + +def _output_value(value: Any, *, serialize: bool) -> Any: + if not serialize: + return value + return to_serializable(value, max_depth=0) diff --git a/lib/crewai/tests/test_checkpoint.py b/lib/crewai/tests/test_checkpoint.py index 317ed0bdd..4138f3819 100644 --- a/lib/crewai/tests/test_checkpoint.py +++ b/lib/crewai/tests/test_checkpoint.py @@ -645,14 +645,11 @@ class TestLegacyMethodOutputsRestore: context = _expression_context(restored) assert context["outputs"] == {"": "legacy"} - def test_raw_legacy_outputs_remain_readable(self) -> None: - from crewai.flow.runtime._expressions import _expression_context - + def test_raw_legacy_outputs_property_remains_readable(self) -> None: flow = Flow() flow._method_outputs = ["legacy"] assert flow.method_outputs == ["legacy"] - assert _expression_context(flow)["outputs"] == {"": "legacy"} class TestAgentCheckpoint: diff --git a/lib/crewai/tests/test_flow_definition.py b/lib/crewai/tests/test_flow_definition.py index dc2bd1c37..7be04fc48 100644 --- a/lib/crewai/tests/test_flow_definition.py +++ b/lib/crewai/tests/test_flow_definition.py @@ -54,6 +54,7 @@ def test_flow_public_exports_are_explicit(): "FlowMethodDefinition", "FlowPersistenceDefinition", "FlowPydanticStateDefinition", + "FlowScriptActionDefinition", "FlowStateDefinition", "FlowToolActionDefinition", "FlowUnknownStateDefinition", diff --git a/lib/crewai/tests/test_flow_from_definition.py b/lib/crewai/tests/test_flow_from_definition.py index 69bb96816..989eb6396 100644 --- a/lib/crewai/tests/test_flow_from_definition.py +++ b/lib/crewai/tests/test_flow_from_definition.py @@ -26,6 +26,7 @@ from crewai.flow.flow_config import flow_config from crewai.flow.flow_definition import FlowConfigDefinition, FlowDefinition from crewai.flow.persistence import persist from crewai.flow.persistence.base import FlowPersistence +from crewai.flow.runtime._actions import FlowScriptExecutionDisabledError from crewai.state.checkpoint_config import CheckpointConfig from crewai.tools import BaseTool from crewai.types.streaming import FlowStreamingOutput @@ -1145,6 +1146,117 @@ methods: assert flow.kickoff(inputs={"rows": ["a", "b"]}) == ["async:a", "async:b"] +def test_script_action_requires_explicit_opt_in(): + yaml_str = """ +schema: crewai.flow/v1 +name: ScriptFlow +methods: + normalize: + do: + call: script + code: | + return "blocked" + start: true +""" + + with pytest.raises( + FlowScriptExecutionDisabledError, + match="CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION=1", + ) as exc_info: + Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + assert "methods with unresolvable actions" not in str(exc_info.value) + + +def test_script_action_runs_python_imports_mutates_state_and_returns_value( + monkeypatch: pytest.MonkeyPatch, +): + monkeypatch.setenv("CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION", "1") + + yaml_str = """ +schema: crewai.flow/v1 +name: ScriptFlow +methods: + normalize: + do: + call: script + code: | + import math + + state["rounded"] = math.ceil(state["raw_score"]) + return f"rounded:{state['rounded']}" + start: true +""" + + flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + + assert flow.kickoff(inputs={"raw_score": 3.2}) == "rounded:4" + assert flow.state["rounded"] == 4 + + +def test_script_listener_reads_trigger_input_and_outputs( + monkeypatch: pytest.MonkeyPatch, +): + monkeypatch.setenv("CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION", "1") + + yaml_str = """ +schema: crewai.flow/v1 +name: ScriptFlow +methods: + seed: + do: + call: expression + expr: "'alpha'" + start: true + combine: + do: + call: script + code: | + state["input_matches_output"] = input == outputs["seed"] + return f"{outputs['seed']}:{input}" + listen: seed +""" + + flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + + assert flow.kickoff() == "alpha:alpha" + assert flow.state["input_matches_output"] is True + + +def test_script_each_action_reads_item_and_inner_outputs( + monkeypatch: pytest.MonkeyPatch, +): + monkeypatch.setenv("CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION", "1") + + yaml_str = """ +schema: crewai.flow/v1 +name: ScriptEachFlow +methods: + seed: + do: + call: expression + expr: "'global'" + start: true + process_rows: + do: + call: each + in: state.rows + do: + - clean: + call: script + code: | + return item.strip() + - tag: + call: script + code: | + return f"{outputs['seed']}:{outputs['clean']}" + listen: seed +""" + + flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + + assert flow.kickoff(inputs={"rows": [" a ", " b "]}) == ["global:a", "global:b"] + + def test_each_action_uses_iteration_outputs_between_nested_actions(): yaml_str = f""" schema: crewai.flow/v1