From b0816e00b6ae40e8be01dcae1822fcd4d5d7cd15 Mon Sep 17 00:00:00 2001 From: Vinicius Brasil Date: Thu, 18 Jun 2026 12:18:22 -0700 Subject: [PATCH] Validate flow CEL expressions at definition load time (#6224) * Validate flow CEL expressions at definition load time Promote CEL expression handling to a public Expression API and validate expressions when a FlowDefinition is built instead of when it executes. Invalid CEL syntax or unknown roots now raise ValidationError from FlowDefinition.from_yaml() and FlowDefinition.from_dict(). Expressions may reference state and outputs, plus item inside each.do; bare identifiers are rejected as unknown roots. For with values, the CEL contract is intentionally simple: after trimming whitespace, a string is evaluated as CEL only if it starts with ${ and ends with }. Anything else is treated as a literal value, so partial interpolation is not supported. If the content inside the wrapper is not valid CEL, validation fails. Examples: ```text "${state.topic}" -> evaluated, returns state.topic "topic is ${state.topic}" -> literal string "${state.topic} suffix" -> literal string "${'a'}${'b'}" -> invalid CEL ``` * Honor explicit empty-context overrides in evaluate() / render_template() --- lib/crewai/src/crewai/flow/__init__.py | 2 + lib/crewai/src/crewai/flow/expressions.py | 329 ++++++++++++++++++ lib/crewai/src/crewai/flow/flow_definition.py | 79 ++++- .../src/crewai/flow/runtime/_actions.py | 32 +- .../src/crewai/flow/runtime/_expressions.py | 146 -------- lib/crewai/tests/test_checkpoint.py | 4 +- lib/crewai/tests/test_flow_from_definition.py | 166 +++++++-- 7 files changed, 572 insertions(+), 186 deletions(-) create mode 100644 lib/crewai/src/crewai/flow/expressions.py delete mode 100644 lib/crewai/src/crewai/flow/runtime/_expressions.py diff --git a/lib/crewai/src/crewai/flow/__init__.py b/lib/crewai/src/crewai/flow/__init__.py index 364d2ab49..d37234edd 100644 --- a/lib/crewai/src/crewai/flow/__init__.py +++ b/lib/crewai/src/crewai/flow/__init__.py @@ -10,6 +10,7 @@ from crewai.flow.conversation import ( ConversationalInputs, ) from crewai.flow.dsl import HumanFeedbackResult, human_feedback +from crewai.flow.expressions import Expression from crewai.flow.flow import Flow, and_, listen, or_, router, start from crewai.flow.flow_config import flow_config from crewai.flow.input_provider import InputProvider, InputResponse @@ -26,6 +27,7 @@ __all__ = [ "ConsoleProvider", "ConversationalConfig", "ConversationalInputs", + "Expression", "Flow", "FlowStructure", "HumanFeedbackPending", diff --git a/lib/crewai/src/crewai/flow/expressions.py b/lib/crewai/src/crewai/flow/expressions.py new file mode 100644 index 000000000..8faeb828c --- /dev/null +++ b/lib/crewai/src/crewai/flow/expressions.py @@ -0,0 +1,329 @@ +"""Runtime expression support for FlowDefinition CEL expressions.""" + +from __future__ import annotations + +from collections.abc import Iterable +import json +from typing import TYPE_CHECKING, Any, TypeAlias, cast + +from crewai.utilities.serialization import to_serializable + + +if TYPE_CHECKING: + from crewai.flow.runtime import Flow +else: + from typing_extensions import TypeAliasType + + +_CEL_MACROS_WITH_LOCAL_BINDINGS = frozenset( + {"all", "exists", "exists_one", "filter", "map"} +) +if TYPE_CHECKING: + ExpressionData: TypeAlias = ( + str + | int + | float + | bool + | None + | list["ExpressionData"] + | dict[str, "ExpressionData"] + ) +else: + ExpressionData = TypeAliasType( + "ExpressionData", + str + | int + | float + | bool + | None + | list["ExpressionData"] + | dict[str, "ExpressionData"], + ) + +__all__ = [ + "Expression", + "ExpressionData", + "ExpressionError", +] + + +class ExpressionError(ValueError): + """An expression failed to parse, validate, render, or evaluate.""" + + +class Expression: + """CEL expression helper used for definition-time checks and runtime rendering.""" + + def __init__( + self, value: ExpressionData, *, context: dict[str, Any] | None = None + ) -> None: + self.value = value + self.context = context + + @classmethod + def from_flow( + cls, + value: ExpressionData, + flow: Flow[Any], + *, + local_context: dict[str, Any] | None = None, + ) -> Expression: + """Build an expression with the standard Flow runtime context.""" + return cls(value, context=cls._flow_context(flow, local_context=local_context)) + + def validate_expression( + self, + *, + allowed_roots: Iterable[str], + source: str = "CEL expression", + ) -> None: + """Validate a full CEL expression without evaluating it.""" + allowed = frozenset(allowed_roots) + expression = self._require_cel_source(cast(str, self.value), source=source) + roots = self._collect_root_identifiers( + self._compile_cel(expression, source=source) + ) + unknown = sorted(root for root in roots if root not in allowed) + if unknown: + allowed_list = ", ".join(sorted(allowed)) + unknown_list = ", ".join(repr(root) for root in unknown) + raise ExpressionError( + f"unknown CEL root at {source}: {unknown_list}; " + f"allowed roots: {allowed_list}. Reference flow data through one " + "of those roots, for example state.field or outputs.step_name." + ) + + def validate_template( + self, + *, + allowed_roots: Iterable[str], + source: str = "with block", + ) -> None: + """Validate nested strings fully wrapped in ``${...}`` as CEL.""" + self._validate_template_value( + self.value, allowed_roots=allowed_roots, source=source + ) + + def evaluate(self, context: dict[str, Any] | None = None) -> Any: + """Evaluate this value as a full CEL expression.""" + resolved_context = self.context if context is None else context + return self._evaluate_cel( + self._require_cel_source(cast(str, self.value)), + resolved_context or {}, + ) + + def render_template(self, context: dict[str, Any] | None = None) -> Any: + """Evaluate nested strings fully wrapped in ``${...}`` as CEL.""" + resolved_context = self.context if context is None else context + return self._render_template_value(self.value, resolved_context or {}) + + @staticmethod + def _validate_template_value( + value: ExpressionData, + *, + allowed_roots: Iterable[str], + source: str, + ) -> None: + if isinstance(value, str): + expression = Expression._expression_marker_source(value, source=source) + if expression is not None: + Expression(expression).validate_expression( + allowed_roots=allowed_roots, source=source + ) + return + if isinstance(value, dict): + for key, item in value.items(): + item_source = f"{source}.{key}" if isinstance(key, str) else source + Expression._validate_template_value( + item, allowed_roots=allowed_roots, source=item_source + ) + return + if isinstance(value, list): + for index, item in enumerate(value): + Expression._validate_template_value( + item, + allowed_roots=allowed_roots, + source=f"{source}[{index}]", + ) + + @staticmethod + def _flow_context( + flow: Flow[Any], local_context: dict[str, Any] | None = None + ) -> dict[str, Any]: + from crewai.flow.runtime._outputs import outputs_by_name + + local_outputs = local_context.get("outputs") if local_context else None + outputs = outputs_by_name( + flow._method_outputs, + local_outputs=local_outputs, + serialize=True, + ) + context: dict[str, Any] = { + "state": flow._copy_and_serialize_state(), + "outputs": outputs, + } + if local_context: + context.update( + { + key: to_serializable(value, max_depth=0) + for key, value in local_context.items() + if key not in {"outputs", "state"} + } + ) + return context + + @staticmethod + def _render_template_value(value: ExpressionData, context: dict[str, Any]) -> Any: + if isinstance(value, str): + return Expression._render_template_string(value, context) + if isinstance(value, dict): + return { + key: Expression._render_template_value(item, context) + for key, item in value.items() + } + if isinstance(value, list): + return [Expression._render_template_value(item, context) for item in value] + return value + + @staticmethod + def _render_template_string(value: str, context: dict[str, Any]) -> Any: + expression = Expression._expression_marker_source(value) + if expression is None: + return value + return Expression._evaluate_cel(expression, context) + + @staticmethod + def _expression_marker_source( + value: str, *, source: str | None = None + ) -> str | None: + """Return CEL source when the trimmed string starts with ``${`` and ends with ``}``.""" + stripped = value.strip() + if not stripped.startswith("${"): + return None + if not stripped.endswith("}"): + return None + + expression = stripped[2:-1].strip() + if not expression: + if source is None: + raise ExpressionError("empty CEL expression in with block") + raise ExpressionError(f"empty CEL expression at {source}") + return expression + + @staticmethod + def _evaluate_cel(expression: str, context: dict[str, Any]) -> Any: + try: + from celpy import Environment + from celpy.adapter import CELJSONEncoder, json_to_cel + from celpy.evaluation import Context + + environment = Environment() + program = environment.program( + Expression._compile_cel(expression, environment=environment) + ) + result = program.evaluate(cast(Context, json_to_cel(context))) + return json.loads(json.dumps(result, cls=CELJSONEncoder)) + except Exception as e: + raise ExpressionError( + f"failed to evaluate CEL expression {expression!r}: {e}" + ) from e + + @staticmethod + def _compile_cel( + expression: str, + *, + source: str | None = None, + environment: Any | None = None, + ) -> Any: + if environment is None: + from celpy import Environment + + environment = Environment() + try: + return environment.compile(expression) + except Exception as e: + if source is None: + raise + raise ExpressionError( + f"invalid CEL expression at {source}: {expression!r}. " + f"Check the CEL syntax. Parser details: {e}" + ) from e + + @staticmethod + def _require_cel_source(value: str, *, source: str | None = None) -> str: + expression = value.strip() + if expression.startswith("${") and expression.endswith("}"): + expression = expression[2:-1].strip() + if expression: + return expression + if source is None: + raise ExpressionError("empty CEL expression") + raise ExpressionError( + f"empty CEL expression at {source}. Provide a CEL expression such as " + "state.topic or outputs.step_name." + ) + + @staticmethod + def _collect_root_identifiers( + tree: Any, local_roots: frozenset[str] = frozenset() + ) -> set[str]: + """Collect CEL root identifiers, excluding receiver macro local variables.""" + data = getattr(tree, "data", None) + children = list(getattr(tree, "children", []) or []) + + if data == "ident" and children: + name = str(children[0]) + return set() if name in local_roots else {name} + + if data == "ident_arg": + return Expression._collect_root_identifiers_from( + children[1:], local_roots=local_roots + ) + + if data == "member_dot_arg": + roots = ( + Expression._collect_root_identifiers(children[0], local_roots) + if children + else set() + ) + nested_locals = frozenset( + {*local_roots, *Expression._receiver_macro_local_roots(children)} + ) + roots.update( + Expression._collect_root_identifiers_from( + children[2:], local_roots=nested_locals + ) + ) + return roots + + return Expression._collect_root_identifiers_from( + children, local_roots=local_roots + ) + + @staticmethod + def _collect_root_identifiers_from( + trees: Iterable[Any], *, local_roots: frozenset[str] + ) -> set[str]: + return set().union( + *(Expression._collect_root_identifiers(tree, local_roots) for tree in trees) + ) + + @staticmethod + def _receiver_macro_local_roots(children: list[Any]) -> set[str]: + if len(children) < 3 or str(children[1]) not in _CEL_MACROS_WITH_LOCAL_BINDINGS: + return set() + exprlist = children[2] + exprs = list(getattr(exprlist, "children", []) or []) + if exprs and (name := Expression._single_identifier_name(exprs[0])): + return {name} + return set() + + @staticmethod + def _single_identifier_name(tree: Any) -> str | None: + data = getattr(tree, "data", None) + children = list(getattr(tree, "children", []) or []) + if data == "ident" and children: + return str(children[0]) + if len(children) != 1: + return None + return Expression._single_identifier_name(children[0]) diff --git a/lib/crewai/src/crewai/flow/flow_definition.py b/lib/crewai/src/crewai/flow/flow_definition.py index 85f20239b..853dd3b77 100644 --- a/lib/crewai/src/crewai/flow/flow_definition.py +++ b/lib/crewai/src/crewai/flow/flow_definition.py @@ -12,7 +12,7 @@ from __future__ import annotations import json import logging import re -from typing import Annotated, Any, Literal, TypeAlias +from typing import Annotated, Any, Literal, TypeAlias, cast from pydantic import ( BaseModel, @@ -27,6 +27,7 @@ from crewai.flow.conversational_definition import ( FlowConversationalDefinition, FlowConversationalRouterDefinition, ) +from crewai.flow.expressions import ExpressionData from crewai.project.crew_definition import CrewDefinition @@ -34,6 +35,8 @@ logger = logging.getLogger(__name__) FlowDefinitionCondition = str | dict[str, Any] _STEP_NAME_PATTERN = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$") +_BASE_CEL_ROOTS = frozenset({"outputs", "state"}) +_EACH_STEP_CEL_ROOTS = frozenset({"item", "outputs", "state"}) __all__ = [ "FlowActionDefinition", @@ -353,10 +356,14 @@ class FlowCodeActionDefinition(BaseModel): description="Import reference for the callable, formatted as module:qualname.", examples=["my_project.flows:normalize_topic"], ) - with_: dict[str, Any] | None = Field( + with_: dict[str, ExpressionData] | None = Field( default=None, alias="with", - description="Keyword arguments passed to the callable after expression rendering.", + description=( + "Keyword arguments passed to the callable. String values are evaluated " + "as CEL only when the trimmed value starts with ${ and ends with }; " + "all other values are literal." + ), examples=[{"topic": "${state.topic}"}], ) @@ -377,10 +384,14 @@ class FlowToolActionDefinition(BaseModel): description="Import reference for a BaseTool class, formatted as module:qualname.", examples=["my_project.tools:SearchTool"], ) - with_: dict[str, Any] | None = Field( + with_: dict[str, ExpressionData] | None = Field( default=None, alias="with", - description="Tool input arguments after expression rendering.", + description=( + "Tool input arguments. String values are evaluated as CEL only when " + "the trimmed value starts with ${ and ends with }; all other values " + "are literal." + ), examples=[{"query": "${outputs.normalize_topic}", "limit": 5}], ) @@ -696,6 +707,16 @@ class FlowDefinition(BaseModel): _validate_step_name(method_name, field="Flow method names") return self + @model_validator(mode="after") + def _validate_cel_expressions(self) -> FlowDefinition: + for method_name, method in self.methods.items(): + _validate_action_cel( + method.do, + path=f"methods.{method_name}.do", + allowed_roots=_BASE_CEL_ROOTS, + ) + return self + def to_dict(self, *, exclude_none: bool = True) -> dict[str, Any]: """Serialize the definition to a JSON/YAML-ready dictionary.""" return self.model_dump(by_alias=True, exclude_none=exclude_none, mode="json") @@ -753,6 +774,54 @@ def _validate_step_list(steps: list[FlowEachStepDefinition], *, field: str) -> N seen.add(name) +def _validate_action_cel( + action: FlowActionDefinition, + *, + path: str, + allowed_roots: frozenset[str], +) -> None: + from crewai.flow.expressions import Expression + + if isinstance(action, FlowExpressionActionDefinition): + Expression(action.expr).validate_expression( + allowed_roots=allowed_roots, source=f"{path}.expr" + ) + return + + if isinstance(action, (FlowCodeActionDefinition, FlowToolActionDefinition)): + if action.with_ is not None: + Expression(action.with_).validate_template( + allowed_roots=allowed_roots, source=f"{path}.with" + ) + return + + if isinstance(action, FlowCrewActionDefinition): + Expression(cast(ExpressionData, action.with_.inputs)).validate_template( + allowed_roots=allowed_roots, + source=f"{path}.with.inputs", + ) + return + + if isinstance(action, FlowEachActionDefinition): + Expression(action.in_).validate_expression( + allowed_roots=_BASE_CEL_ROOTS, + source=f"{path}.in", + ) + for index, step in enumerate(action.do): + step_path = f"{path}.do[{index}]" + if step.if_ is not None: + Expression(step.if_).validate_expression( + allowed_roots=_EACH_STEP_CEL_ROOTS, + source=f"{step_path}.if", + ) + _validate_action_cel( + step.action, + path=f"{step_path}.action", + allowed_roots=_EACH_STEP_CEL_ROOTS, + ) + return + + def log_flow_definition_issues(definition: FlowDefinition) -> None: for method_name, method in definition.methods.items(): path = f"methods.{method_name}" diff --git a/lib/crewai/src/crewai/flow/runtime/_actions.py b/lib/crewai/src/crewai/flow/runtime/_actions.py index 65fa50588..79944c43e 100644 --- a/lib/crewai/src/crewai/flow/runtime/_actions.py +++ b/lib/crewai/src/crewai/flow/runtime/_actions.py @@ -10,6 +10,7 @@ import inspect import os from typing import TYPE_CHECKING, Any, Protocol, cast +from crewai.flow.expressions import Expression, ExpressionData from crewai.flow.flow_definition import ( FlowActionDefinition, FlowCodeActionDefinition, @@ -20,7 +21,6 @@ from crewai.flow.flow_definition import ( FlowScriptActionDefinition, FlowToolActionDefinition, ) -from crewai.flow.runtime._expressions import evaluate_expression, render_with_block from crewai.flow.runtime._outputs import outputs_by_name from crewai.flow.runtime._refs import InvalidRefError, resolve_ref @@ -67,9 +67,9 @@ class CodeAction: if self.definition.with_ is None: return self.handler(*args, **kwargs) return self.handler( - **render_with_block( - self.flow, self.definition.with_, local_context=local_context - ) + **Expression.from_flow( + self.definition.with_, self.flow, local_context=local_context + ).render_template() ) def _resolve_handler(self) -> Callable[..., Any]: @@ -95,7 +95,9 @@ class ToolAction: def run(self, *_args: Any, **kwargs: Any) -> Any: local_context = _pop_local_context(kwargs) return self.tool.run( - **render_with_block(self.flow, self.kwargs, local_context=local_context) + **Expression.from_flow( + self.kwargs, self.flow, local_context=local_context + ).render_template() ) def _build_tool(self) -> Any: @@ -129,9 +131,11 @@ class CrewAction: local_context = _pop_local_context(kwargs) crew_definition = self.definition.with_ - inputs = render_with_block( - self.flow, crew_definition.inputs, local_context=local_context - ) + inputs = Expression.from_flow( + cast(ExpressionData, crew_definition.inputs), + self.flow, + local_context=local_context, + ).render_template() crew, _ = load_crew_from_definition(crew_definition, source="crew action") return await crew.kickoff_async(inputs=inputs) @@ -147,9 +151,9 @@ class ExpressionAction: def run(self, *_args: Any, **kwargs: Any) -> Any: local_context = _pop_local_context(kwargs) - return evaluate_expression( - self.flow, self.definition.expr, local_context=local_context - ) + return Expression.from_flow( + self.definition.expr, self.flow, local_context=local_context + ).evaluate() class ScriptAction: @@ -225,7 +229,7 @@ class EachAction: ] async def run(self, *_args: Any, **_kwargs: Any) -> list[Any]: - items = evaluate_expression(self.flow, self.definition.in_) + items = Expression.from_flow(self.definition.in_, self.flow).evaluate() if not isinstance(items, list): raise ValueError("each.in must evaluate to an array") @@ -248,7 +252,9 @@ class EachAction: return results def _condition_matches(self, condition: str, local_context: LocalContext) -> bool: - result = evaluate_expression(self.flow, condition, local_context=local_context) + result = Expression.from_flow( + condition, self.flow, local_context=local_context + ).evaluate() if not isinstance(result, bool): raise ValueError("if expression must evaluate to a boolean") return result diff --git a/lib/crewai/src/crewai/flow/runtime/_expressions.py b/lib/crewai/src/crewai/flow/runtime/_expressions.py deleted file mode 100644 index dae39e650..000000000 --- a/lib/crewai/src/crewai/flow/runtime/_expressions.py +++ /dev/null @@ -1,146 +0,0 @@ -"""Runtime expression support for FlowDefinition CEL expressions.""" - -from __future__ import annotations - -from itertools import pairwise -import json -import re -from typing import TYPE_CHECKING, Any, cast - -from crewai.flow.runtime._outputs import outputs_by_name -from crewai.utilities.serialization import to_serializable - - -if TYPE_CHECKING: - from crewai.flow.runtime import Flow - - -_EXPRESSION_PATTERN = re.compile(r"\$\{([^{}]*)\}") - -__all__ = ["FlowExpressionError", "evaluate_expression", "render_with_block"] - - -class FlowExpressionError(ValueError): - """A FlowDefinition expression failed to parse or evaluate.""" - - -def render_with_block( - flow: Flow[Any], value: Any, local_context: dict[str, Any] | None = None -) -> Any: - """Render CEL expressions inside a FlowDefinition ``with:`` payload.""" - context = _expression_context(flow, local_context=local_context) - return _render_value(value, context) - - -def evaluate_expression( - flow: Flow[Any], expression: str, local_context: dict[str, Any] | None = None -) -> Any: - """Evaluate a FlowDefinition CEL expression against runtime context.""" - expression = expression.strip() - if not expression: - raise FlowExpressionError("empty CEL expression") - return _eval_cel(expression, _expression_context(flow, local_context=local_context)) - - -def _expression_context( - flow: Flow[Any], local_context: dict[str, Any] | None = None -) -> dict[str, Any]: - local_outputs = local_context.get("outputs") if local_context else None - outputs = outputs_by_name( - flow._method_outputs, - local_outputs=local_outputs, - serialize=True, - ) - context: dict[str, Any] = { - "state": flow._copy_and_serialize_state(), - "outputs": outputs, - } - if local_context: - local_values = { - key: to_serializable(value, max_depth=0) - for key, value in local_context.items() - if key not in {"outputs", "state"} - } - context.update(local_values) - return context - - -def _render_value(value: Any, context: dict[str, Any]) -> Any: - if isinstance(value, str): - return _render_string(value, context) - if isinstance(value, dict): - return {key: _render_value(item, context) for key, item in value.items()} - if isinstance(value, list): - return [_render_value(item, context) for item in value] - return value - - -def _render_string(value: str, context: dict[str, Any]) -> Any: - matches = list(_EXPRESSION_PATTERN.finditer(value)) - if not matches: - _raise_for_invalid_interpolation(value) - return value - - _raise_for_literal_braces(value[: matches[0].start()]) - for previous, current in pairwise(matches): - _raise_for_literal_braces(value[previous.end() : current.start()]) - _raise_for_literal_braces(value[matches[-1].end() :]) - - if len(matches) == 1 and matches[0].span() == (0, len(value)): - expression = matches[0].group(1).strip() - if not expression: - raise FlowExpressionError("empty CEL expression in with block") - return _eval_cel(expression, context) - - rendered: list[str] = [] - position = 0 - for match in matches: - start, end = match.span() - literal = value[position:start] - rendered.append(literal) - - expression = match.group(1).strip() - if not expression: - raise FlowExpressionError("empty CEL expression in with block") - result = _eval_cel(expression, context) - rendered.append(result if isinstance(result, str) else json.dumps(result)) - position = end - - literal = value[position:] - rendered.append(literal) - - return "".join(rendered) - - -def _raise_for_invalid_interpolation(value: str) -> None: - if "${" not in value: - return - raise FlowExpressionError( - "invalid CEL interpolation in with block: expressions must be enclosed " - "as ${...} and cannot contain braces" - ) - - -def _raise_for_literal_braces(value: str) -> None: - if "{" not in value and "}" not in value: - return - raise FlowExpressionError( - "invalid CEL interpolation in with block: expressions must be enclosed " - "as ${...} and cannot contain braces" - ) - - -def _eval_cel(expression: str, context: dict[str, Any]) -> Any: - try: - from celpy import Environment - from celpy.adapter import CELJSONEncoder, json_to_cel - from celpy.evaluation import Context - - environment = Environment() - program = environment.program(environment.compile(expression)) - result = program.evaluate(cast(Context, json_to_cel(context))) - return json.loads(json.dumps(result, cls=CELJSONEncoder)) - except Exception as e: - raise FlowExpressionError( - f"failed to evaluate CEL expression {expression!r}: {e}" - ) from e diff --git a/lib/crewai/tests/test_checkpoint.py b/lib/crewai/tests/test_checkpoint.py index 4138f3819..4d316afe8 100644 --- a/lib/crewai/tests/test_checkpoint.py +++ b/lib/crewai/tests/test_checkpoint.py @@ -631,7 +631,7 @@ class TestLegacyMethodOutputsRestore: assert restored.method_outputs == ["first", "second"] def test_restore_legacy_outputs_evaluates_expressions(self) -> None: - from crewai.flow.runtime._expressions import _expression_context + from crewai.flow.expressions import Expression flow = Flow() flow._method_outputs = ["legacy"] @@ -642,7 +642,7 @@ class TestLegacyMethodOutputsRestore: cfg = CheckpointConfig(restore_from=loc) restored = Flow.from_checkpoint(cfg) - context = _expression_context(restored) + context = Expression._flow_context(restored) assert context["outputs"] == {"": "legacy"} def test_raw_legacy_outputs_property_remains_readable(self) -> None: diff --git a/lib/crewai/tests/test_flow_from_definition.py b/lib/crewai/tests/test_flow_from_definition.py index da1fd27e9..67fa7b843 100644 --- a/lib/crewai/tests/test_flow_from_definition.py +++ b/lib/crewai/tests/test_flow_from_definition.py @@ -644,7 +644,7 @@ methods: assert flow.kickoff(inputs={"topic": "ai"}) == "found:ai agents" -def test_tool_action_rejects_braces_in_embedded_cel_input(): +def test_tool_action_treats_embedded_cel_marker_as_literal(): definition = FlowDefinition.from_dict( { "schema": "crewai.flow/v1", @@ -660,16 +660,62 @@ def test_tool_action_rejects_braces_in_embedded_cel_input(): "prefix": "${'p}x'}", }, }, - } + }, }, } ) - with pytest.raises(ValueError, match="cannot contain braces"): - Flow.from_definition(definition).kickoff() + assert Flow.from_definition(definition).kickoff() == "p}x:wrapped ${'a}b'} value" -def test_tool_action_rejects_braces_in_full_cel_input(): +def test_tool_action_treats_marker_with_trailing_text_as_literal(): + definition = FlowDefinition.from_dict( + { + "schema": "crewai.flow/v1", + "name": "ToolFlow", + "methods": { + "search": { + "start": True, + "do": { + "call": "tool", + "ref": f"{__name__}:StaticSearchTool", + "with": { + "search_query": "${state.topic} extra", + "prefix": "p", + }, + }, + }, + }, + } + ) + + assert Flow.from_definition(definition).kickoff() == "p:${state.topic} extra" + + +def test_tool_action_rejects_adjacent_markers_as_invalid_cel(): + with pytest.raises(ValidationError, match="invalid CEL expression"): + FlowDefinition.from_dict( + { + "schema": "crewai.flow/v1", + "name": "ToolFlow", + "methods": { + "search": { + "start": True, + "do": { + "call": "tool", + "ref": f"{__name__}:StaticSearchTool", + "with": { + "search_query": "${'a'}${'b'}", + "prefix": "p", + }, + }, + }, + }, + } + ) + + +def test_tool_action_accepts_braces_in_full_cel_marker(): definition = FlowDefinition.from_dict( { "schema": "crewai.flow/v1", @@ -682,16 +728,15 @@ def test_tool_action_rejects_braces_in_full_cel_input(): "ref": f"{__name__}:StaticSearchTool", "with": { "search_query": "${{'query': 'ai agents'}.query}", - "prefix": "found", + "prefix": "${'p}x'}", }, }, - } + }, }, } ) - with pytest.raises(ValueError, match="cannot contain braces"): - Flow.from_definition(definition).kickoff() + assert Flow.from_definition(definition).kickoff() == "p}x:ai agents" def test_tool_action_renders_latest_output_by_method_name(): @@ -1026,10 +1071,8 @@ methods: start: true """ - flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) - - with pytest.raises(ValueError, match="failed to evaluate CEL expression"): - flow.kickoff() + with pytest.raises(ValidationError, match="invalid CEL expression"): + FlowDefinition.from_yaml(yaml_str) def test_code_action_renders_keyword_inputs(): @@ -1407,6 +1450,33 @@ methods: ) == ["kept:a", "skipped:b"] +def test_each_action_accepts_expression_markers_in_explicit_cel_fields(): + yaml_str = """ +schema: crewai.flow/v1 +name: EachIfFlow +methods: + process_rows: + do: + call: each + in: "${state.rows}" + do: + - name: kind + action: + call: expression + expr: "${item.kind}" + - name: kept + if: "${outputs.kind == 'keep'}" + action: + call: expression + expr: "${item.value}" + start: true +""" + + flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + + assert flow.kickoff(inputs={"rows": [{"kind": "keep", "value": "a"}]}) == ["a"] + + def test_each_action_skipped_if_keeps_previous_output(): yaml_str = """ schema: crewai.flow/v1 @@ -1690,8 +1760,28 @@ def test_expression_action_round_trips(): assert Flow.from_definition(definition).kickoff(inputs={"score": 90}) == "qualified" +def test_explicit_cel_fields_accept_expression_markers(): + definition = FlowDefinition.from_dict( + { + "schema": "crewai.flow/v1", + "name": "ExpressionFlow", + "methods": { + "classify": { + "start": True, + "do": { + "call": "expression", + "expr": "${state.score >= 80 ? 'qualified' : 'nurture'}", + }, + } + }, + } + ) + + assert Flow.from_definition(definition).kickoff(inputs={"score": 90}) == "qualified" + + def test_expression_local_context_recurses_into_dataclass_values(): - from crewai.flow.runtime._expressions import evaluate_expression + from crewai.flow.expressions import Expression class Payload(BaseModel): name: str @@ -1701,15 +1791,37 @@ def test_expression_local_context_recurses_into_dataclass_values(): payload: Payload assert ( - evaluate_expression( - Flow(), + Expression.from_flow( "item.payload.name", + Flow(), local_context={"item": Row(payload=Payload(name="qualified"))}, - ) + ).evaluate() == "qualified" ) +def test_expression_empty_context_overrides_stored_context(): + from crewai.flow.expressions import Expression, ExpressionError + + expression = Expression("state.score", context={"state": {"score": 90}}) + + assert expression.evaluate() == 90 + with pytest.raises(ExpressionError): + expression.evaluate({}) + + +def test_expression_template_empty_context_overrides_stored_context(): + from crewai.flow.expressions import Expression, ExpressionError + + expression = Expression( + {"score": "${state.score}"}, context={"state": {"score": 90}} + ) + + assert expression.render_template() == {"score": 90} + with pytest.raises(ExpressionError): + expression.render_template({}) + + def test_expression_action_can_route_like_if_else(): yaml_str = f""" schema: crewai.flow/v1 @@ -1761,10 +1873,24 @@ methods: start: true """ - flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + with pytest.raises(ValidationError, match="invalid CEL expression"): + FlowDefinition.from_yaml(yaml_str) - with pytest.raises(ValueError, match="failed to evaluate CEL expression"): - flow.kickoff() + +def test_expression_action_rejects_unknown_cel_root(): + yaml_str = """ +schema: crewai.flow/v1 +name: ExpressionFlow +methods: + classify: + do: + call: expression + expr: "score >= 80" + start: true +""" + + with pytest.raises(ValidationError, match="unknown CEL root"): + FlowDefinition.from_yaml(yaml_str) def test_tool_action_requires_module_qualname_ref():