Validate flow CEL expressions at definition load time (#6224)

* Validate flow CEL expressions at definition load time

Promote CEL expression handling to a public Expression API and validate expressions when a FlowDefinition is built instead of when it executes.

Invalid CEL syntax or unknown roots now raise ValidationError from FlowDefinition.from_yaml() and FlowDefinition.from_dict(). Expressions may reference state and outputs, plus item inside each.do; bare identifiers are rejected as unknown roots.

For with values, the CEL contract is intentionally simple: after trimming whitespace, a string is evaluated as CEL only if it starts with ${ and ends with }. Anything else is treated as a literal value, so partial interpolation is not supported. If the content inside the wrapper is not valid CEL, validation fails.

Examples:

```text
"${state.topic}"          -> evaluated, returns state.topic
"topic is ${state.topic}" -> literal string
"${state.topic} suffix"   -> literal string
"${'a'}${'b'}"              -> invalid CEL
```

* Honor explicit empty-context overrides in evaluate() / render_template()
This commit is contained in:
Vinicius Brasil
2026-06-18 12:18:22 -07:00
committed by GitHub
parent 8153b67f5d
commit b0816e00b6
7 changed files with 572 additions and 186 deletions

View File

@@ -10,6 +10,7 @@ from crewai.flow.conversation import (
ConversationalInputs,
)
from crewai.flow.dsl import HumanFeedbackResult, human_feedback
from crewai.flow.expressions import Expression
from crewai.flow.flow import Flow, and_, listen, or_, router, start
from crewai.flow.flow_config import flow_config
from crewai.flow.input_provider import InputProvider, InputResponse
@@ -26,6 +27,7 @@ __all__ = [
"ConsoleProvider",
"ConversationalConfig",
"ConversationalInputs",
"Expression",
"Flow",
"FlowStructure",
"HumanFeedbackPending",

View File

@@ -0,0 +1,329 @@
"""Runtime expression support for FlowDefinition CEL expressions."""
from __future__ import annotations
from collections.abc import Iterable
import json
from typing import TYPE_CHECKING, Any, TypeAlias, cast
from crewai.utilities.serialization import to_serializable
if TYPE_CHECKING:
from crewai.flow.runtime import Flow
else:
from typing_extensions import TypeAliasType
_CEL_MACROS_WITH_LOCAL_BINDINGS = frozenset(
{"all", "exists", "exists_one", "filter", "map"}
)
if TYPE_CHECKING:
ExpressionData: TypeAlias = (
str
| int
| float
| bool
| None
| list["ExpressionData"]
| dict[str, "ExpressionData"]
)
else:
ExpressionData = TypeAliasType(
"ExpressionData",
str
| int
| float
| bool
| None
| list["ExpressionData"]
| dict[str, "ExpressionData"],
)
__all__ = [
"Expression",
"ExpressionData",
"ExpressionError",
]
class ExpressionError(ValueError):
"""An expression failed to parse, validate, render, or evaluate."""
class Expression:
"""CEL expression helper used for definition-time checks and runtime rendering."""
def __init__(
self, value: ExpressionData, *, context: dict[str, Any] | None = None
) -> None:
self.value = value
self.context = context
@classmethod
def from_flow(
cls,
value: ExpressionData,
flow: Flow[Any],
*,
local_context: dict[str, Any] | None = None,
) -> Expression:
"""Build an expression with the standard Flow runtime context."""
return cls(value, context=cls._flow_context(flow, local_context=local_context))
def validate_expression(
self,
*,
allowed_roots: Iterable[str],
source: str = "CEL expression",
) -> None:
"""Validate a full CEL expression without evaluating it."""
allowed = frozenset(allowed_roots)
expression = self._require_cel_source(cast(str, self.value), source=source)
roots = self._collect_root_identifiers(
self._compile_cel(expression, source=source)
)
unknown = sorted(root for root in roots if root not in allowed)
if unknown:
allowed_list = ", ".join(sorted(allowed))
unknown_list = ", ".join(repr(root) for root in unknown)
raise ExpressionError(
f"unknown CEL root at {source}: {unknown_list}; "
f"allowed roots: {allowed_list}. Reference flow data through one "
"of those roots, for example state.field or outputs.step_name."
)
def validate_template(
self,
*,
allowed_roots: Iterable[str],
source: str = "with block",
) -> None:
"""Validate nested strings fully wrapped in ``${...}`` as CEL."""
self._validate_template_value(
self.value, allowed_roots=allowed_roots, source=source
)
def evaluate(self, context: dict[str, Any] | None = None) -> Any:
"""Evaluate this value as a full CEL expression."""
resolved_context = self.context if context is None else context
return self._evaluate_cel(
self._require_cel_source(cast(str, self.value)),
resolved_context or {},
)
def render_template(self, context: dict[str, Any] | None = None) -> Any:
"""Evaluate nested strings fully wrapped in ``${...}`` as CEL."""
resolved_context = self.context if context is None else context
return self._render_template_value(self.value, resolved_context or {})
@staticmethod
def _validate_template_value(
value: ExpressionData,
*,
allowed_roots: Iterable[str],
source: str,
) -> None:
if isinstance(value, str):
expression = Expression._expression_marker_source(value, source=source)
if expression is not None:
Expression(expression).validate_expression(
allowed_roots=allowed_roots, source=source
)
return
if isinstance(value, dict):
for key, item in value.items():
item_source = f"{source}.{key}" if isinstance(key, str) else source
Expression._validate_template_value(
item, allowed_roots=allowed_roots, source=item_source
)
return
if isinstance(value, list):
for index, item in enumerate(value):
Expression._validate_template_value(
item,
allowed_roots=allowed_roots,
source=f"{source}[{index}]",
)
@staticmethod
def _flow_context(
flow: Flow[Any], local_context: dict[str, Any] | None = None
) -> dict[str, Any]:
from crewai.flow.runtime._outputs import outputs_by_name
local_outputs = local_context.get("outputs") if local_context else None
outputs = outputs_by_name(
flow._method_outputs,
local_outputs=local_outputs,
serialize=True,
)
context: dict[str, Any] = {
"state": flow._copy_and_serialize_state(),
"outputs": outputs,
}
if local_context:
context.update(
{
key: to_serializable(value, max_depth=0)
for key, value in local_context.items()
if key not in {"outputs", "state"}
}
)
return context
@staticmethod
def _render_template_value(value: ExpressionData, context: dict[str, Any]) -> Any:
if isinstance(value, str):
return Expression._render_template_string(value, context)
if isinstance(value, dict):
return {
key: Expression._render_template_value(item, context)
for key, item in value.items()
}
if isinstance(value, list):
return [Expression._render_template_value(item, context) for item in value]
return value
@staticmethod
def _render_template_string(value: str, context: dict[str, Any]) -> Any:
expression = Expression._expression_marker_source(value)
if expression is None:
return value
return Expression._evaluate_cel(expression, context)
@staticmethod
def _expression_marker_source(
value: str, *, source: str | None = None
) -> str | None:
"""Return CEL source when the trimmed string starts with ``${`` and ends with ``}``."""
stripped = value.strip()
if not stripped.startswith("${"):
return None
if not stripped.endswith("}"):
return None
expression = stripped[2:-1].strip()
if not expression:
if source is None:
raise ExpressionError("empty CEL expression in with block")
raise ExpressionError(f"empty CEL expression at {source}")
return expression
@staticmethod
def _evaluate_cel(expression: str, context: dict[str, Any]) -> Any:
try:
from celpy import Environment
from celpy.adapter import CELJSONEncoder, json_to_cel
from celpy.evaluation import Context
environment = Environment()
program = environment.program(
Expression._compile_cel(expression, environment=environment)
)
result = program.evaluate(cast(Context, json_to_cel(context)))
return json.loads(json.dumps(result, cls=CELJSONEncoder))
except Exception as e:
raise ExpressionError(
f"failed to evaluate CEL expression {expression!r}: {e}"
) from e
@staticmethod
def _compile_cel(
expression: str,
*,
source: str | None = None,
environment: Any | None = None,
) -> Any:
if environment is None:
from celpy import Environment
environment = Environment()
try:
return environment.compile(expression)
except Exception as e:
if source is None:
raise
raise ExpressionError(
f"invalid CEL expression at {source}: {expression!r}. "
f"Check the CEL syntax. Parser details: {e}"
) from e
@staticmethod
def _require_cel_source(value: str, *, source: str | None = None) -> str:
expression = value.strip()
if expression.startswith("${") and expression.endswith("}"):
expression = expression[2:-1].strip()
if expression:
return expression
if source is None:
raise ExpressionError("empty CEL expression")
raise ExpressionError(
f"empty CEL expression at {source}. Provide a CEL expression such as "
"state.topic or outputs.step_name."
)
@staticmethod
def _collect_root_identifiers(
tree: Any, local_roots: frozenset[str] = frozenset()
) -> set[str]:
"""Collect CEL root identifiers, excluding receiver macro local variables."""
data = getattr(tree, "data", None)
children = list(getattr(tree, "children", []) or [])
if data == "ident" and children:
name = str(children[0])
return set() if name in local_roots else {name}
if data == "ident_arg":
return Expression._collect_root_identifiers_from(
children[1:], local_roots=local_roots
)
if data == "member_dot_arg":
roots = (
Expression._collect_root_identifiers(children[0], local_roots)
if children
else set()
)
nested_locals = frozenset(
{*local_roots, *Expression._receiver_macro_local_roots(children)}
)
roots.update(
Expression._collect_root_identifiers_from(
children[2:], local_roots=nested_locals
)
)
return roots
return Expression._collect_root_identifiers_from(
children, local_roots=local_roots
)
@staticmethod
def _collect_root_identifiers_from(
trees: Iterable[Any], *, local_roots: frozenset[str]
) -> set[str]:
return set().union(
*(Expression._collect_root_identifiers(tree, local_roots) for tree in trees)
)
@staticmethod
def _receiver_macro_local_roots(children: list[Any]) -> set[str]:
if len(children) < 3 or str(children[1]) not in _CEL_MACROS_WITH_LOCAL_BINDINGS:
return set()
exprlist = children[2]
exprs = list(getattr(exprlist, "children", []) or [])
if exprs and (name := Expression._single_identifier_name(exprs[0])):
return {name}
return set()
@staticmethod
def _single_identifier_name(tree: Any) -> str | None:
data = getattr(tree, "data", None)
children = list(getattr(tree, "children", []) or [])
if data == "ident" and children:
return str(children[0])
if len(children) != 1:
return None
return Expression._single_identifier_name(children[0])

View File

@@ -12,7 +12,7 @@ from __future__ import annotations
import json
import logging
import re
from typing import Annotated, Any, Literal, TypeAlias
from typing import Annotated, Any, Literal, TypeAlias, cast
from pydantic import (
BaseModel,
@@ -27,6 +27,7 @@ from crewai.flow.conversational_definition import (
FlowConversationalDefinition,
FlowConversationalRouterDefinition,
)
from crewai.flow.expressions import ExpressionData
from crewai.project.crew_definition import CrewDefinition
@@ -34,6 +35,8 @@ logger = logging.getLogger(__name__)
FlowDefinitionCondition = str | dict[str, Any]
_STEP_NAME_PATTERN = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
_BASE_CEL_ROOTS = frozenset({"outputs", "state"})
_EACH_STEP_CEL_ROOTS = frozenset({"item", "outputs", "state"})
__all__ = [
"FlowActionDefinition",
@@ -353,10 +356,14 @@ class FlowCodeActionDefinition(BaseModel):
description="Import reference for the callable, formatted as module:qualname.",
examples=["my_project.flows:normalize_topic"],
)
with_: dict[str, Any] | None = Field(
with_: dict[str, ExpressionData] | None = Field(
default=None,
alias="with",
description="Keyword arguments passed to the callable after expression rendering.",
description=(
"Keyword arguments passed to the callable. String values are evaluated "
"as CEL only when the trimmed value starts with ${ and ends with }; "
"all other values are literal."
),
examples=[{"topic": "${state.topic}"}],
)
@@ -377,10 +384,14 @@ class FlowToolActionDefinition(BaseModel):
description="Import reference for a BaseTool class, formatted as module:qualname.",
examples=["my_project.tools:SearchTool"],
)
with_: dict[str, Any] | None = Field(
with_: dict[str, ExpressionData] | None = Field(
default=None,
alias="with",
description="Tool input arguments after expression rendering.",
description=(
"Tool input arguments. String values are evaluated as CEL only when "
"the trimmed value starts with ${ and ends with }; all other values "
"are literal."
),
examples=[{"query": "${outputs.normalize_topic}", "limit": 5}],
)
@@ -696,6 +707,16 @@ class FlowDefinition(BaseModel):
_validate_step_name(method_name, field="Flow method names")
return self
@model_validator(mode="after")
def _validate_cel_expressions(self) -> FlowDefinition:
for method_name, method in self.methods.items():
_validate_action_cel(
method.do,
path=f"methods.{method_name}.do",
allowed_roots=_BASE_CEL_ROOTS,
)
return self
def to_dict(self, *, exclude_none: bool = True) -> dict[str, Any]:
"""Serialize the definition to a JSON/YAML-ready dictionary."""
return self.model_dump(by_alias=True, exclude_none=exclude_none, mode="json")
@@ -753,6 +774,54 @@ def _validate_step_list(steps: list[FlowEachStepDefinition], *, field: str) -> N
seen.add(name)
def _validate_action_cel(
action: FlowActionDefinition,
*,
path: str,
allowed_roots: frozenset[str],
) -> None:
from crewai.flow.expressions import Expression
if isinstance(action, FlowExpressionActionDefinition):
Expression(action.expr).validate_expression(
allowed_roots=allowed_roots, source=f"{path}.expr"
)
return
if isinstance(action, (FlowCodeActionDefinition, FlowToolActionDefinition)):
if action.with_ is not None:
Expression(action.with_).validate_template(
allowed_roots=allowed_roots, source=f"{path}.with"
)
return
if isinstance(action, FlowCrewActionDefinition):
Expression(cast(ExpressionData, action.with_.inputs)).validate_template(
allowed_roots=allowed_roots,
source=f"{path}.with.inputs",
)
return
if isinstance(action, FlowEachActionDefinition):
Expression(action.in_).validate_expression(
allowed_roots=_BASE_CEL_ROOTS,
source=f"{path}.in",
)
for index, step in enumerate(action.do):
step_path = f"{path}.do[{index}]"
if step.if_ is not None:
Expression(step.if_).validate_expression(
allowed_roots=_EACH_STEP_CEL_ROOTS,
source=f"{step_path}.if",
)
_validate_action_cel(
step.action,
path=f"{step_path}.action",
allowed_roots=_EACH_STEP_CEL_ROOTS,
)
return
def log_flow_definition_issues(definition: FlowDefinition) -> None:
for method_name, method in definition.methods.items():
path = f"methods.{method_name}"

View File

@@ -10,6 +10,7 @@ import inspect
import os
from typing import TYPE_CHECKING, Any, Protocol, cast
from crewai.flow.expressions import Expression, ExpressionData
from crewai.flow.flow_definition import (
FlowActionDefinition,
FlowCodeActionDefinition,
@@ -20,7 +21,6 @@ from crewai.flow.flow_definition import (
FlowScriptActionDefinition,
FlowToolActionDefinition,
)
from crewai.flow.runtime._expressions import evaluate_expression, render_with_block
from crewai.flow.runtime._outputs import outputs_by_name
from crewai.flow.runtime._refs import InvalidRefError, resolve_ref
@@ -67,9 +67,9 @@ class CodeAction:
if self.definition.with_ is None:
return self.handler(*args, **kwargs)
return self.handler(
**render_with_block(
self.flow, self.definition.with_, local_context=local_context
)
**Expression.from_flow(
self.definition.with_, self.flow, local_context=local_context
).render_template()
)
def _resolve_handler(self) -> Callable[..., Any]:
@@ -95,7 +95,9 @@ class ToolAction:
def run(self, *_args: Any, **kwargs: Any) -> Any:
local_context = _pop_local_context(kwargs)
return self.tool.run(
**render_with_block(self.flow, self.kwargs, local_context=local_context)
**Expression.from_flow(
self.kwargs, self.flow, local_context=local_context
).render_template()
)
def _build_tool(self) -> Any:
@@ -129,9 +131,11 @@ class CrewAction:
local_context = _pop_local_context(kwargs)
crew_definition = self.definition.with_
inputs = render_with_block(
self.flow, crew_definition.inputs, local_context=local_context
)
inputs = Expression.from_flow(
cast(ExpressionData, crew_definition.inputs),
self.flow,
local_context=local_context,
).render_template()
crew, _ = load_crew_from_definition(crew_definition, source="crew action")
return await crew.kickoff_async(inputs=inputs)
@@ -147,9 +151,9 @@ class ExpressionAction:
def run(self, *_args: Any, **kwargs: Any) -> Any:
local_context = _pop_local_context(kwargs)
return evaluate_expression(
self.flow, self.definition.expr, local_context=local_context
)
return Expression.from_flow(
self.definition.expr, self.flow, local_context=local_context
).evaluate()
class ScriptAction:
@@ -225,7 +229,7 @@ class EachAction:
]
async def run(self, *_args: Any, **_kwargs: Any) -> list[Any]:
items = evaluate_expression(self.flow, self.definition.in_)
items = Expression.from_flow(self.definition.in_, self.flow).evaluate()
if not isinstance(items, list):
raise ValueError("each.in must evaluate to an array")
@@ -248,7 +252,9 @@ class EachAction:
return results
def _condition_matches(self, condition: str, local_context: LocalContext) -> bool:
result = evaluate_expression(self.flow, condition, local_context=local_context)
result = Expression.from_flow(
condition, self.flow, local_context=local_context
).evaluate()
if not isinstance(result, bool):
raise ValueError("if expression must evaluate to a boolean")
return result

View File

@@ -1,146 +0,0 @@
"""Runtime expression support for FlowDefinition CEL expressions."""
from __future__ import annotations
from itertools import pairwise
import json
import re
from typing import TYPE_CHECKING, Any, cast
from crewai.flow.runtime._outputs import outputs_by_name
from crewai.utilities.serialization import to_serializable
if TYPE_CHECKING:
from crewai.flow.runtime import Flow
_EXPRESSION_PATTERN = re.compile(r"\$\{([^{}]*)\}")
__all__ = ["FlowExpressionError", "evaluate_expression", "render_with_block"]
class FlowExpressionError(ValueError):
"""A FlowDefinition expression failed to parse or evaluate."""
def render_with_block(
flow: Flow[Any], value: Any, local_context: dict[str, Any] | None = None
) -> Any:
"""Render CEL expressions inside a FlowDefinition ``with:`` payload."""
context = _expression_context(flow, local_context=local_context)
return _render_value(value, context)
def evaluate_expression(
flow: Flow[Any], expression: str, local_context: dict[str, Any] | None = None
) -> Any:
"""Evaluate a FlowDefinition CEL expression against runtime context."""
expression = expression.strip()
if not expression:
raise FlowExpressionError("empty CEL expression")
return _eval_cel(expression, _expression_context(flow, local_context=local_context))
def _expression_context(
flow: Flow[Any], local_context: dict[str, Any] | None = None
) -> dict[str, Any]:
local_outputs = local_context.get("outputs") if local_context else None
outputs = outputs_by_name(
flow._method_outputs,
local_outputs=local_outputs,
serialize=True,
)
context: dict[str, Any] = {
"state": flow._copy_and_serialize_state(),
"outputs": outputs,
}
if local_context:
local_values = {
key: to_serializable(value, max_depth=0)
for key, value in local_context.items()
if key not in {"outputs", "state"}
}
context.update(local_values)
return context
def _render_value(value: Any, context: dict[str, Any]) -> Any:
if isinstance(value, str):
return _render_string(value, context)
if isinstance(value, dict):
return {key: _render_value(item, context) for key, item in value.items()}
if isinstance(value, list):
return [_render_value(item, context) for item in value]
return value
def _render_string(value: str, context: dict[str, Any]) -> Any:
matches = list(_EXPRESSION_PATTERN.finditer(value))
if not matches:
_raise_for_invalid_interpolation(value)
return value
_raise_for_literal_braces(value[: matches[0].start()])
for previous, current in pairwise(matches):
_raise_for_literal_braces(value[previous.end() : current.start()])
_raise_for_literal_braces(value[matches[-1].end() :])
if len(matches) == 1 and matches[0].span() == (0, len(value)):
expression = matches[0].group(1).strip()
if not expression:
raise FlowExpressionError("empty CEL expression in with block")
return _eval_cel(expression, context)
rendered: list[str] = []
position = 0
for match in matches:
start, end = match.span()
literal = value[position:start]
rendered.append(literal)
expression = match.group(1).strip()
if not expression:
raise FlowExpressionError("empty CEL expression in with block")
result = _eval_cel(expression, context)
rendered.append(result if isinstance(result, str) else json.dumps(result))
position = end
literal = value[position:]
rendered.append(literal)
return "".join(rendered)
def _raise_for_invalid_interpolation(value: str) -> None:
if "${" not in value:
return
raise FlowExpressionError(
"invalid CEL interpolation in with block: expressions must be enclosed "
"as ${...} and cannot contain braces"
)
def _raise_for_literal_braces(value: str) -> None:
if "{" not in value and "}" not in value:
return
raise FlowExpressionError(
"invalid CEL interpolation in with block: expressions must be enclosed "
"as ${...} and cannot contain braces"
)
def _eval_cel(expression: str, context: dict[str, Any]) -> Any:
try:
from celpy import Environment
from celpy.adapter import CELJSONEncoder, json_to_cel
from celpy.evaluation import Context
environment = Environment()
program = environment.program(environment.compile(expression))
result = program.evaluate(cast(Context, json_to_cel(context)))
return json.loads(json.dumps(result, cls=CELJSONEncoder))
except Exception as e:
raise FlowExpressionError(
f"failed to evaluate CEL expression {expression!r}: {e}"
) from e

View File

@@ -631,7 +631,7 @@ class TestLegacyMethodOutputsRestore:
assert restored.method_outputs == ["first", "second"]
def test_restore_legacy_outputs_evaluates_expressions(self) -> None:
from crewai.flow.runtime._expressions import _expression_context
from crewai.flow.expressions import Expression
flow = Flow()
flow._method_outputs = ["legacy"]
@@ -642,7 +642,7 @@ class TestLegacyMethodOutputsRestore:
cfg = CheckpointConfig(restore_from=loc)
restored = Flow.from_checkpoint(cfg)
context = _expression_context(restored)
context = Expression._flow_context(restored)
assert context["outputs"] == {"": "legacy"}
def test_raw_legacy_outputs_property_remains_readable(self) -> None:

View File

@@ -644,7 +644,7 @@ methods:
assert flow.kickoff(inputs={"topic": "ai"}) == "found:ai agents"
def test_tool_action_rejects_braces_in_embedded_cel_input():
def test_tool_action_treats_embedded_cel_marker_as_literal():
definition = FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
@@ -660,16 +660,62 @@ def test_tool_action_rejects_braces_in_embedded_cel_input():
"prefix": "${'p}x'}",
},
},
}
},
},
}
)
with pytest.raises(ValueError, match="cannot contain braces"):
Flow.from_definition(definition).kickoff()
assert Flow.from_definition(definition).kickoff() == "p}x:wrapped ${'a}b'} value"
def test_tool_action_rejects_braces_in_full_cel_input():
def test_tool_action_treats_marker_with_trailing_text_as_literal():
definition = FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "ToolFlow",
"methods": {
"search": {
"start": True,
"do": {
"call": "tool",
"ref": f"{__name__}:StaticSearchTool",
"with": {
"search_query": "${state.topic} extra",
"prefix": "p",
},
},
},
},
}
)
assert Flow.from_definition(definition).kickoff() == "p:${state.topic} extra"
def test_tool_action_rejects_adjacent_markers_as_invalid_cel():
with pytest.raises(ValidationError, match="invalid CEL expression"):
FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "ToolFlow",
"methods": {
"search": {
"start": True,
"do": {
"call": "tool",
"ref": f"{__name__}:StaticSearchTool",
"with": {
"search_query": "${'a'}${'b'}",
"prefix": "p",
},
},
},
},
}
)
def test_tool_action_accepts_braces_in_full_cel_marker():
definition = FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
@@ -682,16 +728,15 @@ def test_tool_action_rejects_braces_in_full_cel_input():
"ref": f"{__name__}:StaticSearchTool",
"with": {
"search_query": "${{'query': 'ai agents'}.query}",
"prefix": "found",
"prefix": "${'p}x'}",
},
},
}
},
},
}
)
with pytest.raises(ValueError, match="cannot contain braces"):
Flow.from_definition(definition).kickoff()
assert Flow.from_definition(definition).kickoff() == "p}x:ai agents"
def test_tool_action_renders_latest_output_by_method_name():
@@ -1026,10 +1071,8 @@ methods:
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
with pytest.raises(ValueError, match="failed to evaluate CEL expression"):
flow.kickoff()
with pytest.raises(ValidationError, match="invalid CEL expression"):
FlowDefinition.from_yaml(yaml_str)
def test_code_action_renders_keyword_inputs():
@@ -1407,6 +1450,33 @@ methods:
) == ["kept:a", "skipped:b"]
def test_each_action_accepts_expression_markers_in_explicit_cel_fields():
yaml_str = """
schema: crewai.flow/v1
name: EachIfFlow
methods:
process_rows:
do:
call: each
in: "${state.rows}"
do:
- name: kind
action:
call: expression
expr: "${item.kind}"
- name: kept
if: "${outputs.kind == 'keep'}"
action:
call: expression
expr: "${item.value}"
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
assert flow.kickoff(inputs={"rows": [{"kind": "keep", "value": "a"}]}) == ["a"]
def test_each_action_skipped_if_keeps_previous_output():
yaml_str = """
schema: crewai.flow/v1
@@ -1690,8 +1760,28 @@ def test_expression_action_round_trips():
assert Flow.from_definition(definition).kickoff(inputs={"score": 90}) == "qualified"
def test_explicit_cel_fields_accept_expression_markers():
definition = FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "ExpressionFlow",
"methods": {
"classify": {
"start": True,
"do": {
"call": "expression",
"expr": "${state.score >= 80 ? 'qualified' : 'nurture'}",
},
}
},
}
)
assert Flow.from_definition(definition).kickoff(inputs={"score": 90}) == "qualified"
def test_expression_local_context_recurses_into_dataclass_values():
from crewai.flow.runtime._expressions import evaluate_expression
from crewai.flow.expressions import Expression
class Payload(BaseModel):
name: str
@@ -1701,15 +1791,37 @@ def test_expression_local_context_recurses_into_dataclass_values():
payload: Payload
assert (
evaluate_expression(
Flow(),
Expression.from_flow(
"item.payload.name",
Flow(),
local_context={"item": Row(payload=Payload(name="qualified"))},
)
).evaluate()
== "qualified"
)
def test_expression_empty_context_overrides_stored_context():
from crewai.flow.expressions import Expression, ExpressionError
expression = Expression("state.score", context={"state": {"score": 90}})
assert expression.evaluate() == 90
with pytest.raises(ExpressionError):
expression.evaluate({})
def test_expression_template_empty_context_overrides_stored_context():
from crewai.flow.expressions import Expression, ExpressionError
expression = Expression(
{"score": "${state.score}"}, context={"state": {"score": 90}}
)
assert expression.render_template() == {"score": 90}
with pytest.raises(ExpressionError):
expression.render_template({})
def test_expression_action_can_route_like_if_else():
yaml_str = f"""
schema: crewai.flow/v1
@@ -1761,10 +1873,24 @@ methods:
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
with pytest.raises(ValidationError, match="invalid CEL expression"):
FlowDefinition.from_yaml(yaml_str)
with pytest.raises(ValueError, match="failed to evaluate CEL expression"):
flow.kickoff()
def test_expression_action_rejects_unknown_cel_root():
yaml_str = """
schema: crewai.flow/v1
name: ExpressionFlow
methods:
classify:
do:
call: expression
expr: "score >= 80"
start: true
"""
with pytest.raises(ValidationError, match="unknown CEL root"):
FlowDefinition.from_yaml(yaml_str)
def test_tool_action_requires_module_qualname_ref():