Support templated Flow action inputs (#6426)

Flow action inputs now support `${...}` inside strings, not only
strings that are fully wrapped in one expression. This lets authored
flows use simple prompt-like values such as:

* `query: "News about ${state.topic}"`
* `input: "Ticket ${text(state, "ticket.id", "unknown")}"`
* `sources: ["${state.primary_source}", "archive-${state.topic}"]`

Whole-expression values still preserve their runtime type, so
`${state.limit}` remains a number and `${state.domains}` remains a list.
Mixed literal and expression strings render as text.

This removes the need to build labeled strings with CEL concatenation,
which was hard to read, easy to quote incorrectly in YAML, and a poor
fit for the Flow authoring skill examples.
This commit is contained in:
Vinicius Brasil
2026-07-02 08:57:50 -07:00
committed by GitHub
parent 559a9c65c4
commit 24901cd4f6
8 changed files with 582 additions and 81 deletions

View File

@@ -57,7 +57,33 @@ Use agents and crews for larger work or side effects.
Use these expression forms correctly:
- Raw CEL: use in `expr`. Do not wrap raw CEL in `${...}`.
- Action mappings: wrap the whole string in `${...}`.
- Use `${...}` inside action mapping strings to read Flow data with CEL. Example value: `Ticket: ${state.ticket_id}`.
- Use `state` for input data. Use `outputs.step_name` for a completed method result.
- If a value is only one `${...}` expression, the result keeps its type. Use this for numbers, booleans, objects, and lists.
- If the string has other text, the final value is text. Non-text values become JSON. `null` becomes empty text.
- Use `text(root, "path", "default")` for values that may be missing or null. The default is optional and is `""`.
Expression examples:
Mix text and Flow data:
```yaml
query: "News about ${state.topic}"
```
Keep a list or number type:
```yaml
domains: "${state.domains}"
limit: "${state.limit}"
```
Use a default for missing text:
```yaml
input: "Ticket ${text(state, \"ticket.id\", \"unknown\")}"
```
- Crew text: use `{name}` placeholders from crew inputs. Example: `Research {topic}`.
- Crew inputs become prompt text only when agent or task text references matching `{name}` placeholders.
- Passing an input that is not referenced by any `{name}` placeholder does not ground the crew. If the crew needs a field, put that placeholder in an agent `goal`, task `description`, or task `expected_output`.
@@ -69,13 +95,8 @@ Available CEL variables:
Dynamic value rules:
- A string is dynamic only when the whole trimmed string is wrapped in `${...}`.
- `Ticket: ${state.ticket}` stays literal. For computed strings, build the string inside the CEL expression.
- For prompt text, use CrewAI's CEL helper `text(root, "path", "default")` to safely read missing or null values as strings. The default argument is optional and defaults to `""`.
- When an agent needs multiple fields, build one single-line CEL string with labels and separators. Example: `input: "${'Ticket ID: ' + text(state, 'ticket_id') + '; Message: ' + text(state, 'message')}"`.
- In YAML, do not put `\n` escapes inside `${...}` strings. Prefer plain `${state.field}` values or the single-line labeled pattern above.
- `${...}` keeps the value type. It does not always make text.
- Crew action-level `inputs` are the actual Crew kickoff inputs. Use CEL-wrapped values there for runtime data from `state` or `outputs`.
- When an agent needs multiple fields, write one text value with labels and separators. Example value: `Ticket ID: ${state.ticket_id}; Message: ${state.message}`.
- Crew action-level `inputs` are the actual Crew kickoff inputs. Use `${...}` values there for runtime data from `state` or `outputs`.
- Crew action-level `inputs` alone are not grounding. Include placeholders for the facts the model must use.
- Crew outputs are objects. Use `${outputs.research_brief.raw}` for text.
- For structured crew output, use fields like `${outputs.research_brief.json_dict.field}` or `${outputs.research_brief.pydantic.field}`.
@@ -162,7 +183,7 @@ methods:
role: Follow-up router
goal: 'Return exactly one bare value: followup or done. Do not include explanation.'
backstory: Skilled at routing reviewed research briefs.
input: "${'Reviewed research: ' + text(outputs, 'research_brief.raw')}"
input: "Reviewed research: ${outputs.research_brief.raw}"
write_followup:
listen: followup
do:
@@ -225,7 +246,7 @@ Shape:
Fields:
- `call` (required): must be `crew`. Action discriminator. Use crew to run an inline Crew definition. Example: `crew`
- `with` (required): inline crew definition. Inline Crew definition to load and execute for this action. Example: `{"agents": {"researcher": {"backstory": "Knows the domain.", "goal": "Research {topic}", "role": "Researcher"}}, "name": "inline_research", "tasks": [{"agent": "researcher", "description": "Research {topic}", "expected_output": "Findings about {topic}", "name": "research_task"}]}`
- `inputs` (optional): map of string to expression data | null; default `null`. Actual kickoff inputs passed to the Crew. Use CEL-wrapped values here, for example `${state.topic}` or `${outputs.research_brief}`. The evaluated values are available to crew agent and task interpolation as `{name}` placeholders; reference each input the crew needs in agent or task text. Example: `{"topic": "${state.topic}"}`
- `inputs` (optional): map of string to expression data | null; default `null`. Actual kickoff inputs passed to the Crew. Use `${...}` inside action mapping strings to read Flow data with CEL. Example value: `Ticket: ${state.ticket_id}`. Use `state` for input data. Use `outputs.step_name` for a completed method result. If a value is only one `${...}` expression, the result keeps its type. Use this for numbers, booleans, objects, and lists. If the string has other text, the final value is text. Non-text values become JSON. `null` becomes empty text. Use `text(root, "path", "default")` for values that may be missing or null. The default is optional and is `""`. The evaluated values are available to crew agent and task interpolation as `{name}` placeholders; reference each input the crew needs in agent or task text. Example: `{"topic": "${state.topic}"}`
#### Crew Definition (`methods.<name>.do[call=crew].with`)
@@ -290,7 +311,7 @@ Fields:
- `tools` (optional): list[string | map of string to any] | null; default `null`. Tool refs or serialized tool definitions available to this agent. String refs can use CrewAI tool names, `custom:<name>`, or fully qualified `module:Class` references. Example: `["crewai_tools:SerperDevTool", "custom:file_read"]`
- `apps` (optional): list[string] | null; default `null`. Platform apps available to this agent. Can contain app names such as `gmail` or app/action refs such as `gmail/send_email`. Example: `["gmail", "slack/send_message"]`
- `mcps` (optional): list[string | map of string to any] | null; default `null`. MCP server refs or serialized MCP server configs available to this agent. String refs can use HTTPS URLs, connected MCP integration slugs, or refs with a `#tool_name` suffix for specific tools. Example: `["https://api.weather.com/mcp#get_current_weather", "snowflake", "stripe#list_invoices", {"cache_tools_list": true, "headers": {"Authorization": "Bearer your_token"}, "streamable": true, "url": "https://api.example.com/mcp"}]`
- `input` (required): string. Input passed to the individual agent kickoff outside of a crew. Use a single string value, often a dynamic `${...}` expression. When an agent needs multiple fields, build one single-line CEL string with labels and separators, using `text(root, 'path')` for values that may be missing or null, for example `${'Ticket ID: ' + text(state, 'ticket_id') + '; Message: ' + text(state, 'message')}`. In YAML, avoid `\n` escapes inside `${...}` strings. Example: `${state.ticket.body}`
- `input` (required): string. Input passed to the individual agent kickoff outside of a crew. Use one string. Use `${...}` inside action mapping strings to read Flow data with CEL. Example value: `Ticket: ${state.ticket_id}`. Use `state` for input data. Use `outputs.step_name` for a completed method result. If a value is only one `${...}` expression, the result keeps its type. Use this for numbers, booleans, objects, and lists. If the string has other text, the final value is text. Non-text values become JSON. `null` becomes empty text. Use `text(root, "path", "default")` for values that may be missing or null. The default is optional and is `""`. When an agent needs multiple fields, write one string with labels and separators, for example `Ticket ID: ${state.ticket_id}; Message: ${state.message}`. Example: `${state.ticket.body}`
#### LLM Definition

View File

@@ -3,8 +3,9 @@
from __future__ import annotations
from collections.abc import Iterable
from functools import lru_cache
import json
from typing import TYPE_CHECKING, Any, TypeAlias, cast
from typing import TYPE_CHECKING, Any, NamedTuple, TypeAlias, cast
from crewai.utilities.serialization import to_serializable
@@ -26,7 +27,6 @@ _CEL_MACROS_WITH_LOCAL_BINDINGS = frozenset(
def _handle_text_custom_expression(
root: Any, path: Any, default: Any = ""
) -> StringType:
from celpy.adapter import CELJSONEncoder
from celpy.celtypes import StringType
fallback = StringType("" if default is None else str(default))
@@ -44,14 +44,119 @@ def _handle_text_custom_expression(
if current is None:
return fallback
if isinstance(current, str):
return StringType(current)
return StringType(json.dumps(current, cls=CELJSONEncoder, ensure_ascii=False))
return StringType(_stringify_cel_value(current))
def _stringify_cel_value(value: Any) -> str:
from celpy.adapter import CELJSONEncoder
if isinstance(value, str):
return value
return json.dumps(value, cls=CELJSONEncoder, ensure_ascii=False)
class _ExpressionSegment(NamedTuple):
source: str
def _marker_end(value: str, start: int) -> int:
from celpy.celparser import CELParser
CELParser()
parser: Any = CELParser.CEL_PARSER
depth = 1
try:
for token in parser.lex(value[start:]):
if token.type == "LBRACE":
depth += 1
elif token.type == "RBRACE":
depth -= 1
if depth == 0:
return start + int(token.start_pos)
except Exception as e:
raise ExpressionError(
f"unterminated or invalid ${{...}} expression in {value!r}: {e}"
) from e
raise ExpressionError(f"unterminated ${{...}} expression in {value!r}")
@lru_cache(maxsize=256)
def _parse_template_segments(value: str) -> tuple[str | _ExpressionSegment, ...]:
segments: list[str | _ExpressionSegment] = []
index = 0
while (start := value.find("${", index)) != -1:
if start > index:
segments.append(value[index:start])
end = _marker_end(value, start + 2)
source = value[start + 2 : end].strip()
if not source:
raise ExpressionError(f"empty CEL expression in {value!r}")
segments.append(_ExpressionSegment(source))
index = end + 1
if index < len(value) or not segments:
segments.append(value[index:])
return tuple(segments)
_EXPRESSION_FUNCTIONS: dict[str, CELFunction] = {
"text": _handle_text_custom_expression,
}
FLOW_TEMPLATE_EXPRESSION_RULES: tuple[str, ...] = (
"Use `${...}` inside action mapping strings to read Flow data with CEL. "
"Example value: `Ticket: ${state.ticket_id}`.",
"Use `state` for input data. Use `outputs.step_name` for a completed "
"method result.",
"If a value is only one `${...}` expression, the result keeps its type. "
"Use this for numbers, booleans, objects, and lists.",
"If the string has other text, the final value is text. Non-text values "
"become JSON. `null` becomes empty text.",
'Use `text(root, "path", "default")` for values that may be missing '
'or null. The default is optional and is `""`.',
)
FLOW_TEMPLATE_EXPRESSION_CONTRACT = " ".join(FLOW_TEMPLATE_EXPRESSION_RULES)
FLOW_TEMPLATE_EXPRESSION_EXAMPLES: dict[str, tuple[dict[str, str], ...]] = {
"yaml": (
{
"title": "Mix text and Flow data",
"code": 'query: "News about ${state.topic}"',
},
{
"title": "Keep a list or number type",
"code": 'domains: "${state.domains}"\nlimit: "${state.limit}"',
},
{
"title": "Use a default for missing text",
"code": 'input: "Ticket ${text(state, \\"ticket.id\\", \\"unknown\\")}"',
},
),
"json": (
{
"title": "Mix text and Flow data",
"code": '{\n "query": "News about ${state.topic}"\n}',
},
{
"title": "Keep a list or number type",
"code": (
'{\n "domains": "${state.domains}",\n "limit": "${state.limit}"\n}'
),
},
{
"title": "Use a default for missing text",
"code": (
"{\n"
' "input": "Ticket ${text(state, \\"ticket.id\\", \\"unknown\\")}"\n'
"}"
),
},
),
}
def flow_template_expression_description(prefix: str) -> str:
return f"{prefix} {FLOW_TEMPLATE_EXPRESSION_CONTRACT}"
if TYPE_CHECKING:
ExpressionData: TypeAlias = (
str
@@ -75,9 +180,13 @@ else:
)
__all__ = [
"FLOW_TEMPLATE_EXPRESSION_CONTRACT",
"FLOW_TEMPLATE_EXPRESSION_EXAMPLES",
"FLOW_TEMPLATE_EXPRESSION_RULES",
"Expression",
"ExpressionData",
"ExpressionError",
"flow_template_expression_description",
]
@@ -133,7 +242,7 @@ class Expression:
allowed_roots: Iterable[str],
source: str = "with block",
) -> None:
"""Validate nested strings fully wrapped in ``${...}`` as CEL."""
"""Validate ``${...}`` expressions inside nested strings as CEL."""
self._validate_template_value(
self.value, allowed_roots=allowed_roots, source=source
)
@@ -147,7 +256,11 @@ class Expression:
)
def render_template(self, context: dict[str, Any] | None = None) -> Any:
"""Evaluate nested strings fully wrapped in ``${...}`` as CEL."""
"""Interpolate ``${...}`` expressions inside nested strings as CEL.
A string that is exactly one ``${...}`` keeps the evaluated value's
type; strings mixing literals and expressions render as text.
"""
resolved_context = self.context if context is None else context
return self._render_template_value(self.value, resolved_context or {})
@@ -159,10 +272,23 @@ class Expression:
source: str,
) -> None:
if isinstance(value, str):
expression = Expression._expression_marker_source(value, source=source)
if expression is not None:
Expression(expression).validate_expression(
allowed_roots=allowed_roots, source=source
try:
segments = _parse_template_segments(value)
except ExpressionError as e:
raise ExpressionError(f"{e} at {source}") from None
expressions = [
segment
for segment in segments
if isinstance(segment, _ExpressionSegment)
]
for index, segment in enumerate(expressions):
segment_source = (
source
if len(expressions) == 1
else f"{source} (expression {index + 1})"
)
Expression(segment.source).validate_expression(
allowed_roots=allowed_roots, source=segment_source
)
return
if isinstance(value, dict):
@@ -221,28 +347,23 @@ class Expression:
@staticmethod
def _render_template_string(value: str, context: dict[str, Any]) -> Any:
expression = Expression._expression_marker_source(value)
if expression is None:
segments = _parse_template_segments(value)
expressions = [
segment for segment in segments if isinstance(segment, _ExpressionSegment)
]
if not expressions:
return value
return Expression._evaluate_cel(expression, context)
@staticmethod
def _expression_marker_source(
value: str, *, source: str | None = None
) -> str | None:
"""Return CEL source when the trimmed string starts with ``${`` and ends with ``}``."""
stripped = value.strip()
if not stripped.startswith("${"):
return None
if not stripped.endswith("}"):
return None
expression = stripped[2:-1].strip()
if not expression:
if source is None:
raise ExpressionError("empty CEL expression in with block")
raise ExpressionError(f"empty CEL expression at {source}")
return expression
literals = [segment for segment in segments if isinstance(segment, str)]
if len(expressions) == 1 and all(not literal.strip() for literal in literals):
return Expression._evaluate_cel(expressions[0].source, context)
rendered: list[str] = []
for segment in segments:
if isinstance(segment, str):
rendered.append(segment)
continue
result = Expression._evaluate_cel(segment.source, context)
rendered.append("" if result is None else _stringify_cel_value(result))
return "".join(rendered)
@staticmethod
def _evaluate_cel(expression: str, context: dict[str, Any]) -> Any:

View File

@@ -29,7 +29,10 @@ from crewai.flow.conversational_definition import (
FlowConversationalDefinition,
FlowConversationalRouterDefinition,
)
from crewai.flow.expressions import ExpressionData
from crewai.flow.expressions import (
ExpressionData,
flow_template_expression_description,
)
from crewai.project.crew_definition import AgentDefinition, CrewDefinition
@@ -362,12 +365,10 @@ class FlowCodeActionDefinition(BaseModel):
with_: dict[str, ExpressionData] | None = Field(
default=None,
alias="with",
description=(
"Keyword arguments passed to the callable. String values are evaluated "
"as CEL only when the trimmed value starts with ${ and ends with }; "
"all other values are literal."
description=flow_template_expression_description(
"Keyword arguments passed to the callable."
),
examples=[{"topic": "${state.topic}"}],
examples=[{"topic": "${state.topic}", "query": "News about ${state.topic}"}],
)
@@ -390,11 +391,7 @@ class FlowToolActionDefinition(BaseModel):
with_: dict[str, ExpressionData] | None = Field(
default=None,
alias="with",
description=(
"Tool input arguments. String values are evaluated as CEL only when "
"the trimmed value starts with ${ and ends with }; all other values "
"are literal."
),
description=flow_template_expression_description("Tool input arguments."),
examples=[{"query": "${outputs.normalize_topic}", "limit": 5}],
)
@@ -446,11 +443,12 @@ class FlowCrewActionDefinition(BaseModel):
)
inputs: dict[str, ExpressionData] | None = Field(
default=None,
description=(
"Input overrides passed to the Crew. String values are evaluated as CEL "
"only when the trimmed value starts with ${ and ends with }; all other "
"values are literal. The resulting values are available to crew agent "
"and task interpolation as `{name}` placeholders."
description=flow_template_expression_description(
"Input overrides passed to the Crew."
)
+ (
" The resulting values are available to crew agent and task "
"interpolation as `{name}` placeholders."
),
examples=[{"topic": "${state.topic}"}],
)

View File

@@ -10,6 +10,11 @@ from typing import Any, Literal
from jinja2 import Environment, FileSystemLoader
import yaml
from crewai.flow.expressions import (
FLOW_TEMPLATE_EXPRESSION_CONTRACT,
FLOW_TEMPLATE_EXPRESSION_EXAMPLES,
FLOW_TEMPLATE_EXPRESSION_RULES,
)
from crewai.flow.flow_definition import FlowDefinition
@@ -73,6 +78,10 @@ def template_context(
"include_expression_action": "expression_action" not in skips,
"include_script_action": "script_action" not in skips,
"include_tool_action": "tool_action" not in skips,
"expression_contract_examples": FLOW_TEMPLATE_EXPRESSION_EXAMPLES[
examples_format
],
"expression_contract_rules": FLOW_TEMPLATE_EXPRESSION_RULES,
"sections": FlowSkillReferenceExtractor(skips=skips).extract(),
}
@@ -185,7 +194,7 @@ MODEL_SPECS: tuple[ModelSpec, ...] = (
examples=True,
descriptions={
"call": "Action discriminator. Use crew to run an inline Crew definition.",
"inputs": "Actual kickoff inputs passed to the Crew. Use CEL-wrapped values here, for example `${state.topic}` or `${outputs.research_brief}`. The evaluated values are available to crew agent and task interpolation as `{name}` placeholders; reference each input the crew needs in agent or task text.",
"inputs": f"Actual kickoff inputs passed to the Crew. {FLOW_TEMPLATE_EXPRESSION_CONTRACT} The evaluated values are available to crew agent and task interpolation as `{{name}}` placeholders; reference each input the crew needs in agent or task text.",
},
),
ModelSpec(
@@ -253,7 +262,7 @@ MODEL_SPECS: tuple[ModelSpec, ...] = (
hidden=True,
examples=True,
descriptions={
"input": "Input passed to the individual agent kickoff outside of a crew. Use a single string value, often a dynamic `${...}` expression. When an agent needs multiple fields, build one single-line CEL string with labels and separators, using `text(root, 'path')` for values that may be missing or null, for example `${'Ticket ID: ' + text(state, 'ticket_id') + '; Message: ' + text(state, 'message')}`. In YAML, avoid `\\n` escapes inside `${...}` strings.",
"input": f"Input passed to the individual agent kickoff outside of a crew. Use one string. {FLOW_TEMPLATE_EXPRESSION_CONTRACT} When an agent needs multiple fields, write one string with labels and separators, for example `Ticket ID: ${{state.ticket_id}}; Message: ${{state.message}}`.",
"llm": "Language model that runs this agent. Use an object when setting LLM options such as `max_tokens`.",
"planning_config": "Agent planning configuration. Set `max_attempts` to limit planning refinement attempts before task execution.",
},

View File

@@ -57,7 +57,7 @@ methods:
role: Follow-up router
goal: 'Return exactly one bare value: followup or done. Do not include explanation.'
backstory: Skilled at routing reviewed research briefs.
input: "${'Reviewed research: ' + text(outputs, 'research_brief.raw')}"
input: "Reviewed research: ${outputs.research_brief.raw}"
write_followup:
listen: followup
do:

View File

@@ -89,7 +89,20 @@ Use these expression forms correctly:
{% if include_expression_action or include_each_action %}
- Raw CEL: use in {% if include_expression_action and include_each_action %}`expr`, `in`, and `if`{% elif include_expression_action %}`expr`{% else %}`in` and `if`{% endif %}. Do not wrap raw CEL in `${...}`.
{% endif %}
- Action mappings: wrap the whole string in `${...}`.
{% for rule in expression_contract_rules %}
- {{ rule }}
{% endfor %}
Expression examples:
{% for example in expression_contract_examples %}
{{ example.title }}:
```{{ example_language }}
{{ example.code }}
```
{% endfor %}
- Crew text: use `{name}` placeholders from crew inputs. Example: `Research {topic}`.
- Crew inputs become prompt text only when agent or task text references matching `{name}` placeholders.
- Passing an input that is not referenced by any `{name}` placeholder does not ground the crew. If the crew needs a field, put that placeholder in an agent `goal`, task `description`, or task `expected_output`.
@@ -105,13 +118,8 @@ Available CEL variables:
Dynamic value rules:
- A string is dynamic only when the whole trimmed string is wrapped in `${...}`.
- `Ticket: ${state.ticket}` stays literal. For computed strings, build the string inside the CEL expression.
- For prompt text, use CrewAI's CEL helper `text(root, "path", "default")` to safely read missing or null values as strings. The default argument is optional and defaults to `""`.
- When an agent needs multiple fields, build one single-line CEL string with labels and separators. Example: `input: "${'Ticket ID: ' + text(state, 'ticket_id') + '; Message: ' + text(state, 'message')}"`.
- In YAML, do not put `\n` escapes inside `${...}` strings. Prefer plain `${state.field}` values or the single-line labeled pattern above.
- `${...}` keeps the value type. It does not always make text.
- Crew action-level `inputs` are the actual Crew kickoff inputs. Use CEL-wrapped values there for runtime data from `state` or `outputs`.
- When an agent needs multiple fields, write one text value with labels and separators. Example value: `Ticket ID: ${state.ticket_id}; Message: ${state.message}`.
- Crew action-level `inputs` are the actual Crew kickoff inputs. Use `${...}` values there for runtime data from `state` or `outputs`.
- Crew action-level `inputs` alone are not grounding. Include placeholders for the facts the model must use.
- Crew outputs are objects. Use `${outputs.research_brief.raw}` for text.
- For structured crew output, use fields like `${outputs.research_brief.json_dict.field}` or `${outputs.research_brief.pydantic.field}`.

View File

@@ -14,6 +14,10 @@ import crewai.flow.dsl as flow_dsl
import crewai.flow.flow_definition as flow_definition
import crewai.flow.visualization.builder as visualization_builder
from crewai.experimental import ConversationConfig, RouterConfig
from crewai.flow.expressions import (
FLOW_TEMPLATE_EXPRESSION_EXAMPLES,
FLOW_TEMPLATE_EXPRESSION_RULES,
)
from crewai.flow import Flow, and_, human_feedback, listen, or_, persist, router, start
@@ -86,6 +90,14 @@ def test_flow_definition_json_schema_carries_reference_descriptions():
assert "outside of a crew" in agent_properties["with"]["description"]
assert "individual inline Agent" in agent_properties["call"]["description"]
expression_rule = FLOW_TEMPLATE_EXPRESSION_RULES[0]
code_properties = defs["FlowCodeActionDefinition"]["properties"]
tool_properties = defs["FlowToolActionDefinition"]["properties"]
crew_properties = defs["FlowCrewActionDefinition"]["properties"]
assert expression_rule in code_properties["with"]["description"]
assert expression_rule in tool_properties["with"]["description"]
assert expression_rule in crew_properties["inputs"]["description"]
state_schema = next(
branch
for branch in schema["properties"]["state"]["anyOf"]
@@ -162,7 +174,9 @@ def test_flow_definition_json_schema_carries_field_examples_only():
assert action_properties["ref"]["examples"] == [
"my_project.flows:normalize_topic"
]
assert action_properties["with"]["examples"] == [{"topic": "${state.topic}"}]
assert action_properties["with"]["examples"] == [
{"topic": "${state.topic}", "query": "News about ${state.topic}"}
]
agent_properties = defs["FlowAgentActionDefinition"]["properties"]
assert agent_properties["call"]["examples"] == ["agent"]
@@ -1333,7 +1347,7 @@ def test_skill_documents_flow_wiring():
assert isinstance(skill, str)
assert "```yaml" in skill
assert "[Method](#method-methods)" in skill
assert "input: \"${'Reviewed research: ' + text(outputs, 'research_brief.raw')}\"" in skill
assert 'input: "Reviewed research: ${outputs.research_brief.raw}"' in skill
assert 'text(root, "path", "default")' in skill
assert "trust CrewAI defaults and omit them" in skill
assert "#### LLM Definition" in skill
@@ -1346,6 +1360,11 @@ def test_skill_documents_flow_wiring():
assert "`max_rpm` (optional): integer | null; default `null`" in skill
assert "`max_execution_time` (optional): integer | null; default `null`" in skill
assert "Maximum execution time in seconds for an agent" in skill
for rule in FLOW_TEMPLATE_EXPRESSION_RULES:
assert rule in skill
for example in FLOW_TEMPLATE_EXPRESSION_EXAMPLES["yaml"]:
assert example["title"] in skill
assert example["code"] in skill
def test_skill_can_render_json_examples():
@@ -1353,6 +1372,10 @@ def test_skill_can_render_json_examples():
assert "```json" in skill
assert '"schema": "crewai.flow/v1"' in skill
for example in FLOW_TEMPLATE_EXPRESSION_EXAMPLES["json"]:
assert example["title"] in skill
assert example["code"] in skill
assert FLOW_TEMPLATE_EXPRESSION_EXAMPLES["yaml"][0]["code"] not in skill
assert "```yaml" not in skill

View File

@@ -100,6 +100,11 @@ class TypedInputsTool(BaseTool):
return f"{count}:{','.join(include_domains)}"
class TemplateInputFlow(Flow):
def capture_inputs(self, prompt: str, domains: list[str]) -> dict[str, Any]:
return {"prompt": prompt, "domains": domains}
class AsyncResultTool(BaseTool):
name: str = "AsyncResultTool"
description: str = "Returns an async result from its sync entrypoint."
@@ -733,7 +738,7 @@ methods:
assert flow.kickoff(inputs={"topic": "ai"}) == "found:ai agents"
def test_tool_action_treats_embedded_cel_marker_as_literal():
def test_tool_action_interpolates_cel_string_literals():
definition = FlowDefinition.from_declaration(contents=
{
"schema": "crewai.flow/v1",
@@ -754,10 +759,10 @@ def test_tool_action_treats_embedded_cel_marker_as_literal():
}
)
assert Flow.from_declaration(contents=definition).kickoff() == "p}x:wrapped ${'a}b'} value"
assert Flow.from_declaration(contents=definition).kickoff() == "p}x:wrapped a}b value"
def test_tool_action_treats_marker_with_trailing_text_as_literal():
def test_tool_action_interpolates_expression_with_surrounding_text():
definition = FlowDefinition.from_declaration(contents=
{
"schema": "crewai.flow/v1",
@@ -778,11 +783,177 @@ def test_tool_action_treats_marker_with_trailing_text_as_literal():
}
)
assert Flow.from_declaration(contents=definition).kickoff() == "p:${state.topic} extra"
flow = Flow.from_declaration(contents=definition)
assert flow.kickoff(inputs={"topic": "ai"}) == "p:ai extra"
def test_tool_action_rejects_adjacent_markers_as_invalid_cel():
with pytest.raises(ValidationError, match="invalid CEL expression"):
def test_tool_action_interpolates_adjacent_expressions():
definition = FlowDefinition.from_declaration(contents=
{
"schema": "crewai.flow/v1",
"name": "ToolFlow",
"methods": {
"search": {
"start": True,
"do": {
"call": "tool",
"ref": f"{__name__}:StaticSearchTool",
"with": {
"search_query": "${'a'}${'b'}",
"prefix": "p",
},
},
},
},
}
)
assert Flow.from_declaration(contents=definition).kickoff() == "p:ab"
def test_tool_action_interpolates_multiple_expressions_with_literals():
definition = FlowDefinition.from_declaration(contents=
{
"schema": "crewai.flow/v1",
"name": "ToolFlow",
"methods": {
"search": {
"start": True,
"do": {
"call": "tool",
"ref": f"{__name__}:StaticSearchTool",
"with": {
"search_query": "here's ${state.a} and another ${state.b}!",
"prefix": "p",
},
},
},
},
}
)
flow = Flow.from_declaration(contents=definition)
assert flow.kickoff(inputs={"a": "one", "b": "two"}) == "p:here's one and another two!"
def test_tool_action_interpolates_non_string_values_as_json():
definition = FlowDefinition.from_declaration(contents=
{
"schema": "crewai.flow/v1",
"name": "ToolFlow",
"methods": {
"search": {
"start": True,
"do": {
"call": "tool",
"ref": f"{__name__}:StaticSearchTool",
"with": {
"search_query": "n=${state.n}; ok=${state.ok}; d=${state.d}",
"prefix": "p",
},
},
},
},
}
)
flow = Flow.from_declaration(contents=definition)
assert (
flow.kickoff(inputs={"n": 3, "ok": True, "d": {"a": 1}})
== 'p:n=3; ok=true; d={"a": 1}'
)
def test_tool_action_interpolates_null_as_empty_string():
definition = FlowDefinition.from_declaration(contents=
{
"schema": "crewai.flow/v1",
"name": "ToolFlow",
"methods": {
"search": {
"start": True,
"do": {
"call": "tool",
"ref": f"{__name__}:StaticSearchTool",
"with": {
"search_query": "note:${state.note};",
"prefix": "p",
},
},
},
},
}
)
flow = Flow.from_declaration(contents=definition)
assert flow.kickoff(inputs={"note": None}) == "p:note:;"
def test_tool_action_interpolates_object_literal_fields():
definition = FlowDefinition.from_declaration(contents=
{
"schema": "crewai.flow/v1",
"name": "ToolFlow",
"methods": {
"search": {
"start": True,
"do": {
"call": "tool",
"ref": f"{__name__}:StaticSearchTool",
"with": {
"search_query": "result: ${ {'k': 'v'}.k } end",
"prefix": "p",
},
},
},
},
}
)
assert Flow.from_declaration(contents=definition).kickoff() == "p:result: v end"
def test_tool_action_keeps_plain_dollar_signs_literal():
definition = FlowDefinition.from_declaration(contents=
{
"schema": "crewai.flow/v1",
"name": "ToolFlow",
"methods": {
"search": {
"start": True,
"do": {
"call": "tool",
"ref": f"{__name__}:StaticSearchTool",
"with": {
"search_query": "$5 or $more, escaped ${'${'}x",
"prefix": "p",
},
},
},
},
}
)
assert Flow.from_declaration(contents=definition).kickoff() == "p:$5 or $more, escaped ${x"
@pytest.mark.parametrize(
("search_query", "error"),
[
("cost ${state.a", "unterminated"),
("x ${} y", "empty CEL expression"),
("a ${foo.bar} b", "unknown CEL root"),
],
)
def test_tool_action_rejects_invalid_interpolated_inputs(
search_query: str,
error: str,
):
with pytest.raises(ValidationError, match=error):
FlowDefinition.from_declaration(contents=
{
"schema": "crewai.flow/v1",
@@ -794,7 +965,7 @@ def test_tool_action_rejects_adjacent_markers_as_invalid_cel():
"call": "tool",
"ref": f"{__name__}:StaticSearchTool",
"with": {
"search_query": "${'a'}${'b'}",
"search_query": search_query,
"prefix": "p",
},
},
@@ -804,7 +975,7 @@ def test_tool_action_rejects_adjacent_markers_as_invalid_cel():
)
def test_tool_action_accepts_braces_in_full_cel_marker():
def test_tool_action_preserves_type_for_object_literal_expression():
definition = FlowDefinition.from_declaration(contents=
{
"schema": "crewai.flow/v1",
@@ -900,6 +1071,37 @@ methods:
)
def test_tool_action_interpolates_values_inside_list_inputs():
yaml_str = f"""
schema: crewai.flow/v1
name: ToolFlow
methods:
typed:
do:
call: tool
ref: {__name__}:TypedInputsTool
with:
count: "${{state.limit}}"
include_domains:
- "${{state.primary_domain}}"
- "docs.${{state.domain_suffix}}"
start: true
"""
flow = Flow.from_declaration(contents=yaml_str)
assert (
flow.kickoff(
inputs={
"limit": 2,
"primary_domain": "crewai.com",
"domain_suffix": "example.com",
}
)
== "2:crewai.com,docs.example.com"
)
def test_tool_action_renders_text_custom_expression_inputs():
yaml_str = f"""
schema: crewai.flow/v1
@@ -984,7 +1186,7 @@ methods:
role: Analyst
goal: Answer questions
backstory: Knows things.
input: "${'Ticket ID: ' + text(state, 'ticket.id') + '; Subject: ' + text(state, 'ticket.subject')}"
input: "Ticket ID: ${text(state, 'ticket.id')}; Subject: ${text(state, 'ticket.subject')}"
start: true
"""
@@ -1183,6 +1385,58 @@ methods:
}
def test_crew_action_interpolates_runtime_strings_and_lists(
monkeypatch: pytest.MonkeyPatch,
):
from crewai import Crew
async def fake_kickoff_async(
self: Crew, inputs: dict[str, Any] | None = None, **_kwargs: Any
) -> dict[str, Any] | None:
return inputs
monkeypatch.setattr(Crew, "kickoff_async", fake_kickoff_async)
yaml_str = """
schema: crewai.flow/v1
name: CrewFlow
methods:
research:
do:
call: crew
with:
name: inline_research
agents:
researcher:
role: Researcher
goal: Research {topic}
backstory: Knows things.
tasks:
- name: research_task
description: Research {topic} using {sources}
expected_output: Findings about {topic}
agent: researcher
inputs:
topic: "News about ${state.topic}"
sources:
- "${state.primary_source}"
- "archive-${state.topic}"
start: true
"""
flow = Flow.from_declaration(contents=yaml_str)
assert flow.kickoff(
inputs={
"topic": "AI",
"primary_source": "crewai.com",
}
) == {
"topic": "News about AI",
"sources": ["crewai.com", "archive-AI"],
}
def test_crew_action_runs_crew_from_declaration(
monkeypatch: pytest.MonkeyPatch, tmp_path: Path
):
@@ -1598,6 +1852,37 @@ methods:
assert flow.kickoff(inputs={"name": "hello"}) == "hello!"
def test_code_action_interpolates_strings_and_lists():
yaml_str = f"""
schema: crewai.flow/v1
name: CodeTemplateFlow
methods:
capture:
do:
call: code
ref: {__name__}:TemplateInputFlow.capture_inputs
with:
prompt: "Ticket ${{state.ticket.id}}: ${{state.ticket.subject}}"
domains:
- "${{state.primary_domain}}"
- "docs.${{state.domain_suffix}}"
start: true
"""
flow = Flow.from_declaration(contents=yaml_str)
assert flow.kickoff(
inputs={
"ticket": {"id": 123, "subject": "Login issue"},
"primary_domain": "crewai.com",
"domain_suffix": "example.com",
}
) == {
"prompt": "Ticket 123: Login issue",
"domains": ["crewai.com", "docs.example.com"],
}
def test_code_action_supports_callable_instance_refs():
yaml_str = f"""
schema: crewai.flow/v1
@@ -1644,6 +1929,42 @@ methods:
]
def test_each_action_interpolates_item_values_in_step_inputs():
yaml_str = f"""
schema: crewai.flow/v1
name: EachFlow
methods:
process_rows:
do:
call: each
in: state.rows
do:
- name: normalize
action:
call: code
ref: {__name__}:EachActionFlow.normalize_row
with:
row: "Row ${{item.id}}: ${{item.value}}"
prefix: "${{state.prefix}}"
start: true
"""
flow = Flow.from_declaration(contents=yaml_str)
assert flow.kickoff(
inputs={
"prefix": "normalized",
"rows": [
{"id": 1, "value": "alpha"},
{"id": 2, "value": "beta"},
],
}
) == [
"normalized:Row 1: alpha",
"normalized:Row 2: beta",
]
def test_each_action_runs_sync_steps_off_event_loop_with_context():
yaml_str = f"""
schema: crewai.flow/v1