From b69e5ccc10f1be6feee3f9b9409744ccce57dc20 Mon Sep 17 00:00:00 2001 From: Vinicius Brasil Date: Tue, 16 Jun 2026 21:19:48 -0700 Subject: [PATCH] Add script action to FlowDefinition MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Let a Flow method run trusted inline Python with `call: script`. The code is compiled once into a generated function and receives the runtime values as arguments — state, outputs, input, and item — so nothing is interpolated into the source. This is not sandboxed. Also extracts the shared `outputs_by_name` helper into `runtime/_outputs` so the script action and the expression evaluator build the outputs map the same way, including each-iteration local outputs. ```yaml methods: normalize: start: true do: call: script code: | import math state["rounded"] = math.ceil(state["raw_score"]) return f"rounded:{state['rounded']}" ``` Co-Authored-By: Claude Opus 4.8 (1M context) --- lib/crewai/src/crewai/flow/flow_definition.py | 30 +++++++ .../src/crewai/flow/runtime/_actions.py | 46 +++++++++++ .../src/crewai/flow/runtime/_expressions.py | 27 ++----- .../src/crewai/flow/runtime/_outputs.py | 42 ++++++++++ lib/crewai/tests/test_flow_definition.py | 1 + lib/crewai/tests/test_flow_from_definition.py | 78 +++++++++++++++++++ 6 files changed, 205 insertions(+), 19 deletions(-) create mode 100644 lib/crewai/src/crewai/flow/runtime/_outputs.py diff --git a/lib/crewai/src/crewai/flow/flow_definition.py b/lib/crewai/src/crewai/flow/flow_definition.py index 29c561486..b1edef2cd 100644 --- a/lib/crewai/src/crewai/flow/flow_definition.py +++ b/lib/crewai/src/crewai/flow/flow_definition.py @@ -55,6 +55,7 @@ __all__ = [ "FlowMethodDefinition", "FlowPersistenceDefinition", "FlowPydanticStateDefinition", + "FlowScriptActionDefinition", "FlowStateDefinition", "FlowToolActionDefinition", "FlowUnknownStateDefinition", @@ -301,11 +302,39 @@ class FlowExpressionActionDefinition(BaseModel): expr: str +class FlowScriptActionDefinition(BaseModel): + """A Flow method action that executes trusted inline Python.""" + + model_config = ConfigDict(extra="forbid") + + call: TypingLiteral["script"] = Field( + description="Action discriminator. Use script to execute trusted inline Python.", + examples=["script"], + ) + code: str = Field( + description=( + "Trusted Python source executed as a generated function. Runtime values are " + "passed as state, outputs, input, and item; they are not interpolated into " + "the source. This is not sandboxed." + ), + examples=[ + "state['normalized_topic'] = input.strip()\n" + "return state['normalized_topic']" + ], + ) + language: TypingLiteral["python"] = Field( + default="python", + description="Script language. Only python is currently supported.", + examples=["python"], + ) + + FlowInnerActionDefinition = ( FlowCodeActionDefinition | FlowToolActionDefinition | FlowCrewActionDefinition | FlowExpressionActionDefinition + | FlowScriptActionDefinition ) @@ -357,6 +386,7 @@ FlowActionDefinition = ( | FlowToolActionDefinition | FlowCrewActionDefinition | FlowExpressionActionDefinition + | FlowScriptActionDefinition | FlowEachActionDefinition ) diff --git a/lib/crewai/src/crewai/flow/runtime/_actions.py b/lib/crewai/src/crewai/flow/runtime/_actions.py index 97333e209..547303bdd 100644 --- a/lib/crewai/src/crewai/flow/runtime/_actions.py +++ b/lib/crewai/src/crewai/flow/runtime/_actions.py @@ -6,6 +6,7 @@ import asyncio from collections.abc import Callable import contextvars import inspect +import textwrap from typing import TYPE_CHECKING, Any, Protocol, cast from crewai.flow.flow_definition import ( @@ -15,9 +16,11 @@ from crewai.flow.flow_definition import ( FlowEachActionDefinition, FlowEachInnerActionDefinition, FlowExpressionActionDefinition, + FlowScriptActionDefinition, FlowToolActionDefinition, ) from crewai.flow.runtime._expressions import evaluate_expression, render_with_block +from crewai.flow.runtime._outputs import outputs_by_name from crewai.flow.runtime._refs import InvalidRefError, resolve_ref @@ -140,6 +143,37 @@ class ExpressionAction: ) +class ScriptAction: + definition_type = FlowScriptActionDefinition + + def __init__(self, flow: Flow[Any], definition: FlowScriptActionDefinition) -> None: + self.flow = flow + self.definition = definition + self.handler = self._compile_handler() + + def run(self, *args: Any, **kwargs: Any) -> Any: + local_context = _pop_local_context(kwargs) + return self.handler( + state=self.flow.state, + outputs=outputs_by_name( + self.flow._method_outputs, + local_outputs=local_context.get("outputs") if local_context else None, + ), + input=args[0] if args else None, + item=local_context.get("item") if local_context else None, + ) + + def _compile_handler(self) -> Callable[..., Any]: + namespace: dict[str, Any] = { + "__name__": f"crewai.flow.script.{self.flow._definition.name}", + } + source = _script_function_source(self.definition.code) + exec( # nosec B102 # noqa: S102 + compile(source, namespace["__name__"], "exec"), namespace + ) + return cast(Callable[..., Any], namespace["__flow_script__"]) + + class EachAction: definition_type = FlowEachActionDefinition @@ -199,6 +233,7 @@ _ACTION_TYPES: tuple[_ActionType, ...] = ( ToolAction, CrewAction, ExpressionAction, + ScriptAction, ) @@ -240,3 +275,14 @@ def _pop_local_context(kwargs: dict[str, Any]) -> LocalContext | None: if not isinstance(local_context, dict): raise TypeError("flow definition local context must be a mapping") return cast(LocalContext, local_context) + + +def _script_function_source(code: str) -> str: + body = code if code.strip() else "pass" + source = ( + "def __flow_script__(state, outputs, input, item):\n" + f"{textwrap.indent(body, ' ')}" + ) + if not source.endswith("\n"): + source += "\n" + return source diff --git a/lib/crewai/src/crewai/flow/runtime/_expressions.py b/lib/crewai/src/crewai/flow/runtime/_expressions.py index b015a9608..dae39e650 100644 --- a/lib/crewai/src/crewai/flow/runtime/_expressions.py +++ b/lib/crewai/src/crewai/flow/runtime/_expressions.py @@ -7,6 +7,7 @@ import json import re from typing import TYPE_CHECKING, Any, cast +from crewai.flow.runtime._outputs import outputs_by_name from crewai.utilities.serialization import to_serializable @@ -44,7 +45,12 @@ def evaluate_expression( def _expression_context( flow: Flow[Any], local_context: dict[str, Any] | None = None ) -> dict[str, Any]: - outputs = _outputs_by_name(flow._method_outputs) + local_outputs = local_context.get("outputs") if local_context else None + outputs = outputs_by_name( + flow._method_outputs, + local_outputs=local_outputs, + serialize=True, + ) context: dict[str, Any] = { "state": flow._copy_and_serialize_state(), "outputs": outputs, @@ -53,29 +59,12 @@ def _expression_context( local_values = { key: to_serializable(value, max_depth=0) for key, value in local_context.items() + if key not in {"outputs", "state"} } - local_outputs = local_values.pop("outputs", None) - local_values.pop("state", None) context.update(local_values) - if local_outputs is not None: - if not isinstance(local_outputs, dict): - raise TypeError("flow definition local outputs must be a mapping") - context["outputs"] = {**outputs, **local_outputs} return context -def _outputs_by_name(method_outputs: list[Any]) -> dict[str, Any]: - outputs: dict[str, Any] = {} - for entry in method_outputs: - method = "" - output = entry - if isinstance(entry, dict) and "output" in entry: - method = str(entry.get("method", "")) - output = entry["output"] - outputs[method] = to_serializable(output, max_depth=0) - return outputs - - def _render_value(value: Any, context: dict[str, Any]) -> Any: if isinstance(value, str): return _render_string(value, context) diff --git a/lib/crewai/src/crewai/flow/runtime/_outputs.py b/lib/crewai/src/crewai/flow/runtime/_outputs.py new file mode 100644 index 000000000..59c21c28a --- /dev/null +++ b/lib/crewai/src/crewai/flow/runtime/_outputs.py @@ -0,0 +1,42 @@ +"""Shared FlowDefinition runtime output helpers.""" + +from __future__ import annotations + +from collections.abc import Mapping +from typing import Any + +from crewai.utilities.serialization import to_serializable + + +def outputs_by_name( + method_outputs: list[Any], + *, + local_outputs: Mapping[str, Any] | None = None, + serialize: bool = False, +) -> dict[str, Any]: + outputs: dict[str, Any] = {} + for entry in method_outputs: + method = "" + output = entry + if isinstance(entry, dict) and "output" in entry: + method = str(entry.get("method", "")) + output = entry["output"] + outputs[method] = _output_value(output, serialize=serialize) + + if local_outputs is not None: + if not isinstance(local_outputs, Mapping): + raise TypeError("flow definition local outputs must be a mapping") + outputs.update( + { + key: _output_value(output, serialize=serialize) + for key, output in local_outputs.items() + } + ) + + return outputs + + +def _output_value(value: Any, *, serialize: bool) -> Any: + if not serialize: + return value + return to_serializable(value, max_depth=0) diff --git a/lib/crewai/tests/test_flow_definition.py b/lib/crewai/tests/test_flow_definition.py index dc2bd1c37..7be04fc48 100644 --- a/lib/crewai/tests/test_flow_definition.py +++ b/lib/crewai/tests/test_flow_definition.py @@ -54,6 +54,7 @@ def test_flow_public_exports_are_explicit(): "FlowMethodDefinition", "FlowPersistenceDefinition", "FlowPydanticStateDefinition", + "FlowScriptActionDefinition", "FlowStateDefinition", "FlowToolActionDefinition", "FlowUnknownStateDefinition", diff --git a/lib/crewai/tests/test_flow_from_definition.py b/lib/crewai/tests/test_flow_from_definition.py index 69bb96816..5f17e188e 100644 --- a/lib/crewai/tests/test_flow_from_definition.py +++ b/lib/crewai/tests/test_flow_from_definition.py @@ -1145,6 +1145,84 @@ methods: assert flow.kickoff(inputs={"rows": ["a", "b"]}) == ["async:a", "async:b"] +def test_script_action_runs_python_imports_mutates_state_and_returns_value(): + yaml_str = """ +schema: crewai.flow/v1 +name: ScriptFlow +methods: + normalize: + do: + call: script + code: | + import math + + state["rounded"] = math.ceil(state["raw_score"]) + return f"rounded:{state['rounded']}" + start: true +""" + + flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + + assert flow.kickoff(inputs={"raw_score": 3.2}) == "rounded:4" + assert flow.state["rounded"] == 4 + + +def test_script_listener_reads_trigger_input_and_outputs(): + yaml_str = """ +schema: crewai.flow/v1 +name: ScriptFlow +methods: + seed: + do: + call: expression + expr: "'alpha'" + start: true + combine: + do: + call: script + code: | + state["input_matches_output"] = input == outputs["seed"] + return f"{outputs['seed']}:{input}" + listen: seed +""" + + flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + + assert flow.kickoff() == "alpha:alpha" + assert flow.state["input_matches_output"] is True + + +def test_script_each_action_reads_item_and_inner_outputs(): + yaml_str = """ +schema: crewai.flow/v1 +name: ScriptEachFlow +methods: + seed: + do: + call: expression + expr: "'global'" + start: true + process_rows: + do: + call: each + in: state.rows + do: + - clean: + call: script + code: | + return item.strip() + - tag: + call: script + code: | + return f"{outputs['seed']}:{outputs['clean']}" + listen: seed +""" + + flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + + assert flow.kickoff(inputs={"rows": [" a ", " b "]}) == ["global:a", "global:b"] + + def test_each_action_uses_iteration_outputs_between_nested_actions(): yaml_str = f""" schema: crewai.flow/v1