mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-06-17 06:08:22 +00:00
Compare commits
1 Commits
flow-defin
...
flow-scrip
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
386a1650da |
@@ -55,6 +55,7 @@ __all__ = [
|
||||
"FlowMethodDefinition",
|
||||
"FlowPersistenceDefinition",
|
||||
"FlowPydanticStateDefinition",
|
||||
"FlowScriptActionDefinition",
|
||||
"FlowStateDefinition",
|
||||
"FlowToolActionDefinition",
|
||||
"FlowUnknownStateDefinition",
|
||||
@@ -301,11 +302,39 @@ class FlowExpressionActionDefinition(BaseModel):
|
||||
expr: str
|
||||
|
||||
|
||||
class FlowScriptActionDefinition(BaseModel):
|
||||
"""A Flow method action that executes trusted inline Python."""
|
||||
|
||||
model_config = ConfigDict(extra="forbid")
|
||||
|
||||
call: TypingLiteral["script"] = Field(
|
||||
description="Action discriminator. Use script to execute trusted inline Python.",
|
||||
examples=["script"],
|
||||
)
|
||||
code: str = Field(
|
||||
description=(
|
||||
"Trusted Python source executed as a generated function. Runtime values are "
|
||||
"passed as state, outputs, input, and item; they are not interpolated into "
|
||||
"the source. This is not sandboxed."
|
||||
),
|
||||
examples=[
|
||||
"state['normalized_topic'] = input.strip()\n"
|
||||
"return state['normalized_topic']"
|
||||
],
|
||||
)
|
||||
language: TypingLiteral["python"] = Field(
|
||||
default="python",
|
||||
description="Script language. Only python is currently supported.",
|
||||
examples=["python"],
|
||||
)
|
||||
|
||||
|
||||
FlowInnerActionDefinition = (
|
||||
FlowCodeActionDefinition
|
||||
| FlowToolActionDefinition
|
||||
| FlowCrewActionDefinition
|
||||
| FlowExpressionActionDefinition
|
||||
| FlowScriptActionDefinition
|
||||
)
|
||||
|
||||
|
||||
@@ -357,6 +386,7 @@ FlowActionDefinition = (
|
||||
| FlowToolActionDefinition
|
||||
| FlowCrewActionDefinition
|
||||
| FlowExpressionActionDefinition
|
||||
| FlowScriptActionDefinition
|
||||
| FlowEachActionDefinition
|
||||
)
|
||||
|
||||
|
||||
@@ -1090,6 +1090,8 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
def build(name: str, definition: FlowMethodDefinition) -> Callable[..., Any]:
|
||||
try:
|
||||
return build_action(self, definition.do)
|
||||
except RuntimeError:
|
||||
raise
|
||||
except Exception as e:
|
||||
unresolved.append(f"{name}: {e}")
|
||||
return lambda *args, **kwargs: None
|
||||
|
||||
@@ -2,10 +2,12 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import ast
|
||||
import asyncio
|
||||
from collections.abc import Callable
|
||||
import contextvars
|
||||
import inspect
|
||||
import os
|
||||
from typing import TYPE_CHECKING, Any, Protocol, cast
|
||||
|
||||
from crewai.flow.flow_definition import (
|
||||
@@ -15,9 +17,11 @@ from crewai.flow.flow_definition import (
|
||||
FlowEachActionDefinition,
|
||||
FlowEachInnerActionDefinition,
|
||||
FlowExpressionActionDefinition,
|
||||
FlowScriptActionDefinition,
|
||||
FlowToolActionDefinition,
|
||||
)
|
||||
from crewai.flow.runtime._expressions import evaluate_expression, render_with_block
|
||||
from crewai.flow.runtime._outputs import outputs_by_name
|
||||
from crewai.flow.runtime._refs import InvalidRefError, resolve_ref
|
||||
|
||||
|
||||
@@ -29,6 +33,8 @@ __all__ = ["build_action"]
|
||||
|
||||
LocalContext = dict[str, Any]
|
||||
_LOCAL_CONTEXT_KWARG = "__flow_definition_local_context"
|
||||
_ALLOW_SCRIPT_EXECUTION_ENV_VAR = "CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION"
|
||||
_TRUSTED_SCRIPT_EXECUTION_VALUES = frozenset({"1", "true", "yes"})
|
||||
|
||||
|
||||
class _BuiltAction(Protocol):
|
||||
@@ -140,6 +146,62 @@ class ExpressionAction:
|
||||
)
|
||||
|
||||
|
||||
class ScriptAction:
|
||||
definition_type = FlowScriptActionDefinition
|
||||
|
||||
def __init__(self, flow: Flow[Any], definition: FlowScriptActionDefinition) -> None:
|
||||
self.flow = flow
|
||||
self.definition = definition
|
||||
self.handler = self._compile_handler()
|
||||
|
||||
def run(self, *args: Any, **kwargs: Any) -> Any:
|
||||
local_context = _pop_local_context(kwargs)
|
||||
return self.handler(
|
||||
state=self.flow.state,
|
||||
outputs=outputs_by_name(
|
||||
self.flow._method_outputs,
|
||||
local_outputs=local_context.get("outputs") if local_context else None,
|
||||
),
|
||||
input=args[0] if args else None,
|
||||
item=local_context.get("item") if local_context else None,
|
||||
)
|
||||
|
||||
def _compile_handler(self) -> Callable[..., Any]:
|
||||
raw = os.environ.get(_ALLOW_SCRIPT_EXECUTION_ENV_VAR, "")
|
||||
if raw.strip().lower() not in _TRUSTED_SCRIPT_EXECUTION_VALUES:
|
||||
raise RuntimeError(
|
||||
"Flow script execution is disabled by default. "
|
||||
f"Set {_ALLOW_SCRIPT_EXECUTION_ENV_VAR}=1 to enable it only for "
|
||||
"trusted flow definitions."
|
||||
)
|
||||
|
||||
filename = f"crewai.flow.script.{self.flow._definition.name}"
|
||||
module = ast.parse(self.definition.code, filename=filename)
|
||||
function = ast.FunctionDef(
|
||||
name="_flow_script",
|
||||
args=ast.arguments(
|
||||
posonlyargs=[],
|
||||
args=[ast.arg(arg) for arg in ("state", "outputs", "input", "item")],
|
||||
vararg=None,
|
||||
kwonlyargs=[],
|
||||
kw_defaults=[],
|
||||
kwarg=None,
|
||||
defaults=[],
|
||||
),
|
||||
body=module.body or [ast.Pass()],
|
||||
decorator_list=[],
|
||||
returns=None,
|
||||
type_comment=None,
|
||||
type_params=[],
|
||||
)
|
||||
module.body = [function]
|
||||
ast.fix_missing_locations(module)
|
||||
|
||||
namespace: dict[str, Any] = {"__name__": filename}
|
||||
exec(compile(module, filename, "exec"), namespace) # nosec B102 # noqa: S102
|
||||
return cast(Callable[..., Any], namespace["_flow_script"])
|
||||
|
||||
|
||||
class EachAction:
|
||||
definition_type = FlowEachActionDefinition
|
||||
|
||||
@@ -199,6 +261,7 @@ _ACTION_TYPES: tuple[_ActionType, ...] = (
|
||||
ToolAction,
|
||||
CrewAction,
|
||||
ExpressionAction,
|
||||
ScriptAction,
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@ import json
|
||||
import re
|
||||
from typing import TYPE_CHECKING, Any, cast
|
||||
|
||||
from crewai.flow.runtime._outputs import outputs_by_name
|
||||
from crewai.utilities.serialization import to_serializable
|
||||
|
||||
|
||||
@@ -44,7 +45,12 @@ def evaluate_expression(
|
||||
def _expression_context(
|
||||
flow: Flow[Any], local_context: dict[str, Any] | None = None
|
||||
) -> dict[str, Any]:
|
||||
outputs = _outputs_by_name(flow._method_outputs)
|
||||
local_outputs = local_context.get("outputs") if local_context else None
|
||||
outputs = outputs_by_name(
|
||||
flow._method_outputs,
|
||||
local_outputs=local_outputs,
|
||||
serialize=True,
|
||||
)
|
||||
context: dict[str, Any] = {
|
||||
"state": flow._copy_and_serialize_state(),
|
||||
"outputs": outputs,
|
||||
@@ -53,29 +59,12 @@ def _expression_context(
|
||||
local_values = {
|
||||
key: to_serializable(value, max_depth=0)
|
||||
for key, value in local_context.items()
|
||||
if key not in {"outputs", "state"}
|
||||
}
|
||||
local_outputs = local_values.pop("outputs", None)
|
||||
local_values.pop("state", None)
|
||||
context.update(local_values)
|
||||
if local_outputs is not None:
|
||||
if not isinstance(local_outputs, dict):
|
||||
raise TypeError("flow definition local outputs must be a mapping")
|
||||
context["outputs"] = {**outputs, **local_outputs}
|
||||
return context
|
||||
|
||||
|
||||
def _outputs_by_name(method_outputs: list[Any]) -> dict[str, Any]:
|
||||
outputs: dict[str, Any] = {}
|
||||
for entry in method_outputs:
|
||||
method = ""
|
||||
output = entry
|
||||
if isinstance(entry, dict) and "output" in entry:
|
||||
method = str(entry.get("method", ""))
|
||||
output = entry["output"]
|
||||
outputs[method] = to_serializable(output, max_depth=0)
|
||||
return outputs
|
||||
|
||||
|
||||
def _render_value(value: Any, context: dict[str, Any]) -> Any:
|
||||
if isinstance(value, str):
|
||||
return _render_string(value, context)
|
||||
|
||||
40
lib/crewai/src/crewai/flow/runtime/_outputs.py
Normal file
40
lib/crewai/src/crewai/flow/runtime/_outputs.py
Normal file
@@ -0,0 +1,40 @@
|
||||
"""Shared FlowDefinition runtime output helpers."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Mapping
|
||||
from typing import Any, TypedDict
|
||||
|
||||
from crewai.utilities.serialization import to_serializable
|
||||
|
||||
|
||||
class _MethodOutput(TypedDict):
|
||||
method: str
|
||||
output: Any
|
||||
|
||||
|
||||
def outputs_by_name(
|
||||
method_outputs: list[_MethodOutput],
|
||||
*,
|
||||
local_outputs: Mapping[str, Any] | None = None,
|
||||
serialize: bool = False,
|
||||
) -> dict[str, Any]:
|
||||
outputs: dict[str, Any] = {}
|
||||
for entry in method_outputs:
|
||||
outputs[entry["method"]] = _output_value(entry["output"], serialize=serialize)
|
||||
|
||||
if local_outputs is not None:
|
||||
outputs.update(
|
||||
{
|
||||
key: _output_value(output, serialize=serialize)
|
||||
for key, output in local_outputs.items()
|
||||
}
|
||||
)
|
||||
|
||||
return outputs
|
||||
|
||||
|
||||
def _output_value(value: Any, *, serialize: bool) -> Any:
|
||||
if not serialize:
|
||||
return value
|
||||
return to_serializable(value, max_depth=0)
|
||||
@@ -645,14 +645,11 @@ class TestLegacyMethodOutputsRestore:
|
||||
context = _expression_context(restored)
|
||||
assert context["outputs"] == {"": "legacy"}
|
||||
|
||||
def test_raw_legacy_outputs_remain_readable(self) -> None:
|
||||
from crewai.flow.runtime._expressions import _expression_context
|
||||
|
||||
def test_raw_legacy_outputs_property_remains_readable(self) -> None:
|
||||
flow = Flow()
|
||||
flow._method_outputs = ["legacy"]
|
||||
|
||||
assert flow.method_outputs == ["legacy"]
|
||||
assert _expression_context(flow)["outputs"] == {"": "legacy"}
|
||||
|
||||
|
||||
class TestAgentCheckpoint:
|
||||
|
||||
@@ -54,6 +54,7 @@ def test_flow_public_exports_are_explicit():
|
||||
"FlowMethodDefinition",
|
||||
"FlowPersistenceDefinition",
|
||||
"FlowPydanticStateDefinition",
|
||||
"FlowScriptActionDefinition",
|
||||
"FlowStateDefinition",
|
||||
"FlowToolActionDefinition",
|
||||
"FlowUnknownStateDefinition",
|
||||
|
||||
@@ -1145,6 +1145,116 @@ methods:
|
||||
assert flow.kickoff(inputs={"rows": ["a", "b"]}) == ["async:a", "async:b"]
|
||||
|
||||
|
||||
def test_script_action_requires_explicit_opt_in():
|
||||
yaml_str = """
|
||||
schema: crewai.flow/v1
|
||||
name: ScriptFlow
|
||||
methods:
|
||||
normalize:
|
||||
do:
|
||||
call: script
|
||||
code: |
|
||||
return "blocked"
|
||||
start: true
|
||||
"""
|
||||
|
||||
with pytest.raises(
|
||||
RuntimeError, match="CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION=1"
|
||||
) as exc_info:
|
||||
Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
|
||||
assert "methods with unresolvable actions" not in str(exc_info.value)
|
||||
|
||||
|
||||
def test_script_action_runs_python_imports_mutates_state_and_returns_value(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
):
|
||||
monkeypatch.setenv("CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION", "1")
|
||||
|
||||
yaml_str = """
|
||||
schema: crewai.flow/v1
|
||||
name: ScriptFlow
|
||||
methods:
|
||||
normalize:
|
||||
do:
|
||||
call: script
|
||||
code: |
|
||||
import math
|
||||
|
||||
state["rounded"] = math.ceil(state["raw_score"])
|
||||
return f"rounded:{state['rounded']}"
|
||||
start: true
|
||||
"""
|
||||
|
||||
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
|
||||
|
||||
assert flow.kickoff(inputs={"raw_score": 3.2}) == "rounded:4"
|
||||
assert flow.state["rounded"] == 4
|
||||
|
||||
|
||||
def test_script_listener_reads_trigger_input_and_outputs(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
):
|
||||
monkeypatch.setenv("CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION", "1")
|
||||
|
||||
yaml_str = """
|
||||
schema: crewai.flow/v1
|
||||
name: ScriptFlow
|
||||
methods:
|
||||
seed:
|
||||
do:
|
||||
call: expression
|
||||
expr: "'alpha'"
|
||||
start: true
|
||||
combine:
|
||||
do:
|
||||
call: script
|
||||
code: |
|
||||
state["input_matches_output"] = input == outputs["seed"]
|
||||
return f"{outputs['seed']}:{input}"
|
||||
listen: seed
|
||||
"""
|
||||
|
||||
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
|
||||
|
||||
assert flow.kickoff() == "alpha:alpha"
|
||||
assert flow.state["input_matches_output"] is True
|
||||
|
||||
|
||||
def test_script_each_action_reads_item_and_inner_outputs(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
):
|
||||
monkeypatch.setenv("CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION", "1")
|
||||
|
||||
yaml_str = """
|
||||
schema: crewai.flow/v1
|
||||
name: ScriptEachFlow
|
||||
methods:
|
||||
seed:
|
||||
do:
|
||||
call: expression
|
||||
expr: "'global'"
|
||||
start: true
|
||||
process_rows:
|
||||
do:
|
||||
call: each
|
||||
in: state.rows
|
||||
do:
|
||||
- clean:
|
||||
call: script
|
||||
code: |
|
||||
return item.strip()
|
||||
- tag:
|
||||
call: script
|
||||
code: |
|
||||
return f"{outputs['seed']}:{outputs['clean']}"
|
||||
listen: seed
|
||||
"""
|
||||
|
||||
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
|
||||
|
||||
assert flow.kickoff(inputs={"rows": [" a ", " b "]}) == ["global:a", "global:b"]
|
||||
|
||||
|
||||
def test_each_action_uses_iteration_outputs_between_nested_actions():
|
||||
yaml_str = f"""
|
||||
schema: crewai.flow/v1
|
||||
|
||||
Reference in New Issue
Block a user