mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-06-17 06:08:22 +00:00
Compare commits
2 Commits
flow-scrip
...
flow-defin
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
51d0ce4cc5 | ||
|
|
b69e5ccc10 |
@@ -12,7 +12,7 @@ from __future__ import annotations
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
from typing import Annotated, Any, Literal as TypingLiteral, TypeAlias
|
||||
from typing import Annotated, Any, Literal, TypeAlias
|
||||
|
||||
from pydantic import (
|
||||
BaseModel,
|
||||
@@ -73,10 +73,24 @@ def _object_ref(value: Any) -> str:
|
||||
class FlowDefinitionDiagnostic(BaseModel):
|
||||
"""A non-fatal Flow Definition build or validation diagnostic."""
|
||||
|
||||
code: str
|
||||
message: str
|
||||
severity: TypingLiteral["warning", "error"] = "warning"
|
||||
path: str | None = None
|
||||
code: str = Field(
|
||||
description="Stable diagnostic identifier for tooling and tests.",
|
||||
examples=["router_without_trigger"],
|
||||
)
|
||||
message: str = Field(
|
||||
description="Human-readable explanation of the diagnostic.",
|
||||
examples=["router: true requires either start or listen"],
|
||||
)
|
||||
severity: Literal["warning", "error"] = Field(
|
||||
default="warning",
|
||||
description="Diagnostic severity. Errors indicate an invalid or incomplete contract.",
|
||||
examples=["error"],
|
||||
)
|
||||
path: str | None = Field(
|
||||
default=None,
|
||||
description="Dot path to the definition field that produced the diagnostic.",
|
||||
examples=["methods.decide"],
|
||||
)
|
||||
|
||||
|
||||
class FlowDictStateDefinition(BaseModel):
|
||||
@@ -84,7 +98,7 @@ class FlowDictStateDefinition(BaseModel):
|
||||
|
||||
model_config = ConfigDict(extra="forbid")
|
||||
|
||||
type: TypingLiteral["dict"] = Field(
|
||||
type: Literal["dict"] = Field(
|
||||
default="dict",
|
||||
description="Plain dictionary state with optional default values.",
|
||||
examples=["dict"],
|
||||
@@ -101,7 +115,7 @@ class FlowPydanticStateDefinition(BaseModel):
|
||||
|
||||
model_config = ConfigDict(extra="forbid")
|
||||
|
||||
type: TypingLiteral["pydantic"] = Field(
|
||||
type: Literal["pydantic"] = Field(
|
||||
default="pydantic",
|
||||
description="Importable Pydantic model used as the Flow state type.",
|
||||
examples=["pydantic"],
|
||||
@@ -136,7 +150,7 @@ class FlowJsonSchemaStateDefinition(BaseModel):
|
||||
|
||||
model_config = ConfigDict(extra="forbid")
|
||||
|
||||
type: TypingLiteral["json_schema"] = Field(
|
||||
type: Literal["json_schema"] = Field(
|
||||
default="json_schema",
|
||||
description="Inline JSON Schema used as the Flow state contract.",
|
||||
examples=["json_schema"],
|
||||
@@ -163,7 +177,7 @@ class FlowUnknownStateDefinition(BaseModel):
|
||||
|
||||
model_config = ConfigDict(extra="forbid")
|
||||
|
||||
type: TypingLiteral["unknown"] = Field(
|
||||
type: Literal["unknown"] = Field(
|
||||
default="unknown",
|
||||
description="Unknown state representation; runtime falls back to dictionary state.",
|
||||
examples=["unknown"],
|
||||
@@ -192,14 +206,46 @@ FlowStateDefinition: TypeAlias = Annotated[
|
||||
class FlowConfigDefinition(BaseModel):
|
||||
"""Serializable Flow-level configuration."""
|
||||
|
||||
tracing: bool | None = None
|
||||
stream: bool = False
|
||||
memory: dict[str, Any] | None = None
|
||||
input_provider: str | None = None
|
||||
suppress_flow_events: bool = False
|
||||
max_method_calls: int = 100
|
||||
defer_trace_finalization: bool = False
|
||||
checkpoint: bool | dict[str, Any] | None = None
|
||||
tracing: bool | None = Field(
|
||||
default=None,
|
||||
description="Override for flow tracing; when omitted, runtime defaults apply.",
|
||||
examples=[True],
|
||||
)
|
||||
stream: bool = Field(
|
||||
default=False,
|
||||
description="Whether the flow should emit streaming events when supported.",
|
||||
examples=[True],
|
||||
)
|
||||
memory: dict[str, Any] | None = Field(
|
||||
default=None,
|
||||
description="Serializable memory configuration passed to flow execution.",
|
||||
examples=[{"enabled": True}],
|
||||
)
|
||||
input_provider: str | None = Field(
|
||||
default=None,
|
||||
description="Import reference or provider key used to supply flow inputs.",
|
||||
examples=["my_project.inputs:load_inputs"],
|
||||
)
|
||||
suppress_flow_events: bool = Field(
|
||||
default=False,
|
||||
description="Disable flow event emission for this definition.",
|
||||
examples=[False],
|
||||
)
|
||||
max_method_calls: int = Field(
|
||||
default=100,
|
||||
description="Maximum number of method executions allowed during one kickoff.",
|
||||
examples=[20],
|
||||
)
|
||||
defer_trace_finalization: bool = Field(
|
||||
default=False,
|
||||
description="Defer trace finalization so callers can complete tracing later.",
|
||||
examples=[False],
|
||||
)
|
||||
checkpoint: bool | dict[str, Any] | None = Field(
|
||||
default=None,
|
||||
description="Checkpointing configuration, or true to use default checkpointing.",
|
||||
examples=[True, {"enabled": True}],
|
||||
)
|
||||
|
||||
|
||||
class FlowPersistenceDefinition(BaseModel):
|
||||
@@ -211,9 +257,21 @@ class FlowPersistenceDefinition(BaseModel):
|
||||
serialized config.
|
||||
"""
|
||||
|
||||
enabled: bool = False
|
||||
verbose: bool = False
|
||||
persistence: Any = None
|
||||
enabled: bool = Field(
|
||||
default=False,
|
||||
description="Whether persistence is enabled for this flow or method.",
|
||||
examples=[True],
|
||||
)
|
||||
verbose: bool = Field(
|
||||
default=False,
|
||||
description="Whether persistence should emit verbose diagnostic output.",
|
||||
examples=[False],
|
||||
)
|
||||
persistence: Any = Field(
|
||||
default=None,
|
||||
description="Persistence backend configuration or import reference.",
|
||||
examples=[{"ref": "my_project.persistence:FlowStore"}],
|
||||
)
|
||||
|
||||
@field_serializer("persistence", when_used="json")
|
||||
def _serialize_persistence(self, value: Any) -> Any:
|
||||
@@ -239,15 +297,53 @@ class FlowHumanFeedbackDefinition(BaseModel):
|
||||
a serialized config (``llm``) or a ``module:qualname`` ref (``provider``).
|
||||
"""
|
||||
|
||||
message: str
|
||||
emit: list[str] | None = None
|
||||
llm: Any = "gpt-4o-mini"
|
||||
default_outcome: str | None = None
|
||||
metadata: dict[str, Any] | None = None
|
||||
provider: Any = None
|
||||
learn: bool = False
|
||||
learn_source: str = "hitl"
|
||||
learn_strict: bool = False
|
||||
message: str = Field(
|
||||
description="Prompt shown to the human reviewer when feedback is requested.",
|
||||
examples=["Review the research summary before publishing."],
|
||||
)
|
||||
emit: list[str] | None = Field(
|
||||
default=None,
|
||||
description=(
|
||||
"Allowed feedback outcomes. When set, the method routes like a router "
|
||||
"using the selected outcome."
|
||||
),
|
||||
examples=[["approved", "revise"]],
|
||||
)
|
||||
llm: Any = Field(
|
||||
default="gpt-4o-mini",
|
||||
description="LLM configuration used to assist or process human feedback.",
|
||||
examples=["gpt-4o-mini"],
|
||||
)
|
||||
default_outcome: str | None = Field(
|
||||
default=None,
|
||||
description="Outcome to use when feedback cannot be collected.",
|
||||
examples=["revise"],
|
||||
)
|
||||
metadata: dict[str, Any] | None = Field(
|
||||
default=None,
|
||||
description="Serializable metadata attached to the feedback request.",
|
||||
examples=[{"team": "research"}],
|
||||
)
|
||||
provider: Any = Field(
|
||||
default=None,
|
||||
description="Feedback provider configuration or import reference.",
|
||||
examples=["my_project.feedback:provider"],
|
||||
)
|
||||
learn: bool = Field(
|
||||
default=False,
|
||||
description="Whether feedback should be recorded for later learning workflows.",
|
||||
examples=[True],
|
||||
)
|
||||
learn_source: str = Field(
|
||||
default="hitl",
|
||||
description="Source label attached to learned feedback records.",
|
||||
examples=["hitl"],
|
||||
)
|
||||
learn_strict: bool = Field(
|
||||
default=False,
|
||||
description="Whether learning should enforce strict validation of feedback data.",
|
||||
examples=[False],
|
||||
)
|
||||
|
||||
@field_serializer("llm", when_used="json")
|
||||
def _serialize_llm(self, value: Any) -> dict[str, Any] | str | None:
|
||||
@@ -267,30 +363,89 @@ class FlowHumanFeedbackDefinition(BaseModel):
|
||||
class FlowCodeActionDefinition(BaseModel):
|
||||
"""A Flow method action that executes importable Python code."""
|
||||
|
||||
model_config = ConfigDict(populate_by_name=True, extra="forbid")
|
||||
model_config = ConfigDict(
|
||||
populate_by_name=True,
|
||||
extra="forbid",
|
||||
)
|
||||
|
||||
call: TypingLiteral["code"] = "code"
|
||||
ref: str
|
||||
with_: dict[str, Any] | None = Field(default=None, alias="with")
|
||||
call: Literal["code"] = Field(
|
||||
default="code",
|
||||
description="Action discriminator. Use code to call importable Python.",
|
||||
examples=["code"],
|
||||
)
|
||||
ref: str = Field(
|
||||
description="Import reference for the callable, formatted as module:qualname.",
|
||||
examples=["my_project.flows:normalize_topic"],
|
||||
)
|
||||
with_: dict[str, Any] | None = Field(
|
||||
default=None,
|
||||
alias="with",
|
||||
description="Keyword arguments passed to the callable after expression rendering.",
|
||||
examples=[{"topic": "${state.topic}"}],
|
||||
)
|
||||
|
||||
|
||||
class FlowToolActionDefinition(BaseModel):
|
||||
"""A Flow method action that invokes a CrewAI tool."""
|
||||
|
||||
model_config = ConfigDict(populate_by_name=True, extra="forbid")
|
||||
model_config = ConfigDict(
|
||||
populate_by_name=True,
|
||||
extra="forbid",
|
||||
)
|
||||
|
||||
call: TypingLiteral["tool"]
|
||||
ref: str
|
||||
with_: dict[str, Any] | None = Field(default=None, alias="with")
|
||||
call: Literal["tool"] = Field(
|
||||
description="Action discriminator. Use tool to instantiate and run a CrewAI tool.",
|
||||
examples=["tool"],
|
||||
)
|
||||
ref: str = Field(
|
||||
description="Import reference for a BaseTool class, formatted as module:qualname.",
|
||||
examples=["my_project.tools:SearchTool"],
|
||||
)
|
||||
with_: dict[str, Any] | None = Field(
|
||||
default=None,
|
||||
alias="with",
|
||||
description="Tool input arguments after expression rendering.",
|
||||
examples=[{"query": "${outputs.normalize_topic}", "limit": 5}],
|
||||
)
|
||||
|
||||
|
||||
class FlowCrewActionDefinition(BaseModel):
|
||||
"""A Flow method action that builds and kicks off a CrewAI crew."""
|
||||
|
||||
model_config = ConfigDict(populate_by_name=True, extra="forbid")
|
||||
model_config = ConfigDict(
|
||||
populate_by_name=True,
|
||||
extra="forbid",
|
||||
)
|
||||
|
||||
call: TypingLiteral["crew"]
|
||||
with_: CrewDefinition = Field(alias="with")
|
||||
call: Literal["crew"] = Field(
|
||||
description="Action discriminator. Use crew to run an inline Crew definition.",
|
||||
examples=["crew"],
|
||||
)
|
||||
with_: CrewDefinition = Field(
|
||||
alias="with",
|
||||
description="Inline Crew definition to load and execute for this action.",
|
||||
examples=[
|
||||
{
|
||||
"name": "inline_research",
|
||||
"agents": {
|
||||
"researcher": {
|
||||
"role": "Researcher",
|
||||
"goal": "Research {topic}",
|
||||
"backstory": "Knows the domain.",
|
||||
}
|
||||
},
|
||||
"tasks": [
|
||||
{
|
||||
"name": "research_task",
|
||||
"description": "Research {topic}",
|
||||
"expected_output": "Findings about {topic}",
|
||||
"agent": "researcher",
|
||||
}
|
||||
],
|
||||
"inputs": {"topic": "${state.topic}"},
|
||||
}
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
class FlowExpressionActionDefinition(BaseModel):
|
||||
@@ -298,8 +453,14 @@ class FlowExpressionActionDefinition(BaseModel):
|
||||
|
||||
model_config = ConfigDict(extra="forbid")
|
||||
|
||||
call: TypingLiteral["expression"]
|
||||
expr: str
|
||||
call: Literal["expression"] = Field(
|
||||
description="Action discriminator. Use expression to evaluate a CEL expression.",
|
||||
examples=["expression"],
|
||||
)
|
||||
expr: str = Field(
|
||||
description="CEL expression evaluated against state, outputs, and local context.",
|
||||
examples=["state.topic", "outputs.normalize_topic"],
|
||||
)
|
||||
|
||||
|
||||
class FlowScriptActionDefinition(BaseModel):
|
||||
@@ -307,7 +468,7 @@ class FlowScriptActionDefinition(BaseModel):
|
||||
|
||||
model_config = ConfigDict(extra="forbid")
|
||||
|
||||
call: TypingLiteral["script"] = Field(
|
||||
call: Literal["script"] = Field(
|
||||
description="Action discriminator. Use script to execute trusted inline Python.",
|
||||
examples=["script"],
|
||||
)
|
||||
@@ -322,7 +483,7 @@ class FlowScriptActionDefinition(BaseModel):
|
||||
"return state['normalized_topic']"
|
||||
],
|
||||
)
|
||||
language: TypingLiteral["python"] = Field(
|
||||
language: Literal["python"] = Field(
|
||||
default="python",
|
||||
description="Script language. Only python is currently supported.",
|
||||
examples=["python"],
|
||||
@@ -341,6 +502,11 @@ FlowInnerActionDefinition = (
|
||||
class FlowEachInnerActionDefinition(RootModel[dict[str, FlowInnerActionDefinition]]):
|
||||
"""One named action inside an ``each`` composite action."""
|
||||
|
||||
root: dict[str, FlowInnerActionDefinition] = Field(
|
||||
description="Single-entry mapping from an inner action name to its action.",
|
||||
examples=[{"clean": {"call": "script", "code": "return item.strip()"}}],
|
||||
)
|
||||
|
||||
@model_validator(mode="after")
|
||||
def _validate_action_mapping(self) -> FlowEachInnerActionDefinition:
|
||||
if len(self.root) != 1:
|
||||
@@ -360,11 +526,35 @@ class FlowEachInnerActionDefinition(RootModel[dict[str, FlowInnerActionDefinitio
|
||||
class FlowEachActionDefinition(BaseModel):
|
||||
"""A composite action that runs a sequential mini-pipeline for each item."""
|
||||
|
||||
model_config = ConfigDict(populate_by_name=True, extra="forbid")
|
||||
model_config = ConfigDict(
|
||||
populate_by_name=True,
|
||||
extra="forbid",
|
||||
)
|
||||
|
||||
call: TypingLiteral["each"]
|
||||
in_: str = Field(alias="in")
|
||||
do: list[FlowEachInnerActionDefinition]
|
||||
call: Literal["each"] = Field(
|
||||
description=(
|
||||
"Action discriminator. Use each to run a sequence of actions for every "
|
||||
"item in an input list."
|
||||
),
|
||||
examples=["each"],
|
||||
)
|
||||
in_: str = Field(
|
||||
alias="in",
|
||||
description="CEL expression that must evaluate to the list to iterate.",
|
||||
examples=["state.rows"],
|
||||
)
|
||||
do: list[FlowEachInnerActionDefinition] = Field(
|
||||
description=(
|
||||
"Ordered inner actions to run for each item. Each entry must be a "
|
||||
"single-key mapping naming that inner action."
|
||||
),
|
||||
examples=[
|
||||
[
|
||||
{"clean": {"call": "script", "code": "return item.strip()"}},
|
||||
{"tag": {"call": "expression", "expr": "outputs.clean"}},
|
||||
]
|
||||
],
|
||||
)
|
||||
|
||||
@model_validator(mode="after")
|
||||
def _validate_inner_action_list(self) -> FlowEachActionDefinition:
|
||||
@@ -394,14 +584,48 @@ FlowActionDefinition = (
|
||||
class FlowMethodDefinition(BaseModel):
|
||||
"""Static definition of one Flow method and its execution roles."""
|
||||
|
||||
description: str | None = None
|
||||
do: FlowActionDefinition
|
||||
start: bool | FlowDefinitionCondition | None = None
|
||||
listen: FlowDefinitionCondition | None = None
|
||||
router: bool = False
|
||||
emit: list[str] | None = None
|
||||
human_feedback: FlowHumanFeedbackDefinition | None = None
|
||||
persist: FlowPersistenceDefinition | None = None
|
||||
description: str | None = Field(
|
||||
default=None,
|
||||
description="Human-readable summary of what this method does.",
|
||||
examples=["Normalize the incoming topic."],
|
||||
)
|
||||
do: FlowActionDefinition = Field(
|
||||
description="Action executed when this method runs.",
|
||||
examples=[{"call": "script", "code": "return input.strip()"}],
|
||||
)
|
||||
start: bool | FlowDefinitionCondition | None = Field(
|
||||
default=None,
|
||||
description=(
|
||||
"Marks a start method. True starts unconditionally; a condition starts "
|
||||
"when the kickoff inputs or events satisfy it."
|
||||
),
|
||||
examples=[True],
|
||||
)
|
||||
listen: FlowDefinitionCondition | None = Field(
|
||||
default=None,
|
||||
description="Trigger condition that runs this method after upstream events.",
|
||||
examples=["seed", {"or": ["approved", "revise"]}],
|
||||
)
|
||||
router: bool = Field(
|
||||
default=False,
|
||||
description="Whether the method output should be treated as the next event name.",
|
||||
examples=[True],
|
||||
)
|
||||
emit: list[str] | None = Field(
|
||||
default=None,
|
||||
description="Declared router events this method may emit.",
|
||||
examples=[["approved", "revise"]],
|
||||
)
|
||||
human_feedback: FlowHumanFeedbackDefinition | None = Field(
|
||||
default=None,
|
||||
description="Optional human feedback step applied after the method action.",
|
||||
examples=[{"message": "Review the research summary before publishing."}],
|
||||
)
|
||||
persist: FlowPersistenceDefinition | None = Field(
|
||||
default=None,
|
||||
description="Method-level persistence override.",
|
||||
examples=[{"enabled": True}],
|
||||
)
|
||||
|
||||
@model_validator(mode="after")
|
||||
def _canonicalize_human_feedback_routing(self) -> FlowMethodDefinition:
|
||||
@@ -427,19 +651,71 @@ class FlowMethodDefinition(BaseModel):
|
||||
class FlowDefinition(BaseModel):
|
||||
"""Static, serializable definition of a Flow."""
|
||||
|
||||
model_config = ConfigDict(populate_by_name=True, arbitrary_types_allowed=True)
|
||||
|
||||
schema_: TypingLiteral["crewai.flow/v1"] = Field(
|
||||
default="crewai.flow/v1", alias="schema"
|
||||
model_config = ConfigDict(
|
||||
populate_by_name=True,
|
||||
arbitrary_types_allowed=True,
|
||||
)
|
||||
|
||||
schema_: Literal["crewai.flow/v1"] = Field(
|
||||
default="crewai.flow/v1",
|
||||
alias="schema",
|
||||
description="Flow Definition schema identifier and version.",
|
||||
examples=["crewai.flow/v1"],
|
||||
)
|
||||
name: str = Field(
|
||||
description="Unique flow name used in logs, events, and traces.",
|
||||
examples=["ResearchFlow"],
|
||||
)
|
||||
description: str | None = Field(
|
||||
default=None,
|
||||
description="Human-readable summary of the flow.",
|
||||
examples=["Normalize a topic and prepare it for research."],
|
||||
)
|
||||
state: FlowStateDefinition | None = Field(
|
||||
default=None,
|
||||
description="State contract for kickoff inputs and runtime state.",
|
||||
examples=[{"type": "dict", "default": {"topic": "AI agents"}}],
|
||||
)
|
||||
config: FlowConfigDefinition = Field(
|
||||
default_factory=FlowConfigDefinition,
|
||||
description="Serializable flow-level runtime configuration.",
|
||||
examples=[{"stream": True, "max_method_calls": 20}],
|
||||
)
|
||||
persist: FlowPersistenceDefinition | None = Field(
|
||||
default=None,
|
||||
description="Flow-level persistence configuration.",
|
||||
examples=[{"enabled": True}],
|
||||
)
|
||||
conversational: FlowConversationalDefinition | None = Field(
|
||||
default=None,
|
||||
description="Conversational flow configuration, when the flow supports chat.",
|
||||
)
|
||||
methods: dict[str, FlowMethodDefinition] = Field(
|
||||
default_factory=dict,
|
||||
description="Mapping of method names to method definitions.",
|
||||
examples=[
|
||||
{
|
||||
"seed": {
|
||||
"start": True,
|
||||
"do": {"call": "expression", "expr": "state.topic"},
|
||||
}
|
||||
}
|
||||
],
|
||||
)
|
||||
diagnostics: list[FlowDefinitionDiagnostic] = Field(
|
||||
default_factory=list,
|
||||
description="Validation diagnostics attached to this definition.",
|
||||
examples=[
|
||||
[
|
||||
{
|
||||
"code": "router_without_trigger",
|
||||
"message": "router: true requires either start or listen",
|
||||
"severity": "error",
|
||||
"path": "methods.decide",
|
||||
}
|
||||
]
|
||||
],
|
||||
)
|
||||
name: str
|
||||
description: str | None = None
|
||||
state: FlowStateDefinition | None = None
|
||||
config: FlowConfigDefinition = Field(default_factory=FlowConfigDefinition)
|
||||
persist: FlowPersistenceDefinition | None = None
|
||||
conversational: FlowConversationalDefinition | None = None
|
||||
methods: dict[str, FlowMethodDefinition] = Field(default_factory=dict)
|
||||
diagnostics: list[FlowDefinitionDiagnostic] = Field(default_factory=list)
|
||||
|
||||
@model_validator(mode="after")
|
||||
def _validate_method_names(self) -> FlowDefinition:
|
||||
|
||||
@@ -1090,8 +1090,6 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
def build(name: str, definition: FlowMethodDefinition) -> Callable[..., Any]:
|
||||
try:
|
||||
return build_action(self, definition.do)
|
||||
except RuntimeError:
|
||||
raise
|
||||
except Exception as e:
|
||||
unresolved.append(f"{name}: {e}")
|
||||
return lambda *args, **kwargs: None
|
||||
|
||||
@@ -2,12 +2,11 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import ast
|
||||
import asyncio
|
||||
from collections.abc import Callable
|
||||
import contextvars
|
||||
import inspect
|
||||
import os
|
||||
import textwrap
|
||||
from typing import TYPE_CHECKING, Any, Protocol, cast
|
||||
|
||||
from crewai.flow.flow_definition import (
|
||||
@@ -33,8 +32,6 @@ __all__ = ["build_action"]
|
||||
|
||||
LocalContext = dict[str, Any]
|
||||
_LOCAL_CONTEXT_KWARG = "__flow_definition_local_context"
|
||||
_ALLOW_SCRIPT_EXECUTION_ENV_VAR = "CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION"
|
||||
_TRUSTED_SCRIPT_EXECUTION_VALUES = frozenset({"1", "true", "yes"})
|
||||
|
||||
|
||||
class _BuiltAction(Protocol):
|
||||
@@ -167,39 +164,14 @@ class ScriptAction:
|
||||
)
|
||||
|
||||
def _compile_handler(self) -> Callable[..., Any]:
|
||||
raw = os.environ.get(_ALLOW_SCRIPT_EXECUTION_ENV_VAR, "")
|
||||
if raw.strip().lower() not in _TRUSTED_SCRIPT_EXECUTION_VALUES:
|
||||
raise RuntimeError(
|
||||
"Flow script execution is disabled by default. "
|
||||
f"Set {_ALLOW_SCRIPT_EXECUTION_ENV_VAR}=1 to enable it only for "
|
||||
"trusted flow definitions."
|
||||
)
|
||||
|
||||
filename = f"crewai.flow.script.{self.flow._definition.name}"
|
||||
module = ast.parse(self.definition.code, filename=filename)
|
||||
function = ast.FunctionDef(
|
||||
name="_flow_script",
|
||||
args=ast.arguments(
|
||||
posonlyargs=[],
|
||||
args=[ast.arg(arg) for arg in ("state", "outputs", "input", "item")],
|
||||
vararg=None,
|
||||
kwonlyargs=[],
|
||||
kw_defaults=[],
|
||||
kwarg=None,
|
||||
defaults=[],
|
||||
),
|
||||
body=module.body or [ast.Pass()],
|
||||
decorator_list=[],
|
||||
returns=None,
|
||||
type_comment=None,
|
||||
type_params=[],
|
||||
namespace: dict[str, Any] = {
|
||||
"__name__": f"crewai.flow.script.{self.flow._definition.name}",
|
||||
}
|
||||
source = _script_function_source(self.definition.code)
|
||||
exec( # nosec B102 # noqa: S102
|
||||
compile(source, namespace["__name__"], "exec"), namespace
|
||||
)
|
||||
module.body = [function]
|
||||
ast.fix_missing_locations(module)
|
||||
|
||||
namespace: dict[str, Any] = {"__name__": filename}
|
||||
exec(compile(module, filename, "exec"), namespace) # nosec B102 # noqa: S102
|
||||
return cast(Callable[..., Any], namespace["_flow_script"])
|
||||
return cast(Callable[..., Any], namespace["__flow_script__"])
|
||||
|
||||
|
||||
class EachAction:
|
||||
@@ -303,3 +275,14 @@ def _pop_local_context(kwargs: dict[str, Any]) -> LocalContext | None:
|
||||
if not isinstance(local_context, dict):
|
||||
raise TypeError("flow definition local context must be a mapping")
|
||||
return cast(LocalContext, local_context)
|
||||
|
||||
|
||||
def _script_function_source(code: str) -> str:
|
||||
body = code if code.strip() else "pass"
|
||||
source = (
|
||||
"def __flow_script__(state, outputs, input, item):\n"
|
||||
f"{textwrap.indent(body, ' ')}"
|
||||
)
|
||||
if not source.endswith("\n"):
|
||||
source += "\n"
|
||||
return source
|
||||
|
||||
@@ -3,27 +3,29 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Mapping
|
||||
from typing import Any, TypedDict
|
||||
from typing import Any
|
||||
|
||||
from crewai.utilities.serialization import to_serializable
|
||||
|
||||
|
||||
class _MethodOutput(TypedDict):
|
||||
method: str
|
||||
output: Any
|
||||
|
||||
|
||||
def outputs_by_name(
|
||||
method_outputs: list[_MethodOutput],
|
||||
method_outputs: list[Any],
|
||||
*,
|
||||
local_outputs: Mapping[str, Any] | None = None,
|
||||
serialize: bool = False,
|
||||
) -> dict[str, Any]:
|
||||
outputs: dict[str, Any] = {}
|
||||
for entry in method_outputs:
|
||||
outputs[entry["method"]] = _output_value(entry["output"], serialize=serialize)
|
||||
method = ""
|
||||
output = entry
|
||||
if isinstance(entry, dict) and "output" in entry:
|
||||
method = str(entry.get("method", ""))
|
||||
output = entry["output"]
|
||||
outputs[method] = _output_value(output, serialize=serialize)
|
||||
|
||||
if local_outputs is not None:
|
||||
if not isinstance(local_outputs, Mapping):
|
||||
raise TypeError("flow definition local outputs must be a mapping")
|
||||
outputs.update(
|
||||
{
|
||||
key: _output_value(output, serialize=serialize)
|
||||
|
||||
@@ -645,11 +645,14 @@ class TestLegacyMethodOutputsRestore:
|
||||
context = _expression_context(restored)
|
||||
assert context["outputs"] == {"": "legacy"}
|
||||
|
||||
def test_raw_legacy_outputs_property_remains_readable(self) -> None:
|
||||
def test_raw_legacy_outputs_remain_readable(self) -> None:
|
||||
from crewai.flow.runtime._expressions import _expression_context
|
||||
|
||||
flow = Flow()
|
||||
flow._method_outputs = ["legacy"]
|
||||
|
||||
assert flow.method_outputs == ["legacy"]
|
||||
assert _expression_context(flow)["outputs"] == {"": "legacy"}
|
||||
|
||||
|
||||
class TestAgentCheckpoint:
|
||||
|
||||
@@ -63,6 +63,104 @@ def test_flow_public_exports_are_explicit():
|
||||
assert "calculate_node_levels" not in flow_visualization.__all__
|
||||
|
||||
|
||||
def test_flow_definition_json_schema_carries_reference_descriptions():
|
||||
schema = flow_definition.FlowDefinition.json_schema()
|
||||
defs = schema["$defs"]
|
||||
|
||||
assert schema["properties"]["schema"]["description"]
|
||||
assert schema["properties"]["methods"]["description"]
|
||||
|
||||
method_properties = defs["FlowMethodDefinition"]["properties"]
|
||||
assert method_properties["do"]["description"] == "Action executed when this method runs."
|
||||
assert "Trigger condition" in method_properties["listen"]["description"]
|
||||
|
||||
script_properties = defs["FlowScriptActionDefinition"]["properties"]
|
||||
assert "trusted inline Python" in script_properties["call"]["description"]
|
||||
assert "not interpolated" in script_properties["code"]["description"]
|
||||
assert "not sandboxed" in script_properties["code"]["description"]
|
||||
|
||||
state_schema = schema["properties"]["state"]["anyOf"][0]
|
||||
assert state_schema["discriminator"]["propertyName"] == "type"
|
||||
assert state_schema["discriminator"]["mapping"] == {
|
||||
"dict": "#/$defs/FlowDictStateDefinition",
|
||||
"json_schema": "#/$defs/FlowJsonSchemaStateDefinition",
|
||||
"pydantic": "#/$defs/FlowPydanticStateDefinition",
|
||||
"unknown": "#/$defs/FlowUnknownStateDefinition",
|
||||
}
|
||||
|
||||
dict_state_properties = defs["FlowDictStateDefinition"]["properties"]
|
||||
assert dict_state_properties["type"]["description"]
|
||||
assert "ref" not in dict_state_properties
|
||||
|
||||
json_schema_state_properties = defs["FlowJsonSchemaStateDefinition"]["properties"]
|
||||
assert json_schema_state_properties["json_schema"]["description"]
|
||||
assert "json_schema" in defs["FlowJsonSchemaStateDefinition"]["required"]
|
||||
|
||||
pydantic_state_properties = defs["FlowPydanticStateDefinition"]["properties"]
|
||||
assert "Fallback JSON Schema" in pydantic_state_properties["json_schema"][
|
||||
"description"
|
||||
]
|
||||
|
||||
each_properties = defs["FlowEachActionDefinition"]["properties"]
|
||||
assert "list to iterate" in each_properties["in"]["description"]
|
||||
assert "Ordered inner actions" in each_properties["do"]["description"]
|
||||
|
||||
|
||||
def test_flow_definition_json_schema_carries_field_examples_only():
|
||||
schema = flow_definition.FlowDefinition.json_schema()
|
||||
defs = schema["$defs"]
|
||||
|
||||
for model_name in [
|
||||
"FlowDefinition",
|
||||
"FlowCodeActionDefinition",
|
||||
"FlowToolActionDefinition",
|
||||
"FlowCrewActionDefinition",
|
||||
"FlowExpressionActionDefinition",
|
||||
"FlowScriptActionDefinition",
|
||||
"FlowEachActionDefinition",
|
||||
"FlowMethodDefinition",
|
||||
"FlowDictStateDefinition",
|
||||
"FlowJsonSchemaStateDefinition",
|
||||
"FlowPydanticStateDefinition",
|
||||
"FlowUnknownStateDefinition",
|
||||
"FlowConfigDefinition",
|
||||
"FlowPersistenceDefinition",
|
||||
"FlowHumanFeedbackDefinition",
|
||||
"FlowDefinitionDiagnostic",
|
||||
]:
|
||||
model_schema = schema if model_name == "FlowDefinition" else defs[model_name]
|
||||
assert "examples" not in model_schema
|
||||
|
||||
assert schema["properties"]["name"]["examples"] == ["ResearchFlow"]
|
||||
assert schema["properties"]["schema"]["examples"] == ["crewai.flow/v1"]
|
||||
assert schema["properties"]["methods"]["examples"][0]["seed"]["do"] == {
|
||||
"call": "expression",
|
||||
"expr": "state.topic",
|
||||
}
|
||||
|
||||
script_properties = defs["FlowScriptActionDefinition"]["properties"]
|
||||
assert script_properties["call"]["examples"] == ["script"]
|
||||
assert "input.strip()" in script_properties["code"]["examples"][0]
|
||||
assert script_properties["language"]["examples"] == ["python"]
|
||||
|
||||
action_properties = defs["FlowCodeActionDefinition"]["properties"]
|
||||
assert action_properties["ref"]["examples"] == [
|
||||
"my_project.flows:normalize_topic"
|
||||
]
|
||||
assert action_properties["with"]["examples"] == [{"topic": "${state.topic}"}]
|
||||
|
||||
each_properties = defs["FlowEachActionDefinition"]["properties"]
|
||||
assert each_properties["in"]["examples"] == ["state.rows"]
|
||||
assert each_properties["do"]["examples"][0][0]["clean"]["call"] == "script"
|
||||
|
||||
method_properties = defs["FlowMethodDefinition"]["properties"]
|
||||
assert method_properties["listen"]["examples"] == [
|
||||
"seed",
|
||||
{"or": ["approved", "revise"]},
|
||||
]
|
||||
assert method_properties["emit"]["examples"] == [["approved", "revise"]]
|
||||
|
||||
|
||||
def test_flow_state_definition_uses_discriminated_branches():
|
||||
definition = flow_definition.FlowDefinition.model_validate(
|
||||
{
|
||||
|
||||
@@ -1145,31 +1145,7 @@ methods:
|
||||
assert flow.kickoff(inputs={"rows": ["a", "b"]}) == ["async:a", "async:b"]
|
||||
|
||||
|
||||
def test_script_action_requires_explicit_opt_in():
|
||||
yaml_str = """
|
||||
schema: crewai.flow/v1
|
||||
name: ScriptFlow
|
||||
methods:
|
||||
normalize:
|
||||
do:
|
||||
call: script
|
||||
code: |
|
||||
return "blocked"
|
||||
start: true
|
||||
"""
|
||||
|
||||
with pytest.raises(
|
||||
RuntimeError, match="CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION=1"
|
||||
) as exc_info:
|
||||
Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
|
||||
assert "methods with unresolvable actions" not in str(exc_info.value)
|
||||
|
||||
|
||||
def test_script_action_runs_python_imports_mutates_state_and_returns_value(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
):
|
||||
monkeypatch.setenv("CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION", "1")
|
||||
|
||||
def test_script_action_runs_python_imports_mutates_state_and_returns_value():
|
||||
yaml_str = """
|
||||
schema: crewai.flow/v1
|
||||
name: ScriptFlow
|
||||
@@ -1191,11 +1167,7 @@ methods:
|
||||
assert flow.state["rounded"] == 4
|
||||
|
||||
|
||||
def test_script_listener_reads_trigger_input_and_outputs(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
):
|
||||
monkeypatch.setenv("CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION", "1")
|
||||
|
||||
def test_script_listener_reads_trigger_input_and_outputs():
|
||||
yaml_str = """
|
||||
schema: crewai.flow/v1
|
||||
name: ScriptFlow
|
||||
@@ -1220,11 +1192,7 @@ methods:
|
||||
assert flow.state["input_matches_output"] is True
|
||||
|
||||
|
||||
def test_script_each_action_reads_item_and_inner_outputs(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
):
|
||||
monkeypatch.setenv("CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION", "1")
|
||||
|
||||
def test_script_each_action_reads_item_and_inner_outputs():
|
||||
yaml_str = """
|
||||
schema: crewai.flow/v1
|
||||
name: ScriptEachFlow
|
||||
|
||||
Reference in New Issue
Block a user