From 51d0ce4cc50393a9b2fb303cde7a34cd3fea636e Mon Sep 17 00:00:00 2001 From: Vinicius Brasil Date: Tue, 16 Jun 2026 21:21:41 -0700 Subject: [PATCH] Document FlowDefinition fields in the JSON schema MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a description and examples to every FlowDefinition field and standardize on `typing.Literal`, so the generated JSON schema documents itself — each action discriminator, state branch, and config option explains what it is and shows a realistic value. Examples live on individual fields only, never at the model level, which keeps the schema readable for tooling that renders field-level help. Co-Authored-By: Claude Opus 4.8 (1M context) --- lib/crewai/src/crewai/flow/flow_definition.py | 412 +++++++++++++++--- lib/crewai/tests/test_flow_definition.py | 98 +++++ 2 files changed, 442 insertions(+), 68 deletions(-) diff --git a/lib/crewai/src/crewai/flow/flow_definition.py b/lib/crewai/src/crewai/flow/flow_definition.py index b1edef2cd..9897cdc1e 100644 --- a/lib/crewai/src/crewai/flow/flow_definition.py +++ b/lib/crewai/src/crewai/flow/flow_definition.py @@ -12,7 +12,7 @@ from __future__ import annotations import json import logging import re -from typing import Annotated, Any, Literal as TypingLiteral, TypeAlias +from typing import Annotated, Any, Literal, TypeAlias from pydantic import ( BaseModel, @@ -73,10 +73,24 @@ def _object_ref(value: Any) -> str: class FlowDefinitionDiagnostic(BaseModel): """A non-fatal Flow Definition build or validation diagnostic.""" - code: str - message: str - severity: TypingLiteral["warning", "error"] = "warning" - path: str | None = None + code: str = Field( + description="Stable diagnostic identifier for tooling and tests.", + examples=["router_without_trigger"], + ) + message: str = Field( + description="Human-readable explanation of the diagnostic.", + examples=["router: true requires either start or listen"], + ) + severity: Literal["warning", "error"] = Field( + default="warning", + description="Diagnostic severity. Errors indicate an invalid or incomplete contract.", + examples=["error"], + ) + path: str | None = Field( + default=None, + description="Dot path to the definition field that produced the diagnostic.", + examples=["methods.decide"], + ) class FlowDictStateDefinition(BaseModel): @@ -84,7 +98,7 @@ class FlowDictStateDefinition(BaseModel): model_config = ConfigDict(extra="forbid") - type: TypingLiteral["dict"] = Field( + type: Literal["dict"] = Field( default="dict", description="Plain dictionary state with optional default values.", examples=["dict"], @@ -101,7 +115,7 @@ class FlowPydanticStateDefinition(BaseModel): model_config = ConfigDict(extra="forbid") - type: TypingLiteral["pydantic"] = Field( + type: Literal["pydantic"] = Field( default="pydantic", description="Importable Pydantic model used as the Flow state type.", examples=["pydantic"], @@ -136,7 +150,7 @@ class FlowJsonSchemaStateDefinition(BaseModel): model_config = ConfigDict(extra="forbid") - type: TypingLiteral["json_schema"] = Field( + type: Literal["json_schema"] = Field( default="json_schema", description="Inline JSON Schema used as the Flow state contract.", examples=["json_schema"], @@ -163,7 +177,7 @@ class FlowUnknownStateDefinition(BaseModel): model_config = ConfigDict(extra="forbid") - type: TypingLiteral["unknown"] = Field( + type: Literal["unknown"] = Field( default="unknown", description="Unknown state representation; runtime falls back to dictionary state.", examples=["unknown"], @@ -192,14 +206,46 @@ FlowStateDefinition: TypeAlias = Annotated[ class FlowConfigDefinition(BaseModel): """Serializable Flow-level configuration.""" - tracing: bool | None = None - stream: bool = False - memory: dict[str, Any] | None = None - input_provider: str | None = None - suppress_flow_events: bool = False - max_method_calls: int = 100 - defer_trace_finalization: bool = False - checkpoint: bool | dict[str, Any] | None = None + tracing: bool | None = Field( + default=None, + description="Override for flow tracing; when omitted, runtime defaults apply.", + examples=[True], + ) + stream: bool = Field( + default=False, + description="Whether the flow should emit streaming events when supported.", + examples=[True], + ) + memory: dict[str, Any] | None = Field( + default=None, + description="Serializable memory configuration passed to flow execution.", + examples=[{"enabled": True}], + ) + input_provider: str | None = Field( + default=None, + description="Import reference or provider key used to supply flow inputs.", + examples=["my_project.inputs:load_inputs"], + ) + suppress_flow_events: bool = Field( + default=False, + description="Disable flow event emission for this definition.", + examples=[False], + ) + max_method_calls: int = Field( + default=100, + description="Maximum number of method executions allowed during one kickoff.", + examples=[20], + ) + defer_trace_finalization: bool = Field( + default=False, + description="Defer trace finalization so callers can complete tracing later.", + examples=[False], + ) + checkpoint: bool | dict[str, Any] | None = Field( + default=None, + description="Checkpointing configuration, or true to use default checkpointing.", + examples=[True, {"enabled": True}], + ) class FlowPersistenceDefinition(BaseModel): @@ -211,9 +257,21 @@ class FlowPersistenceDefinition(BaseModel): serialized config. """ - enabled: bool = False - verbose: bool = False - persistence: Any = None + enabled: bool = Field( + default=False, + description="Whether persistence is enabled for this flow or method.", + examples=[True], + ) + verbose: bool = Field( + default=False, + description="Whether persistence should emit verbose diagnostic output.", + examples=[False], + ) + persistence: Any = Field( + default=None, + description="Persistence backend configuration or import reference.", + examples=[{"ref": "my_project.persistence:FlowStore"}], + ) @field_serializer("persistence", when_used="json") def _serialize_persistence(self, value: Any) -> Any: @@ -239,15 +297,53 @@ class FlowHumanFeedbackDefinition(BaseModel): a serialized config (``llm``) or a ``module:qualname`` ref (``provider``). """ - message: str - emit: list[str] | None = None - llm: Any = "gpt-4o-mini" - default_outcome: str | None = None - metadata: dict[str, Any] | None = None - provider: Any = None - learn: bool = False - learn_source: str = "hitl" - learn_strict: bool = False + message: str = Field( + description="Prompt shown to the human reviewer when feedback is requested.", + examples=["Review the research summary before publishing."], + ) + emit: list[str] | None = Field( + default=None, + description=( + "Allowed feedback outcomes. When set, the method routes like a router " + "using the selected outcome." + ), + examples=[["approved", "revise"]], + ) + llm: Any = Field( + default="gpt-4o-mini", + description="LLM configuration used to assist or process human feedback.", + examples=["gpt-4o-mini"], + ) + default_outcome: str | None = Field( + default=None, + description="Outcome to use when feedback cannot be collected.", + examples=["revise"], + ) + metadata: dict[str, Any] | None = Field( + default=None, + description="Serializable metadata attached to the feedback request.", + examples=[{"team": "research"}], + ) + provider: Any = Field( + default=None, + description="Feedback provider configuration or import reference.", + examples=["my_project.feedback:provider"], + ) + learn: bool = Field( + default=False, + description="Whether feedback should be recorded for later learning workflows.", + examples=[True], + ) + learn_source: str = Field( + default="hitl", + description="Source label attached to learned feedback records.", + examples=["hitl"], + ) + learn_strict: bool = Field( + default=False, + description="Whether learning should enforce strict validation of feedback data.", + examples=[False], + ) @field_serializer("llm", when_used="json") def _serialize_llm(self, value: Any) -> dict[str, Any] | str | None: @@ -267,30 +363,89 @@ class FlowHumanFeedbackDefinition(BaseModel): class FlowCodeActionDefinition(BaseModel): """A Flow method action that executes importable Python code.""" - model_config = ConfigDict(populate_by_name=True, extra="forbid") + model_config = ConfigDict( + populate_by_name=True, + extra="forbid", + ) - call: TypingLiteral["code"] = "code" - ref: str - with_: dict[str, Any] | None = Field(default=None, alias="with") + call: Literal["code"] = Field( + default="code", + description="Action discriminator. Use code to call importable Python.", + examples=["code"], + ) + ref: str = Field( + description="Import reference for the callable, formatted as module:qualname.", + examples=["my_project.flows:normalize_topic"], + ) + with_: dict[str, Any] | None = Field( + default=None, + alias="with", + description="Keyword arguments passed to the callable after expression rendering.", + examples=[{"topic": "${state.topic}"}], + ) class FlowToolActionDefinition(BaseModel): """A Flow method action that invokes a CrewAI tool.""" - model_config = ConfigDict(populate_by_name=True, extra="forbid") + model_config = ConfigDict( + populate_by_name=True, + extra="forbid", + ) - call: TypingLiteral["tool"] - ref: str - with_: dict[str, Any] | None = Field(default=None, alias="with") + call: Literal["tool"] = Field( + description="Action discriminator. Use tool to instantiate and run a CrewAI tool.", + examples=["tool"], + ) + ref: str = Field( + description="Import reference for a BaseTool class, formatted as module:qualname.", + examples=["my_project.tools:SearchTool"], + ) + with_: dict[str, Any] | None = Field( + default=None, + alias="with", + description="Tool input arguments after expression rendering.", + examples=[{"query": "${outputs.normalize_topic}", "limit": 5}], + ) class FlowCrewActionDefinition(BaseModel): """A Flow method action that builds and kicks off a CrewAI crew.""" - model_config = ConfigDict(populate_by_name=True, extra="forbid") + model_config = ConfigDict( + populate_by_name=True, + extra="forbid", + ) - call: TypingLiteral["crew"] - with_: CrewDefinition = Field(alias="with") + call: Literal["crew"] = Field( + description="Action discriminator. Use crew to run an inline Crew definition.", + examples=["crew"], + ) + with_: CrewDefinition = Field( + alias="with", + description="Inline Crew definition to load and execute for this action.", + examples=[ + { + "name": "inline_research", + "agents": { + "researcher": { + "role": "Researcher", + "goal": "Research {topic}", + "backstory": "Knows the domain.", + } + }, + "tasks": [ + { + "name": "research_task", + "description": "Research {topic}", + "expected_output": "Findings about {topic}", + "agent": "researcher", + } + ], + "inputs": {"topic": "${state.topic}"}, + } + ], + ) class FlowExpressionActionDefinition(BaseModel): @@ -298,8 +453,14 @@ class FlowExpressionActionDefinition(BaseModel): model_config = ConfigDict(extra="forbid") - call: TypingLiteral["expression"] - expr: str + call: Literal["expression"] = Field( + description="Action discriminator. Use expression to evaluate a CEL expression.", + examples=["expression"], + ) + expr: str = Field( + description="CEL expression evaluated against state, outputs, and local context.", + examples=["state.topic", "outputs.normalize_topic"], + ) class FlowScriptActionDefinition(BaseModel): @@ -307,7 +468,7 @@ class FlowScriptActionDefinition(BaseModel): model_config = ConfigDict(extra="forbid") - call: TypingLiteral["script"] = Field( + call: Literal["script"] = Field( description="Action discriminator. Use script to execute trusted inline Python.", examples=["script"], ) @@ -322,7 +483,7 @@ class FlowScriptActionDefinition(BaseModel): "return state['normalized_topic']" ], ) - language: TypingLiteral["python"] = Field( + language: Literal["python"] = Field( default="python", description="Script language. Only python is currently supported.", examples=["python"], @@ -341,6 +502,11 @@ FlowInnerActionDefinition = ( class FlowEachInnerActionDefinition(RootModel[dict[str, FlowInnerActionDefinition]]): """One named action inside an ``each`` composite action.""" + root: dict[str, FlowInnerActionDefinition] = Field( + description="Single-entry mapping from an inner action name to its action.", + examples=[{"clean": {"call": "script", "code": "return item.strip()"}}], + ) + @model_validator(mode="after") def _validate_action_mapping(self) -> FlowEachInnerActionDefinition: if len(self.root) != 1: @@ -360,11 +526,35 @@ class FlowEachInnerActionDefinition(RootModel[dict[str, FlowInnerActionDefinitio class FlowEachActionDefinition(BaseModel): """A composite action that runs a sequential mini-pipeline for each item.""" - model_config = ConfigDict(populate_by_name=True, extra="forbid") + model_config = ConfigDict( + populate_by_name=True, + extra="forbid", + ) - call: TypingLiteral["each"] - in_: str = Field(alias="in") - do: list[FlowEachInnerActionDefinition] + call: Literal["each"] = Field( + description=( + "Action discriminator. Use each to run a sequence of actions for every " + "item in an input list." + ), + examples=["each"], + ) + in_: str = Field( + alias="in", + description="CEL expression that must evaluate to the list to iterate.", + examples=["state.rows"], + ) + do: list[FlowEachInnerActionDefinition] = Field( + description=( + "Ordered inner actions to run for each item. Each entry must be a " + "single-key mapping naming that inner action." + ), + examples=[ + [ + {"clean": {"call": "script", "code": "return item.strip()"}}, + {"tag": {"call": "expression", "expr": "outputs.clean"}}, + ] + ], + ) @model_validator(mode="after") def _validate_inner_action_list(self) -> FlowEachActionDefinition: @@ -394,14 +584,48 @@ FlowActionDefinition = ( class FlowMethodDefinition(BaseModel): """Static definition of one Flow method and its execution roles.""" - description: str | None = None - do: FlowActionDefinition - start: bool | FlowDefinitionCondition | None = None - listen: FlowDefinitionCondition | None = None - router: bool = False - emit: list[str] | None = None - human_feedback: FlowHumanFeedbackDefinition | None = None - persist: FlowPersistenceDefinition | None = None + description: str | None = Field( + default=None, + description="Human-readable summary of what this method does.", + examples=["Normalize the incoming topic."], + ) + do: FlowActionDefinition = Field( + description="Action executed when this method runs.", + examples=[{"call": "script", "code": "return input.strip()"}], + ) + start: bool | FlowDefinitionCondition | None = Field( + default=None, + description=( + "Marks a start method. True starts unconditionally; a condition starts " + "when the kickoff inputs or events satisfy it." + ), + examples=[True], + ) + listen: FlowDefinitionCondition | None = Field( + default=None, + description="Trigger condition that runs this method after upstream events.", + examples=["seed", {"or": ["approved", "revise"]}], + ) + router: bool = Field( + default=False, + description="Whether the method output should be treated as the next event name.", + examples=[True], + ) + emit: list[str] | None = Field( + default=None, + description="Declared router events this method may emit.", + examples=[["approved", "revise"]], + ) + human_feedback: FlowHumanFeedbackDefinition | None = Field( + default=None, + description="Optional human feedback step applied after the method action.", + examples=[{"message": "Review the research summary before publishing."}], + ) + persist: FlowPersistenceDefinition | None = Field( + default=None, + description="Method-level persistence override.", + examples=[{"enabled": True}], + ) @model_validator(mode="after") def _canonicalize_human_feedback_routing(self) -> FlowMethodDefinition: @@ -427,19 +651,71 @@ class FlowMethodDefinition(BaseModel): class FlowDefinition(BaseModel): """Static, serializable definition of a Flow.""" - model_config = ConfigDict(populate_by_name=True, arbitrary_types_allowed=True) - - schema_: TypingLiteral["crewai.flow/v1"] = Field( - default="crewai.flow/v1", alias="schema" + model_config = ConfigDict( + populate_by_name=True, + arbitrary_types_allowed=True, + ) + + schema_: Literal["crewai.flow/v1"] = Field( + default="crewai.flow/v1", + alias="schema", + description="Flow Definition schema identifier and version.", + examples=["crewai.flow/v1"], + ) + name: str = Field( + description="Unique flow name used in logs, events, and traces.", + examples=["ResearchFlow"], + ) + description: str | None = Field( + default=None, + description="Human-readable summary of the flow.", + examples=["Normalize a topic and prepare it for research."], + ) + state: FlowStateDefinition | None = Field( + default=None, + description="State contract for kickoff inputs and runtime state.", + examples=[{"type": "dict", "default": {"topic": "AI agents"}}], + ) + config: FlowConfigDefinition = Field( + default_factory=FlowConfigDefinition, + description="Serializable flow-level runtime configuration.", + examples=[{"stream": True, "max_method_calls": 20}], + ) + persist: FlowPersistenceDefinition | None = Field( + default=None, + description="Flow-level persistence configuration.", + examples=[{"enabled": True}], + ) + conversational: FlowConversationalDefinition | None = Field( + default=None, + description="Conversational flow configuration, when the flow supports chat.", + ) + methods: dict[str, FlowMethodDefinition] = Field( + default_factory=dict, + description="Mapping of method names to method definitions.", + examples=[ + { + "seed": { + "start": True, + "do": {"call": "expression", "expr": "state.topic"}, + } + } + ], + ) + diagnostics: list[FlowDefinitionDiagnostic] = Field( + default_factory=list, + description="Validation diagnostics attached to this definition.", + examples=[ + [ + { + "code": "router_without_trigger", + "message": "router: true requires either start or listen", + "severity": "error", + "path": "methods.decide", + } + ] + ], ) - name: str - description: str | None = None - state: FlowStateDefinition | None = None - config: FlowConfigDefinition = Field(default_factory=FlowConfigDefinition) - persist: FlowPersistenceDefinition | None = None - conversational: FlowConversationalDefinition | None = None - methods: dict[str, FlowMethodDefinition] = Field(default_factory=dict) - diagnostics: list[FlowDefinitionDiagnostic] = Field(default_factory=list) @model_validator(mode="after") def _validate_method_names(self) -> FlowDefinition: diff --git a/lib/crewai/tests/test_flow_definition.py b/lib/crewai/tests/test_flow_definition.py index 7be04fc48..7aaa4d31a 100644 --- a/lib/crewai/tests/test_flow_definition.py +++ b/lib/crewai/tests/test_flow_definition.py @@ -63,6 +63,104 @@ def test_flow_public_exports_are_explicit(): assert "calculate_node_levels" not in flow_visualization.__all__ +def test_flow_definition_json_schema_carries_reference_descriptions(): + schema = flow_definition.FlowDefinition.json_schema() + defs = schema["$defs"] + + assert schema["properties"]["schema"]["description"] + assert schema["properties"]["methods"]["description"] + + method_properties = defs["FlowMethodDefinition"]["properties"] + assert method_properties["do"]["description"] == "Action executed when this method runs." + assert "Trigger condition" in method_properties["listen"]["description"] + + script_properties = defs["FlowScriptActionDefinition"]["properties"] + assert "trusted inline Python" in script_properties["call"]["description"] + assert "not interpolated" in script_properties["code"]["description"] + assert "not sandboxed" in script_properties["code"]["description"] + + state_schema = schema["properties"]["state"]["anyOf"][0] + assert state_schema["discriminator"]["propertyName"] == "type" + assert state_schema["discriminator"]["mapping"] == { + "dict": "#/$defs/FlowDictStateDefinition", + "json_schema": "#/$defs/FlowJsonSchemaStateDefinition", + "pydantic": "#/$defs/FlowPydanticStateDefinition", + "unknown": "#/$defs/FlowUnknownStateDefinition", + } + + dict_state_properties = defs["FlowDictStateDefinition"]["properties"] + assert dict_state_properties["type"]["description"] + assert "ref" not in dict_state_properties + + json_schema_state_properties = defs["FlowJsonSchemaStateDefinition"]["properties"] + assert json_schema_state_properties["json_schema"]["description"] + assert "json_schema" in defs["FlowJsonSchemaStateDefinition"]["required"] + + pydantic_state_properties = defs["FlowPydanticStateDefinition"]["properties"] + assert "Fallback JSON Schema" in pydantic_state_properties["json_schema"][ + "description" + ] + + each_properties = defs["FlowEachActionDefinition"]["properties"] + assert "list to iterate" in each_properties["in"]["description"] + assert "Ordered inner actions" in each_properties["do"]["description"] + + +def test_flow_definition_json_schema_carries_field_examples_only(): + schema = flow_definition.FlowDefinition.json_schema() + defs = schema["$defs"] + + for model_name in [ + "FlowDefinition", + "FlowCodeActionDefinition", + "FlowToolActionDefinition", + "FlowCrewActionDefinition", + "FlowExpressionActionDefinition", + "FlowScriptActionDefinition", + "FlowEachActionDefinition", + "FlowMethodDefinition", + "FlowDictStateDefinition", + "FlowJsonSchemaStateDefinition", + "FlowPydanticStateDefinition", + "FlowUnknownStateDefinition", + "FlowConfigDefinition", + "FlowPersistenceDefinition", + "FlowHumanFeedbackDefinition", + "FlowDefinitionDiagnostic", + ]: + model_schema = schema if model_name == "FlowDefinition" else defs[model_name] + assert "examples" not in model_schema + + assert schema["properties"]["name"]["examples"] == ["ResearchFlow"] + assert schema["properties"]["schema"]["examples"] == ["crewai.flow/v1"] + assert schema["properties"]["methods"]["examples"][0]["seed"]["do"] == { + "call": "expression", + "expr": "state.topic", + } + + script_properties = defs["FlowScriptActionDefinition"]["properties"] + assert script_properties["call"]["examples"] == ["script"] + assert "input.strip()" in script_properties["code"]["examples"][0] + assert script_properties["language"]["examples"] == ["python"] + + action_properties = defs["FlowCodeActionDefinition"]["properties"] + assert action_properties["ref"]["examples"] == [ + "my_project.flows:normalize_topic" + ] + assert action_properties["with"]["examples"] == [{"topic": "${state.topic}"}] + + each_properties = defs["FlowEachActionDefinition"]["properties"] + assert each_properties["in"]["examples"] == ["state.rows"] + assert each_properties["do"]["examples"][0][0]["clean"]["call"] == "script" + + method_properties = defs["FlowMethodDefinition"]["properties"] + assert method_properties["listen"]["examples"] == [ + "seed", + {"or": ["approved", "revise"]}, + ] + assert method_properties["emit"]["examples"] == [["approved", "revise"]] + + def test_flow_state_definition_uses_discriminated_branches(): definition = flow_definition.FlowDefinition.model_validate( {