Compare commits

..

4 Commits

Author SHA1 Message Date
Greyson LaLonde
4068fa3cfd ci: diff explicit base...head shas to match files-changed 2026-06-17 17:04:16 -07:00
Greyson LaLonde
474496f928 Merge branch 'main' into ci/python-pr-size-gate 2026-06-17 17:01:11 -07:00
Greyson LaLonde
c2d28b932a ci: scope job permissions and use immutable base sha 2026-06-17 16:53:02 -07:00
Greyson LaLonde
3fcbc38280 ci: fail PRs over 1500 lines of python changes 2026-06-17 16:46:36 -07:00
26 changed files with 338 additions and 963 deletions

View File

@@ -29,4 +29,30 @@ jobs:
lib/crewai/src/crewai/cli/templates/**
**/*.json
**/test_durations/**
**/cassettes/**
**/cassettes/**
python-diff-size:
runs-on: ubuntu-latest
permissions:
contents: read
steps:
- uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4.3.1
with:
fetch-depth: 0
- name: Enforce Python diff size limit
env:
MAX: "1500"
BASE_SHA: ${{ github.event.pull_request.base.sha }}
HEAD_SHA: ${{ github.event.pull_request.head.sha }}
run: |
# Three-dot base...head == merge-base(base, head)..head: matches GitHub's
# "Files changed" diff and ignores the synthetic merge commit at HEAD.
# Sum added + deleted lines across changed .py files; skip binaries ("-").
total=$(git diff --numstat "$BASE_SHA...$HEAD_SHA" -- '*.py' \
| awk '$1 != "-" && $2 != "-" { sum += $1 + $2 } END { print sum + 0 }')
echo "Python churn: $total lines (limit $MAX)"
if [ "$total" -gt "$MAX" ]; then
echo "::error::Python changes total $total lines, over the $MAX-line limit. Split into smaller PRs."
git diff --numstat "$BASE_SHA...$HEAD_SHA" -- '*.py' | sort -rn
exit 1
fi

View File

@@ -4,43 +4,6 @@ description: "تحديثات المنتج والتحسينات وإصلاحات
icon: "clock"
mode: "wide"
---
<Update label="17 يونيو 2026">
## v1.14.8a
[عرض الإصدار على GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.8a)
## ما الذي تغير
### الميزات
- إضافة إجراء كتلة نصية/كود إلى FlowDefinition
- إضافة إجراءات الطاقم إلى FlowDefinition
- إضافة إجراء مركب `each` إلى FlowDefinition
- تنفيذ دعم وضع DMN في إنشاء الطاقم وتنفيذه
- تحسين وظيفة إعادة تعيين الذاكرة ومعالجة الطاقم بتنسيق JSON
- إضافة تعبيرات إلى إجراءات FlowDefinition
- تنفيذ أدوات تشغيل تعريف التدفق بدون كود Python
- دفع التغذية الراجعة البشرية من تعريف التدفق
- توصيل التكوين والاستمرارية من FlowDefinition إلى وقت التشغيل
- إضافة `crewai run --definition` التجريبية للتدفقات
- دعم تراجع نشر ZIP وتشغيل مشاريع الطاقم بتنسيق JSON
- تقديم الطواقم بتنسيق JSON أولاً
### إصلاحات الأخطاء
- إصلاح أداة Exa المكررة
- إصلاح استخدام الرموز المجمعة عبر جميع استدعاءات LLM
- حل المشكلات المتعلقة بتحميل الطاقم ومنطق التحقق
### الوثائق
- توثيق حقول FlowDefinition في مخطط JSON
- تحديث وثائق التثبيت والبدء السريع لمشاريع الطاقم بتنسيق JSON أولاً
- تحديث سجل التغييرات والإصدار لـ v1.14.7
## المساهمون
@gabemilani, @greysonlalonde, @iris-clawd, @joaomdmoura, @lorenzejay, @lucasgomide, @theCyberTech, @vinibrsl
</Update>
<Update label="11 يونيو 2026">
## v1.14.7

View File

@@ -4,43 +4,6 @@ description: "Product updates, improvements, and bug fixes for CrewAI"
icon: "clock"
mode: "wide"
---
<Update label="Jun 17, 2026">
## v1.14.8a
[View release on GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.8a)
## What's Changed
### Features
- Add script/code block action to FlowDefinition
- Add crew actions to FlowDefinition
- Add `each` composite action to FlowDefinition
- Implement DMN mode support in crew creation and execution
- Enhance memory reset functionality and JSON crew handling
- Add expressions to FlowDefinition actions
- Implement Flow definition run tools without Python code
- Drive human feedback from the flow definition
- Wire config and persistence from FlowDefinition into the runtime
- Add experimental `crewai run --definition` for flows
- Support ZIP deployment fallback and JSON crew project env runs
- Introduce JSON first crews
### Bug Fixes
- Fix duplicated Exa tool
- Fix aggregate token usage across all LLM calls
- Resolve issues with crew loading and validation logic
### Documentation
- Document FlowDefinition fields in the JSON schema
- Update installation and quickstart documentation for JSON-first crew projects
- Update changelog and version for v1.14.7
## Contributors
@gabemilani, @greysonlalonde, @iris-clawd, @joaomdmoura, @lorenzejay, @lucasgomide, @theCyberTech, @vinibrsl
</Update>
<Update label="Jun 11, 2026">
## v1.14.7

View File

@@ -4,43 +4,6 @@ description: "CrewAI의 제품 업데이트, 개선 사항 및 버그 수정"
icon: "clock"
mode: "wide"
---
<Update label="2026년 6월 17일">
## v1.14.8a
[GitHub 릴리스 보기](https://github.com/crewAIInc/crewAI/releases/tag/1.14.8a)
## 변경 사항
### 기능
- FlowDefinition에 스크립트/코드 블록 액션 추가
- FlowDefinition에 크루 액션 추가
- FlowDefinition에 `each` 복합 액션 추가
- 크루 생성 및 실행에서 DMN 모드 지원 구현
- 메모리 재설정 기능 및 JSON 크루 처리 기능 향상
- FlowDefinition 액션에 표현식 추가
- Python 코드 없이 Flow 정의 실행 도구 구현
- Flow 정의에서 인간 피드백 유도
- FlowDefinition의 구성 및 지속성을 런타임에 연결
- 흐름을 위한 실험적 `crewai run --definition` 추가
- ZIP 배포 대체 및 JSON 크루 프로젝트 환경 실행 지원
- JSON 우선 크루 도입
### 버그 수정
- 중복된 Exa 도구 수정
- 모든 LLM 호출에서 집계 토큰 사용 수정
- 크루 로딩 및 검증 로직 관련 문제 해결
### 문서
- JSON 스키마에서 FlowDefinition 필드 문서화
- JSON 우선 크루 프로젝트에 대한 설치 및 빠른 시작 문서 업데이트
- v1.14.7에 대한 변경 로그 및 버전 업데이트
## 기여자
@gabemilani, @greysonlalonde, @iris-clawd, @joaomdmoura, @lorenzejay, @lucasgomide, @theCyberTech, @vinibrsl
</Update>
<Update label="2026년 6월 11일">
## v1.14.7

View File

@@ -4,43 +4,6 @@ description: "Atualizações de produto, melhorias e correções do CrewAI"
icon: "clock"
mode: "wide"
---
<Update label="17 jun 2026">
## v1.14.8a
[Ver release no GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.8a)
## O que Mudou
### Recursos
- Adicionar ação de bloco de script/código ao FlowDefinition
- Adicionar ações de equipe ao FlowDefinition
- Adicionar ação composta `each` ao FlowDefinition
- Implementar suporte ao modo DMN na criação e execução de equipes
- Melhorar a funcionalidade de redefinição de memória e o manuseio de equipes em JSON
- Adicionar expressões às ações do FlowDefinition
- Implementar ferramentas de execução de definição de fluxo sem código Python
- Conduzir feedback humano a partir da definição de fluxo
- Conectar configuração e persistência do FlowDefinition ao tempo de execução
- Adicionar `crewai run --definition` experimental para fluxos
- Suportar fallback de implantação ZIP e execuções de projeto de equipe em JSON
- Introduzir equipes em JSON primeiro
### Correções de Bugs
- Corrigir ferramenta Exa duplicada
- Corrigir uso de token agregado em todas as chamadas LLM
- Resolver problemas com o carregamento de equipes e lógica de validação
### Documentação
- Documentar campos do FlowDefinition no esquema JSON
- Atualizar documentação de instalação e início rápido para projetos de equipe em JSON-primeiro
- Atualizar changelog e versão para v1.14.7
## Contribuidores
@gabemilani, @greysonlalonde, @iris-clawd, @joaomdmoura, @lorenzejay, @lucasgomide, @theCyberTech, @vinibrsl
</Update>
<Update label="11 jun 2026">
## v1.14.7

View File

@@ -8,7 +8,7 @@ authors = [
]
requires-python = ">=3.10, <3.14"
dependencies = [
"crewai-core==1.14.8a",
"crewai-core==1.14.7",
"click>=8.1.7,<9",
"pydantic>=2.11.9,<2.13",
"pydantic-settings~=2.10.1",

View File

@@ -1 +1 @@
__version__ = "1.14.8a"
__version__ = "1.14.7"

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.14.8a"
"crewai[tools]==1.14.7"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.14.8a"
"crewai[tools]==1.14.7"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "Power up your crews with {{folder_name}}"
readme = "README.md"
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.14.8a"
"crewai[tools]==1.14.7"
]
[tool.crewai]

View File

@@ -1 +1 @@
__version__ = "1.14.8a"
__version__ = "1.14.7"

View File

@@ -152,4 +152,4 @@ __all__ = [
"wrap_file_source",
]
__version__ = "1.14.8a"
__version__ = "1.14.7"

View File

@@ -10,7 +10,7 @@ requires-python = ">=3.10, <3.14"
dependencies = [
"pytube~=15.0.0",
"requests>=2.33.0,<3",
"crewai==1.14.8a",
"crewai==1.14.7",
"tiktoken>=0.8.0,<0.13",
"beautifulsoup4~=4.13.4",
"python-docx~=1.2.0",

View File

@@ -330,4 +330,4 @@ __all__ = [
"ZapierActionTools",
]
__version__ = "1.14.8a"
__version__ = "1.14.7"

View File

@@ -8,8 +8,8 @@ authors = [
]
requires-python = ">=3.10, <3.14"
dependencies = [
"crewai-core==1.14.8a",
"crewai-cli==1.14.8a",
"crewai-core==1.14.7",
"crewai-cli==1.14.7",
# Core Dependencies
"pydantic>=2.11.9,<2.13",
"openai>=2.30.0,<3",
@@ -55,7 +55,7 @@ Repository = "https://github.com/crewAIInc/crewAI"
[project.optional-dependencies]
tools = [
"crewai-tools==1.14.8a",
"crewai-tools==1.14.7",
]
embeddings = [
"tiktoken>=0.8.0,<0.13"

View File

@@ -48,7 +48,7 @@ def _suppress_pydantic_deprecation_warnings() -> None:
_suppress_pydantic_deprecation_warnings()
__version__ = "1.14.8a"
__version__ = "1.14.7"
_LAZY_IMPORTS: dict[str, tuple[str, str]] = {
"Memory": ("crewai.memory.unified_memory", "Memory"),

View File

@@ -14,6 +14,7 @@ from crewai.flow.flow_definition import (
FlowConversationalDefinition,
FlowConversationalRouterDefinition,
FlowDefinition,
FlowDefinitionDiagnostic,
FlowDictStateDefinition,
FlowHumanFeedbackDefinition,
FlowMethodDefinition,
@@ -22,7 +23,6 @@ from crewai.flow.flow_definition import (
FlowStateDefinition,
FlowUnknownStateDefinition,
_object_ref,
log_flow_definition_issues,
)
from crewai.flow.flow_wrappers import (
FlowMethod,
@@ -116,6 +116,7 @@ def _is_json_serializable(value: Any) -> bool:
def _serialize_static_value(
value: Any,
diagnostics: list[FlowDefinitionDiagnostic],
path: str,
) -> Any:
if value is None or _is_json_serializable(value):
@@ -147,11 +148,12 @@ def _serialize_static_value(
)
ref = _object_ref(value)
logger.warning(
"Flow definition value at %s is not fully serializable; "
"preserved import reference %s.",
path,
ref,
diagnostics.append(
FlowDefinitionDiagnostic(
code="non_serializable_value",
path=path,
message=f"value is not fully serializable; preserved import reference {ref}",
)
)
return {"ref": ref}
@@ -167,7 +169,10 @@ def _state_ref(value: Any) -> str | None:
return None
def _build_state_definition(flow_class: type) -> FlowStateDefinition | None:
def _build_state_definition(
flow_class: type,
diagnostics: list[FlowDefinitionDiagnostic],
) -> FlowStateDefinition | None:
from pydantic import BaseModel as PydanticBaseModel
state_value = getattr(flow_class, "_initial_state_t", None)
@@ -182,23 +187,29 @@ def _build_state_definition(flow_class: type) -> FlowStateDefinition | None:
if state_value is dict or isinstance(state_value, dict):
default = None
if isinstance(state_value, dict):
default = _serialize_static_value(state_value, "state.default")
default = _serialize_static_value(state_value, diagnostics, "state.default")
return FlowDictStateDefinition(default=default)
if isinstance(state_value, type) and issubclass(state_value, PydanticBaseModel):
return FlowPydanticStateDefinition(ref=_state_ref(state_value))
if isinstance(state_value, PydanticBaseModel):
return FlowPydanticStateDefinition(
ref=_state_ref(state_value),
default=_serialize_static_value(state_value, "state.default"),
default=_serialize_static_value(state_value, diagnostics, "state.default"),
)
diagnostics.append(
FlowDefinitionDiagnostic(
code="unknown_state_type",
path="state",
message=f"could not serialize state type {_object_ref(state_value)}",
)
logger.warning(
"Flow definition state could not serialize state type %s.",
_object_ref(state_value),
)
return FlowUnknownStateDefinition(ref=_state_ref(state_value))
def _build_config_definition(flow_class: type) -> FlowConfigDefinition:
def _build_config_definition(
flow_class: type,
diagnostics: list[FlowDefinitionDiagnostic],
) -> FlowConfigDefinition:
config_field_names = set(FlowConfigDefinition.model_fields)
field_defaults = {
name: field.get_default(call_default_factory=True)
@@ -214,12 +225,15 @@ def _build_config_definition(flow_class: type) -> FlowConfigDefinition:
value if value is None or isinstance(value, str) else _object_ref(value)
)
else:
values[field_name] = _serialize_static_value(value, f"config.{field_name}")
values[field_name] = _serialize_static_value(
value, diagnostics, f"config.{field_name}"
)
return FlowConfigDefinition(**values)
def _build_human_feedback_definition(
method: Any,
diagnostics: list[FlowDefinitionDiagnostic],
path: str,
) -> FlowHumanFeedbackDefinition | None:
config = getattr(method, "__human_feedback_config__", None)
@@ -234,7 +248,7 @@ def _build_human_feedback_definition(
llm=getattr(config, "llm", None),
default_outcome=getattr(config, "default_outcome", None),
metadata=_serialize_static_value(
getattr(config, "metadata", None), f"{path}.metadata"
getattr(config, "metadata", None), diagnostics, f"{path}.metadata"
),
provider=getattr(config, "provider", None),
learn=bool(getattr(config, "learn", False)),
@@ -259,6 +273,7 @@ def _build_persistence_definition(value: Any) -> FlowPersistenceDefinition | Non
def _build_conversational_router_definition(
router_config: Any,
diagnostics: list[FlowDefinitionDiagnostic],
path: str,
) -> FlowConversationalRouterDefinition | None:
if router_config is None:
@@ -269,9 +284,12 @@ def _build_conversational_router_definition(
prompt=getattr(router_config, "prompt", None),
response_format=_serialize_static_value(
getattr(router_config, "response_format", None),
diagnostics,
f"{path}.response_format",
),
llm=_serialize_static_value(getattr(router_config, "llm", None), f"{path}.llm"),
llm=_serialize_static_value(
getattr(router_config, "llm", None), diagnostics, f"{path}.llm"
),
routes=[str(route) for route in routes] if routes is not None else None,
route_descriptions=getattr(router_config, "route_descriptions", None),
default_intent=getattr(router_config, "default_intent", "converse"),
@@ -282,6 +300,7 @@ def _build_conversational_router_definition(
def _build_conversational_definition(
flow_class: type,
diagnostics: list[FlowDefinitionDiagnostic],
) -> FlowConversationalDefinition | None:
if not _is_conversational_flow(flow_class):
return None
@@ -305,9 +324,12 @@ def _build_conversational_definition(
return FlowConversationalDefinition(
enabled=True,
system_prompt=getattr(config, "system_prompt", None),
llm=_serialize_static_value(getattr(config, "llm", None), "conversational.llm"),
llm=_serialize_static_value(
getattr(config, "llm", None), diagnostics, "conversational.llm"
),
router=_build_conversational_router_definition(
getattr(config, "router", None),
diagnostics,
"conversational.router",
),
answer_from_history_prompt=getattr(config, "answer_from_history_prompt", None),
@@ -318,10 +340,12 @@ def _build_conversational_definition(
),
intent_llm=_serialize_static_value(
getattr(config, "intent_llm", None),
diagnostics,
"conversational.intent_llm",
),
answer_from_history_llm=_serialize_static_value(
getattr(config, "answer_from_history_llm", None),
diagnostics,
"conversational.answer_from_history_llm",
),
visible_agent_outputs=(
@@ -341,6 +365,7 @@ def _build_conversational_definition(
def _build_method_definition(
method: Any,
diagnostics: list[FlowDefinitionDiagnostic],
path: str,
) -> FlowMethodDefinition:
fragment = _get_flow_method_definition(method)
@@ -351,7 +376,9 @@ def _build_method_definition(
deep=True, update={"do": _method_action(method)}
)
human_feedback = _build_human_feedback_definition(method, f"{path}.human_feedback")
human_feedback = _build_human_feedback_definition(
method, diagnostics, f"{path}.human_feedback"
)
if human_feedback is not None:
method_definition.human_feedback = human_feedback
if human_feedback.emit:
@@ -417,6 +444,7 @@ def _build_flow_definition_from_class(
flow_class: type,
namespace: dict[str, Any] | None = None,
) -> FlowDefinition:
diagnostics: list[FlowDefinitionDiagnostic] = []
methods: dict[str, FlowMethodDefinition] = {}
flow_methods = _iter_flow_methods(flow_class)
if namespace is not None:
@@ -428,7 +456,7 @@ def _build_flow_definition_from_class(
for method_name, method in flow_methods.items():
methods[method_name] = _build_method_definition(
method, f"methods.{method_name}"
method, diagnostics, f"methods.{method_name}"
)
description = None
@@ -439,13 +467,15 @@ def _build_flow_definition_from_class(
definition = FlowDefinition(
name=getattr(flow_class, "__name__", "Flow"),
description=description,
state=_build_state_definition(flow_class),
config=_build_config_definition(flow_class),
state=_build_state_definition(flow_class, diagnostics),
config=_build_config_definition(flow_class, diagnostics),
persist=_build_persistence_definition(flow_class),
conversational=_build_conversational_definition(flow_class),
conversational=_build_conversational_definition(flow_class, diagnostics),
methods=methods,
diagnostics=diagnostics,
)
log_flow_definition_issues(definition)
definition.diagnostics.extend(definition.validate_contract())
definition.log_diagnostics()
return definition

View File

@@ -12,7 +12,7 @@ from __future__ import annotations
import json
import logging
import re
from typing import Annotated, Any, Literal, TypeAlias
from typing import Annotated, Any, Literal as TypingLiteral, TypeAlias
from pydantic import (
BaseModel,
@@ -45,6 +45,7 @@ __all__ = [
"FlowCrewActionDefinition",
"FlowDefinition",
"FlowDefinitionCondition",
"FlowDefinitionDiagnostic",
"FlowDictStateDefinition",
"FlowEachActionDefinition",
"FlowEachInnerActionDefinition",
@@ -54,7 +55,6 @@ __all__ = [
"FlowMethodDefinition",
"FlowPersistenceDefinition",
"FlowPydanticStateDefinition",
"FlowScriptActionDefinition",
"FlowStateDefinition",
"FlowToolActionDefinition",
"FlowUnknownStateDefinition",
@@ -69,12 +69,21 @@ def _object_ref(value: Any) -> str:
return f"{module}:{qualname}" if module and qualname else repr(value)
class FlowDefinitionDiagnostic(BaseModel):
"""A non-fatal Flow Definition build or validation diagnostic."""
code: str
message: str
severity: TypingLiteral["warning", "error"] = "warning"
path: str | None = None
class FlowDictStateDefinition(BaseModel):
"""Static description of a plain dictionary Flow state contract."""
model_config = ConfigDict(extra="forbid")
type: Literal["dict"] = Field(
type: TypingLiteral["dict"] = Field(
default="dict",
description="Plain dictionary state with optional default values.",
examples=["dict"],
@@ -91,7 +100,7 @@ class FlowPydanticStateDefinition(BaseModel):
model_config = ConfigDict(extra="forbid")
type: Literal["pydantic"] = Field(
type: TypingLiteral["pydantic"] = Field(
default="pydantic",
description="Importable Pydantic model used as the Flow state type.",
examples=["pydantic"],
@@ -126,7 +135,7 @@ class FlowJsonSchemaStateDefinition(BaseModel):
model_config = ConfigDict(extra="forbid")
type: Literal["json_schema"] = Field(
type: TypingLiteral["json_schema"] = Field(
default="json_schema",
description="Inline JSON Schema used as the Flow state contract.",
examples=["json_schema"],
@@ -153,7 +162,7 @@ class FlowUnknownStateDefinition(BaseModel):
model_config = ConfigDict(extra="forbid")
type: Literal["unknown"] = Field(
type: TypingLiteral["unknown"] = Field(
default="unknown",
description="Unknown state representation; runtime falls back to dictionary state.",
examples=["unknown"],
@@ -182,46 +191,14 @@ FlowStateDefinition: TypeAlias = Annotated[
class FlowConfigDefinition(BaseModel):
"""Serializable Flow-level configuration."""
tracing: bool | None = Field(
default=None,
description="Override for flow tracing; when omitted, runtime defaults apply.",
examples=[True],
)
stream: bool = Field(
default=False,
description="Whether the flow should emit streaming events when supported.",
examples=[True],
)
memory: dict[str, Any] | None = Field(
default=None,
description="Serializable memory configuration passed to flow execution.",
examples=[{"enabled": True}],
)
input_provider: str | None = Field(
default=None,
description="Import reference or provider key used to supply flow inputs.",
examples=["my_project.inputs:load_inputs"],
)
suppress_flow_events: bool = Field(
default=False,
description="Disable flow event emission for this definition.",
examples=[False],
)
max_method_calls: int = Field(
default=100,
description="Maximum number of method executions allowed during one kickoff.",
examples=[20],
)
defer_trace_finalization: bool = Field(
default=False,
description="Defer trace finalization so callers can complete tracing later.",
examples=[False],
)
checkpoint: bool | dict[str, Any] | None = Field(
default=None,
description="Checkpointing configuration, or true to use default checkpointing.",
examples=[True, {"enabled": True}],
)
tracing: bool | None = None
stream: bool = False
memory: dict[str, Any] | None = None
input_provider: str | None = None
suppress_flow_events: bool = False
max_method_calls: int = 100
defer_trace_finalization: bool = False
checkpoint: bool | dict[str, Any] | None = None
class FlowPersistenceDefinition(BaseModel):
@@ -233,21 +210,9 @@ class FlowPersistenceDefinition(BaseModel):
serialized config.
"""
enabled: bool = Field(
default=False,
description="Whether persistence is enabled for this flow or method.",
examples=[True],
)
verbose: bool = Field(
default=False,
description="Whether persistence should emit verbose diagnostic output.",
examples=[False],
)
persistence: Any = Field(
default=None,
description="Persistence backend configuration or import reference.",
examples=[{"ref": "my_project.persistence:FlowStore"}],
)
enabled: bool = False
verbose: bool = False
persistence: Any = None
@field_serializer("persistence", when_used="json")
def _serialize_persistence(self, value: Any) -> Any:
@@ -273,53 +238,15 @@ class FlowHumanFeedbackDefinition(BaseModel):
a serialized config (``llm``) or a ``module:qualname`` ref (``provider``).
"""
message: str = Field(
description="Prompt shown to the human reviewer when feedback is requested.",
examples=["Review the research summary before publishing."],
)
emit: list[str] | None = Field(
default=None,
description=(
"Allowed feedback outcomes. When set, the method routes like a router "
"using the selected outcome."
),
examples=[["approved", "revise"]],
)
llm: Any = Field(
default="gpt-4o-mini",
description="LLM configuration used to assist or process human feedback.",
examples=["gpt-4o-mini"],
)
default_outcome: str | None = Field(
default=None,
description="Outcome to use when feedback cannot be collected.",
examples=["revise"],
)
metadata: dict[str, Any] | None = Field(
default=None,
description="Serializable metadata attached to the feedback request.",
examples=[{"team": "research"}],
)
provider: Any = Field(
default=None,
description="Feedback provider configuration or import reference.",
examples=["my_project.feedback:provider"],
)
learn: bool = Field(
default=False,
description="Whether feedback should be recorded for later learning workflows.",
examples=[True],
)
learn_source: str = Field(
default="hitl",
description="Source label attached to learned feedback records.",
examples=["hitl"],
)
learn_strict: bool = Field(
default=False,
description="Whether learning should enforce strict validation of feedback data.",
examples=[False],
)
message: str
emit: list[str] | None = None
llm: Any = "gpt-4o-mini"
default_outcome: str | None = None
metadata: dict[str, Any] | None = None
provider: Any = None
learn: bool = False
learn_source: str = "hitl"
learn_strict: bool = False
@field_serializer("llm", when_used="json")
def _serialize_llm(self, value: Any) -> dict[str, Any] | str | None:
@@ -339,89 +266,30 @@ class FlowHumanFeedbackDefinition(BaseModel):
class FlowCodeActionDefinition(BaseModel):
"""A Flow method action that executes importable Python code."""
model_config = ConfigDict(
populate_by_name=True,
extra="forbid",
)
model_config = ConfigDict(populate_by_name=True, extra="forbid")
call: Literal["code"] = Field(
default="code",
description="Action discriminator. Use code to call importable Python.",
examples=["code"],
)
ref: str = Field(
description="Import reference for the callable, formatted as module:qualname.",
examples=["my_project.flows:normalize_topic"],
)
with_: dict[str, Any] | None = Field(
default=None,
alias="with",
description="Keyword arguments passed to the callable after expression rendering.",
examples=[{"topic": "${state.topic}"}],
)
call: TypingLiteral["code"] = "code"
ref: str
with_: dict[str, Any] | None = Field(default=None, alias="with")
class FlowToolActionDefinition(BaseModel):
"""A Flow method action that invokes a CrewAI tool."""
model_config = ConfigDict(
populate_by_name=True,
extra="forbid",
)
model_config = ConfigDict(populate_by_name=True, extra="forbid")
call: Literal["tool"] = Field(
description="Action discriminator. Use tool to instantiate and run a CrewAI tool.",
examples=["tool"],
)
ref: str = Field(
description="Import reference for a BaseTool class, formatted as module:qualname.",
examples=["my_project.tools:SearchTool"],
)
with_: dict[str, Any] | None = Field(
default=None,
alias="with",
description="Tool input arguments after expression rendering.",
examples=[{"query": "${outputs.normalize_topic}", "limit": 5}],
)
call: TypingLiteral["tool"]
ref: str
with_: dict[str, Any] | None = Field(default=None, alias="with")
class FlowCrewActionDefinition(BaseModel):
"""A Flow method action that builds and kicks off a CrewAI crew."""
model_config = ConfigDict(
populate_by_name=True,
extra="forbid",
)
model_config = ConfigDict(populate_by_name=True, extra="forbid")
call: Literal["crew"] = Field(
description="Action discriminator. Use crew to run an inline Crew definition.",
examples=["crew"],
)
with_: CrewDefinition = Field(
alias="with",
description="Inline Crew definition to load and execute for this action.",
examples=[
{
"name": "inline_research",
"agents": {
"researcher": {
"role": "Researcher",
"goal": "Research {topic}",
"backstory": "Knows the domain.",
}
},
"tasks": [
{
"name": "research_task",
"description": "Research {topic}",
"expected_output": "Findings about {topic}",
"agent": "researcher",
}
],
"inputs": {"topic": "${state.topic}"},
}
],
)
call: TypingLiteral["crew"]
with_: CrewDefinition = Field(alias="with")
class FlowExpressionActionDefinition(BaseModel):
@@ -429,41 +297,8 @@ class FlowExpressionActionDefinition(BaseModel):
model_config = ConfigDict(extra="forbid")
call: Literal["expression"] = Field(
description="Action discriminator. Use expression to evaluate a CEL expression.",
examples=["expression"],
)
expr: str = Field(
description="CEL expression evaluated against state, outputs, and local context.",
examples=["state.topic", "outputs.normalize_topic"],
)
class FlowScriptActionDefinition(BaseModel):
"""A Flow method action that executes trusted inline Python."""
model_config = ConfigDict(extra="forbid")
call: Literal["script"] = Field(
description="Action discriminator. Use script to execute trusted inline Python.",
examples=["script"],
)
code: str = Field(
description=(
"Trusted Python source executed as a generated function. Runtime values are "
"passed as state, outputs, input, and item; they are not interpolated into "
"the source. This is not sandboxed."
),
examples=[
"state['normalized_topic'] = input.strip()\n"
"return state['normalized_topic']"
],
)
language: Literal["python"] = Field(
default="python",
description="Script language. Only python is currently supported.",
examples=["python"],
)
call: TypingLiteral["expression"]
expr: str
FlowInnerActionDefinition = (
@@ -471,18 +306,12 @@ FlowInnerActionDefinition = (
| FlowToolActionDefinition
| FlowCrewActionDefinition
| FlowExpressionActionDefinition
| FlowScriptActionDefinition
)
class FlowEachInnerActionDefinition(RootModel[dict[str, FlowInnerActionDefinition]]):
"""One named action inside an ``each`` composite action."""
root: dict[str, FlowInnerActionDefinition] = Field(
description="Single-entry mapping from an inner action name to its action.",
examples=[{"clean": {"call": "script", "code": "return item.strip()"}}],
)
@model_validator(mode="after")
def _validate_action_mapping(self) -> FlowEachInnerActionDefinition:
if len(self.root) != 1:
@@ -502,35 +331,11 @@ class FlowEachInnerActionDefinition(RootModel[dict[str, FlowInnerActionDefinitio
class FlowEachActionDefinition(BaseModel):
"""A composite action that runs a sequential mini-pipeline for each item."""
model_config = ConfigDict(
populate_by_name=True,
extra="forbid",
)
model_config = ConfigDict(populate_by_name=True, extra="forbid")
call: Literal["each"] = Field(
description=(
"Action discriminator. Use each to run a sequence of actions for every "
"item in an input list."
),
examples=["each"],
)
in_: str = Field(
alias="in",
description="CEL expression that must evaluate to the list to iterate.",
examples=["state.rows"],
)
do: list[FlowEachInnerActionDefinition] = Field(
description=(
"Ordered inner actions to run for each item. Each entry must be a "
"single-key mapping naming that inner action."
),
examples=[
[
{"clean": {"call": "script", "code": "return item.strip()"}},
{"tag": {"call": "expression", "expr": "outputs.clean"}},
]
],
)
call: TypingLiteral["each"]
in_: str = Field(alias="in")
do: list[FlowEachInnerActionDefinition]
@model_validator(mode="after")
def _validate_inner_action_list(self) -> FlowEachActionDefinition:
@@ -552,7 +357,6 @@ FlowActionDefinition = (
| FlowToolActionDefinition
| FlowCrewActionDefinition
| FlowExpressionActionDefinition
| FlowScriptActionDefinition
| FlowEachActionDefinition
)
@@ -560,48 +364,14 @@ FlowActionDefinition = (
class FlowMethodDefinition(BaseModel):
"""Static definition of one Flow method and its execution roles."""
description: str | None = Field(
default=None,
description="Human-readable summary of what this method does.",
examples=["Normalize the incoming topic."],
)
do: FlowActionDefinition = Field(
description="Action executed when this method runs.",
examples=[{"call": "script", "code": "return input.strip()"}],
)
start: bool | FlowDefinitionCondition | None = Field(
default=None,
description=(
"Marks a start method. True starts unconditionally; a condition starts "
"when the kickoff inputs or events satisfy it."
),
examples=[True],
)
listen: FlowDefinitionCondition | None = Field(
default=None,
description="Trigger condition that runs this method after upstream events.",
examples=["seed", {"or": ["approved", "revise"]}],
)
router: bool = Field(
default=False,
description="Whether the method output should be treated as the next event name.",
examples=[True],
)
emit: list[str] | None = Field(
default=None,
description="Declared router events this method may emit.",
examples=[["approved", "revise"]],
)
human_feedback: FlowHumanFeedbackDefinition | None = Field(
default=None,
description="Optional human feedback step applied after the method action.",
examples=[{"message": "Review the research summary before publishing."}],
)
persist: FlowPersistenceDefinition | None = Field(
default=None,
description="Method-level persistence override.",
examples=[{"enabled": True}],
)
description: str | None = None
do: FlowActionDefinition
start: bool | FlowDefinitionCondition | None = None
listen: FlowDefinitionCondition | None = None
router: bool = False
emit: list[str] | None = None
human_feedback: FlowHumanFeedbackDefinition | None = None
persist: FlowPersistenceDefinition | None = None
@model_validator(mode="after")
def _canonicalize_human_feedback_routing(self) -> FlowMethodDefinition:
@@ -627,57 +397,19 @@ class FlowMethodDefinition(BaseModel):
class FlowDefinition(BaseModel):
"""Static, serializable definition of a Flow."""
model_config = ConfigDict(
populate_by_name=True,
arbitrary_types_allowed=True,
)
model_config = ConfigDict(populate_by_name=True, arbitrary_types_allowed=True)
schema_: Literal["crewai.flow/v1"] = Field(
default="crewai.flow/v1",
alias="schema",
description="Flow Definition schema identifier and version.",
examples=["crewai.flow/v1"],
)
name: str = Field(
description="Unique flow name used in logs, events, and traces.",
examples=["ResearchFlow"],
)
description: str | None = Field(
default=None,
description="Human-readable summary of the flow.",
examples=["Normalize a topic and prepare it for research."],
)
state: FlowStateDefinition | None = Field(
default=None,
description="State contract for kickoff inputs and runtime state.",
examples=[{"type": "dict", "default": {"topic": "AI agents"}}],
)
config: FlowConfigDefinition = Field(
default_factory=FlowConfigDefinition,
description="Serializable flow-level runtime configuration.",
examples=[{"stream": True, "max_method_calls": 20}],
)
persist: FlowPersistenceDefinition | None = Field(
default=None,
description="Flow-level persistence configuration.",
examples=[{"enabled": True}],
)
conversational: FlowConversationalDefinition | None = Field(
default=None,
description="Conversational flow configuration, when the flow supports chat.",
)
methods: dict[str, FlowMethodDefinition] = Field(
default_factory=dict,
description="Mapping of method names to method definitions.",
examples=[
{
"seed": {
"start": True,
"do": {"call": "expression", "expr": "state.topic"},
}
}
],
schema_: TypingLiteral["crewai.flow/v1"] = Field(
default="crewai.flow/v1", alias="schema"
)
name: str
description: str | None = None
state: FlowStateDefinition | None = None
config: FlowConfigDefinition = Field(default_factory=FlowConfigDefinition)
persist: FlowPersistenceDefinition | None = None
conversational: FlowConversationalDefinition | None = None
methods: dict[str, FlowMethodDefinition] = Field(default_factory=dict)
diagnostics: list[FlowDefinitionDiagnostic] = Field(default_factory=list)
@model_validator(mode="after")
def _validate_method_names(self) -> FlowDefinition:
@@ -704,9 +436,13 @@ class FlowDefinition(BaseModel):
@classmethod
def from_dict(cls, data: dict[str, Any]) -> FlowDefinition:
"""Load a definition from a dictionary."""
"""Load a definition from a dictionary and attach diagnostics."""
serialized_diagnostics = _deserialize_diagnostics(data.get("diagnostics", []))
definition = cls.model_validate(data)
log_flow_definition_issues(definition)
definition.diagnostics = _merge_diagnostics(
serialized_diagnostics, definition.validate_contract()
)
definition.log_diagnostics()
return definition
@classmethod
@@ -727,81 +463,122 @@ class FlowDefinition(BaseModel):
"""Return the JSON Schema for the Flow Definition contract."""
return cls.model_json_schema(by_alias=True)
def validate_contract(self) -> list[FlowDefinitionDiagnostic]:
"""Validate the static contract without rejecting dynamic routing."""
diagnostics: list[FlowDefinitionDiagnostic] = []
for method_name, method in self.methods.items():
path = f"methods.{method_name}"
if method.router and not method.is_start and method.listen is None:
diagnostics.append(
FlowDefinitionDiagnostic(
code="router_without_trigger",
severity="error",
path=path,
message="router: true requires either start or listen",
)
)
if method.emit and not method.router:
diagnostics.append(
FlowDefinitionDiagnostic(
code="emit_without_router",
path=f"{path}.emit",
message="emit is only used by routers to declare downstream events",
)
)
if method.human_feedback:
human_feedback_config = method.human_feedback
if human_feedback_config.emit and not human_feedback_config.llm:
diagnostics.append(
FlowDefinitionDiagnostic(
code="human_feedback_llm_required",
severity="error",
path=f"{path}.human_feedback.llm",
message="llm is required when human_feedback.emit is set",
)
)
if (
human_feedback_config.default_outcome is not None
and not human_feedback_config.emit
):
diagnostics.append(
FlowDefinitionDiagnostic(
code="human_feedback_default_requires_emit",
severity="error",
path=f"{path}.human_feedback.default_outcome",
message="default_outcome requires human_feedback.emit",
)
)
elif (
human_feedback_config.default_outcome is not None
and human_feedback_config.emit
):
if (
human_feedback_config.default_outcome
not in human_feedback_config.emit
):
diagnostics.append(
FlowDefinitionDiagnostic(
code="human_feedback_default_not_in_emit",
severity="error",
path=f"{path}.human_feedback.default_outcome",
message="default_outcome must be one of human_feedback.emit",
)
)
return diagnostics
def with_diagnostics(self) -> FlowDefinition:
"""Attach fresh diagnostics and return this definition."""
self.diagnostics = self.validate_contract()
self.log_diagnostics()
return self
def log_diagnostics(self) -> None:
"""Emit all attached diagnostics through the flow definition logger."""
_log_flow_definition_diagnostics(self.name, self.diagnostics)
def _log_flow_definition_diagnostics(
definition_name: str,
diagnostics: list[FlowDefinitionDiagnostic],
) -> None:
for diagnostic in diagnostics:
level = logging.ERROR if diagnostic.severity == "error" else logging.WARNING
path = f" at {diagnostic.path}" if diagnostic.path else ""
logger.log(
level,
"Flow definition diagnostic for %s%s [%s]: %s",
definition_name,
path,
diagnostic.code,
diagnostic.message,
)
def _deserialize_diagnostics(value: Any) -> list[FlowDefinitionDiagnostic]:
return [FlowDefinitionDiagnostic.model_validate(item) for item in value or []]
def _validate_step_name(name: str, *, field: str) -> None:
if not isinstance(name, str) or not _STEP_NAME_PATTERN.fullmatch(name):
raise ValueError(f"{field} must match {_STEP_NAME_PATTERN.pattern}")
def log_flow_definition_issues(definition: FlowDefinition) -> None:
for method_name, method in definition.methods.items():
path = f"methods.{method_name}"
if method.router and not method.is_start and method.listen is None:
_log_flow_definition_issue(
definition.name,
code="router_without_trigger",
severity="error",
path=path,
message="router: true requires either start or listen",
def _merge_diagnostics(
*diagnostic_groups: list[FlowDefinitionDiagnostic],
) -> list[FlowDefinitionDiagnostic]:
diagnostics: list[FlowDefinitionDiagnostic] = []
seen: set[tuple[str, str, str | None, str]] = set()
for group in diagnostic_groups:
for diagnostic in group:
key = (
diagnostic.code,
diagnostic.severity,
diagnostic.path,
diagnostic.message,
)
if method.emit and not method.router:
_log_flow_definition_issue(
definition.name,
code="emit_without_router",
path=f"{path}.emit",
message="emit is only used by routers to declare downstream events",
)
if method.human_feedback:
human_feedback_config = method.human_feedback
if human_feedback_config.emit and not human_feedback_config.llm:
_log_flow_definition_issue(
definition.name,
code="human_feedback_llm_required",
severity="error",
path=f"{path}.human_feedback.llm",
message="llm is required when human_feedback.emit is set",
)
if (
human_feedback_config.default_outcome is not None
and not human_feedback_config.emit
):
_log_flow_definition_issue(
definition.name,
code="human_feedback_default_requires_emit",
severity="error",
path=f"{path}.human_feedback.default_outcome",
message="default_outcome requires human_feedback.emit",
)
elif (
human_feedback_config.default_outcome is not None
and human_feedback_config.emit
and human_feedback_config.default_outcome
not in human_feedback_config.emit
):
_log_flow_definition_issue(
definition.name,
code="human_feedback_default_not_in_emit",
severity="error",
path=f"{path}.human_feedback.default_outcome",
message="default_outcome must be one of human_feedback.emit",
)
def _log_flow_definition_issue(
definition_name: str,
*,
code: str,
message: str,
severity: Literal["warning", "error"] = "warning",
path: str | None = None,
) -> None:
level = logging.ERROR if severity == "error" else logging.WARNING
location = f" at {path}" if path else ""
logger.log(
level,
"Flow definition issue for %s%s [%s]: %s",
definition_name,
location,
code,
message,
)
if key in seen:
continue
seen.add(key)
diagnostics.append(diagnostic)
return diagnostics

View File

@@ -121,7 +121,7 @@ from crewai.flow.human_feedback import (
)
from crewai.flow.input_provider import InputProvider
from crewai.flow.persistence.base import FlowPersistence
from crewai.flow.runtime._actions import FlowScriptExecutionDisabledError, build_action
from crewai.flow.runtime._actions import build_action
from crewai.flow.runtime._refs import resolve_instance_ref, resolve_ref
from crewai.flow.types import (
FlowExecutionData,
@@ -1090,8 +1090,6 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
def build(name: str, definition: FlowMethodDefinition) -> Callable[..., Any]:
try:
return build_action(self, definition.do)
except FlowScriptExecutionDisabledError:
raise
except Exception as e:
unresolved.append(f"{name}: {e}")
return lambda *args, **kwargs: None

View File

@@ -2,12 +2,10 @@
from __future__ import annotations
import ast
import asyncio
from collections.abc import Callable
import contextvars
import inspect
import os
from typing import TYPE_CHECKING, Any, Protocol, cast
from crewai.flow.flow_definition import (
@@ -17,11 +15,9 @@ from crewai.flow.flow_definition import (
FlowEachActionDefinition,
FlowEachInnerActionDefinition,
FlowExpressionActionDefinition,
FlowScriptActionDefinition,
FlowToolActionDefinition,
)
from crewai.flow.runtime._expressions import evaluate_expression, render_with_block
from crewai.flow.runtime._outputs import outputs_by_name
from crewai.flow.runtime._refs import InvalidRefError, resolve_ref
@@ -29,16 +25,10 @@ if TYPE_CHECKING:
from crewai.flow.runtime import Flow
__all__ = ["FlowScriptExecutionDisabledError", "build_action"]
__all__ = ["build_action"]
LocalContext = dict[str, Any]
_LOCAL_CONTEXT_KWARG = "__flow_definition_local_context"
_ALLOW_SCRIPT_EXECUTION_ENV_VAR = "CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION"
_TRUSTED_SCRIPT_EXECUTION_VALUES = frozenset({"1", "true", "yes"})
class FlowScriptExecutionDisabledError(RuntimeError):
"""Raised when a flow definition tries to execute inline script code."""
class _BuiltAction(Protocol):
@@ -150,67 +140,6 @@ class ExpressionAction:
)
class ScriptAction:
definition_type = FlowScriptActionDefinition
def __init__(self, flow: Flow[Any], definition: FlowScriptActionDefinition) -> None:
self.flow = flow
self.definition = definition
self.handler = self._compile_handler()
def run(self, *args: Any, **kwargs: Any) -> Any:
local_context = _pop_local_context(kwargs)
return self.handler(
state=self.flow.state,
outputs=outputs_by_name(
self.flow._method_outputs,
local_outputs=local_context.get("outputs") if local_context else None,
),
input=args[0] if args else None,
item=local_context.get("item") if local_context else None,
)
def _compile_handler(self) -> Callable[..., Any]:
raw = os.environ.get(_ALLOW_SCRIPT_EXECUTION_ENV_VAR, "")
if raw.strip().lower() not in _TRUSTED_SCRIPT_EXECUTION_VALUES:
raise FlowScriptExecutionDisabledError(
"Flow script execution is disabled by default. "
f"Set {_ALLOW_SCRIPT_EXECUTION_ENV_VAR}=1 to enable it only for "
"trusted flow definitions."
)
filename = f"crewai.flow.script.{self.flow._definition.name}"
module = ast.parse(self.definition.code, filename=filename)
function = ast.FunctionDef(
name="_flow_script",
args=ast.arguments(
posonlyargs=[],
args=[ast.arg(arg) for arg in ("state", "outputs", "input", "item")],
vararg=None,
kwonlyargs=[],
kw_defaults=[],
kwarg=None,
defaults=[],
),
body=module.body or [ast.Pass()],
decorator_list=[],
returns=None,
type_comment=None,
type_params=[],
)
module.body = [function]
ast.fix_missing_locations(module)
# The YAML here is trusted project source authored by the code owner,
# so this has the same trust boundary as using custom tools. We
# intentionally do not interpolate user input and runtime values are passed
# as function arguments. This is still arbitrary trusted Python execution,
# so it remains disabled by default behind `CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION`
namespace: dict[str, Any] = {"__name__": filename}
exec(compile(module, filename, "exec"), namespace) # nosec B102 # noqa: S102
return cast(Callable[..., Any], namespace["_flow_script"])
class EachAction:
definition_type = FlowEachActionDefinition
@@ -270,7 +199,6 @@ _ACTION_TYPES: tuple[_ActionType, ...] = (
ToolAction,
CrewAction,
ExpressionAction,
ScriptAction,
)

View File

@@ -7,7 +7,6 @@ import json
import re
from typing import TYPE_CHECKING, Any, cast
from crewai.flow.runtime._outputs import outputs_by_name
from crewai.utilities.serialization import to_serializable
@@ -45,12 +44,7 @@ def evaluate_expression(
def _expression_context(
flow: Flow[Any], local_context: dict[str, Any] | None = None
) -> dict[str, Any]:
local_outputs = local_context.get("outputs") if local_context else None
outputs = outputs_by_name(
flow._method_outputs,
local_outputs=local_outputs,
serialize=True,
)
outputs = _outputs_by_name(flow._method_outputs)
context: dict[str, Any] = {
"state": flow._copy_and_serialize_state(),
"outputs": outputs,
@@ -59,12 +53,29 @@ def _expression_context(
local_values = {
key: to_serializable(value, max_depth=0)
for key, value in local_context.items()
if key not in {"outputs", "state"}
}
local_outputs = local_values.pop("outputs", None)
local_values.pop("state", None)
context.update(local_values)
if local_outputs is not None:
if not isinstance(local_outputs, dict):
raise TypeError("flow definition local outputs must be a mapping")
context["outputs"] = {**outputs, **local_outputs}
return context
def _outputs_by_name(method_outputs: list[Any]) -> dict[str, Any]:
outputs: dict[str, Any] = {}
for entry in method_outputs:
method = ""
output = entry
if isinstance(entry, dict) and "output" in entry:
method = str(entry.get("method", ""))
output = entry["output"]
outputs[method] = to_serializable(output, max_depth=0)
return outputs
def _render_value(value: Any, context: dict[str, Any]) -> Any:
if isinstance(value, str):
return _render_string(value, context)

View File

@@ -1,40 +0,0 @@
"""Shared FlowDefinition runtime output helpers."""
from __future__ import annotations
from collections.abc import Mapping
from typing import Any, TypedDict
from crewai.utilities.serialization import to_serializable
class _MethodOutput(TypedDict):
method: str
output: Any
def outputs_by_name(
method_outputs: list[_MethodOutput],
*,
local_outputs: Mapping[str, Any] | None = None,
serialize: bool = False,
) -> dict[str, Any]:
outputs: dict[str, Any] = {}
for entry in method_outputs:
outputs[entry["method"]] = _output_value(entry["output"], serialize=serialize)
if local_outputs is not None:
outputs.update(
{
key: _output_value(output, serialize=serialize)
for key, output in local_outputs.items()
}
)
return outputs
def _output_value(value: Any, *, serialize: bool) -> Any:
if not serialize:
return value
return to_serializable(value, max_depth=0)

View File

@@ -645,11 +645,14 @@ class TestLegacyMethodOutputsRestore:
context = _expression_context(restored)
assert context["outputs"] == {"": "legacy"}
def test_raw_legacy_outputs_property_remains_readable(self) -> None:
def test_raw_legacy_outputs_remain_readable(self) -> None:
from crewai.flow.runtime._expressions import _expression_context
flow = Flow()
flow._method_outputs = ["legacy"]
assert flow.method_outputs == ["legacy"]
assert _expression_context(flow)["outputs"] == {"": "legacy"}
class TestAgentCheckpoint:

View File

@@ -44,6 +44,7 @@ def test_flow_public_exports_are_explicit():
"FlowCrewActionDefinition",
"FlowDefinition",
"FlowDefinitionCondition",
"FlowDefinitionDiagnostic",
"FlowDictStateDefinition",
"FlowEachActionDefinition",
"FlowEachInnerActionDefinition",
@@ -53,7 +54,6 @@ def test_flow_public_exports_are_explicit():
"FlowMethodDefinition",
"FlowPersistenceDefinition",
"FlowPydanticStateDefinition",
"FlowScriptActionDefinition",
"FlowStateDefinition",
"FlowToolActionDefinition",
"FlowUnknownStateDefinition",
@@ -62,108 +62,6 @@ def test_flow_public_exports_are_explicit():
assert "calculate_node_levels" not in flow_visualization.__all__
def test_flow_definition_json_schema_carries_reference_descriptions():
schema = flow_definition.FlowDefinition.json_schema()
defs = schema["$defs"]
assert schema["properties"]["schema"]["description"]
assert schema["properties"]["methods"]["description"]
assert "diagnostics" not in schema["properties"]
method_properties = defs["FlowMethodDefinition"]["properties"]
assert method_properties["do"]["description"] == "Action executed when this method runs."
assert "Trigger condition" in method_properties["listen"]["description"]
script_properties = defs["FlowScriptActionDefinition"]["properties"]
assert "trusted inline Python" in script_properties["call"]["description"]
assert "not interpolated" in script_properties["code"]["description"]
assert "not sandboxed" in script_properties["code"]["description"]
state_schema = next(
branch
for branch in schema["properties"]["state"]["anyOf"]
if "discriminator" in branch
)
assert state_schema["discriminator"]["propertyName"] == "type"
assert state_schema["discriminator"]["mapping"] == {
"dict": "#/$defs/FlowDictStateDefinition",
"json_schema": "#/$defs/FlowJsonSchemaStateDefinition",
"pydantic": "#/$defs/FlowPydanticStateDefinition",
"unknown": "#/$defs/FlowUnknownStateDefinition",
}
dict_state_properties = defs["FlowDictStateDefinition"]["properties"]
assert dict_state_properties["type"]["description"]
assert "ref" not in dict_state_properties
json_schema_state_properties = defs["FlowJsonSchemaStateDefinition"]["properties"]
assert json_schema_state_properties["json_schema"]["description"]
assert "json_schema" in defs["FlowJsonSchemaStateDefinition"]["required"]
pydantic_state_properties = defs["FlowPydanticStateDefinition"]["properties"]
assert "Fallback JSON Schema" in pydantic_state_properties["json_schema"][
"description"
]
each_properties = defs["FlowEachActionDefinition"]["properties"]
assert "list to iterate" in each_properties["in"]["description"]
assert "Ordered inner actions" in each_properties["do"]["description"]
def test_flow_definition_json_schema_carries_field_examples_only():
schema = flow_definition.FlowDefinition.json_schema()
defs = schema["$defs"]
for model_name in [
"FlowDefinition",
"FlowCodeActionDefinition",
"FlowToolActionDefinition",
"FlowCrewActionDefinition",
"FlowExpressionActionDefinition",
"FlowScriptActionDefinition",
"FlowEachActionDefinition",
"FlowMethodDefinition",
"FlowDictStateDefinition",
"FlowJsonSchemaStateDefinition",
"FlowPydanticStateDefinition",
"FlowUnknownStateDefinition",
"FlowConfigDefinition",
"FlowPersistenceDefinition",
"FlowHumanFeedbackDefinition",
]:
model_schema = schema if model_name == "FlowDefinition" else defs[model_name]
assert "examples" not in model_schema
assert schema["properties"]["name"]["examples"] == ["ResearchFlow"]
assert schema["properties"]["schema"]["examples"] == ["crewai.flow/v1"]
assert schema["properties"]["methods"]["examples"][0]["seed"]["do"] == {
"call": "expression",
"expr": "state.topic",
}
script_properties = defs["FlowScriptActionDefinition"]["properties"]
assert script_properties["call"]["examples"] == ["script"]
assert "input.strip()" in script_properties["code"]["examples"][0]
assert script_properties["language"]["examples"] == ["python"]
action_properties = defs["FlowCodeActionDefinition"]["properties"]
assert action_properties["ref"]["examples"] == [
"my_project.flows:normalize_topic"
]
assert action_properties["with"]["examples"] == [{"topic": "${state.topic}"}]
each_properties = defs["FlowEachActionDefinition"]["properties"]
assert each_properties["in"]["examples"] == ["state.rows"]
assert each_properties["do"]["examples"][0][0]["clean"]["call"] == "script"
method_properties = defs["FlowMethodDefinition"]["properties"]
assert method_properties["listen"]["examples"] == [
"seed",
{"or": ["approved", "revise"]},
]
assert method_properties["emit"]["examples"] == [["approved", "revise"]]
def test_flow_state_definition_uses_discriminated_branches():
definition = flow_definition.FlowDefinition.model_validate(
{
@@ -335,7 +233,7 @@ def test_flow_definition_maps_dsl_to_static_contract():
assert review.human_feedback.learn_strict is True
assert definition.methods["audit"].listen == {"and": ["begin", "process"]}
assert "diagnostics" not in definition.to_dict()
assert definition.diagnostics == []
def test_flow_definition_excludes_conversational_builtins_for_regular_flows():
@@ -417,8 +315,7 @@ def test_flow_definition_uses_collapsed_conversational_router_start():
assert methods["route_conversation"].router is True
def test_flow_definition_serializes_human_feedback_metadata(caplog):
caplog.set_level(logging.WARNING, logger="crewai.flow.dsl._utils")
def test_flow_definition_serializes_human_feedback_metadata():
marker = object()
class MetadataFlow(Flow):
@@ -437,9 +334,9 @@ def test_flow_definition_serializes_human_feedback_metadata(caplog):
assert review.human_feedback is not None
assert review.human_feedback.metadata == {"ref": "builtins:dict"}
assert any(
"methods.review.human_feedback.metadata" in record.message
and "not fully serializable" in record.message
for record in caplog.records
diagnostic.code == "non_serializable_value"
and diagnostic.path == "methods.review.human_feedback.metadata"
for diagnostic in definition.diagnostics
)
definition.to_json()
@@ -678,6 +575,7 @@ def test_flow_definition_allows_dynamic_router_emit():
definition = DynamicRouterFlow.flow_definition()
assert definition.methods["decide"].emit is None
assert definition.diagnostics == []
def test_flow_definition_infers_literal_router_emit():
@@ -830,15 +728,16 @@ def test_flow_definition_accepts_explicit_router_events():
assert definition.methods["decide"].emit == ["left", "right"]
def test_flow_definition_ignores_legacy_diagnostics_loaded_from_contract():
def test_flow_definition_preserves_diagnostics_loaded_from_contract():
definition = flow_definition.FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "LoadedDiagnosticsFlow",
"methods": {
"begin": {
"do": {"ref": "loaded_flows:LoadedDiagnosticsFlow.begin"},
"start": True,
"decision": {
"do": {"ref": "loaded_flows:LoadedDiagnosticsFlow.decision"},
"router": True,
"emit": ["continue"],
}
},
"diagnostics": [
@@ -858,13 +757,13 @@ def test_flow_definition_ignores_legacy_diagnostics_loaded_from_contract():
}
)
assert "diagnostics" not in definition.to_dict()
codes = [diagnostic.code for diagnostic in definition.diagnostics]
assert "serialized_warning" in codes
assert codes.count("router_without_trigger") == 1
def test_router_start_false_without_listen_logs_missing_trigger(caplog):
caplog.set_level(logging.ERROR, logger="crewai.flow.flow_definition")
flow_definition.FlowDefinition.from_dict(
def test_router_start_false_without_listen_reports_missing_trigger():
definition = flow_definition.FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "LoadedFlow",
@@ -880,10 +779,9 @@ def test_router_start_false_without_listen_logs_missing_trigger(caplog):
)
assert any(
record.levelno == logging.ERROR
and "router_without_trigger" in record.message
and "methods.decision" in record.message
for record in caplog.records
diagnostic.code == "router_without_trigger"
and diagnostic.path == "methods.decision"
for diagnostic in definition.diagnostics
)
@@ -911,7 +809,7 @@ def test_router_human_feedback_preserves_existing_router_metadata():
assert method.human_feedback is not None
def test_dynamic_router_flow_definition_allows_dynamic_emit():
def test_dynamic_router_flow_definition_has_no_diagnostics():
class LazyDynamicRouterFlow(Flow):
@start()
def begin(self):
@@ -922,7 +820,7 @@ def test_dynamic_router_flow_definition_allows_dynamic_emit():
return self.state["dynamic_event"]
definition = LazyDynamicRouterFlow.flow_definition()
assert definition.methods["decide"].emit is None
assert definition.diagnostics == []
def test_dynamic_router_string_listener_is_valid_contract():
@@ -941,7 +839,7 @@ def test_dynamic_router_string_listener_is_valid_contract():
definition = DynamicRouterListenerFlow.flow_definition()
assert definition.methods["handle"].listen == "dynamic_event"
assert definition.diagnostics == []
def test_static_string_listener_is_allowed_by_contract():
@@ -961,7 +859,7 @@ def test_static_string_listener_is_allowed_by_contract():
},
}
)
assert definition.methods["handle"].listen == "begni"
assert definition.diagnostics == []
def test_start_false_not_classified_as_start_method():
@@ -1026,10 +924,10 @@ def test_flow_definition_cache_is_not_reused_by_subclasses():
assert set(child_definition.methods) == {"child_step"}
def test_flow_definition_logs_validation_issues_when_loaded_from_contract(caplog):
def test_flow_definition_logs_diagnostics_when_loaded_from_contract(caplog):
caplog.set_level(logging.WARNING, logger="crewai.flow.flow_definition")
flow_definition.FlowDefinition.from_dict(
definition = flow_definition.FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "LoadedFlow",
@@ -1043,6 +941,10 @@ def test_flow_definition_logs_validation_issues_when_loaded_from_contract(caplog
}
)
assert any(
diagnostic.code == "router_without_trigger"
for diagnostic in definition.diagnostics
)
assert any(
record.levelno == logging.ERROR
and "LoadedFlow" in record.message

View File

@@ -26,7 +26,6 @@ from crewai.flow.flow_config import flow_config
from crewai.flow.flow_definition import FlowConfigDefinition, FlowDefinition
from crewai.flow.persistence import persist
from crewai.flow.persistence.base import FlowPersistence
from crewai.flow.runtime._actions import FlowScriptExecutionDisabledError
from crewai.state.checkpoint_config import CheckpointConfig
from crewai.tools import BaseTool
from crewai.types.streaming import FlowStreamingOutput
@@ -1146,117 +1145,6 @@ methods:
assert flow.kickoff(inputs={"rows": ["a", "b"]}) == ["async:a", "async:b"]
def test_script_action_requires_explicit_opt_in():
yaml_str = """
schema: crewai.flow/v1
name: ScriptFlow
methods:
normalize:
do:
call: script
code: |
return "blocked"
start: true
"""
with pytest.raises(
FlowScriptExecutionDisabledError,
match="CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION=1",
) as exc_info:
Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
assert "methods with unresolvable actions" not in str(exc_info.value)
def test_script_action_runs_python_imports_mutates_state_and_returns_value(
monkeypatch: pytest.MonkeyPatch,
):
monkeypatch.setenv("CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION", "1")
yaml_str = """
schema: crewai.flow/v1
name: ScriptFlow
methods:
normalize:
do:
call: script
code: |
import math
state["rounded"] = math.ceil(state["raw_score"])
return f"rounded:{state['rounded']}"
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
assert flow.kickoff(inputs={"raw_score": 3.2}) == "rounded:4"
assert flow.state["rounded"] == 4
def test_script_listener_reads_trigger_input_and_outputs(
monkeypatch: pytest.MonkeyPatch,
):
monkeypatch.setenv("CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION", "1")
yaml_str = """
schema: crewai.flow/v1
name: ScriptFlow
methods:
seed:
do:
call: expression
expr: "'alpha'"
start: true
combine:
do:
call: script
code: |
state["input_matches_output"] = input == outputs["seed"]
return f"{outputs['seed']}:{input}"
listen: seed
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
assert flow.kickoff() == "alpha:alpha"
assert flow.state["input_matches_output"] is True
def test_script_each_action_reads_item_and_inner_outputs(
monkeypatch: pytest.MonkeyPatch,
):
monkeypatch.setenv("CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION", "1")
yaml_str = """
schema: crewai.flow/v1
name: ScriptEachFlow
methods:
seed:
do:
call: expression
expr: "'global'"
start: true
process_rows:
do:
call: each
in: state.rows
do:
- clean:
call: script
code: |
return item.strip()
- tag:
call: script
code: |
return f"{outputs['seed']}:{outputs['clean']}"
listen: seed
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
assert flow.kickoff(inputs={"rows": [" a ", " b "]}) == ["global:a", "global:b"]
def test_each_action_uses_iteration_outputs_between_nested_actions():
yaml_str = f"""
schema: crewai.flow/v1

View File

@@ -1,3 +1,3 @@
"""CrewAI development tools."""
__version__ = "1.14.8a"
__version__ = "1.14.7"