Add expressions to FlowDefinition actions

Let definitions compute values without Python. A new `call: expression`
action evaluates a Common Expression Language (CEL) expression, and tool
`with:` blocks now render `${...}` CEL templates.

Example 1:

```yaml
decide:
  do:
    call: expression
    expr: "state.score >= 80 ? 'qualified' : 'nurture'"
  router: true
  emit: [qualified, nurture]
```

Example 2:

```yaml
search:
  do:
    call: tool
    ref: my.pkg:SearchTool
    with:
      search_query: "${outputs.build_query.query + ' news'}"
      max_results: "${state.limit}"
```
This commit is contained in:
Vinicius Brasil
2026-06-12 20:38:12 -07:00
parent 2444895ca4
commit c1a70d84d0
10 changed files with 2433 additions and 1857 deletions

View File

@@ -33,6 +33,7 @@ dependencies = [
"appdirs~=1.4.4",
"jsonref~=1.1.0",
"json-repair~=0.25.2",
"cel-python>=0.5.0,<0.6",
"tomli-w~=1.1.0",
"tomli~=2.0.2",
"json5~=0.10.0",

View File

@@ -146,6 +146,10 @@ class _ConversationalMixin:
def kickoff(self, *args: Any, **kwargs: Any) -> Any:
pass
@property
def method_outputs(self) -> list[Any]:
pass
def conversation_start(self) -> str | None:
"""Return the current user message for conversational route selection.
@@ -1033,7 +1037,8 @@ class _ConversationalMixin:
# of warning about an empty scope stack.
started_id = getattr(self, "_deferred_flow_started_event_id", None)
if started_id:
last_output = self._method_outputs[-1] if self._method_outputs else None
method_outputs = self.method_outputs
last_output = method_outputs[-1] if method_outputs else None
restore_event_scope(((started_id, "flow_started"),))
try:
crewai_event_bus.emit(

View File

@@ -35,6 +35,7 @@ __all__ = [
"FlowDefinition",
"FlowDefinitionCondition",
"FlowDefinitionDiagnostic",
"FlowExpressionActionDefinition",
"FlowHumanFeedbackDefinition",
"FlowMethodDefinition",
"FlowPersistenceDefinition",
@@ -163,7 +164,18 @@ class FlowToolActionDefinition(BaseModel):
with_: dict[str, Any] | None = Field(default=None, alias="with")
FlowActionDefinition = FlowCodeActionDefinition | FlowToolActionDefinition
class FlowExpressionActionDefinition(BaseModel):
"""A Flow method action that evaluates a CEL expression."""
model_config = ConfigDict(extra="forbid")
call: TypingLiteral["expression"]
expr: str
FlowActionDefinition = (
FlowCodeActionDefinition | FlowToolActionDefinition | FlowExpressionActionDefinition
)
class FlowMethodDefinition(BaseModel):

View File

@@ -1679,7 +1679,9 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
try:
if emit and collapsed_outcome:
self._method_outputs.append(collapsed_outcome)
self._method_outputs.append(
{"method": context.method_name, "output": collapsed_outcome}
)
await self._execute_listeners(
FlowMethodName(collapsed_outcome),
result,
@@ -1725,7 +1727,8 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
return e
raise
final_result = self._method_outputs[-1] if self._method_outputs else result
method_outputs = self.method_outputs
final_result = method_outputs[-1] if method_outputs else result
if self._event_futures:
await asyncio.gather(
@@ -1906,7 +1909,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
@property
def method_outputs(self) -> list[Any]:
"""Returns the list of all outputs from executed methods."""
return self._method_outputs
return [entry["output"] for entry in self._method_outputs]
@property
def flow_id(self) -> str:
@@ -2540,7 +2543,8 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
# Clear the resumption flag after initial execution completes
self._is_execution_resuming = False
final_output = self._method_outputs[-1] if self._method_outputs else None
method_outputs = self.method_outputs
final_output = method_outputs[-1] if method_outputs else None
if self._event_futures:
await asyncio.gather(
@@ -2695,7 +2699,8 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
if start_method_name in self._completed_methods:
if self._is_execution_resuming:
# During resumption, skip execution but continue listeners
last_output = self._method_outputs[-1] if self._method_outputs else None
method_outputs = self.method_outputs
last_output = method_outputs[-1] if method_outputs else None
await self._execute_listeners(start_method_name, last_output)
return
# For cyclic flows, clear from completed to allow re-execution
@@ -2825,7 +2830,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
method_name, method_definition.human_feedback, result
)
self._method_outputs.append(result)
self._method_outputs.append({"method": str(method_name), "output": result})
# For @human_feedback methods with emit, the result is the collapsed outcome
# (e.g., "approved") used for routing. But we want the actual method output
@@ -2833,8 +2838,8 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
# if a stashed output exists. Dict-based stash is concurrency-safe and
# handles None return values (presence in dict = stashed, not value).
if method_name in self._human_feedback_method_outputs:
self._method_outputs[-1] = self._human_feedback_method_outputs.pop(
method_name
self._method_outputs[-1]["output"] = (
self._human_feedback_method_outputs.pop(method_name)
)
self._method_execution_counts[method_name] = (

View File

@@ -0,0 +1,102 @@
"""Runtime expression support for FlowDefinition CEL expressions."""
from __future__ import annotations
import copy
import json
import re
from typing import TYPE_CHECKING, Any, cast
from pydantic import BaseModel
if TYPE_CHECKING:
from crewai.flow.runtime import Flow
_EXPRESSION_PATTERN = re.compile(r"\$\{([^}]*)\}")
__all__ = ["FlowExpressionError", "evaluate_expression", "render_with_block"]
class FlowExpressionError(ValueError):
"""A FlowDefinition expression failed to parse or evaluate."""
def render_with_block(flow: Flow[Any], value: Any) -> Any:
"""Render CEL expressions inside a FlowDefinition ``with:`` payload."""
context = _expression_context(flow)
return _render_value(value, context)
def evaluate_expression(flow: Flow[Any], expression: str) -> Any:
"""Evaluate a FlowDefinition CEL expression against runtime context."""
expression = expression.strip()
if not expression:
raise FlowExpressionError("empty CEL expression")
return _eval_cel(expression, _expression_context(flow))
def _expression_context(flow: Flow[Any]) -> dict[str, Any]:
return {
"state": flow._copy_and_serialize_state(),
"outputs": _outputs_by_name(flow._method_outputs),
}
def _outputs_by_name(method_outputs: list[Any]) -> dict[str, Any]:
outputs: dict[str, Any] = {}
for entry in method_outputs:
output = copy.deepcopy(entry.get("output"))
if isinstance(output, BaseModel):
output = output.model_dump(mode="json")
outputs[str(entry["method"])] = output
return outputs
def _render_value(value: Any, context: dict[str, Any]) -> Any:
if isinstance(value, str):
return _render_string(value, context)
if isinstance(value, dict):
return {key: _render_value(item, context) for key, item in value.items()}
if isinstance(value, list):
return [_render_value(item, context) for item in value]
return value
def _render_string(value: str, context: dict[str, Any]) -> Any:
if value.startswith("${") and value.endswith("}"):
expression = value[2:-1].strip()
if not expression:
raise FlowExpressionError("empty CEL expression in with block")
return _eval_cel(expression, context)
if _EXPRESSION_PATTERN.search(value) is None:
if "${" in value:
raise FlowExpressionError("unterminated CEL expression in with block")
return value
def replace_expression(match: re.Match[str]) -> str:
expression = match.group(1).strip()
if not expression:
raise FlowExpressionError("empty CEL expression in with block")
result = _eval_cel(expression, context)
return result if isinstance(result, str) else json.dumps(result)
return _EXPRESSION_PATTERN.sub(replace_expression, value)
def _eval_cel(expression: str, context: dict[str, Any]) -> Any:
try:
from celpy import Environment
from celpy.adapter import CELJSONEncoder, json_to_cel
from celpy.evaluation import Context
environment = Environment()
program = environment.program(environment.compile(expression))
result = program.evaluate(cast(Context, json_to_cel(context)))
return json.loads(json.dumps(result, cls=CELJSONEncoder))
except Exception as e:
raise FlowExpressionError(
f"failed to evaluate CEL expression {expression!r}: {e}"
) from e

View File

@@ -16,8 +16,10 @@ from typing import TYPE_CHECKING, Any, cast
from crewai.flow.flow_definition import (
FlowActionDefinition,
FlowCodeActionDefinition,
FlowExpressionActionDefinition,
FlowToolActionDefinition,
)
from crewai.flow.runtime._expressions import evaluate_expression, render_with_block
if TYPE_CHECKING:
@@ -68,7 +70,7 @@ def _resolve_code_action(
def _resolve_tool_action(
_flow: Flow[Any], action: FlowToolActionDefinition
flow: Flow[Any], action: FlowToolActionDefinition
) -> Callable[..., Any]:
target = resolve_ref(action.ref, field="do")
from crewai.tools import BaseTool
@@ -89,15 +91,26 @@ def _resolve_tool_action(
tool_kwargs = action.with_ or {}
def run_tool(*_args: Any, **_kwargs: Any) -> Any:
return tool.run(**tool_kwargs)
return tool.run(**render_with_block(flow, tool_kwargs))
return run_tool
def _resolve_expression_action(
flow: Flow[Any], action: FlowExpressionActionDefinition
) -> Callable[..., Any]:
def run_expression(*_args: Any, **_kwargs: Any) -> Any:
return evaluate_expression(flow, action.expr)
return run_expression
def resolve_action(flow: Flow[Any], action: FlowActionDefinition) -> Callable[..., Any]:
"""Turn one `do:` action into the callable the flow runs for that node."""
if action.call == "code":
return _resolve_code_action(flow, action)
if action.call == "tool":
return _resolve_tool_action(flow, action)
if action.call == "expression":
return _resolve_expression_action(flow, action)
raise ValueError(f"unknown call type {action.call!r}")

View File

@@ -44,6 +44,7 @@ def test_flow_public_exports_are_explicit():
"FlowDefinition",
"FlowDefinitionCondition",
"FlowDefinitionDiagnostic",
"FlowExpressionActionDefinition",
"FlowHumanFeedbackDefinition",
"FlowMethodDefinition",
"FlowPersistenceDefinition",

View File

@@ -36,6 +36,14 @@ class StaticSearchTool(BaseTool):
return f"{prefix}:{search_query}"
class TypedInputsTool(BaseTool):
name: str = "TypedInputsTool"
description: str = "Returns typed input details."
def _run(self, count: int, include_domains: list[str]) -> str:
return f"{count}:{','.join(include_domains)}"
class ChainFlow(Flow):
@start()
def begin(self):
@@ -52,6 +60,13 @@ class ChainFlow(Flow):
return f"confirmed:{self.state['confirmed']}"
class ToolInputFlow(Flow):
@start()
def build_query(self):
self.state["prefix"] = "found"
return {"query": "ai agents", "suffix": " news"}
CHAIN_YAML = f"""
schema: crewai.flow/v1
name: ChainFlow
@@ -545,6 +560,204 @@ def test_tool_action_round_trips_with_inputs():
assert Flow.from_definition(definition).kickoff() == "search:ai agents"
def test_tool_action_renders_cel_inputs_at_runtime():
yaml_str = f"""
schema: crewai.flow/v1
name: ToolFlow
methods:
begin:
do:
call: code
ref: {__name__}:ChainFlow.begin
start: true
search:
do:
call: tool
ref: {__name__}:StaticSearchTool
with:
search_query: "${{state.begin_ran ? state.topic + ' agents' : 'missing'}}"
prefix: found
listen: begin
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
assert flow.kickoff(inputs={"topic": "ai"}) == "found:ai agents"
def test_tool_action_renders_latest_output_by_method_name():
yaml_str = f"""
schema: crewai.flow/v1
name: ToolFlow
methods:
begin:
do:
call: code
ref: {__name__}:ChainFlow.begin
start: true
search:
do:
call: tool
ref: {__name__}:StaticSearchTool
with:
search_query: "${{outputs.begin + ' agents'}}"
listen: begin
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
assert flow.kickoff() == "search:hello agents"
def test_tool_action_uses_state_and_outputs_in_full_yaml_example():
yaml_str = f"""
schema: crewai.flow/v1
name: ToolFlow
methods:
build_query:
do:
call: code
ref: {__name__}:ToolInputFlow.build_query
start: true
search:
do:
call: tool
ref: {__name__}:StaticSearchTool
with:
search_query: "${{outputs.build_query.query + outputs.build_query.suffix}}"
prefix: "${{state.prefix}}"
listen: build_query
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
assert flow.kickoff() == "found:ai agents news"
def test_tool_action_preserves_whole_expression_value_types():
yaml_str = f"""
schema: crewai.flow/v1
name: ToolFlow
methods:
typed:
do:
call: tool
ref: {__name__}:TypedInputsTool
with:
count: "${{state.limit}}"
include_domains: "${{state.domains}}"
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
assert (
flow.kickoff(inputs={"limit": 2, "domains": ["crewai.com", "example.com"]})
== "2:crewai.com,example.com"
)
def test_tool_action_reports_invalid_cel_expression():
yaml_str = f"""
schema: crewai.flow/v1
name: ToolFlow
methods:
search:
do:
call: tool
ref: {__name__}:StaticSearchTool
with:
search_query: "${{state.}}"
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
with pytest.raises(ValueError, match="failed to evaluate CEL expression"):
flow.kickoff()
def test_expression_action_round_trips():
definition = FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "ExpressionFlow",
"methods": {
"classify": {
"start": True,
"do": {
"call": "expression",
"expr": "state.score >= 80 ? 'qualified' : 'nurture'",
},
}
},
}
)
assert definition.to_dict()["methods"]["classify"]["do"] == {
"call": "expression",
"expr": "state.score >= 80 ? 'qualified' : 'nurture'",
}
assert Flow.from_definition(definition).kickoff(inputs={"score": 90}) == "qualified"
def test_expression_action_can_route_like_if_else():
yaml_str = f"""
schema: crewai.flow/v1
name: ExpressionRouterFlow
methods:
begin:
do:
call: code
ref: {__name__}:ChainFlow.begin
start: true
decide:
do:
call: expression
expr: "state.direction == 'left' ? 'left' : 'right'"
listen: begin
router: true
emit: [left, right]
take_left:
do:
call: code
ref: {__name__}:RouteFlow.take_left
listen: left
take_right:
do:
call: code
ref: {__name__}:RouteFlow.take_right
listen: right
"""
definition = FlowDefinition.from_yaml(yaml_str)
assert Flow.from_definition(definition).kickoff(
inputs={"direction": "left"}
) == "took-left"
assert Flow.from_definition(definition).kickoff(
inputs={"direction": "right"}
) == "took-right"
def test_expression_action_reports_invalid_cel_expression():
yaml_str = """
schema: crewai.flow/v1
name: ExpressionFlow
methods:
classify:
do:
call: expression
expr: "state."
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
with pytest.raises(ValueError, match="failed to evaluate CEL expression"):
flow.kickoff()
def test_tool_action_requires_module_qualname_ref():
definition = FlowDefinition.from_dict(
{

View File

@@ -860,9 +860,9 @@ class TestHumanFeedbackFinalOutputPreservation:
):
flow.kickoff()
# _method_outputs should contain the real output
assert len(flow._method_outputs) == 1
assert flow._method_outputs[0] == {"data": "real output"}
# method_outputs should contain the real output
assert flow.method_outputs == [{"data": "real output"}]
assert flow._method_outputs[0]["method"] == "generate"
@patch("builtins.input", return_value="looks good")
@patch("builtins.print")

3908
uv.lock generated

File diff suppressed because it is too large Load Diff