Add single agent action to Flow definitions (#6226)
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Check Documentation Broken Links / Check broken links (push) Has been cancelled
Vulnerability Scan / pip-audit (push) Has been cancelled
Build uv cache / build-cache (3.10) (push) Has been cancelled
Build uv cache / build-cache (3.11) (push) Has been cancelled
Build uv cache / build-cache (3.12) (push) Has been cancelled
Build uv cache / build-cache (3.13) (push) Has been cancelled
Nightly Canary Release / Build nightly packages (push) Has been cancelled
Nightly Canary Release / Check for new commits (push) Has been cancelled
Nightly Canary Release / Publish nightly to PyPI (push) Has been cancelled

* Add single agent action to Flow definitions

Lets a flow method build and run a single CrewAI agent directly, without
wrapping it in a crew. Same idea as the existing `crew` action, but for
one agent.

  methods:
    answer:
      do:
        call: agent
        with:
          role: Analyst
          goal: Answer questions
          backstory: Knows things.
          input: "${state.question}"
      start: true

* `input` is required and interpolated from flow state, like
  `${state.question}` or `${item}` inside an `each` loop
* optional `response_format` points at a Pydantic model (`{"python":
  "models.AnswerModel"}`) to get structured output
* `input` must be a string and its CEL is validated at load time, so bad
  expressions like `${state.}` fail early

* Simplify test code
This commit is contained in:
Vinicius Brasil
2026-06-18 14:53:33 -07:00
committed by GitHub
parent fa89ac428e
commit bc2c2a858c
8 changed files with 345 additions and 11 deletions

View File

@@ -28,7 +28,7 @@ from crewai.flow.conversational_definition import (
FlowConversationalRouterDefinition,
)
from crewai.flow.expressions import ExpressionData
from crewai.project.crew_definition import CrewDefinition
from crewai.project.crew_definition import AgentDefinition, CrewDefinition
logger = logging.getLogger(__name__)
@@ -40,6 +40,7 @@ _EACH_STEP_CEL_ROOTS = frozenset({"item", "outputs", "state"})
__all__ = [
"FlowActionDefinition",
"FlowAgentActionDefinition",
"FlowAtomicActionDefinition",
"FlowCodeActionDefinition",
"FlowConfigDefinition",
@@ -435,6 +436,33 @@ class FlowCrewActionDefinition(BaseModel):
)
class FlowAgentActionDefinition(BaseModel):
"""A Flow method action that builds and kicks off a CrewAI agent."""
model_config = ConfigDict(
populate_by_name=True,
extra="forbid",
)
call: Literal["agent"] = Field(
description="Action discriminator. Use agent to run an inline Agent definition.",
examples=["agent"],
)
with_: AgentDefinition = Field(
alias="with",
description="Inline Agent definition to load and execute for this action.",
examples=[
{
"role": "Analyst",
"goal": "Answer user questions",
"backstory": "Precise and concise.",
"settings": {"llm": "openai/gpt-4o-mini"},
"input": "${state.question}",
}
],
)
class FlowExpressionActionDefinition(BaseModel):
"""A Flow method action that evaluates a CEL expression."""
@@ -481,6 +509,7 @@ FlowAtomicActionDefinition: TypeAlias = Annotated[
FlowCodeActionDefinition
| FlowToolActionDefinition
| FlowCrewActionDefinition
| FlowAgentActionDefinition
| FlowExpressionActionDefinition
| FlowScriptActionDefinition,
Field(discriminator="call"),
@@ -573,6 +602,7 @@ FlowActionDefinition: TypeAlias = (
FlowCodeActionDefinition
| FlowToolActionDefinition
| FlowCrewActionDefinition
| FlowAgentActionDefinition
| FlowExpressionActionDefinition
| FlowScriptActionDefinition
| FlowEachActionDefinition
@@ -802,6 +832,13 @@ def _validate_action_cel(
)
return
if isinstance(action, FlowAgentActionDefinition):
Expression(cast(ExpressionData, action.with_.input)).validate_template(
allowed_roots=allowed_roots,
source=f"{path}.with.input",
)
return
if isinstance(action, FlowEachActionDefinition):
Expression(action.in_).validate_expression(
allowed_roots=_BASE_CEL_ROOTS,
@@ -821,6 +858,14 @@ def _validate_action_cel(
)
return
if isinstance(action, FlowScriptActionDefinition):
return
raise TypeError(
f"no CEL validation defined for action type {type(action).__name__} at "
f"{path}; add a branch to _validate_action_cel for it."
)
def log_flow_definition_issues(definition: FlowDefinition) -> None:
for method_name, method in definition.methods.items():

View File

@@ -13,6 +13,7 @@ from typing import TYPE_CHECKING, Any, Protocol, cast
from crewai.flow.expressions import Expression, ExpressionData
from crewai.flow.flow_definition import (
FlowActionDefinition,
FlowAgentActionDefinition,
FlowCodeActionDefinition,
FlowCrewActionDefinition,
FlowEachActionDefinition,
@@ -140,6 +141,35 @@ class CrewAction:
return await crew.kickoff_async(inputs=inputs)
class AgentAction:
definition_type = FlowAgentActionDefinition
def __init__(self, flow: Flow[Any], definition: FlowAgentActionDefinition) -> None:
self.flow = flow
self.definition = definition
async def run(self, *_args: Any, **kwargs: Any) -> Any:
from crewai.project.json_loader import load_agent_from_definition
local_context = _pop_local_context(kwargs)
rendered_input = Expression.from_flow(
cast(ExpressionData, self.definition.with_.input),
self.flow,
local_context=local_context,
).render_template()
if not isinstance(rendered_input, str):
raise ValueError("agent input must render to a string")
agent, response_format = load_agent_from_definition(
self.definition.with_,
source="agent action",
)
return await agent.kickoff_async(
rendered_input,
response_format=response_format,
)
class ExpressionAction:
definition_type = FlowExpressionActionDefinition
@@ -284,6 +314,7 @@ _ACTION_TYPES: tuple[_ActionType, ...] = (
EachAction,
CodeAction,
ToolAction,
AgentAction,
CrewAction,
ExpressionAction,
ScriptAction,

View File

@@ -15,16 +15,22 @@ from crewai.project.annotations import (
)
from crewai.project.crew_base import CrewBase
from crewai.project.crew_definition import (
AgentDefinition,
CrewAgentDefinition,
CrewDefinition,
CrewTaskDefinition,
PythonReferenceDefinition,
)
from crewai.project.crew_loader import load_crew, load_crew_and_kickoff
from crewai.project.json_loader import load_agent, strip_jsonc_comments
from crewai.project.json_loader import (
load_agent,
load_agent_from_definition,
strip_jsonc_comments,
)
__all__ = [
"AgentDefinition",
"CrewAgentDefinition",
"CrewBase",
"CrewDefinition",
@@ -38,6 +44,7 @@ __all__ = [
"crew",
"llm",
"load_agent",
"load_agent_from_definition",
"load_crew",
"load_crew_and_kickoff",
"output_json",

View File

@@ -8,6 +8,7 @@ from pydantic import BaseModel, ConfigDict, Field, field_validator, model_valida
__all__ = [
"AgentDefinition",
"CrewAgentDefinition",
"CrewDefinition",
"CrewTaskDefinition",
@@ -53,6 +54,20 @@ class CrewAgentDefinition(BaseModel):
return value or {}
class AgentDefinition(CrewAgentDefinition):
"""Inline agent definition used by a Flow agent action."""
input: str
response_format: PythonReferenceDefinition | None = None
@field_validator("input", mode="before")
@classmethod
def _validate_input(cls, value: Any) -> Any:
if not isinstance(value, str):
raise ValueError("agent.input must be a string")
return value
class CrewTaskDefinition(BaseModel):
"""Task definition used by a crew definition."""

View File

@@ -207,19 +207,18 @@ def load_jsonc_file(source: str | Path) -> Any:
return parse_jsonc(path.read_text(encoding="utf-8"), source=path)
def load_agent(source: str | Path) -> Any:
"""Load an existing ``Agent`` from a ``.json`` / ``.jsonc`` definition file."""
path = Path(source)
defn = _expect_object(load_jsonc_file(path), path)
root = path.parent.parent if path.parent.name == "agents" else path.parent
def _instantiate_agent_from_data(
defn: dict[str, Any], source_label: str, root: Path
) -> Any:
"""Resolve the agent class and kwargs from definition data and instantiate it."""
agent_class = _agent_class_from_definition(
defn,
f"{path}: type",
f"{source_label}: type",
project_root=root,
)
agent_kwargs = _agent_kwargs_from_definition(
defn,
path,
source_label,
agent_class=agent_class,
project_root=root,
)
@@ -227,9 +226,50 @@ def load_agent(source: str | Path) -> Any:
try:
return agent_class(**agent_kwargs)
except ValidationError as exc:
raise JSONProjectError(_format_validation_error(path, exc)) from exc
raise JSONProjectError(_format_validation_error(source_label, exc)) from exc
except Exception as exc:
raise JSONProjectError(f"{path}: failed to load agent: {exc}") from exc
raise JSONProjectError(f"{source_label}: failed to load agent: {exc}") from exc
def load_agent(source: str | Path) -> Any:
"""Load an existing ``Agent`` from a ``.json`` / ``.jsonc`` definition file."""
path = Path(source)
defn = _expect_object(load_jsonc_file(path), path)
root = path.parent.parent if path.parent.name == "agents" else path.parent
return _instantiate_agent_from_data(defn, str(path), root)
def load_agent_from_definition(
definition: dict[str, Any] | Any,
*,
source: str | Path = "<inline agent>",
project_root: str | Path | None = None,
) -> tuple[Any, type[BaseModel] | None]:
"""Load an ``Agent`` and optional kickoff response model from an inline definition."""
from crewai.project.crew_definition import AgentDefinition
root = Path(project_root) if project_root is not None else Path.cwd()
source_label = str(source)
agent_definition = (
definition
if isinstance(definition, AgentDefinition)
else AgentDefinition.model_validate(definition)
)
definition_data = agent_definition.model_dump(mode="python", exclude_none=True)
response_format_ref = definition_data.pop("response_format", None)
definition_data.pop("input", None)
agent = _instantiate_agent_from_data(definition_data, source_label, root)
response_format = None
if response_format_ref is not None:
response_format = _resolve_model_class(
response_format_ref,
f"{source_label}: response_format",
root,
)
return agent, response_format
def validate_crew_project(

View File

@@ -7,6 +7,7 @@ from pathlib import Path
import sys
import pytest
from pydantic import BaseModel
from crewai.llms.base_llm import BaseLLM
from crewai.project.json_loader import (
@@ -14,6 +15,7 @@ from crewai.project.json_loader import (
_looks_like_windows_absolute_path,
find_json_project_file,
load_agent,
load_agent_from_definition,
strip_jsonc_comments,
)
@@ -358,6 +360,30 @@ class TestLoadAgent:
load_agent(Path("/nonexistent/agent.json"))
class TestLoadAgentFromDefinition:
def test_resolves_response_format_from_project_module(self, tmp_path: Path):
(tmp_path / "models.py").write_text(
"from pydantic import BaseModel\n"
"class AnswerModel(BaseModel):\n"
" answer: str\n"
)
_, response_format = load_agent_from_definition(
{
"role": "Analyst",
"goal": "Analyze data",
"backstory": "Data expert.",
"input": "Summarize this",
"response_format": {"python": "models.AnswerModel"},
},
source="agent action",
project_root=tmp_path,
)
assert issubclass(response_format, BaseModel)
assert response_format.__name__ == "AnswerModel"
class TestResolveTools:
def test_unknown_tool_raises_with_guidance(self):
from crewai.project.json_loader import JSONProjectError, _resolve_tools

View File

@@ -37,6 +37,7 @@ def test_flow_public_exports_are_explicit():
}
assert set(flow_definition.__all__) == {
"FlowActionDefinition",
"FlowAgentActionDefinition",
"FlowAtomicActionDefinition",
"FlowCodeActionDefinition",
"FlowConfigDefinition",
@@ -80,6 +81,10 @@ def test_flow_definition_json_schema_carries_reference_descriptions():
assert "not interpolated" in script_properties["code"]["description"]
assert "not sandboxed" in script_properties["code"]["description"]
agent_properties = defs["FlowAgentActionDefinition"]["properties"]
assert "Inline Agent definition" in agent_properties["with"]["description"]
assert "run an inline Agent" in agent_properties["call"]["description"]
state_schema = next(
branch
for branch in schema["properties"]["state"]["anyOf"]
@@ -122,6 +127,7 @@ def test_flow_definition_json_schema_carries_field_examples_only():
"FlowDefinition",
"FlowCodeActionDefinition",
"FlowToolActionDefinition",
"FlowAgentActionDefinition",
"FlowCrewActionDefinition",
"FlowExpressionActionDefinition",
"FlowScriptActionDefinition",
@@ -157,6 +163,10 @@ def test_flow_definition_json_schema_carries_field_examples_only():
]
assert action_properties["with"]["examples"] == [{"topic": "${state.topic}"}]
agent_properties = defs["FlowAgentActionDefinition"]["properties"]
assert agent_properties["call"]["examples"] == ["agent"]
assert agent_properties["with"]["examples"][0]["input"] == "${state.question}"
each_properties = defs["FlowEachActionDefinition"]["properties"]
assert each_properties["in"]["examples"] == ["state.rows"]
assert each_properties["do"]["examples"][0][0]["name"] == "clean"

View File

@@ -811,6 +811,166 @@ methods:
)
def test_agent_action_runs_inline_yaml_definition(monkeypatch: pytest.MonkeyPatch):
from crewai import Agent
async def fake_kickoff_async(
self: Agent, messages: str, **_kwargs: Any
) -> dict[str, Any]:
return {"agent": self.role, "input": messages}
monkeypatch.setattr(Agent, "kickoff_async", fake_kickoff_async)
yaml_str = """
schema: crewai.flow/v1
name: AgentFlow
methods:
answer:
do:
call: agent
with:
role: Analyst
goal: Answer questions
backstory: Knows things.
input: "${state.question}"
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
assert flow.kickoff(inputs={"question": "What is CrewAI?"}) == {
"agent": "Analyst",
"input": "What is CrewAI?",
}
def test_agent_action_runs_inside_each(monkeypatch: pytest.MonkeyPatch):
from crewai import Agent
async def fake_kickoff_async(
self: Agent, messages: str, **_kwargs: Any
) -> str:
return f"{self.role}:{messages}"
monkeypatch.setattr(Agent, "kickoff_async", fake_kickoff_async)
yaml_str = """
schema: crewai.flow/v1
name: AgentEachFlow
methods:
answer_each:
do:
call: each
in: state.questions
do:
- name: answer
action:
call: agent
with:
role: Analyst
goal: Answer questions
backstory: Knows things.
input: "${item}"
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
assert flow.kickoff(inputs={"questions": ["one", "two"]}) == [
"Analyst:one",
"Analyst:two",
]
def test_agent_action_round_trips_with_inline_definition():
definition = FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "AgentFlow",
"methods": {
"answer": {
"start": True,
"do": {
"call": "agent",
"with": {
"role": "Analyst",
"goal": "Answer questions",
"backstory": "Knows things.",
"settings": {"verbose": True},
"input": "${state.question}",
},
},
}
},
}
)
round_trip = FlowDefinition.from_yaml(definition.to_yaml())
action = round_trip.to_dict()["methods"]["answer"]["do"]
assert action["call"] == "agent"
assert action["with"]["role"] == "Analyst"
assert action["with"]["input"] == "${state.question}"
assert action["with"]["settings"] == {"verbose": True}
def test_agent_action_json_schema_describes_inline_agent_definitions():
schema_defs = FlowDefinition.json_schema()["$defs"]
assert set(schema_defs["AgentDefinition"]["properties"]) >= {
"role",
"goal",
"backstory",
"settings",
"input",
"response_format",
}
def test_agent_action_rejects_non_string_input_in_definition():
with pytest.raises(ValidationError, match="agent.input must be a string"):
FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "AgentFlow",
"methods": {
"answer": {
"start": True,
"do": {
"call": "agent",
"with": {
"role": "Analyst",
"goal": "Answer questions",
"backstory": "Knows things.",
"input": 123,
},
},
}
},
}
)
def test_agent_action_reports_invalid_cel_expression():
yaml_str = """
schema: crewai.flow/v1
name: AgentFlow
methods:
answer:
do:
call: agent
with:
role: Analyst
goal: Answer questions
backstory: Knows things.
input: "${state.}"
start: true
"""
with pytest.raises(ValidationError, match="invalid CEL expression"):
FlowDefinition.from_yaml(yaml_str)
def test_crew_action_runs_inline_yaml_definition(monkeypatch: pytest.MonkeyPatch):
from crewai import Crew