diff --git a/lib/crewai/src/crewai/flow/flow_definition.py b/lib/crewai/src/crewai/flow/flow_definition.py index 853dd3b77..a566fcadd 100644 --- a/lib/crewai/src/crewai/flow/flow_definition.py +++ b/lib/crewai/src/crewai/flow/flow_definition.py @@ -28,7 +28,7 @@ from crewai.flow.conversational_definition import ( FlowConversationalRouterDefinition, ) from crewai.flow.expressions import ExpressionData -from crewai.project.crew_definition import CrewDefinition +from crewai.project.crew_definition import AgentDefinition, CrewDefinition logger = logging.getLogger(__name__) @@ -40,6 +40,7 @@ _EACH_STEP_CEL_ROOTS = frozenset({"item", "outputs", "state"}) __all__ = [ "FlowActionDefinition", + "FlowAgentActionDefinition", "FlowAtomicActionDefinition", "FlowCodeActionDefinition", "FlowConfigDefinition", @@ -435,6 +436,33 @@ class FlowCrewActionDefinition(BaseModel): ) +class FlowAgentActionDefinition(BaseModel): + """A Flow method action that builds and kicks off a CrewAI agent.""" + + model_config = ConfigDict( + populate_by_name=True, + extra="forbid", + ) + + call: Literal["agent"] = Field( + description="Action discriminator. Use agent to run an inline Agent definition.", + examples=["agent"], + ) + with_: AgentDefinition = Field( + alias="with", + description="Inline Agent definition to load and execute for this action.", + examples=[ + { + "role": "Analyst", + "goal": "Answer user questions", + "backstory": "Precise and concise.", + "settings": {"llm": "openai/gpt-4o-mini"}, + "input": "${state.question}", + } + ], + ) + + class FlowExpressionActionDefinition(BaseModel): """A Flow method action that evaluates a CEL expression.""" @@ -481,6 +509,7 @@ FlowAtomicActionDefinition: TypeAlias = Annotated[ FlowCodeActionDefinition | FlowToolActionDefinition | FlowCrewActionDefinition + | FlowAgentActionDefinition | FlowExpressionActionDefinition | FlowScriptActionDefinition, Field(discriminator="call"), @@ -573,6 +602,7 @@ FlowActionDefinition: TypeAlias = ( FlowCodeActionDefinition | FlowToolActionDefinition | FlowCrewActionDefinition + | FlowAgentActionDefinition | FlowExpressionActionDefinition | FlowScriptActionDefinition | FlowEachActionDefinition @@ -802,6 +832,13 @@ def _validate_action_cel( ) return + if isinstance(action, FlowAgentActionDefinition): + Expression(cast(ExpressionData, action.with_.input)).validate_template( + allowed_roots=allowed_roots, + source=f"{path}.with.input", + ) + return + if isinstance(action, FlowEachActionDefinition): Expression(action.in_).validate_expression( allowed_roots=_BASE_CEL_ROOTS, @@ -821,6 +858,14 @@ def _validate_action_cel( ) return + if isinstance(action, FlowScriptActionDefinition): + return + + raise TypeError( + f"no CEL validation defined for action type {type(action).__name__} at " + f"{path}; add a branch to _validate_action_cel for it." + ) + def log_flow_definition_issues(definition: FlowDefinition) -> None: for method_name, method in definition.methods.items(): diff --git a/lib/crewai/src/crewai/flow/runtime/_actions.py b/lib/crewai/src/crewai/flow/runtime/_actions.py index 79944c43e..c437e274b 100644 --- a/lib/crewai/src/crewai/flow/runtime/_actions.py +++ b/lib/crewai/src/crewai/flow/runtime/_actions.py @@ -13,6 +13,7 @@ from typing import TYPE_CHECKING, Any, Protocol, cast from crewai.flow.expressions import Expression, ExpressionData from crewai.flow.flow_definition import ( FlowActionDefinition, + FlowAgentActionDefinition, FlowCodeActionDefinition, FlowCrewActionDefinition, FlowEachActionDefinition, @@ -140,6 +141,35 @@ class CrewAction: return await crew.kickoff_async(inputs=inputs) +class AgentAction: + definition_type = FlowAgentActionDefinition + + def __init__(self, flow: Flow[Any], definition: FlowAgentActionDefinition) -> None: + self.flow = flow + self.definition = definition + + async def run(self, *_args: Any, **kwargs: Any) -> Any: + from crewai.project.json_loader import load_agent_from_definition + + local_context = _pop_local_context(kwargs) + rendered_input = Expression.from_flow( + cast(ExpressionData, self.definition.with_.input), + self.flow, + local_context=local_context, + ).render_template() + if not isinstance(rendered_input, str): + raise ValueError("agent input must render to a string") + + agent, response_format = load_agent_from_definition( + self.definition.with_, + source="agent action", + ) + return await agent.kickoff_async( + rendered_input, + response_format=response_format, + ) + + class ExpressionAction: definition_type = FlowExpressionActionDefinition @@ -284,6 +314,7 @@ _ACTION_TYPES: tuple[_ActionType, ...] = ( EachAction, CodeAction, ToolAction, + AgentAction, CrewAction, ExpressionAction, ScriptAction, diff --git a/lib/crewai/src/crewai/project/__init__.py b/lib/crewai/src/crewai/project/__init__.py index 151b616f9..cc9286596 100644 --- a/lib/crewai/src/crewai/project/__init__.py +++ b/lib/crewai/src/crewai/project/__init__.py @@ -15,16 +15,22 @@ from crewai.project.annotations import ( ) from crewai.project.crew_base import CrewBase from crewai.project.crew_definition import ( + AgentDefinition, CrewAgentDefinition, CrewDefinition, CrewTaskDefinition, PythonReferenceDefinition, ) from crewai.project.crew_loader import load_crew, load_crew_and_kickoff -from crewai.project.json_loader import load_agent, strip_jsonc_comments +from crewai.project.json_loader import ( + load_agent, + load_agent_from_definition, + strip_jsonc_comments, +) __all__ = [ + "AgentDefinition", "CrewAgentDefinition", "CrewBase", "CrewDefinition", @@ -38,6 +44,7 @@ __all__ = [ "crew", "llm", "load_agent", + "load_agent_from_definition", "load_crew", "load_crew_and_kickoff", "output_json", diff --git a/lib/crewai/src/crewai/project/crew_definition.py b/lib/crewai/src/crewai/project/crew_definition.py index 059878b5c..9e88c2ad6 100644 --- a/lib/crewai/src/crewai/project/crew_definition.py +++ b/lib/crewai/src/crewai/project/crew_definition.py @@ -8,6 +8,7 @@ from pydantic import BaseModel, ConfigDict, Field, field_validator, model_valida __all__ = [ + "AgentDefinition", "CrewAgentDefinition", "CrewDefinition", "CrewTaskDefinition", @@ -53,6 +54,20 @@ class CrewAgentDefinition(BaseModel): return value or {} +class AgentDefinition(CrewAgentDefinition): + """Inline agent definition used by a Flow agent action.""" + + input: str + response_format: PythonReferenceDefinition | None = None + + @field_validator("input", mode="before") + @classmethod + def _validate_input(cls, value: Any) -> Any: + if not isinstance(value, str): + raise ValueError("agent.input must be a string") + return value + + class CrewTaskDefinition(BaseModel): """Task definition used by a crew definition.""" diff --git a/lib/crewai/src/crewai/project/json_loader.py b/lib/crewai/src/crewai/project/json_loader.py index 107eb8c0c..0c6d7cbba 100644 --- a/lib/crewai/src/crewai/project/json_loader.py +++ b/lib/crewai/src/crewai/project/json_loader.py @@ -207,19 +207,18 @@ def load_jsonc_file(source: str | Path) -> Any: return parse_jsonc(path.read_text(encoding="utf-8"), source=path) -def load_agent(source: str | Path) -> Any: - """Load an existing ``Agent`` from a ``.json`` / ``.jsonc`` definition file.""" - path = Path(source) - defn = _expect_object(load_jsonc_file(path), path) - root = path.parent.parent if path.parent.name == "agents" else path.parent +def _instantiate_agent_from_data( + defn: dict[str, Any], source_label: str, root: Path +) -> Any: + """Resolve the agent class and kwargs from definition data and instantiate it.""" agent_class = _agent_class_from_definition( defn, - f"{path}: type", + f"{source_label}: type", project_root=root, ) agent_kwargs = _agent_kwargs_from_definition( defn, - path, + source_label, agent_class=agent_class, project_root=root, ) @@ -227,9 +226,50 @@ def load_agent(source: str | Path) -> Any: try: return agent_class(**agent_kwargs) except ValidationError as exc: - raise JSONProjectError(_format_validation_error(path, exc)) from exc + raise JSONProjectError(_format_validation_error(source_label, exc)) from exc except Exception as exc: - raise JSONProjectError(f"{path}: failed to load agent: {exc}") from exc + raise JSONProjectError(f"{source_label}: failed to load agent: {exc}") from exc + + +def load_agent(source: str | Path) -> Any: + """Load an existing ``Agent`` from a ``.json`` / ``.jsonc`` definition file.""" + path = Path(source) + defn = _expect_object(load_jsonc_file(path), path) + root = path.parent.parent if path.parent.name == "agents" else path.parent + return _instantiate_agent_from_data(defn, str(path), root) + + +def load_agent_from_definition( + definition: dict[str, Any] | Any, + *, + source: str | Path = "", + project_root: str | Path | None = None, +) -> tuple[Any, type[BaseModel] | None]: + """Load an ``Agent`` and optional kickoff response model from an inline definition.""" + from crewai.project.crew_definition import AgentDefinition + + root = Path(project_root) if project_root is not None else Path.cwd() + source_label = str(source) + agent_definition = ( + definition + if isinstance(definition, AgentDefinition) + else AgentDefinition.model_validate(definition) + ) + definition_data = agent_definition.model_dump(mode="python", exclude_none=True) + response_format_ref = definition_data.pop("response_format", None) + definition_data.pop("input", None) + + agent = _instantiate_agent_from_data(definition_data, source_label, root) + + response_format = None + if response_format_ref is not None: + response_format = _resolve_model_class( + response_format_ref, + f"{source_label}: response_format", + root, + ) + + return agent, response_format def validate_crew_project( diff --git a/lib/crewai/tests/project/test_json_loader.py b/lib/crewai/tests/project/test_json_loader.py index 3acfabf5d..a5a8f85fa 100644 --- a/lib/crewai/tests/project/test_json_loader.py +++ b/lib/crewai/tests/project/test_json_loader.py @@ -7,6 +7,7 @@ from pathlib import Path import sys import pytest +from pydantic import BaseModel from crewai.llms.base_llm import BaseLLM from crewai.project.json_loader import ( @@ -14,6 +15,7 @@ from crewai.project.json_loader import ( _looks_like_windows_absolute_path, find_json_project_file, load_agent, + load_agent_from_definition, strip_jsonc_comments, ) @@ -358,6 +360,30 @@ class TestLoadAgent: load_agent(Path("/nonexistent/agent.json")) +class TestLoadAgentFromDefinition: + def test_resolves_response_format_from_project_module(self, tmp_path: Path): + (tmp_path / "models.py").write_text( + "from pydantic import BaseModel\n" + "class AnswerModel(BaseModel):\n" + " answer: str\n" + ) + + _, response_format = load_agent_from_definition( + { + "role": "Analyst", + "goal": "Analyze data", + "backstory": "Data expert.", + "input": "Summarize this", + "response_format": {"python": "models.AnswerModel"}, + }, + source="agent action", + project_root=tmp_path, + ) + + assert issubclass(response_format, BaseModel) + assert response_format.__name__ == "AnswerModel" + + class TestResolveTools: def test_unknown_tool_raises_with_guidance(self): from crewai.project.json_loader import JSONProjectError, _resolve_tools diff --git a/lib/crewai/tests/test_flow_definition.py b/lib/crewai/tests/test_flow_definition.py index 73120e7db..4b5b8d37b 100644 --- a/lib/crewai/tests/test_flow_definition.py +++ b/lib/crewai/tests/test_flow_definition.py @@ -37,6 +37,7 @@ def test_flow_public_exports_are_explicit(): } assert set(flow_definition.__all__) == { "FlowActionDefinition", + "FlowAgentActionDefinition", "FlowAtomicActionDefinition", "FlowCodeActionDefinition", "FlowConfigDefinition", @@ -80,6 +81,10 @@ def test_flow_definition_json_schema_carries_reference_descriptions(): assert "not interpolated" in script_properties["code"]["description"] assert "not sandboxed" in script_properties["code"]["description"] + agent_properties = defs["FlowAgentActionDefinition"]["properties"] + assert "Inline Agent definition" in agent_properties["with"]["description"] + assert "run an inline Agent" in agent_properties["call"]["description"] + state_schema = next( branch for branch in schema["properties"]["state"]["anyOf"] @@ -122,6 +127,7 @@ def test_flow_definition_json_schema_carries_field_examples_only(): "FlowDefinition", "FlowCodeActionDefinition", "FlowToolActionDefinition", + "FlowAgentActionDefinition", "FlowCrewActionDefinition", "FlowExpressionActionDefinition", "FlowScriptActionDefinition", @@ -157,6 +163,10 @@ def test_flow_definition_json_schema_carries_field_examples_only(): ] assert action_properties["with"]["examples"] == [{"topic": "${state.topic}"}] + agent_properties = defs["FlowAgentActionDefinition"]["properties"] + assert agent_properties["call"]["examples"] == ["agent"] + assert agent_properties["with"]["examples"][0]["input"] == "${state.question}" + each_properties = defs["FlowEachActionDefinition"]["properties"] assert each_properties["in"]["examples"] == ["state.rows"] assert each_properties["do"]["examples"][0][0]["name"] == "clean" diff --git a/lib/crewai/tests/test_flow_from_definition.py b/lib/crewai/tests/test_flow_from_definition.py index 67fa7b843..1ed8dbcf9 100644 --- a/lib/crewai/tests/test_flow_from_definition.py +++ b/lib/crewai/tests/test_flow_from_definition.py @@ -811,6 +811,166 @@ methods: ) +def test_agent_action_runs_inline_yaml_definition(monkeypatch: pytest.MonkeyPatch): + from crewai import Agent + + async def fake_kickoff_async( + self: Agent, messages: str, **_kwargs: Any + ) -> dict[str, Any]: + return {"agent": self.role, "input": messages} + + monkeypatch.setattr(Agent, "kickoff_async", fake_kickoff_async) + + yaml_str = """ +schema: crewai.flow/v1 +name: AgentFlow +methods: + answer: + do: + call: agent + with: + role: Analyst + goal: Answer questions + backstory: Knows things. + input: "${state.question}" + start: true +""" + + flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + + assert flow.kickoff(inputs={"question": "What is CrewAI?"}) == { + "agent": "Analyst", + "input": "What is CrewAI?", + } + + +def test_agent_action_runs_inside_each(monkeypatch: pytest.MonkeyPatch): + from crewai import Agent + + async def fake_kickoff_async( + self: Agent, messages: str, **_kwargs: Any + ) -> str: + return f"{self.role}:{messages}" + + monkeypatch.setattr(Agent, "kickoff_async", fake_kickoff_async) + + yaml_str = """ +schema: crewai.flow/v1 +name: AgentEachFlow +methods: + answer_each: + do: + call: each + in: state.questions + do: + - name: answer + action: + call: agent + with: + role: Analyst + goal: Answer questions + backstory: Knows things. + input: "${item}" + start: true +""" + + flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + + assert flow.kickoff(inputs={"questions": ["one", "two"]}) == [ + "Analyst:one", + "Analyst:two", + ] + + +def test_agent_action_round_trips_with_inline_definition(): + definition = FlowDefinition.from_dict( + { + "schema": "crewai.flow/v1", + "name": "AgentFlow", + "methods": { + "answer": { + "start": True, + "do": { + "call": "agent", + "with": { + "role": "Analyst", + "goal": "Answer questions", + "backstory": "Knows things.", + "settings": {"verbose": True}, + "input": "${state.question}", + }, + }, + } + }, + } + ) + + round_trip = FlowDefinition.from_yaml(definition.to_yaml()) + action = round_trip.to_dict()["methods"]["answer"]["do"] + + assert action["call"] == "agent" + assert action["with"]["role"] == "Analyst" + assert action["with"]["input"] == "${state.question}" + assert action["with"]["settings"] == {"verbose": True} + + +def test_agent_action_json_schema_describes_inline_agent_definitions(): + schema_defs = FlowDefinition.json_schema()["$defs"] + + assert set(schema_defs["AgentDefinition"]["properties"]) >= { + "role", + "goal", + "backstory", + "settings", + "input", + "response_format", + } + + +def test_agent_action_rejects_non_string_input_in_definition(): + with pytest.raises(ValidationError, match="agent.input must be a string"): + FlowDefinition.from_dict( + { + "schema": "crewai.flow/v1", + "name": "AgentFlow", + "methods": { + "answer": { + "start": True, + "do": { + "call": "agent", + "with": { + "role": "Analyst", + "goal": "Answer questions", + "backstory": "Knows things.", + "input": 123, + }, + }, + } + }, + } + ) + + +def test_agent_action_reports_invalid_cel_expression(): + yaml_str = """ +schema: crewai.flow/v1 +name: AgentFlow +methods: + answer: + do: + call: agent + with: + role: Analyst + goal: Answer questions + backstory: Knows things. + input: "${state.}" + start: true +""" + + with pytest.raises(ValidationError, match="invalid CEL expression"): + FlowDefinition.from_yaml(yaml_str) + + def test_crew_action_runs_inline_yaml_definition(monkeypatch: pytest.MonkeyPatch): from crewai import Crew