mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-06-18 06:38:11 +00:00
Compare commits
5 Commits
ci/python-
...
flow-route
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2105a0ebf8 | ||
|
|
4f8d5cf7cb | ||
|
|
218dc82bf7 | ||
|
|
7374486f00 | ||
|
|
5bd10ee2c4 |
@@ -14,7 +14,6 @@ from crewai.flow.flow_definition import (
|
||||
FlowConversationalDefinition,
|
||||
FlowConversationalRouterDefinition,
|
||||
FlowDefinition,
|
||||
FlowDefinitionDiagnostic,
|
||||
FlowDictStateDefinition,
|
||||
FlowHumanFeedbackDefinition,
|
||||
FlowMethodDefinition,
|
||||
@@ -23,6 +22,7 @@ from crewai.flow.flow_definition import (
|
||||
FlowStateDefinition,
|
||||
FlowUnknownStateDefinition,
|
||||
_object_ref,
|
||||
log_flow_definition_issues,
|
||||
)
|
||||
from crewai.flow.flow_wrappers import (
|
||||
FlowMethod,
|
||||
@@ -116,7 +116,6 @@ def _is_json_serializable(value: Any) -> bool:
|
||||
|
||||
def _serialize_static_value(
|
||||
value: Any,
|
||||
diagnostics: list[FlowDefinitionDiagnostic],
|
||||
path: str,
|
||||
) -> Any:
|
||||
if value is None or _is_json_serializable(value):
|
||||
@@ -148,12 +147,11 @@ def _serialize_static_value(
|
||||
)
|
||||
|
||||
ref = _object_ref(value)
|
||||
diagnostics.append(
|
||||
FlowDefinitionDiagnostic(
|
||||
code="non_serializable_value",
|
||||
path=path,
|
||||
message=f"value is not fully serializable; preserved import reference {ref}",
|
||||
)
|
||||
logger.warning(
|
||||
"Flow definition value at %s is not fully serializable; "
|
||||
"preserved import reference %s.",
|
||||
path,
|
||||
ref,
|
||||
)
|
||||
return {"ref": ref}
|
||||
|
||||
@@ -169,10 +167,7 @@ def _state_ref(value: Any) -> str | None:
|
||||
return None
|
||||
|
||||
|
||||
def _build_state_definition(
|
||||
flow_class: type,
|
||||
diagnostics: list[FlowDefinitionDiagnostic],
|
||||
) -> FlowStateDefinition | None:
|
||||
def _build_state_definition(flow_class: type) -> FlowStateDefinition | None:
|
||||
from pydantic import BaseModel as PydanticBaseModel
|
||||
|
||||
state_value = getattr(flow_class, "_initial_state_t", None)
|
||||
@@ -187,29 +182,23 @@ def _build_state_definition(
|
||||
if state_value is dict or isinstance(state_value, dict):
|
||||
default = None
|
||||
if isinstance(state_value, dict):
|
||||
default = _serialize_static_value(state_value, diagnostics, "state.default")
|
||||
default = _serialize_static_value(state_value, "state.default")
|
||||
return FlowDictStateDefinition(default=default)
|
||||
if isinstance(state_value, type) and issubclass(state_value, PydanticBaseModel):
|
||||
return FlowPydanticStateDefinition(ref=_state_ref(state_value))
|
||||
if isinstance(state_value, PydanticBaseModel):
|
||||
return FlowPydanticStateDefinition(
|
||||
ref=_state_ref(state_value),
|
||||
default=_serialize_static_value(state_value, diagnostics, "state.default"),
|
||||
)
|
||||
diagnostics.append(
|
||||
FlowDefinitionDiagnostic(
|
||||
code="unknown_state_type",
|
||||
path="state",
|
||||
message=f"could not serialize state type {_object_ref(state_value)}",
|
||||
default=_serialize_static_value(state_value, "state.default"),
|
||||
)
|
||||
logger.warning(
|
||||
"Flow definition state could not serialize state type %s.",
|
||||
_object_ref(state_value),
|
||||
)
|
||||
return FlowUnknownStateDefinition(ref=_state_ref(state_value))
|
||||
|
||||
|
||||
def _build_config_definition(
|
||||
flow_class: type,
|
||||
diagnostics: list[FlowDefinitionDiagnostic],
|
||||
) -> FlowConfigDefinition:
|
||||
def _build_config_definition(flow_class: type) -> FlowConfigDefinition:
|
||||
config_field_names = set(FlowConfigDefinition.model_fields)
|
||||
field_defaults = {
|
||||
name: field.get_default(call_default_factory=True)
|
||||
@@ -225,15 +214,12 @@ def _build_config_definition(
|
||||
value if value is None or isinstance(value, str) else _object_ref(value)
|
||||
)
|
||||
else:
|
||||
values[field_name] = _serialize_static_value(
|
||||
value, diagnostics, f"config.{field_name}"
|
||||
)
|
||||
values[field_name] = _serialize_static_value(value, f"config.{field_name}")
|
||||
return FlowConfigDefinition(**values)
|
||||
|
||||
|
||||
def _build_human_feedback_definition(
|
||||
method: Any,
|
||||
diagnostics: list[FlowDefinitionDiagnostic],
|
||||
path: str,
|
||||
) -> FlowHumanFeedbackDefinition | None:
|
||||
config = getattr(method, "__human_feedback_config__", None)
|
||||
@@ -248,7 +234,7 @@ def _build_human_feedback_definition(
|
||||
llm=getattr(config, "llm", None),
|
||||
default_outcome=getattr(config, "default_outcome", None),
|
||||
metadata=_serialize_static_value(
|
||||
getattr(config, "metadata", None), diagnostics, f"{path}.metadata"
|
||||
getattr(config, "metadata", None), f"{path}.metadata"
|
||||
),
|
||||
provider=getattr(config, "provider", None),
|
||||
learn=bool(getattr(config, "learn", False)),
|
||||
@@ -273,7 +259,6 @@ def _build_persistence_definition(value: Any) -> FlowPersistenceDefinition | Non
|
||||
|
||||
def _build_conversational_router_definition(
|
||||
router_config: Any,
|
||||
diagnostics: list[FlowDefinitionDiagnostic],
|
||||
path: str,
|
||||
) -> FlowConversationalRouterDefinition | None:
|
||||
if router_config is None:
|
||||
@@ -284,12 +269,9 @@ def _build_conversational_router_definition(
|
||||
prompt=getattr(router_config, "prompt", None),
|
||||
response_format=_serialize_static_value(
|
||||
getattr(router_config, "response_format", None),
|
||||
diagnostics,
|
||||
f"{path}.response_format",
|
||||
),
|
||||
llm=_serialize_static_value(
|
||||
getattr(router_config, "llm", None), diagnostics, f"{path}.llm"
|
||||
),
|
||||
llm=_serialize_static_value(getattr(router_config, "llm", None), f"{path}.llm"),
|
||||
routes=[str(route) for route in routes] if routes is not None else None,
|
||||
route_descriptions=getattr(router_config, "route_descriptions", None),
|
||||
default_intent=getattr(router_config, "default_intent", "converse"),
|
||||
@@ -300,7 +282,6 @@ def _build_conversational_router_definition(
|
||||
|
||||
def _build_conversational_definition(
|
||||
flow_class: type,
|
||||
diagnostics: list[FlowDefinitionDiagnostic],
|
||||
) -> FlowConversationalDefinition | None:
|
||||
if not _is_conversational_flow(flow_class):
|
||||
return None
|
||||
@@ -324,12 +305,9 @@ def _build_conversational_definition(
|
||||
return FlowConversationalDefinition(
|
||||
enabled=True,
|
||||
system_prompt=getattr(config, "system_prompt", None),
|
||||
llm=_serialize_static_value(
|
||||
getattr(config, "llm", None), diagnostics, "conversational.llm"
|
||||
),
|
||||
llm=_serialize_static_value(getattr(config, "llm", None), "conversational.llm"),
|
||||
router=_build_conversational_router_definition(
|
||||
getattr(config, "router", None),
|
||||
diagnostics,
|
||||
"conversational.router",
|
||||
),
|
||||
answer_from_history_prompt=getattr(config, "answer_from_history_prompt", None),
|
||||
@@ -340,12 +318,10 @@ def _build_conversational_definition(
|
||||
),
|
||||
intent_llm=_serialize_static_value(
|
||||
getattr(config, "intent_llm", None),
|
||||
diagnostics,
|
||||
"conversational.intent_llm",
|
||||
),
|
||||
answer_from_history_llm=_serialize_static_value(
|
||||
getattr(config, "answer_from_history_llm", None),
|
||||
diagnostics,
|
||||
"conversational.answer_from_history_llm",
|
||||
),
|
||||
visible_agent_outputs=(
|
||||
@@ -365,7 +341,6 @@ def _build_conversational_definition(
|
||||
|
||||
def _build_method_definition(
|
||||
method: Any,
|
||||
diagnostics: list[FlowDefinitionDiagnostic],
|
||||
path: str,
|
||||
) -> FlowMethodDefinition:
|
||||
fragment = _get_flow_method_definition(method)
|
||||
@@ -376,9 +351,7 @@ def _build_method_definition(
|
||||
deep=True, update={"do": _method_action(method)}
|
||||
)
|
||||
|
||||
human_feedback = _build_human_feedback_definition(
|
||||
method, diagnostics, f"{path}.human_feedback"
|
||||
)
|
||||
human_feedback = _build_human_feedback_definition(method, f"{path}.human_feedback")
|
||||
if human_feedback is not None:
|
||||
method_definition.human_feedback = human_feedback
|
||||
if human_feedback.emit:
|
||||
@@ -444,7 +417,6 @@ def _build_flow_definition_from_class(
|
||||
flow_class: type,
|
||||
namespace: dict[str, Any] | None = None,
|
||||
) -> FlowDefinition:
|
||||
diagnostics: list[FlowDefinitionDiagnostic] = []
|
||||
methods: dict[str, FlowMethodDefinition] = {}
|
||||
flow_methods = _iter_flow_methods(flow_class)
|
||||
if namespace is not None:
|
||||
@@ -456,7 +428,7 @@ def _build_flow_definition_from_class(
|
||||
|
||||
for method_name, method in flow_methods.items():
|
||||
methods[method_name] = _build_method_definition(
|
||||
method, diagnostics, f"methods.{method_name}"
|
||||
method, f"methods.{method_name}"
|
||||
)
|
||||
|
||||
description = None
|
||||
@@ -467,15 +439,13 @@ def _build_flow_definition_from_class(
|
||||
definition = FlowDefinition(
|
||||
name=getattr(flow_class, "__name__", "Flow"),
|
||||
description=description,
|
||||
state=_build_state_definition(flow_class, diagnostics),
|
||||
config=_build_config_definition(flow_class, diagnostics),
|
||||
state=_build_state_definition(flow_class),
|
||||
config=_build_config_definition(flow_class),
|
||||
persist=_build_persistence_definition(flow_class),
|
||||
conversational=_build_conversational_definition(flow_class, diagnostics),
|
||||
conversational=_build_conversational_definition(flow_class),
|
||||
methods=methods,
|
||||
diagnostics=diagnostics,
|
||||
)
|
||||
definition.diagnostics.extend(definition.validate_contract())
|
||||
definition.log_diagnostics()
|
||||
log_flow_definition_issues(definition)
|
||||
return definition
|
||||
|
||||
|
||||
|
||||
@@ -12,13 +12,12 @@ from __future__ import annotations
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
from typing import Annotated, Any, Literal as TypingLiteral, TypeAlias
|
||||
from typing import Annotated, Any, Literal, TypeAlias
|
||||
|
||||
from pydantic import (
|
||||
BaseModel,
|
||||
ConfigDict,
|
||||
Field,
|
||||
RootModel,
|
||||
field_serializer,
|
||||
model_validator,
|
||||
)
|
||||
@@ -38,6 +37,7 @@ _STEP_NAME_PATTERN = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
|
||||
|
||||
__all__ = [
|
||||
"FlowActionDefinition",
|
||||
"FlowAtomicActionDefinition",
|
||||
"FlowCodeActionDefinition",
|
||||
"FlowConfigDefinition",
|
||||
"FlowConversationalDefinition",
|
||||
@@ -45,16 +45,16 @@ __all__ = [
|
||||
"FlowCrewActionDefinition",
|
||||
"FlowDefinition",
|
||||
"FlowDefinitionCondition",
|
||||
"FlowDefinitionDiagnostic",
|
||||
"FlowDictStateDefinition",
|
||||
"FlowEachActionDefinition",
|
||||
"FlowEachInnerActionDefinition",
|
||||
"FlowEachStepDefinition",
|
||||
"FlowExpressionActionDefinition",
|
||||
"FlowHumanFeedbackDefinition",
|
||||
"FlowJsonSchemaStateDefinition",
|
||||
"FlowMethodDefinition",
|
||||
"FlowPersistenceDefinition",
|
||||
"FlowPydanticStateDefinition",
|
||||
"FlowScriptActionDefinition",
|
||||
"FlowStateDefinition",
|
||||
"FlowToolActionDefinition",
|
||||
"FlowUnknownStateDefinition",
|
||||
@@ -69,21 +69,12 @@ def _object_ref(value: Any) -> str:
|
||||
return f"{module}:{qualname}" if module and qualname else repr(value)
|
||||
|
||||
|
||||
class FlowDefinitionDiagnostic(BaseModel):
|
||||
"""A non-fatal Flow Definition build or validation diagnostic."""
|
||||
|
||||
code: str
|
||||
message: str
|
||||
severity: TypingLiteral["warning", "error"] = "warning"
|
||||
path: str | None = None
|
||||
|
||||
|
||||
class FlowDictStateDefinition(BaseModel):
|
||||
"""Static description of a plain dictionary Flow state contract."""
|
||||
|
||||
model_config = ConfigDict(extra="forbid")
|
||||
|
||||
type: TypingLiteral["dict"] = Field(
|
||||
type: Literal["dict"] = Field(
|
||||
default="dict",
|
||||
description="Plain dictionary state with optional default values.",
|
||||
examples=["dict"],
|
||||
@@ -100,7 +91,7 @@ class FlowPydanticStateDefinition(BaseModel):
|
||||
|
||||
model_config = ConfigDict(extra="forbid")
|
||||
|
||||
type: TypingLiteral["pydantic"] = Field(
|
||||
type: Literal["pydantic"] = Field(
|
||||
default="pydantic",
|
||||
description="Importable Pydantic model used as the Flow state type.",
|
||||
examples=["pydantic"],
|
||||
@@ -135,7 +126,7 @@ class FlowJsonSchemaStateDefinition(BaseModel):
|
||||
|
||||
model_config = ConfigDict(extra="forbid")
|
||||
|
||||
type: TypingLiteral["json_schema"] = Field(
|
||||
type: Literal["json_schema"] = Field(
|
||||
default="json_schema",
|
||||
description="Inline JSON Schema used as the Flow state contract.",
|
||||
examples=["json_schema"],
|
||||
@@ -162,7 +153,7 @@ class FlowUnknownStateDefinition(BaseModel):
|
||||
|
||||
model_config = ConfigDict(extra="forbid")
|
||||
|
||||
type: TypingLiteral["unknown"] = Field(
|
||||
type: Literal["unknown"] = Field(
|
||||
default="unknown",
|
||||
description="Unknown state representation; runtime falls back to dictionary state.",
|
||||
examples=["unknown"],
|
||||
@@ -191,14 +182,46 @@ FlowStateDefinition: TypeAlias = Annotated[
|
||||
class FlowConfigDefinition(BaseModel):
|
||||
"""Serializable Flow-level configuration."""
|
||||
|
||||
tracing: bool | None = None
|
||||
stream: bool = False
|
||||
memory: dict[str, Any] | None = None
|
||||
input_provider: str | None = None
|
||||
suppress_flow_events: bool = False
|
||||
max_method_calls: int = 100
|
||||
defer_trace_finalization: bool = False
|
||||
checkpoint: bool | dict[str, Any] | None = None
|
||||
tracing: bool | None = Field(
|
||||
default=None,
|
||||
description="Override for flow tracing; when omitted, runtime defaults apply.",
|
||||
examples=[True],
|
||||
)
|
||||
stream: bool = Field(
|
||||
default=False,
|
||||
description="Whether the flow should emit streaming events when supported.",
|
||||
examples=[True],
|
||||
)
|
||||
memory: dict[str, Any] | None = Field(
|
||||
default=None,
|
||||
description="Serializable memory configuration passed to flow execution.",
|
||||
examples=[{"enabled": True}],
|
||||
)
|
||||
input_provider: str | None = Field(
|
||||
default=None,
|
||||
description="Import reference or provider key used to supply flow inputs.",
|
||||
examples=["my_project.inputs:load_inputs"],
|
||||
)
|
||||
suppress_flow_events: bool = Field(
|
||||
default=False,
|
||||
description="Disable flow event emission for this definition.",
|
||||
examples=[False],
|
||||
)
|
||||
max_method_calls: int = Field(
|
||||
default=100,
|
||||
description="Maximum number of method executions allowed during one kickoff.",
|
||||
examples=[20],
|
||||
)
|
||||
defer_trace_finalization: bool = Field(
|
||||
default=False,
|
||||
description="Defer trace finalization so callers can complete tracing later.",
|
||||
examples=[False],
|
||||
)
|
||||
checkpoint: bool | dict[str, Any] | None = Field(
|
||||
default=None,
|
||||
description="Checkpointing configuration, or true to use default checkpointing.",
|
||||
examples=[True, {"enabled": True}],
|
||||
)
|
||||
|
||||
|
||||
class FlowPersistenceDefinition(BaseModel):
|
||||
@@ -210,9 +233,21 @@ class FlowPersistenceDefinition(BaseModel):
|
||||
serialized config.
|
||||
"""
|
||||
|
||||
enabled: bool = False
|
||||
verbose: bool = False
|
||||
persistence: Any = None
|
||||
enabled: bool = Field(
|
||||
default=False,
|
||||
description="Whether persistence is enabled for this flow or method.",
|
||||
examples=[True],
|
||||
)
|
||||
verbose: bool = Field(
|
||||
default=False,
|
||||
description="Whether persistence should emit verbose diagnostic output.",
|
||||
examples=[False],
|
||||
)
|
||||
persistence: Any = Field(
|
||||
default=None,
|
||||
description="Persistence backend configuration or import reference.",
|
||||
examples=[{"ref": "my_project.persistence:FlowStore"}],
|
||||
)
|
||||
|
||||
@field_serializer("persistence", when_used="json")
|
||||
def _serialize_persistence(self, value: Any) -> Any:
|
||||
@@ -238,15 +273,53 @@ class FlowHumanFeedbackDefinition(BaseModel):
|
||||
a serialized config (``llm``) or a ``module:qualname`` ref (``provider``).
|
||||
"""
|
||||
|
||||
message: str
|
||||
emit: list[str] | None = None
|
||||
llm: Any = "gpt-4o-mini"
|
||||
default_outcome: str | None = None
|
||||
metadata: dict[str, Any] | None = None
|
||||
provider: Any = None
|
||||
learn: bool = False
|
||||
learn_source: str = "hitl"
|
||||
learn_strict: bool = False
|
||||
message: str = Field(
|
||||
description="Prompt shown to the human reviewer when feedback is requested.",
|
||||
examples=["Review the research summary before publishing."],
|
||||
)
|
||||
emit: list[str] | None = Field(
|
||||
default=None,
|
||||
description=(
|
||||
"Allowed feedback outcomes. When set, the method routes like a router "
|
||||
"using the selected outcome."
|
||||
),
|
||||
examples=[["approved", "revise"]],
|
||||
)
|
||||
llm: Any = Field(
|
||||
default="gpt-4o-mini",
|
||||
description="LLM configuration used to assist or process human feedback.",
|
||||
examples=["gpt-4o-mini"],
|
||||
)
|
||||
default_outcome: str | None = Field(
|
||||
default=None,
|
||||
description="Outcome to use when feedback cannot be collected.",
|
||||
examples=["revise"],
|
||||
)
|
||||
metadata: dict[str, Any] | None = Field(
|
||||
default=None,
|
||||
description="Serializable metadata attached to the feedback request.",
|
||||
examples=[{"team": "research"}],
|
||||
)
|
||||
provider: Any = Field(
|
||||
default=None,
|
||||
description="Feedback provider configuration or import reference.",
|
||||
examples=["my_project.feedback:provider"],
|
||||
)
|
||||
learn: bool = Field(
|
||||
default=False,
|
||||
description="Whether feedback should be recorded for later learning workflows.",
|
||||
examples=[True],
|
||||
)
|
||||
learn_source: str = Field(
|
||||
default="hitl",
|
||||
description="Source label attached to learned feedback records.",
|
||||
examples=["hitl"],
|
||||
)
|
||||
learn_strict: bool = Field(
|
||||
default=False,
|
||||
description="Whether learning should enforce strict validation of feedback data.",
|
||||
examples=[False],
|
||||
)
|
||||
|
||||
@field_serializer("llm", when_used="json")
|
||||
def _serialize_llm(self, value: Any) -> dict[str, Any] | str | None:
|
||||
@@ -266,30 +339,89 @@ class FlowHumanFeedbackDefinition(BaseModel):
|
||||
class FlowCodeActionDefinition(BaseModel):
|
||||
"""A Flow method action that executes importable Python code."""
|
||||
|
||||
model_config = ConfigDict(populate_by_name=True, extra="forbid")
|
||||
model_config = ConfigDict(
|
||||
populate_by_name=True,
|
||||
extra="forbid",
|
||||
)
|
||||
|
||||
call: TypingLiteral["code"] = "code"
|
||||
ref: str
|
||||
with_: dict[str, Any] | None = Field(default=None, alias="with")
|
||||
call: Literal["code"] = Field(
|
||||
default="code",
|
||||
description="Action discriminator. Use code to call importable Python.",
|
||||
examples=["code"],
|
||||
)
|
||||
ref: str = Field(
|
||||
description="Import reference for the callable, formatted as module:qualname.",
|
||||
examples=["my_project.flows:normalize_topic"],
|
||||
)
|
||||
with_: dict[str, Any] | None = Field(
|
||||
default=None,
|
||||
alias="with",
|
||||
description="Keyword arguments passed to the callable after expression rendering.",
|
||||
examples=[{"topic": "${state.topic}"}],
|
||||
)
|
||||
|
||||
|
||||
class FlowToolActionDefinition(BaseModel):
|
||||
"""A Flow method action that invokes a CrewAI tool."""
|
||||
|
||||
model_config = ConfigDict(populate_by_name=True, extra="forbid")
|
||||
model_config = ConfigDict(
|
||||
populate_by_name=True,
|
||||
extra="forbid",
|
||||
)
|
||||
|
||||
call: TypingLiteral["tool"]
|
||||
ref: str
|
||||
with_: dict[str, Any] | None = Field(default=None, alias="with")
|
||||
call: Literal["tool"] = Field(
|
||||
description="Action discriminator. Use tool to instantiate and run a CrewAI tool.",
|
||||
examples=["tool"],
|
||||
)
|
||||
ref: str = Field(
|
||||
description="Import reference for a BaseTool class, formatted as module:qualname.",
|
||||
examples=["my_project.tools:SearchTool"],
|
||||
)
|
||||
with_: dict[str, Any] | None = Field(
|
||||
default=None,
|
||||
alias="with",
|
||||
description="Tool input arguments after expression rendering.",
|
||||
examples=[{"query": "${outputs.normalize_topic}", "limit": 5}],
|
||||
)
|
||||
|
||||
|
||||
class FlowCrewActionDefinition(BaseModel):
|
||||
"""A Flow method action that builds and kicks off a CrewAI crew."""
|
||||
|
||||
model_config = ConfigDict(populate_by_name=True, extra="forbid")
|
||||
model_config = ConfigDict(
|
||||
populate_by_name=True,
|
||||
extra="forbid",
|
||||
)
|
||||
|
||||
call: TypingLiteral["crew"]
|
||||
with_: CrewDefinition = Field(alias="with")
|
||||
call: Literal["crew"] = Field(
|
||||
description="Action discriminator. Use crew to run an inline Crew definition.",
|
||||
examples=["crew"],
|
||||
)
|
||||
with_: CrewDefinition = Field(
|
||||
alias="with",
|
||||
description="Inline Crew definition to load and execute for this action.",
|
||||
examples=[
|
||||
{
|
||||
"name": "inline_research",
|
||||
"agents": {
|
||||
"researcher": {
|
||||
"role": "Researcher",
|
||||
"goal": "Research {topic}",
|
||||
"backstory": "Knows the domain.",
|
||||
}
|
||||
},
|
||||
"tasks": [
|
||||
{
|
||||
"name": "research_task",
|
||||
"description": "Research {topic}",
|
||||
"expected_output": "Findings about {topic}",
|
||||
"agent": "researcher",
|
||||
}
|
||||
],
|
||||
"inputs": {"topic": "${state.topic}"},
|
||||
}
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
class FlowExpressionActionDefinition(BaseModel):
|
||||
@@ -297,66 +429,141 @@ class FlowExpressionActionDefinition(BaseModel):
|
||||
|
||||
model_config = ConfigDict(extra="forbid")
|
||||
|
||||
call: TypingLiteral["expression"]
|
||||
expr: str
|
||||
call: Literal["expression"] = Field(
|
||||
description="Action discriminator. Use expression to evaluate a CEL expression.",
|
||||
examples=["expression"],
|
||||
)
|
||||
expr: str = Field(
|
||||
description="CEL expression evaluated against state, outputs, and local context.",
|
||||
examples=["state.topic", "outputs.normalize_topic"],
|
||||
)
|
||||
|
||||
|
||||
FlowInnerActionDefinition = (
|
||||
class FlowScriptActionDefinition(BaseModel):
|
||||
"""A Flow method action that executes trusted inline Python."""
|
||||
|
||||
model_config = ConfigDict(extra="forbid")
|
||||
|
||||
call: Literal["script"] = Field(
|
||||
description="Action discriminator. Use script to execute trusted inline Python.",
|
||||
examples=["script"],
|
||||
)
|
||||
code: str = Field(
|
||||
description=(
|
||||
"Trusted Python source executed as a generated function. Runtime values are "
|
||||
"passed as state, outputs, input, and item; they are not interpolated into "
|
||||
"the source. This is not sandboxed."
|
||||
),
|
||||
examples=[
|
||||
"state['normalized_topic'] = input.strip()\n"
|
||||
"return state['normalized_topic']"
|
||||
],
|
||||
)
|
||||
language: Literal["python"] = Field(
|
||||
default="python",
|
||||
description="Script language. Only python is currently supported.",
|
||||
examples=["python"],
|
||||
)
|
||||
|
||||
|
||||
FlowAtomicActionDefinition: TypeAlias = Annotated[
|
||||
FlowCodeActionDefinition
|
||||
| FlowToolActionDefinition
|
||||
| FlowCrewActionDefinition
|
||||
| FlowExpressionActionDefinition
|
||||
)
|
||||
| FlowScriptActionDefinition,
|
||||
Field(discriminator="call"),
|
||||
]
|
||||
|
||||
|
||||
class FlowEachInnerActionDefinition(RootModel[dict[str, FlowInnerActionDefinition]]):
|
||||
"""One named action inside an ``each`` composite action."""
|
||||
class FlowEachStepDefinition(BaseModel):
|
||||
"""One named step inside an ``each`` composite action."""
|
||||
|
||||
model_config = ConfigDict(
|
||||
populate_by_name=True,
|
||||
extra="forbid",
|
||||
)
|
||||
|
||||
name: str = Field(
|
||||
description="Step name used to reference this step's output.",
|
||||
examples=["clean"],
|
||||
)
|
||||
if_: str | None = Field(
|
||||
default=None,
|
||||
alias="if",
|
||||
description=(
|
||||
"Optional CEL expression evaluated against state, outputs, and local "
|
||||
"context. When present, the step runs only if the expression evaluates "
|
||||
"to true."
|
||||
),
|
||||
examples=["item.kind == 'invoice'"],
|
||||
)
|
||||
action: FlowAtomicActionDefinition = Field(
|
||||
description="Atomic action to run for this step.",
|
||||
examples=[{"call": "script", "code": "return item.strip()"}],
|
||||
)
|
||||
|
||||
@model_validator(mode="after")
|
||||
def _validate_action_mapping(self) -> FlowEachInnerActionDefinition:
|
||||
if len(self.root) != 1:
|
||||
raise ValueError("each.do entries must be one-key mappings")
|
||||
_validate_step_name(self.name, field="each.do action names")
|
||||
def _validate_step_name(self) -> FlowEachStepDefinition:
|
||||
_validate_step_name(self.name, field="each.do step names")
|
||||
return self
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return next(iter(self.root))
|
||||
|
||||
@property
|
||||
def action(self) -> FlowInnerActionDefinition:
|
||||
return next(iter(self.root.values()))
|
||||
|
||||
|
||||
class FlowEachActionDefinition(BaseModel):
|
||||
"""A composite action that runs a sequential mini-pipeline for each item."""
|
||||
|
||||
model_config = ConfigDict(populate_by_name=True, extra="forbid")
|
||||
model_config = ConfigDict(
|
||||
populate_by_name=True,
|
||||
extra="forbid",
|
||||
)
|
||||
|
||||
call: TypingLiteral["each"]
|
||||
in_: str = Field(alias="in")
|
||||
do: list[FlowEachInnerActionDefinition]
|
||||
call: Literal["each"] = Field(
|
||||
description=(
|
||||
"Action discriminator. Use each to run a sequence of actions for every "
|
||||
"item in an input list."
|
||||
),
|
||||
examples=["each"],
|
||||
)
|
||||
in_: str = Field(
|
||||
alias="in",
|
||||
description="CEL expression that must evaluate to the list to iterate.",
|
||||
examples=["state.rows"],
|
||||
)
|
||||
do: list[FlowEachStepDefinition] = Field(
|
||||
description=(
|
||||
"Ordered steps to run for each item. Each step has a name, optional "
|
||||
"if expression, and atomic action."
|
||||
),
|
||||
examples=[
|
||||
[
|
||||
{
|
||||
"name": "clean",
|
||||
"action": {"call": "script", "code": "return item.strip()"},
|
||||
},
|
||||
{
|
||||
"name": "tag",
|
||||
"if": "outputs.clean != ''",
|
||||
"action": {"call": "expression", "expr": "outputs.clean"},
|
||||
},
|
||||
]
|
||||
],
|
||||
)
|
||||
|
||||
@model_validator(mode="after")
|
||||
def _validate_inner_action_list(self) -> FlowEachActionDefinition:
|
||||
def _validate_step_list(self) -> FlowEachActionDefinition:
|
||||
if not self.do:
|
||||
raise ValueError("each.do must contain at least one action")
|
||||
|
||||
seen: set[str] = set()
|
||||
for inner_action in self.do:
|
||||
name = inner_action.name
|
||||
if name in seen:
|
||||
raise ValueError(f"each.do action names must be unique: {name!r}")
|
||||
seen.add(name)
|
||||
raise ValueError("each.do must contain at least one step")
|
||||
|
||||
_validate_step_list(self.do, field="each.do")
|
||||
return self
|
||||
|
||||
|
||||
FlowActionDefinition = (
|
||||
FlowActionDefinition: TypeAlias = (
|
||||
FlowCodeActionDefinition
|
||||
| FlowToolActionDefinition
|
||||
| FlowCrewActionDefinition
|
||||
| FlowExpressionActionDefinition
|
||||
| FlowScriptActionDefinition
|
||||
| FlowEachActionDefinition
|
||||
)
|
||||
|
||||
@@ -364,14 +571,48 @@ FlowActionDefinition = (
|
||||
class FlowMethodDefinition(BaseModel):
|
||||
"""Static definition of one Flow method and its execution roles."""
|
||||
|
||||
description: str | None = None
|
||||
do: FlowActionDefinition
|
||||
start: bool | FlowDefinitionCondition | None = None
|
||||
listen: FlowDefinitionCondition | None = None
|
||||
router: bool = False
|
||||
emit: list[str] | None = None
|
||||
human_feedback: FlowHumanFeedbackDefinition | None = None
|
||||
persist: FlowPersistenceDefinition | None = None
|
||||
description: str | None = Field(
|
||||
default=None,
|
||||
description="Human-readable summary of what this method does.",
|
||||
examples=["Normalize the incoming topic."],
|
||||
)
|
||||
do: FlowActionDefinition = Field(
|
||||
description="Action executed when this method runs.",
|
||||
examples=[{"call": "script", "code": "return input.strip()"}],
|
||||
)
|
||||
start: bool | FlowDefinitionCondition | None = Field(
|
||||
default=None,
|
||||
description=(
|
||||
"Marks a start method. True starts unconditionally; a condition starts "
|
||||
"when the kickoff inputs or events satisfy it."
|
||||
),
|
||||
examples=[True],
|
||||
)
|
||||
listen: FlowDefinitionCondition | None = Field(
|
||||
default=None,
|
||||
description="Trigger condition that runs this method after upstream events.",
|
||||
examples=["seed", {"or": ["approved", "revise"]}],
|
||||
)
|
||||
router: bool = Field(
|
||||
default=False,
|
||||
description="Whether the method output should be treated as the next event name.",
|
||||
examples=[True],
|
||||
)
|
||||
emit: list[str] | None = Field(
|
||||
default=None,
|
||||
description="Declared router events this method may emit.",
|
||||
examples=[["approved", "revise"]],
|
||||
)
|
||||
human_feedback: FlowHumanFeedbackDefinition | None = Field(
|
||||
default=None,
|
||||
description="Optional human feedback step applied after the method action.",
|
||||
examples=[{"message": "Review the research summary before publishing."}],
|
||||
)
|
||||
persist: FlowPersistenceDefinition | None = Field(
|
||||
default=None,
|
||||
description="Method-level persistence override.",
|
||||
examples=[{"enabled": True}],
|
||||
)
|
||||
|
||||
@model_validator(mode="after")
|
||||
def _canonicalize_human_feedback_routing(self) -> FlowMethodDefinition:
|
||||
@@ -397,19 +638,57 @@ class FlowMethodDefinition(BaseModel):
|
||||
class FlowDefinition(BaseModel):
|
||||
"""Static, serializable definition of a Flow."""
|
||||
|
||||
model_config = ConfigDict(populate_by_name=True, arbitrary_types_allowed=True)
|
||||
|
||||
schema_: TypingLiteral["crewai.flow/v1"] = Field(
|
||||
default="crewai.flow/v1", alias="schema"
|
||||
model_config = ConfigDict(
|
||||
populate_by_name=True,
|
||||
arbitrary_types_allowed=True,
|
||||
)
|
||||
|
||||
schema_: Literal["crewai.flow/v1"] = Field(
|
||||
default="crewai.flow/v1",
|
||||
alias="schema",
|
||||
description="Flow Definition schema identifier and version.",
|
||||
examples=["crewai.flow/v1"],
|
||||
)
|
||||
name: str = Field(
|
||||
description="Unique flow name used in logs, events, and traces.",
|
||||
examples=["ResearchFlow"],
|
||||
)
|
||||
description: str | None = Field(
|
||||
default=None,
|
||||
description="Human-readable summary of the flow.",
|
||||
examples=["Normalize a topic and prepare it for research."],
|
||||
)
|
||||
state: FlowStateDefinition | None = Field(
|
||||
default=None,
|
||||
description="State contract for kickoff inputs and runtime state.",
|
||||
examples=[{"type": "dict", "default": {"topic": "AI agents"}}],
|
||||
)
|
||||
config: FlowConfigDefinition = Field(
|
||||
default_factory=FlowConfigDefinition,
|
||||
description="Serializable flow-level runtime configuration.",
|
||||
examples=[{"stream": True, "max_method_calls": 20}],
|
||||
)
|
||||
persist: FlowPersistenceDefinition | None = Field(
|
||||
default=None,
|
||||
description="Flow-level persistence configuration.",
|
||||
examples=[{"enabled": True}],
|
||||
)
|
||||
conversational: FlowConversationalDefinition | None = Field(
|
||||
default=None,
|
||||
description="Conversational flow configuration, when the flow supports chat.",
|
||||
)
|
||||
methods: dict[str, FlowMethodDefinition] = Field(
|
||||
default_factory=dict,
|
||||
description="Mapping of method names to method definitions.",
|
||||
examples=[
|
||||
{
|
||||
"seed": {
|
||||
"start": True,
|
||||
"do": {"call": "expression", "expr": "state.topic"},
|
||||
}
|
||||
}
|
||||
],
|
||||
)
|
||||
name: str
|
||||
description: str | None = None
|
||||
state: FlowStateDefinition | None = None
|
||||
config: FlowConfigDefinition = Field(default_factory=FlowConfigDefinition)
|
||||
persist: FlowPersistenceDefinition | None = None
|
||||
conversational: FlowConversationalDefinition | None = None
|
||||
methods: dict[str, FlowMethodDefinition] = Field(default_factory=dict)
|
||||
diagnostics: list[FlowDefinitionDiagnostic] = Field(default_factory=list)
|
||||
|
||||
@model_validator(mode="after")
|
||||
def _validate_method_names(self) -> FlowDefinition:
|
||||
@@ -436,13 +715,9 @@ class FlowDefinition(BaseModel):
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: dict[str, Any]) -> FlowDefinition:
|
||||
"""Load a definition from a dictionary and attach diagnostics."""
|
||||
serialized_diagnostics = _deserialize_diagnostics(data.get("diagnostics", []))
|
||||
"""Load a definition from a dictionary."""
|
||||
definition = cls.model_validate(data)
|
||||
definition.diagnostics = _merge_diagnostics(
|
||||
serialized_diagnostics, definition.validate_contract()
|
||||
)
|
||||
definition.log_diagnostics()
|
||||
log_flow_definition_issues(definition)
|
||||
return definition
|
||||
|
||||
@classmethod
|
||||
@@ -463,122 +738,90 @@ class FlowDefinition(BaseModel):
|
||||
"""Return the JSON Schema for the Flow Definition contract."""
|
||||
return cls.model_json_schema(by_alias=True)
|
||||
|
||||
def validate_contract(self) -> list[FlowDefinitionDiagnostic]:
|
||||
"""Validate the static contract without rejecting dynamic routing."""
|
||||
diagnostics: list[FlowDefinitionDiagnostic] = []
|
||||
for method_name, method in self.methods.items():
|
||||
path = f"methods.{method_name}"
|
||||
if method.router and not method.is_start and method.listen is None:
|
||||
diagnostics.append(
|
||||
FlowDefinitionDiagnostic(
|
||||
code="router_without_trigger",
|
||||
severity="error",
|
||||
path=path,
|
||||
message="router: true requires either start or listen",
|
||||
)
|
||||
)
|
||||
if method.emit and not method.router:
|
||||
diagnostics.append(
|
||||
FlowDefinitionDiagnostic(
|
||||
code="emit_without_router",
|
||||
path=f"{path}.emit",
|
||||
message="emit is only used by routers to declare downstream events",
|
||||
)
|
||||
)
|
||||
if method.human_feedback:
|
||||
human_feedback_config = method.human_feedback
|
||||
if human_feedback_config.emit and not human_feedback_config.llm:
|
||||
diagnostics.append(
|
||||
FlowDefinitionDiagnostic(
|
||||
code="human_feedback_llm_required",
|
||||
severity="error",
|
||||
path=f"{path}.human_feedback.llm",
|
||||
message="llm is required when human_feedback.emit is set",
|
||||
)
|
||||
)
|
||||
if (
|
||||
human_feedback_config.default_outcome is not None
|
||||
and not human_feedback_config.emit
|
||||
):
|
||||
diagnostics.append(
|
||||
FlowDefinitionDiagnostic(
|
||||
code="human_feedback_default_requires_emit",
|
||||
severity="error",
|
||||
path=f"{path}.human_feedback.default_outcome",
|
||||
message="default_outcome requires human_feedback.emit",
|
||||
)
|
||||
)
|
||||
elif (
|
||||
human_feedback_config.default_outcome is not None
|
||||
and human_feedback_config.emit
|
||||
):
|
||||
if (
|
||||
human_feedback_config.default_outcome
|
||||
not in human_feedback_config.emit
|
||||
):
|
||||
diagnostics.append(
|
||||
FlowDefinitionDiagnostic(
|
||||
code="human_feedback_default_not_in_emit",
|
||||
severity="error",
|
||||
path=f"{path}.human_feedback.default_outcome",
|
||||
message="default_outcome must be one of human_feedback.emit",
|
||||
)
|
||||
)
|
||||
|
||||
return diagnostics
|
||||
|
||||
def with_diagnostics(self) -> FlowDefinition:
|
||||
"""Attach fresh diagnostics and return this definition."""
|
||||
self.diagnostics = self.validate_contract()
|
||||
self.log_diagnostics()
|
||||
return self
|
||||
|
||||
def log_diagnostics(self) -> None:
|
||||
"""Emit all attached diagnostics through the flow definition logger."""
|
||||
_log_flow_definition_diagnostics(self.name, self.diagnostics)
|
||||
|
||||
|
||||
def _log_flow_definition_diagnostics(
|
||||
definition_name: str,
|
||||
diagnostics: list[FlowDefinitionDiagnostic],
|
||||
) -> None:
|
||||
for diagnostic in diagnostics:
|
||||
level = logging.ERROR if diagnostic.severity == "error" else logging.WARNING
|
||||
path = f" at {diagnostic.path}" if diagnostic.path else ""
|
||||
logger.log(
|
||||
level,
|
||||
"Flow definition diagnostic for %s%s [%s]: %s",
|
||||
definition_name,
|
||||
path,
|
||||
diagnostic.code,
|
||||
diagnostic.message,
|
||||
)
|
||||
|
||||
|
||||
def _deserialize_diagnostics(value: Any) -> list[FlowDefinitionDiagnostic]:
|
||||
return [FlowDefinitionDiagnostic.model_validate(item) for item in value or []]
|
||||
|
||||
|
||||
def _validate_step_name(name: str, *, field: str) -> None:
|
||||
if not isinstance(name, str) or not _STEP_NAME_PATTERN.fullmatch(name):
|
||||
raise ValueError(f"{field} must match {_STEP_NAME_PATTERN.pattern}")
|
||||
|
||||
|
||||
def _merge_diagnostics(
|
||||
*diagnostic_groups: list[FlowDefinitionDiagnostic],
|
||||
) -> list[FlowDefinitionDiagnostic]:
|
||||
diagnostics: list[FlowDefinitionDiagnostic] = []
|
||||
seen: set[tuple[str, str, str | None, str]] = set()
|
||||
for group in diagnostic_groups:
|
||||
for diagnostic in group:
|
||||
key = (
|
||||
diagnostic.code,
|
||||
diagnostic.severity,
|
||||
diagnostic.path,
|
||||
diagnostic.message,
|
||||
def _validate_step_list(steps: list[FlowEachStepDefinition], *, field: str) -> None:
|
||||
seen: set[str] = set()
|
||||
for step in steps:
|
||||
name = step.name
|
||||
if name in seen:
|
||||
raise ValueError(f"{field} step names must be unique: {name!r}")
|
||||
seen.add(name)
|
||||
|
||||
|
||||
def log_flow_definition_issues(definition: FlowDefinition) -> None:
|
||||
for method_name, method in definition.methods.items():
|
||||
path = f"methods.{method_name}"
|
||||
if method.router and not method.is_start and method.listen is None:
|
||||
_log_flow_definition_issue(
|
||||
definition.name,
|
||||
code="router_without_trigger",
|
||||
severity="error",
|
||||
path=path,
|
||||
message="router: true requires either start or listen",
|
||||
)
|
||||
if key in seen:
|
||||
continue
|
||||
seen.add(key)
|
||||
diagnostics.append(diagnostic)
|
||||
return diagnostics
|
||||
if method.emit and not method.router:
|
||||
_log_flow_definition_issue(
|
||||
definition.name,
|
||||
code="emit_without_router",
|
||||
path=f"{path}.emit",
|
||||
message="emit is only used by routers to declare downstream events",
|
||||
)
|
||||
if method.human_feedback:
|
||||
human_feedback_config = method.human_feedback
|
||||
if human_feedback_config.emit and not human_feedback_config.llm:
|
||||
_log_flow_definition_issue(
|
||||
definition.name,
|
||||
code="human_feedback_llm_required",
|
||||
severity="error",
|
||||
path=f"{path}.human_feedback.llm",
|
||||
message="llm is required when human_feedback.emit is set",
|
||||
)
|
||||
if (
|
||||
human_feedback_config.default_outcome is not None
|
||||
and not human_feedback_config.emit
|
||||
):
|
||||
_log_flow_definition_issue(
|
||||
definition.name,
|
||||
code="human_feedback_default_requires_emit",
|
||||
severity="error",
|
||||
path=f"{path}.human_feedback.default_outcome",
|
||||
message="default_outcome requires human_feedback.emit",
|
||||
)
|
||||
elif (
|
||||
human_feedback_config.default_outcome is not None
|
||||
and human_feedback_config.emit
|
||||
and human_feedback_config.default_outcome
|
||||
not in human_feedback_config.emit
|
||||
):
|
||||
_log_flow_definition_issue(
|
||||
definition.name,
|
||||
code="human_feedback_default_not_in_emit",
|
||||
severity="error",
|
||||
path=f"{path}.human_feedback.default_outcome",
|
||||
message="default_outcome must be one of human_feedback.emit",
|
||||
)
|
||||
|
||||
|
||||
def _log_flow_definition_issue(
|
||||
definition_name: str,
|
||||
*,
|
||||
code: str,
|
||||
message: str,
|
||||
severity: Literal["warning", "error"] = "warning",
|
||||
path: str | None = None,
|
||||
) -> None:
|
||||
level = logging.ERROR if severity == "error" else logging.WARNING
|
||||
location = f" at {path}" if path else ""
|
||||
logger.log(
|
||||
level,
|
||||
"Flow definition issue for %s%s [%s]: %s",
|
||||
definition_name,
|
||||
location,
|
||||
code,
|
||||
message,
|
||||
)
|
||||
|
||||
@@ -121,7 +121,7 @@ from crewai.flow.human_feedback import (
|
||||
)
|
||||
from crewai.flow.input_provider import InputProvider
|
||||
from crewai.flow.persistence.base import FlowPersistence
|
||||
from crewai.flow.runtime._actions import build_action
|
||||
from crewai.flow.runtime._actions import FlowScriptExecutionDisabledError, build_action
|
||||
from crewai.flow.runtime._refs import resolve_instance_ref, resolve_ref
|
||||
from crewai.flow.types import (
|
||||
FlowExecutionData,
|
||||
@@ -1090,6 +1090,8 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
def build(name: str, definition: FlowMethodDefinition) -> Callable[..., Any]:
|
||||
try:
|
||||
return build_action(self, definition.do)
|
||||
except FlowScriptExecutionDisabledError:
|
||||
raise
|
||||
except Exception as e:
|
||||
unresolved.append(f"{name}: {e}")
|
||||
return lambda *args, **kwargs: None
|
||||
|
||||
@@ -2,10 +2,12 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import ast
|
||||
import asyncio
|
||||
from collections.abc import Callable
|
||||
from collections.abc import Awaitable, Callable
|
||||
import contextvars
|
||||
import inspect
|
||||
import os
|
||||
from typing import TYPE_CHECKING, Any, Protocol, cast
|
||||
|
||||
from crewai.flow.flow_definition import (
|
||||
@@ -13,11 +15,13 @@ from crewai.flow.flow_definition import (
|
||||
FlowCodeActionDefinition,
|
||||
FlowCrewActionDefinition,
|
||||
FlowEachActionDefinition,
|
||||
FlowEachInnerActionDefinition,
|
||||
FlowEachStepDefinition,
|
||||
FlowExpressionActionDefinition,
|
||||
FlowScriptActionDefinition,
|
||||
FlowToolActionDefinition,
|
||||
)
|
||||
from crewai.flow.runtime._expressions import evaluate_expression, render_with_block
|
||||
from crewai.flow.runtime._outputs import outputs_by_name
|
||||
from crewai.flow.runtime._refs import InvalidRefError, resolve_ref
|
||||
|
||||
|
||||
@@ -25,10 +29,18 @@ if TYPE_CHECKING:
|
||||
from crewai.flow.runtime import Flow
|
||||
|
||||
|
||||
__all__ = ["build_action"]
|
||||
__all__ = ["FlowScriptExecutionDisabledError", "build_action"]
|
||||
|
||||
LocalContext = dict[str, Any]
|
||||
NestedStepRunner = Callable[[LocalContext], Awaitable[Any]]
|
||||
NestedStep = tuple[str, str | None, NestedStepRunner]
|
||||
_LOCAL_CONTEXT_KWARG = "__flow_definition_local_context"
|
||||
_ALLOW_SCRIPT_EXECUTION_ENV_VAR = "CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION"
|
||||
_TRUSTED_SCRIPT_EXECUTION_VALUES = frozenset({"1", "true", "yes"})
|
||||
|
||||
|
||||
class FlowScriptExecutionDisabledError(RuntimeError):
|
||||
"""Raised when a flow definition tries to execute inline script code."""
|
||||
|
||||
|
||||
class _BuiltAction(Protocol):
|
||||
@@ -140,15 +152,76 @@ class ExpressionAction:
|
||||
)
|
||||
|
||||
|
||||
class ScriptAction:
|
||||
definition_type = FlowScriptActionDefinition
|
||||
|
||||
def __init__(self, flow: Flow[Any], definition: FlowScriptActionDefinition) -> None:
|
||||
self.flow = flow
|
||||
self.definition = definition
|
||||
self.handler = self._compile_handler()
|
||||
|
||||
def run(self, *args: Any, **kwargs: Any) -> Any:
|
||||
local_context = _pop_local_context(kwargs)
|
||||
return self.handler(
|
||||
state=self.flow.state,
|
||||
outputs=outputs_by_name(
|
||||
self.flow._method_outputs,
|
||||
local_outputs=local_context.get("outputs") if local_context else None,
|
||||
),
|
||||
input=args[0] if args else None,
|
||||
item=local_context.get("item") if local_context else None,
|
||||
)
|
||||
|
||||
def _compile_handler(self) -> Callable[..., Any]:
|
||||
raw = os.environ.get(_ALLOW_SCRIPT_EXECUTION_ENV_VAR, "")
|
||||
if raw.strip().lower() not in _TRUSTED_SCRIPT_EXECUTION_VALUES:
|
||||
raise FlowScriptExecutionDisabledError(
|
||||
"Flow script execution is disabled by default. "
|
||||
f"Set {_ALLOW_SCRIPT_EXECUTION_ENV_VAR}=1 to enable it only for "
|
||||
"trusted flow definitions."
|
||||
)
|
||||
|
||||
filename = f"crewai.flow.script.{self.flow._definition.name}"
|
||||
module = ast.parse(self.definition.code, filename=filename)
|
||||
function = ast.FunctionDef(
|
||||
name="_flow_script",
|
||||
args=ast.arguments(
|
||||
posonlyargs=[],
|
||||
args=[ast.arg(arg) for arg in ("state", "outputs", "input", "item")],
|
||||
vararg=None,
|
||||
kwonlyargs=[],
|
||||
kw_defaults=[],
|
||||
kwarg=None,
|
||||
defaults=[],
|
||||
),
|
||||
body=module.body or [ast.Pass()],
|
||||
decorator_list=[],
|
||||
returns=None,
|
||||
type_comment=None,
|
||||
type_params=[],
|
||||
)
|
||||
module.body = [function]
|
||||
ast.fix_missing_locations(module)
|
||||
|
||||
# The YAML here is trusted project source authored by the code owner,
|
||||
# so this has the same trust boundary as using custom tools. We
|
||||
# intentionally do not interpolate user input and runtime values are passed
|
||||
# as function arguments. This is still arbitrary trusted Python execution,
|
||||
# so it remains disabled by default behind `CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION`
|
||||
namespace: dict[str, Any] = {"__name__": filename}
|
||||
exec(compile(module, filename, "exec"), namespace) # nosec B102 # noqa: S102
|
||||
return cast(Callable[..., Any], namespace["_flow_script"])
|
||||
|
||||
|
||||
class EachAction:
|
||||
definition_type = FlowEachActionDefinition
|
||||
|
||||
def __init__(self, flow: Flow[Any], definition: FlowEachActionDefinition) -> None:
|
||||
self.flow = flow
|
||||
self.definition = definition
|
||||
self.inner_actions = [
|
||||
(inner_action.name, self._build_inner_action(inner_action))
|
||||
for inner_action in definition.do
|
||||
self.steps: list[NestedStep] = [
|
||||
(step.name, step.if_, self._build_step_action(step))
|
||||
for step in definition.do
|
||||
]
|
||||
|
||||
async def run(self, *_args: Any, **_kwargs: Any) -> list[Any]:
|
||||
@@ -160,22 +233,30 @@ class EachAction:
|
||||
|
||||
for item in items:
|
||||
local_outputs: dict[str, Any] = {}
|
||||
local_context = {"item": item, "outputs": local_outputs}
|
||||
last_output: Any = None
|
||||
for name, run_inner_action in self.inner_actions:
|
||||
last_output = await run_inner_action(
|
||||
{"item": item, "outputs": local_outputs}
|
||||
)
|
||||
for name, condition, run_step_action in self.steps:
|
||||
if condition is not None and not self._condition_matches(
|
||||
condition, local_context
|
||||
):
|
||||
continue
|
||||
|
||||
last_output = await run_step_action(local_context)
|
||||
local_outputs[name] = last_output
|
||||
results.append(last_output)
|
||||
|
||||
return results
|
||||
|
||||
def _build_inner_action(
|
||||
self, inner_action: FlowEachInnerActionDefinition
|
||||
) -> Callable[[LocalContext], Any]:
|
||||
run_action = build_action(self.flow, inner_action.action)
|
||||
def _condition_matches(self, condition: str, local_context: LocalContext) -> bool:
|
||||
result = evaluate_expression(self.flow, condition, local_context=local_context)
|
||||
if not isinstance(result, bool):
|
||||
raise ValueError("if expression must evaluate to a boolean")
|
||||
return result
|
||||
|
||||
async def run_inner_action(local_context: LocalContext) -> Any:
|
||||
def _build_step_action(self, step: FlowEachStepDefinition) -> NestedStepRunner:
|
||||
run_action = build_action(self.flow, step.action)
|
||||
|
||||
async def run_step_action(local_context: LocalContext) -> Any:
|
||||
kwargs = {_LOCAL_CONTEXT_KWARG: local_context}
|
||||
if inspect.iscoroutinefunction(run_action):
|
||||
result = run_action(**kwargs)
|
||||
@@ -190,7 +271,7 @@ class EachAction:
|
||||
result = await result
|
||||
return result
|
||||
|
||||
return run_inner_action
|
||||
return run_step_action
|
||||
|
||||
|
||||
_ACTION_TYPES: tuple[_ActionType, ...] = (
|
||||
@@ -199,6 +280,7 @@ _ACTION_TYPES: tuple[_ActionType, ...] = (
|
||||
ToolAction,
|
||||
CrewAction,
|
||||
ExpressionAction,
|
||||
ScriptAction,
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@ import json
|
||||
import re
|
||||
from typing import TYPE_CHECKING, Any, cast
|
||||
|
||||
from crewai.flow.runtime._outputs import outputs_by_name
|
||||
from crewai.utilities.serialization import to_serializable
|
||||
|
||||
|
||||
@@ -44,7 +45,12 @@ def evaluate_expression(
|
||||
def _expression_context(
|
||||
flow: Flow[Any], local_context: dict[str, Any] | None = None
|
||||
) -> dict[str, Any]:
|
||||
outputs = _outputs_by_name(flow._method_outputs)
|
||||
local_outputs = local_context.get("outputs") if local_context else None
|
||||
outputs = outputs_by_name(
|
||||
flow._method_outputs,
|
||||
local_outputs=local_outputs,
|
||||
serialize=True,
|
||||
)
|
||||
context: dict[str, Any] = {
|
||||
"state": flow._copy_and_serialize_state(),
|
||||
"outputs": outputs,
|
||||
@@ -53,29 +59,12 @@ def _expression_context(
|
||||
local_values = {
|
||||
key: to_serializable(value, max_depth=0)
|
||||
for key, value in local_context.items()
|
||||
if key not in {"outputs", "state"}
|
||||
}
|
||||
local_outputs = local_values.pop("outputs", None)
|
||||
local_values.pop("state", None)
|
||||
context.update(local_values)
|
||||
if local_outputs is not None:
|
||||
if not isinstance(local_outputs, dict):
|
||||
raise TypeError("flow definition local outputs must be a mapping")
|
||||
context["outputs"] = {**outputs, **local_outputs}
|
||||
return context
|
||||
|
||||
|
||||
def _outputs_by_name(method_outputs: list[Any]) -> dict[str, Any]:
|
||||
outputs: dict[str, Any] = {}
|
||||
for entry in method_outputs:
|
||||
method = ""
|
||||
output = entry
|
||||
if isinstance(entry, dict) and "output" in entry:
|
||||
method = str(entry.get("method", ""))
|
||||
output = entry["output"]
|
||||
outputs[method] = to_serializable(output, max_depth=0)
|
||||
return outputs
|
||||
|
||||
|
||||
def _render_value(value: Any, context: dict[str, Any]) -> Any:
|
||||
if isinstance(value, str):
|
||||
return _render_string(value, context)
|
||||
|
||||
40
lib/crewai/src/crewai/flow/runtime/_outputs.py
Normal file
40
lib/crewai/src/crewai/flow/runtime/_outputs.py
Normal file
@@ -0,0 +1,40 @@
|
||||
"""Shared FlowDefinition runtime output helpers."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Mapping
|
||||
from typing import Any, TypedDict
|
||||
|
||||
from crewai.utilities.serialization import to_serializable
|
||||
|
||||
|
||||
class _MethodOutput(TypedDict):
|
||||
method: str
|
||||
output: Any
|
||||
|
||||
|
||||
def outputs_by_name(
|
||||
method_outputs: list[_MethodOutput],
|
||||
*,
|
||||
local_outputs: Mapping[str, Any] | None = None,
|
||||
serialize: bool = False,
|
||||
) -> dict[str, Any]:
|
||||
outputs: dict[str, Any] = {}
|
||||
for entry in method_outputs:
|
||||
outputs[entry["method"]] = _output_value(entry["output"], serialize=serialize)
|
||||
|
||||
if local_outputs is not None:
|
||||
outputs.update(
|
||||
{
|
||||
key: _output_value(output, serialize=serialize)
|
||||
for key, output in local_outputs.items()
|
||||
}
|
||||
)
|
||||
|
||||
return outputs
|
||||
|
||||
|
||||
def _output_value(value: Any, *, serialize: bool) -> Any:
|
||||
if not serialize:
|
||||
return value
|
||||
return to_serializable(value, max_depth=0)
|
||||
@@ -645,14 +645,11 @@ class TestLegacyMethodOutputsRestore:
|
||||
context = _expression_context(restored)
|
||||
assert context["outputs"] == {"": "legacy"}
|
||||
|
||||
def test_raw_legacy_outputs_remain_readable(self) -> None:
|
||||
from crewai.flow.runtime._expressions import _expression_context
|
||||
|
||||
def test_raw_legacy_outputs_property_remains_readable(self) -> None:
|
||||
flow = Flow()
|
||||
flow._method_outputs = ["legacy"]
|
||||
|
||||
assert flow.method_outputs == ["legacy"]
|
||||
assert _expression_context(flow)["outputs"] == {"": "legacy"}
|
||||
|
||||
|
||||
class TestAgentCheckpoint:
|
||||
|
||||
@@ -37,6 +37,7 @@ def test_flow_public_exports_are_explicit():
|
||||
}
|
||||
assert set(flow_definition.__all__) == {
|
||||
"FlowActionDefinition",
|
||||
"FlowAtomicActionDefinition",
|
||||
"FlowCodeActionDefinition",
|
||||
"FlowConfigDefinition",
|
||||
"FlowConversationalDefinition",
|
||||
@@ -44,16 +45,16 @@ def test_flow_public_exports_are_explicit():
|
||||
"FlowCrewActionDefinition",
|
||||
"FlowDefinition",
|
||||
"FlowDefinitionCondition",
|
||||
"FlowDefinitionDiagnostic",
|
||||
"FlowDictStateDefinition",
|
||||
"FlowEachActionDefinition",
|
||||
"FlowEachInnerActionDefinition",
|
||||
"FlowEachStepDefinition",
|
||||
"FlowExpressionActionDefinition",
|
||||
"FlowHumanFeedbackDefinition",
|
||||
"FlowJsonSchemaStateDefinition",
|
||||
"FlowMethodDefinition",
|
||||
"FlowPersistenceDefinition",
|
||||
"FlowPydanticStateDefinition",
|
||||
"FlowScriptActionDefinition",
|
||||
"FlowStateDefinition",
|
||||
"FlowToolActionDefinition",
|
||||
"FlowUnknownStateDefinition",
|
||||
@@ -62,6 +63,117 @@ def test_flow_public_exports_are_explicit():
|
||||
assert "calculate_node_levels" not in flow_visualization.__all__
|
||||
|
||||
|
||||
def test_flow_definition_json_schema_carries_reference_descriptions():
|
||||
schema = flow_definition.FlowDefinition.json_schema()
|
||||
defs = schema["$defs"]
|
||||
|
||||
assert schema["properties"]["schema"]["description"]
|
||||
assert schema["properties"]["methods"]["description"]
|
||||
assert "diagnostics" not in schema["properties"]
|
||||
|
||||
method_properties = defs["FlowMethodDefinition"]["properties"]
|
||||
assert method_properties["do"]["description"] == "Action executed when this method runs."
|
||||
assert "Trigger condition" in method_properties["listen"]["description"]
|
||||
|
||||
script_properties = defs["FlowScriptActionDefinition"]["properties"]
|
||||
assert "trusted inline Python" in script_properties["call"]["description"]
|
||||
assert "not interpolated" in script_properties["code"]["description"]
|
||||
assert "not sandboxed" in script_properties["code"]["description"]
|
||||
|
||||
state_schema = next(
|
||||
branch
|
||||
for branch in schema["properties"]["state"]["anyOf"]
|
||||
if "discriminator" in branch
|
||||
)
|
||||
assert state_schema["discriminator"]["propertyName"] == "type"
|
||||
assert state_schema["discriminator"]["mapping"] == {
|
||||
"dict": "#/$defs/FlowDictStateDefinition",
|
||||
"json_schema": "#/$defs/FlowJsonSchemaStateDefinition",
|
||||
"pydantic": "#/$defs/FlowPydanticStateDefinition",
|
||||
"unknown": "#/$defs/FlowUnknownStateDefinition",
|
||||
}
|
||||
|
||||
dict_state_properties = defs["FlowDictStateDefinition"]["properties"]
|
||||
assert dict_state_properties["type"]["description"]
|
||||
assert "ref" not in dict_state_properties
|
||||
|
||||
json_schema_state_properties = defs["FlowJsonSchemaStateDefinition"]["properties"]
|
||||
assert json_schema_state_properties["json_schema"]["description"]
|
||||
assert "json_schema" in defs["FlowJsonSchemaStateDefinition"]["required"]
|
||||
|
||||
pydantic_state_properties = defs["FlowPydanticStateDefinition"]["properties"]
|
||||
assert "Fallback JSON Schema" in pydantic_state_properties["json_schema"][
|
||||
"description"
|
||||
]
|
||||
|
||||
each_properties = defs["FlowEachActionDefinition"]["properties"]
|
||||
assert "list to iterate" in each_properties["in"]["description"]
|
||||
assert "Ordered steps" in each_properties["do"]["description"]
|
||||
|
||||
step_properties = defs["FlowEachStepDefinition"]["properties"]
|
||||
assert "runs only if" in step_properties["if"]["description"]
|
||||
|
||||
|
||||
def test_flow_definition_json_schema_carries_field_examples_only():
|
||||
schema = flow_definition.FlowDefinition.json_schema()
|
||||
defs = schema["$defs"]
|
||||
|
||||
for model_name in [
|
||||
"FlowDefinition",
|
||||
"FlowCodeActionDefinition",
|
||||
"FlowToolActionDefinition",
|
||||
"FlowCrewActionDefinition",
|
||||
"FlowExpressionActionDefinition",
|
||||
"FlowScriptActionDefinition",
|
||||
"FlowEachActionDefinition",
|
||||
"FlowEachStepDefinition",
|
||||
"FlowMethodDefinition",
|
||||
"FlowDictStateDefinition",
|
||||
"FlowJsonSchemaStateDefinition",
|
||||
"FlowPydanticStateDefinition",
|
||||
"FlowUnknownStateDefinition",
|
||||
"FlowConfigDefinition",
|
||||
"FlowPersistenceDefinition",
|
||||
"FlowHumanFeedbackDefinition",
|
||||
]:
|
||||
model_schema = schema if model_name == "FlowDefinition" else defs[model_name]
|
||||
assert "examples" not in model_schema
|
||||
|
||||
assert schema["properties"]["name"]["examples"] == ["ResearchFlow"]
|
||||
assert schema["properties"]["schema"]["examples"] == ["crewai.flow/v1"]
|
||||
assert schema["properties"]["methods"]["examples"][0]["seed"]["do"] == {
|
||||
"call": "expression",
|
||||
"expr": "state.topic",
|
||||
}
|
||||
|
||||
script_properties = defs["FlowScriptActionDefinition"]["properties"]
|
||||
assert script_properties["call"]["examples"] == ["script"]
|
||||
assert "input.strip()" in script_properties["code"]["examples"][0]
|
||||
assert script_properties["language"]["examples"] == ["python"]
|
||||
|
||||
action_properties = defs["FlowCodeActionDefinition"]["properties"]
|
||||
assert action_properties["ref"]["examples"] == [
|
||||
"my_project.flows:normalize_topic"
|
||||
]
|
||||
assert action_properties["with"]["examples"] == [{"topic": "${state.topic}"}]
|
||||
|
||||
each_properties = defs["FlowEachActionDefinition"]["properties"]
|
||||
assert each_properties["in"]["examples"] == ["state.rows"]
|
||||
assert each_properties["do"]["examples"][0][0]["name"] == "clean"
|
||||
assert each_properties["do"]["examples"][0][0]["action"]["call"] == "script"
|
||||
assert each_properties["do"]["examples"][0][1]["if"] == "outputs.clean != ''"
|
||||
|
||||
step_properties = defs["FlowEachStepDefinition"]["properties"]
|
||||
assert step_properties["if"]["examples"] == ["item.kind == 'invoice'"]
|
||||
|
||||
method_properties = defs["FlowMethodDefinition"]["properties"]
|
||||
assert method_properties["listen"]["examples"] == [
|
||||
"seed",
|
||||
{"or": ["approved", "revise"]},
|
||||
]
|
||||
assert method_properties["emit"]["examples"] == [["approved", "revise"]]
|
||||
|
||||
|
||||
def test_flow_state_definition_uses_discriminated_branches():
|
||||
definition = flow_definition.FlowDefinition.model_validate(
|
||||
{
|
||||
@@ -233,7 +345,7 @@ def test_flow_definition_maps_dsl_to_static_contract():
|
||||
assert review.human_feedback.learn_strict is True
|
||||
|
||||
assert definition.methods["audit"].listen == {"and": ["begin", "process"]}
|
||||
assert definition.diagnostics == []
|
||||
assert "diagnostics" not in definition.to_dict()
|
||||
|
||||
|
||||
def test_flow_definition_excludes_conversational_builtins_for_regular_flows():
|
||||
@@ -315,7 +427,8 @@ def test_flow_definition_uses_collapsed_conversational_router_start():
|
||||
assert methods["route_conversation"].router is True
|
||||
|
||||
|
||||
def test_flow_definition_serializes_human_feedback_metadata():
|
||||
def test_flow_definition_serializes_human_feedback_metadata(caplog):
|
||||
caplog.set_level(logging.WARNING, logger="crewai.flow.dsl._utils")
|
||||
marker = object()
|
||||
|
||||
class MetadataFlow(Flow):
|
||||
@@ -334,9 +447,9 @@ def test_flow_definition_serializes_human_feedback_metadata():
|
||||
assert review.human_feedback is not None
|
||||
assert review.human_feedback.metadata == {"ref": "builtins:dict"}
|
||||
assert any(
|
||||
diagnostic.code == "non_serializable_value"
|
||||
and diagnostic.path == "methods.review.human_feedback.metadata"
|
||||
for diagnostic in definition.diagnostics
|
||||
"methods.review.human_feedback.metadata" in record.message
|
||||
and "not fully serializable" in record.message
|
||||
for record in caplog.records
|
||||
)
|
||||
definition.to_json()
|
||||
|
||||
@@ -481,14 +594,16 @@ def test_each_action_round_trips_json_and_yaml():
|
||||
"in": "state.rows",
|
||||
"do": [
|
||||
{
|
||||
"normalize": {
|
||||
"name": "normalize",
|
||||
"action": {
|
||||
"call": "tool",
|
||||
"ref": "my_tools:NormalizeRowTool",
|
||||
"with": {"row": "${ item }"},
|
||||
}
|
||||
},
|
||||
{
|
||||
"save": {
|
||||
"name": "save",
|
||||
"action": {
|
||||
"call": "code",
|
||||
"ref": "my_flow:save_row",
|
||||
"with": {
|
||||
@@ -575,7 +690,6 @@ def test_flow_definition_allows_dynamic_router_emit():
|
||||
definition = DynamicRouterFlow.flow_definition()
|
||||
|
||||
assert definition.methods["decide"].emit is None
|
||||
assert definition.diagnostics == []
|
||||
|
||||
|
||||
def test_flow_definition_infers_literal_router_emit():
|
||||
@@ -728,16 +842,15 @@ def test_flow_definition_accepts_explicit_router_events():
|
||||
assert definition.methods["decide"].emit == ["left", "right"]
|
||||
|
||||
|
||||
def test_flow_definition_preserves_diagnostics_loaded_from_contract():
|
||||
def test_flow_definition_ignores_legacy_diagnostics_loaded_from_contract():
|
||||
definition = flow_definition.FlowDefinition.from_dict(
|
||||
{
|
||||
"schema": "crewai.flow/v1",
|
||||
"name": "LoadedDiagnosticsFlow",
|
||||
"methods": {
|
||||
"decision": {
|
||||
"do": {"ref": "loaded_flows:LoadedDiagnosticsFlow.decision"},
|
||||
"router": True,
|
||||
"emit": ["continue"],
|
||||
"begin": {
|
||||
"do": {"ref": "loaded_flows:LoadedDiagnosticsFlow.begin"},
|
||||
"start": True,
|
||||
}
|
||||
},
|
||||
"diagnostics": [
|
||||
@@ -757,13 +870,13 @@ def test_flow_definition_preserves_diagnostics_loaded_from_contract():
|
||||
}
|
||||
)
|
||||
|
||||
codes = [diagnostic.code for diagnostic in definition.diagnostics]
|
||||
assert "serialized_warning" in codes
|
||||
assert codes.count("router_without_trigger") == 1
|
||||
assert "diagnostics" not in definition.to_dict()
|
||||
|
||||
|
||||
def test_router_start_false_without_listen_reports_missing_trigger():
|
||||
definition = flow_definition.FlowDefinition.from_dict(
|
||||
def test_router_start_false_without_listen_logs_missing_trigger(caplog):
|
||||
caplog.set_level(logging.ERROR, logger="crewai.flow.flow_definition")
|
||||
|
||||
flow_definition.FlowDefinition.from_dict(
|
||||
{
|
||||
"schema": "crewai.flow/v1",
|
||||
"name": "LoadedFlow",
|
||||
@@ -779,9 +892,10 @@ def test_router_start_false_without_listen_reports_missing_trigger():
|
||||
)
|
||||
|
||||
assert any(
|
||||
diagnostic.code == "router_without_trigger"
|
||||
and diagnostic.path == "methods.decision"
|
||||
for diagnostic in definition.diagnostics
|
||||
record.levelno == logging.ERROR
|
||||
and "router_without_trigger" in record.message
|
||||
and "methods.decision" in record.message
|
||||
for record in caplog.records
|
||||
)
|
||||
|
||||
|
||||
@@ -809,7 +923,7 @@ def test_router_human_feedback_preserves_existing_router_metadata():
|
||||
assert method.human_feedback is not None
|
||||
|
||||
|
||||
def test_dynamic_router_flow_definition_has_no_diagnostics():
|
||||
def test_dynamic_router_flow_definition_allows_dynamic_emit():
|
||||
class LazyDynamicRouterFlow(Flow):
|
||||
@start()
|
||||
def begin(self):
|
||||
@@ -820,7 +934,7 @@ def test_dynamic_router_flow_definition_has_no_diagnostics():
|
||||
return self.state["dynamic_event"]
|
||||
|
||||
definition = LazyDynamicRouterFlow.flow_definition()
|
||||
assert definition.diagnostics == []
|
||||
assert definition.methods["decide"].emit is None
|
||||
|
||||
|
||||
def test_dynamic_router_string_listener_is_valid_contract():
|
||||
@@ -839,7 +953,7 @@ def test_dynamic_router_string_listener_is_valid_contract():
|
||||
|
||||
definition = DynamicRouterListenerFlow.flow_definition()
|
||||
|
||||
assert definition.diagnostics == []
|
||||
assert definition.methods["handle"].listen == "dynamic_event"
|
||||
|
||||
|
||||
def test_static_string_listener_is_allowed_by_contract():
|
||||
@@ -859,7 +973,7 @@ def test_static_string_listener_is_allowed_by_contract():
|
||||
},
|
||||
}
|
||||
)
|
||||
assert definition.diagnostics == []
|
||||
assert definition.methods["handle"].listen == "begni"
|
||||
|
||||
|
||||
def test_start_false_not_classified_as_start_method():
|
||||
@@ -924,10 +1038,10 @@ def test_flow_definition_cache_is_not_reused_by_subclasses():
|
||||
assert set(child_definition.methods) == {"child_step"}
|
||||
|
||||
|
||||
def test_flow_definition_logs_diagnostics_when_loaded_from_contract(caplog):
|
||||
def test_flow_definition_logs_validation_issues_when_loaded_from_contract(caplog):
|
||||
caplog.set_level(logging.WARNING, logger="crewai.flow.flow_definition")
|
||||
|
||||
definition = flow_definition.FlowDefinition.from_dict(
|
||||
flow_definition.FlowDefinition.from_dict(
|
||||
{
|
||||
"schema": "crewai.flow/v1",
|
||||
"name": "LoadedFlow",
|
||||
@@ -941,10 +1055,6 @@ def test_flow_definition_logs_diagnostics_when_loaded_from_contract(caplog):
|
||||
}
|
||||
)
|
||||
|
||||
assert any(
|
||||
diagnostic.code == "router_without_trigger"
|
||||
for diagnostic in definition.diagnostics
|
||||
)
|
||||
assert any(
|
||||
record.levelno == logging.ERROR
|
||||
and "LoadedFlow" in record.message
|
||||
|
||||
@@ -26,6 +26,7 @@ from crewai.flow.flow_config import flow_config
|
||||
from crewai.flow.flow_definition import FlowConfigDefinition, FlowDefinition
|
||||
from crewai.flow.persistence import persist
|
||||
from crewai.flow.persistence.base import FlowPersistence
|
||||
from crewai.flow.runtime._actions import FlowScriptExecutionDisabledError
|
||||
from crewai.state.checkpoint_config import CheckpointConfig
|
||||
from crewai.tools import BaseTool
|
||||
from crewai.types.streaming import FlowStreamingOutput
|
||||
@@ -113,7 +114,7 @@ class EachActionFlow(Flow):
|
||||
except RuntimeError:
|
||||
pass
|
||||
else:
|
||||
raise RuntimeError("inner action ran on the event loop")
|
||||
raise RuntimeError("each step ran on the event loop")
|
||||
|
||||
from crewai.flow.flow_context import current_flow_method_name
|
||||
|
||||
@@ -1080,7 +1081,8 @@ methods:
|
||||
call: each
|
||||
in: state.rows
|
||||
do:
|
||||
- normalize:
|
||||
- name: normalize
|
||||
action:
|
||||
call: code
|
||||
ref: {__name__}:EachActionFlow.normalize_row
|
||||
with:
|
||||
@@ -1096,7 +1098,7 @@ methods:
|
||||
]
|
||||
|
||||
|
||||
def test_each_action_runs_sync_inner_actions_off_event_loop_with_context():
|
||||
def test_each_action_runs_sync_steps_off_event_loop_with_context():
|
||||
yaml_str = f"""
|
||||
schema: crewai.flow/v1
|
||||
name: EachFlow
|
||||
@@ -1106,7 +1108,8 @@ methods:
|
||||
call: each
|
||||
in: state.rows
|
||||
do:
|
||||
- threaded:
|
||||
- name: threaded
|
||||
action:
|
||||
call: code
|
||||
ref: {__name__}:EachActionFlow.require_threaded_context
|
||||
with:
|
||||
@@ -1122,7 +1125,7 @@ methods:
|
||||
assert flow.inner_thread_id != caller_thread_id
|
||||
|
||||
|
||||
def test_each_action_runs_async_tool_results_from_sync_inner_actions():
|
||||
def test_each_action_runs_async_tool_results_from_sync_steps():
|
||||
yaml_str = f"""
|
||||
schema: crewai.flow/v1
|
||||
name: EachFlow
|
||||
@@ -1132,7 +1135,8 @@ methods:
|
||||
call: each
|
||||
in: state.rows
|
||||
do:
|
||||
- async_tool:
|
||||
- name: async_tool
|
||||
action:
|
||||
call: tool
|
||||
ref: {__name__}:AsyncResultTool
|
||||
with:
|
||||
@@ -1145,7 +1149,120 @@ methods:
|
||||
assert flow.kickoff(inputs={"rows": ["a", "b"]}) == ["async:a", "async:b"]
|
||||
|
||||
|
||||
def test_each_action_uses_iteration_outputs_between_nested_actions():
|
||||
def test_script_action_requires_explicit_opt_in():
|
||||
yaml_str = """
|
||||
schema: crewai.flow/v1
|
||||
name: ScriptFlow
|
||||
methods:
|
||||
normalize:
|
||||
do:
|
||||
call: script
|
||||
code: |
|
||||
return "blocked"
|
||||
start: true
|
||||
"""
|
||||
|
||||
with pytest.raises(
|
||||
FlowScriptExecutionDisabledError,
|
||||
match="CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION=1",
|
||||
) as exc_info:
|
||||
Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
|
||||
assert "methods with unresolvable actions" not in str(exc_info.value)
|
||||
|
||||
|
||||
def test_script_action_runs_python_imports_mutates_state_and_returns_value(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
):
|
||||
monkeypatch.setenv("CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION", "1")
|
||||
|
||||
yaml_str = """
|
||||
schema: crewai.flow/v1
|
||||
name: ScriptFlow
|
||||
methods:
|
||||
normalize:
|
||||
do:
|
||||
call: script
|
||||
code: |
|
||||
import math
|
||||
|
||||
state["rounded"] = math.ceil(state["raw_score"])
|
||||
return f"rounded:{state['rounded']}"
|
||||
start: true
|
||||
"""
|
||||
|
||||
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
|
||||
|
||||
assert flow.kickoff(inputs={"raw_score": 3.2}) == "rounded:4"
|
||||
assert flow.state["rounded"] == 4
|
||||
|
||||
|
||||
def test_script_listener_reads_trigger_input_and_outputs(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
):
|
||||
monkeypatch.setenv("CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION", "1")
|
||||
|
||||
yaml_str = """
|
||||
schema: crewai.flow/v1
|
||||
name: ScriptFlow
|
||||
methods:
|
||||
seed:
|
||||
do:
|
||||
call: expression
|
||||
expr: "'alpha'"
|
||||
start: true
|
||||
combine:
|
||||
do:
|
||||
call: script
|
||||
code: |
|
||||
state["input_matches_output"] = input == outputs["seed"]
|
||||
return f"{outputs['seed']}:{input}"
|
||||
listen: seed
|
||||
"""
|
||||
|
||||
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
|
||||
|
||||
assert flow.kickoff() == "alpha:alpha"
|
||||
assert flow.state["input_matches_output"] is True
|
||||
|
||||
|
||||
def test_script_each_action_reads_item_and_step_outputs(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
):
|
||||
monkeypatch.setenv("CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION", "1")
|
||||
|
||||
yaml_str = """
|
||||
schema: crewai.flow/v1
|
||||
name: ScriptEachFlow
|
||||
methods:
|
||||
seed:
|
||||
do:
|
||||
call: expression
|
||||
expr: "'global'"
|
||||
start: true
|
||||
process_rows:
|
||||
do:
|
||||
call: each
|
||||
in: state.rows
|
||||
do:
|
||||
- name: clean
|
||||
action:
|
||||
call: script
|
||||
code: |
|
||||
return item.strip()
|
||||
- name: tag
|
||||
action:
|
||||
call: script
|
||||
code: |
|
||||
return f"{outputs['seed']}:{outputs['clean']}"
|
||||
listen: seed
|
||||
"""
|
||||
|
||||
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
|
||||
|
||||
assert flow.kickoff(inputs={"rows": [" a ", " b "]}) == ["global:a", "global:b"]
|
||||
|
||||
|
||||
def test_each_action_uses_iteration_outputs_between_steps():
|
||||
yaml_str = f"""
|
||||
schema: crewai.flow/v1
|
||||
name: EachFlow
|
||||
@@ -1155,13 +1272,15 @@ methods:
|
||||
call: each
|
||||
in: state.rows
|
||||
do:
|
||||
- normalize:
|
||||
- name: normalize
|
||||
action:
|
||||
call: code
|
||||
ref: {__name__}:EachActionFlow.normalize_row
|
||||
with:
|
||||
row: "${{item}}"
|
||||
prefix: saved
|
||||
- save:
|
||||
- name: save
|
||||
action:
|
||||
call: code
|
||||
ref: {__name__}:EachActionFlow.save_row
|
||||
with:
|
||||
@@ -1178,7 +1297,7 @@ methods:
|
||||
]
|
||||
|
||||
|
||||
def test_each_action_resets_inner_outputs_between_iterations():
|
||||
def test_each_action_resets_step_outputs_between_iterations():
|
||||
yaml_str = """
|
||||
schema: crewai.flow/v1
|
||||
name: EachFlow
|
||||
@@ -1188,10 +1307,12 @@ methods:
|
||||
call: each
|
||||
in: state.rows
|
||||
do:
|
||||
- leak_check:
|
||||
- name: leak_check
|
||||
action:
|
||||
call: expression
|
||||
expr: "has(outputs.previous) ? outputs.previous : 'empty'"
|
||||
- previous:
|
||||
- name: previous
|
||||
action:
|
||||
call: expression
|
||||
expr: item
|
||||
start: true
|
||||
@@ -1205,7 +1326,7 @@ methods:
|
||||
]
|
||||
|
||||
|
||||
def test_each_action_preserves_flow_outputs_and_prefers_inner_outputs():
|
||||
def test_each_action_preserves_flow_outputs_and_prefers_step_outputs():
|
||||
yaml_str = """
|
||||
schema: crewai.flow/v1
|
||||
name: EachFlow
|
||||
@@ -1220,13 +1341,16 @@ methods:
|
||||
call: each
|
||||
in: state.rows
|
||||
do:
|
||||
- before_shadow:
|
||||
- name: before_shadow
|
||||
action:
|
||||
call: expression
|
||||
expr: "outputs.seed + ':' + item"
|
||||
- seed:
|
||||
- name: seed
|
||||
action:
|
||||
call: expression
|
||||
expr: "'local:' + item"
|
||||
- after_shadow:
|
||||
- name: after_shadow
|
||||
action:
|
||||
call: expression
|
||||
expr: "outputs.seed"
|
||||
listen: seed
|
||||
@@ -1244,6 +1368,103 @@ methods:
|
||||
]
|
||||
|
||||
|
||||
def test_each_action_runs_simple_if_clauses():
|
||||
yaml_str = """
|
||||
schema: crewai.flow/v1
|
||||
name: EachIfFlow
|
||||
methods:
|
||||
process_rows:
|
||||
do:
|
||||
call: each
|
||||
in: state.rows
|
||||
do:
|
||||
- name: kind
|
||||
action:
|
||||
call: expression
|
||||
expr: item.kind
|
||||
- name: kept
|
||||
if: "outputs.kind == 'keep'"
|
||||
action:
|
||||
call: expression
|
||||
expr: "'kept:' + item.value"
|
||||
- name: skipped
|
||||
if: "outputs.kind != 'keep'"
|
||||
action:
|
||||
call: expression
|
||||
expr: "'skipped:' + item.value"
|
||||
start: true
|
||||
"""
|
||||
|
||||
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
|
||||
|
||||
assert flow.kickoff(
|
||||
inputs={
|
||||
"rows": [
|
||||
{"kind": "keep", "value": "a"},
|
||||
{"kind": "drop", "value": "b"},
|
||||
]
|
||||
}
|
||||
) == ["kept:a", "skipped:b"]
|
||||
|
||||
|
||||
def test_each_action_skipped_if_keeps_previous_output():
|
||||
yaml_str = """
|
||||
schema: crewai.flow/v1
|
||||
name: EachIfFlow
|
||||
methods:
|
||||
process_rows:
|
||||
do:
|
||||
call: each
|
||||
in: state.rows
|
||||
do:
|
||||
- name: original
|
||||
action:
|
||||
call: expression
|
||||
expr: item.value
|
||||
- name: maybe_included
|
||||
if: item.include
|
||||
action:
|
||||
call: expression
|
||||
expr: "'included:' + item.value"
|
||||
start: true
|
||||
"""
|
||||
|
||||
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
|
||||
|
||||
assert flow.kickoff(
|
||||
inputs={
|
||||
"rows": [
|
||||
{"include": True, "value": "a"},
|
||||
{"include": False, "value": "b"},
|
||||
]
|
||||
}
|
||||
) == ["included:a", "b"]
|
||||
|
||||
|
||||
def test_each_action_if_condition_must_be_boolean():
|
||||
yaml_str = """
|
||||
schema: crewai.flow/v1
|
||||
name: EachIfFlow
|
||||
methods:
|
||||
process_rows:
|
||||
do:
|
||||
call: each
|
||||
in: state.rows
|
||||
do:
|
||||
- name: value
|
||||
if: item.value
|
||||
action:
|
||||
call: expression
|
||||
expr: item.value
|
||||
start: true
|
||||
"""
|
||||
|
||||
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
|
||||
|
||||
with pytest.raises(ValueError, match="if expression must evaluate to a boolean"):
|
||||
flow.kickoff(inputs={"rows": [{"value": "truthy"}]})
|
||||
|
||||
|
||||
def test_each_action_empty_list_returns_empty_and_listener_runs_once():
|
||||
yaml_str = f"""
|
||||
schema: crewai.flow/v1
|
||||
@@ -1254,7 +1475,8 @@ methods:
|
||||
call: each
|
||||
in: state.rows
|
||||
do:
|
||||
- normalize:
|
||||
- name: normalize
|
||||
action:
|
||||
call: code
|
||||
ref: {__name__}:EachActionFlow.normalize_row
|
||||
with:
|
||||
@@ -1303,7 +1525,12 @@ def test_each_action_rejects_non_list_inputs(expr, inputs):
|
||||
"do": {
|
||||
"call": "each",
|
||||
"in": expr,
|
||||
"do": [{"value": {"call": "expression", "expr": "item"}}],
|
||||
"do": [
|
||||
{
|
||||
"name": "value",
|
||||
"action": {"call": "expression", "expr": "item"},
|
||||
}
|
||||
],
|
||||
},
|
||||
}
|
||||
},
|
||||
@@ -1319,15 +1546,25 @@ def test_each_action_rejects_non_list_inputs(expr, inputs):
|
||||
"action_do",
|
||||
[
|
||||
[],
|
||||
[{"first": {"call": "expression", "expr": "item"}, "second": {"call": "expression", "expr": "item"}}],
|
||||
[{"1bad": {"call": "expression", "expr": "item"}}],
|
||||
[{"value": {"call": "expression", "expr": "item"}}],
|
||||
[{"name": "1bad", "action": {"call": "expression", "expr": "item"}}],
|
||||
[{"name": "missing_action"}],
|
||||
[{"action": {"call": "expression", "expr": "item"}}],
|
||||
[
|
||||
{"same": {"call": "expression", "expr": "item"}},
|
||||
{"same": {"call": "expression", "expr": "item"}},
|
||||
{
|
||||
"name": "value",
|
||||
"if": "true",
|
||||
"then": [],
|
||||
"action": {"call": "expression", "expr": "item"},
|
||||
}
|
||||
],
|
||||
[
|
||||
{"name": "same", "action": {"call": "expression", "expr": "item"}},
|
||||
{"name": "same", "action": {"call": "expression", "expr": "item"}},
|
||||
],
|
||||
],
|
||||
)
|
||||
def test_each_action_validates_inner_action_shape(action_do):
|
||||
def test_each_action_validates_step_shape(action_do):
|
||||
with pytest.raises(ValidationError):
|
||||
FlowDefinition.from_dict(
|
||||
{
|
||||
@@ -1347,6 +1584,26 @@ def test_each_action_validates_inner_action_shape(action_do):
|
||||
)
|
||||
|
||||
|
||||
def test_if_clauses_are_rejected_at_method_level():
|
||||
with pytest.raises(ValidationError):
|
||||
FlowDefinition.from_dict(
|
||||
{
|
||||
"schema": "crewai.flow/v1",
|
||||
"name": "TopLevelIfFlow",
|
||||
"methods": {
|
||||
"process": {
|
||||
"start": True,
|
||||
"do": {
|
||||
"call": "expression",
|
||||
"if": "true",
|
||||
"expr": "'ok'",
|
||||
},
|
||||
}
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def test_each_action_rejects_nested_each_actions():
|
||||
with pytest.raises(ValidationError):
|
||||
FlowDefinition.from_dict(
|
||||
@@ -1361,12 +1618,14 @@ def test_each_action_rejects_nested_each_actions():
|
||||
"in": "state.rows",
|
||||
"do": [
|
||||
{
|
||||
"nested": {
|
||||
"name": "nested",
|
||||
"action": {
|
||||
"call": "each",
|
||||
"in": "state.children",
|
||||
"do": [
|
||||
{
|
||||
"child": {
|
||||
"name": "child",
|
||||
"action": {
|
||||
"call": "expression",
|
||||
"expr": "item",
|
||||
}
|
||||
@@ -1392,7 +1651,8 @@ methods:
|
||||
call: each
|
||||
in: state.rows
|
||||
do:
|
||||
- validate:
|
||||
- name: validate
|
||||
action:
|
||||
call: code
|
||||
ref: {__name__}:EachActionFlow.fail_on_bad_row
|
||||
with:
|
||||
|
||||
Reference in New Issue
Block a user