mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-06-12 03:38:11 +00:00
Compare commits
8 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
373dca3d04 | ||
|
|
21fa8e32d9 | ||
|
|
f18c03cd8f | ||
|
|
50b9c02272 | ||
|
|
c55334be5f | ||
|
|
05a2ba9ca4 | ||
|
|
fbafe1f0d3 | ||
|
|
5267c059f5 |
@@ -4,6 +4,74 @@ description: "تحديثات المنتج والتحسينات وإصلاحات
|
||||
icon: "clock"
|
||||
mode: "wide"
|
||||
---
|
||||
<Update label="11 يونيو 2026">
|
||||
## v1.14.7
|
||||
|
||||
[عرض الإصدار على GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7)
|
||||
|
||||
## ما الذي تغير
|
||||
|
||||
### الميزات
|
||||
- إضافة واجهات خلفية افتراضية قابلة للتوصيل للذاكرة، والمعرفة، وrag، وflow.
|
||||
- عرض السبب الحقيقي للإنهاء، ومعلمات العينة، وresponse.id في أحداث LLM.
|
||||
- تصنيف مشغلات DSL كزخارف واعية للمسار.
|
||||
- إضافة واجهة برمجة تطبيقات الدردشة لتدفقات المحادثة.
|
||||
- جعل واجهة القفل قابلة للتجاوز.
|
||||
- بناء FlowDefinition من بيانات التعريف الخاصة بـ Flow DSL.
|
||||
- إضافة مزود LLM من Snowflake Cortex الأصلي.
|
||||
- إضافة دعم لملفات الوكلاء المدربين من crew.
|
||||
|
||||
### إصلاحات الأخطاء
|
||||
- إصلاح نقطة التحقق لإعادة بناء BaseLLM مخصص كـ LLM ملموس عند الاستعادة.
|
||||
- تقييد الاستعادة على علامة لمنع اللقطات الحية من إعادة التشغيل كاستئناف.
|
||||
- تحديد حالة وقت التشغيل لكل تشغيل للحد من النمو وعزل التشغيل المتزامن.
|
||||
- إصلاح إعدادات التتبع على crewai-login.
|
||||
- احترام suppress_flow_events لأحداث تنفيذ الطريقة.
|
||||
- استعادة [project.scripts] في حزمة crewai لتثبيت أداة uv.
|
||||
- حل مشكلات CVE الخاصة بـ pip-audit لـ aiohttp وdocling وdocling-core.
|
||||
- إصلاح إدخال الملفات الذي لا يعمل بشكل موثوق.
|
||||
- إصلاح تاريخ نتائج أدوات Snowflake Claude غير المكتملة.
|
||||
|
||||
### الوثائق
|
||||
- تحديث سجل التغييرات والإصدار لـ v1.14.7.
|
||||
- تحديث وثائق جامع OpenTelemetry.
|
||||
- تحديث دليل NVIDIA Nemotron LLM.
|
||||
- إضافة دليل تكامل Databricks.
|
||||
- إضافة دليل تكامل Snowflake.
|
||||
|
||||
### الأداء
|
||||
- تحسين سرعة استيراد crewai من خلال تحميل مستندات docling بشكل كسول.
|
||||
|
||||
### إعادة الهيكلة
|
||||
- تبسيط تقييم شروط التدفق ليكون بلا حالة لكل حدث.
|
||||
- فصل منطق المحادثة عن وقت التشغيل وإضافة تعريف المحادثة.
|
||||
- تقسيم `flow.py` إلى DSL، وتعريف، ووقت تشغيل.
|
||||
|
||||
## المساهمون
|
||||
|
||||
@Luzk, @alex-clawd, @devin-ai-integration[bot], @greysonlalonde, @gvieira, @jessemiller, @lorenzejay, @lucasgomide, @mattatcha, @vinibrsl
|
||||
|
||||
</Update>
|
||||
|
||||
<Update label="10 يونيو 2026">
|
||||
## v1.14.7rc2
|
||||
|
||||
[عرض الإصدار على GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7rc2)
|
||||
|
||||
## ما الذي تغير
|
||||
|
||||
### إصلاحات الأخطاء
|
||||
- استعادة البوابة على علامة لمنع اللقطات الحية من إعادة التشغيل كاستئناف
|
||||
|
||||
### الوثائق
|
||||
- تحديث سجل التغييرات والإصدار لـ v1.14.7rc1
|
||||
|
||||
## المساهمون
|
||||
|
||||
@greysonlalonde
|
||||
|
||||
</Update>
|
||||
|
||||
<Update label="10 يونيو 2026">
|
||||
## v1.14.7rc1
|
||||
|
||||
|
||||
2137
docs/docs.json
2137
docs/docs.json
File diff suppressed because it is too large
Load Diff
@@ -4,6 +4,74 @@ description: "Product updates, improvements, and bug fixes for CrewAI"
|
||||
icon: "clock"
|
||||
mode: "wide"
|
||||
---
|
||||
<Update label="Jun 11, 2026">
|
||||
## v1.14.7
|
||||
|
||||
[View release on GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7)
|
||||
|
||||
## What's Changed
|
||||
|
||||
### Features
|
||||
- Add pluggable default backends for memory, knowledge, rag, and flow.
|
||||
- Surface real finish_reason, sampling params, and response.id on LLM events.
|
||||
- Type DSL triggers as route-aware decorators.
|
||||
- Add chat API for conversational flows.
|
||||
- Make locking backend overridable.
|
||||
- Build FlowDefinition from Flow DSL metadata.
|
||||
- Add native Snowflake Cortex LLM provider.
|
||||
- Add crew trained agents file support.
|
||||
|
||||
### Bug Fixes
|
||||
- Fix checkpoint to rebuild custom BaseLLM as concrete LLM on restore.
|
||||
- Gate restore on a flag to prevent live snapshots from replaying as resume.
|
||||
- Scope runtime state per run to bound growth and isolate concurrent runs.
|
||||
- Fix telemetry setup on crewai-login.
|
||||
- Respect suppress_flow_events for method-execution events.
|
||||
- Restore [project.scripts] in crewai package for uv tool install.
|
||||
- Resolve pip-audit CVEs for aiohttp, docling, and docling-core.
|
||||
- Fix file input not working reliably.
|
||||
- Fix Snowflake Claude incomplete tool result histories.
|
||||
|
||||
### Documentation
|
||||
- Update changelog and version for v1.14.7.
|
||||
- Update OpenTelemetry collector documentation.
|
||||
- Update NVIDIA Nemotron LLM guide.
|
||||
- Add Databricks integration guide.
|
||||
- Add Snowflake integration guide.
|
||||
|
||||
### Performance
|
||||
- Improve crewai import speed by lazy-loading docling imports.
|
||||
|
||||
### Refactoring
|
||||
- Simplify flow condition evaluation to be stateless per event.
|
||||
- Decouple convo logic from runtime and add a conversational_definition.
|
||||
- Split `flow.py` into DSL, definition, and runtime.
|
||||
|
||||
## Contributors
|
||||
|
||||
@Luzk, @alex-clawd, @devin-ai-integration[bot], @greysonlalonde, @gvieira, @jessemiller, @lorenzejay, @lucasgomide, @mattatcha, @vinibrsl
|
||||
|
||||
</Update>
|
||||
|
||||
<Update label="Jun 10, 2026">
|
||||
## v1.14.7rc2
|
||||
|
||||
[View release on GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7rc2)
|
||||
|
||||
## What's Changed
|
||||
|
||||
### Bug Fixes
|
||||
- Gate restore on a flag to prevent live snapshots from replaying as resume
|
||||
|
||||
### Documentation
|
||||
- Update changelog and version for v1.14.7rc1
|
||||
|
||||
## Contributors
|
||||
|
||||
@greysonlalonde
|
||||
|
||||
</Update>
|
||||
|
||||
<Update label="Jun 10, 2026">
|
||||
## v1.14.7rc1
|
||||
|
||||
|
||||
@@ -4,6 +4,74 @@ description: "CrewAI의 제품 업데이트, 개선 사항 및 버그 수정"
|
||||
icon: "clock"
|
||||
mode: "wide"
|
||||
---
|
||||
<Update label="2026년 6월 11일">
|
||||
## v1.14.7
|
||||
|
||||
[GitHub 릴리스 보기](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7)
|
||||
|
||||
## 변경 사항
|
||||
|
||||
### 기능
|
||||
- 메모리, 지식, RAG 및 흐름에 대한 플러그 가능한 기본 백엔드를 추가했습니다.
|
||||
- LLM 이벤트에서 실제 finish_reason, 샘플링 매개변수 및 response.id를 표시합니다.
|
||||
- 경로 인식 장식자로서의 타입 DSL 트리거를 설정합니다.
|
||||
- 대화 흐름을 위한 채팅 API를 추가했습니다.
|
||||
- 잠금 백엔드를 재정의 가능하도록 만듭니다.
|
||||
- Flow DSL 메타데이터에서 FlowDefinition을 빌드합니다.
|
||||
- 네이티브 Snowflake Cortex LLM 공급자를 추가했습니다.
|
||||
- 훈련된 에이전트 파일 지원을 추가했습니다.
|
||||
|
||||
### 버그 수정
|
||||
- 복원 시 사용자 정의 BaseLLM을 구체적인 LLM으로 재구성하도록 체크포인트를 수정했습니다.
|
||||
- 라이브 스냅샷이 재개로 재생되지 않도록 플래그를 사용하여 복원을 제한합니다.
|
||||
- 실행마다 런타임 상태의 범위를 설정하여 성장을 제한하고 동시 실행을 격리합니다.
|
||||
- crewai-login에서 텔레메트리 설정을 수정했습니다.
|
||||
- 메서드 실행 이벤트에 대해 suppress_flow_events를 존중합니다.
|
||||
- uv 도구 설치를 위해 crewai 패키지에서 [project.scripts]를 복원합니다.
|
||||
- aiohttp, docling 및 docling-core에 대한 pip-audit CVE를 해결합니다.
|
||||
- 파일 입력이 신뢰할 수 없게 작동하는 문제를 수정했습니다.
|
||||
- Snowflake Claude의 불완전한 도구 결과 기록을 수정했습니다.
|
||||
|
||||
### 문서
|
||||
- v1.14.7에 대한 변경 로그 및 버전을 업데이트했습니다.
|
||||
- OpenTelemetry 수집기 문서를 업데이트했습니다.
|
||||
- NVIDIA Nemotron LLM 가이드를 업데이트했습니다.
|
||||
- Databricks 통합 가이드를 추가했습니다.
|
||||
- Snowflake 통합 가이드를 추가했습니다.
|
||||
|
||||
### 성능
|
||||
- docling 가져오기를 지연 로딩하여 crewai 가져오기 속도를 개선했습니다.
|
||||
|
||||
### 리팩토링
|
||||
- 흐름 조건 평가를 이벤트별로 상태 비저장으로 단순화했습니다.
|
||||
- 대화 논리를 런타임에서 분리하고 conversational_definition을 추가했습니다.
|
||||
- `flow.py`를 DSL, 정의 및 런타임으로 분리했습니다.
|
||||
|
||||
## 기여자
|
||||
|
||||
@Luzk, @alex-clawd, @devin-ai-integration[bot], @greysonlalonde, @gvieira, @jessemiller, @lorenzejay, @lucasgomide, @mattatcha, @vinibrsl
|
||||
|
||||
</Update>
|
||||
|
||||
<Update label="2026년 6월 10일">
|
||||
## v1.14.7rc2
|
||||
|
||||
[GitHub 릴리스 보기](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7rc2)
|
||||
|
||||
## 변경 사항
|
||||
|
||||
### 버그 수정
|
||||
- 라이브 스냅샷이 재개로 재생되는 것을 방지하기 위한 플래그에서 게이트 복원
|
||||
|
||||
### 문서
|
||||
- v1.14.7rc1에 대한 변경 로그 및 버전 업데이트
|
||||
|
||||
## 기여자
|
||||
|
||||
@greysonlalonde
|
||||
|
||||
</Update>
|
||||
|
||||
<Update label="2026년 6월 10일">
|
||||
## v1.14.7rc1
|
||||
|
||||
|
||||
@@ -4,6 +4,74 @@ description: "Atualizações de produto, melhorias e correções do CrewAI"
|
||||
icon: "clock"
|
||||
mode: "wide"
|
||||
---
|
||||
<Update label="11 jun 2026">
|
||||
## v1.14.7
|
||||
|
||||
[Ver release no GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7)
|
||||
|
||||
## O que Mudou
|
||||
|
||||
### Recursos
|
||||
- Adicionar backends padrão plugáveis para memória, conhecimento, rag e fluxo.
|
||||
- Exibir o verdadeiro finish_reason, parâmetros de amostragem e response.id em eventos LLM.
|
||||
- Tipar os gatilhos DSL como decoradores cientes de rotas.
|
||||
- Adicionar API de chat para fluxos de conversa.
|
||||
- Tornar o backend de bloqueio substituível.
|
||||
- Construir FlowDefinition a partir de metadados Flow DSL.
|
||||
- Adicionar provedor nativo Snowflake Cortex LLM.
|
||||
- Adicionar suporte a arquivos de agentes treinados pela equipe.
|
||||
|
||||
### Correções de Bugs
|
||||
- Corrigir checkpoint para reconstruir BaseLLM personalizado como LLM concreto na restauração.
|
||||
- Controlar a restauração com uma flag para evitar que snapshots ao vivo sejam reproduzidos como retomar.
|
||||
- Escopar o estado de execução por execução para limitar o crescimento e isolar execuções concorrentes.
|
||||
- Corrigir configuração de telemetria no crewai-login.
|
||||
- Respeitar suppress_flow_events para eventos de execução de método.
|
||||
- Restaurar [project.scripts] no pacote crewai para instalação da ferramenta uv.
|
||||
- Resolver CVEs de pip-audit para aiohttp, docling e docling-core.
|
||||
- Corrigir entrada de arquivo que não estava funcionando de forma confiável.
|
||||
- Corrigir histórias de resultados de ferramentas incompletas do Snowflake Claude.
|
||||
|
||||
### Documentação
|
||||
- Atualizar changelog e versão para v1.14.7.
|
||||
- Atualizar documentação do coletor OpenTelemetry.
|
||||
- Atualizar guia do LLM NVIDIA Nemotron.
|
||||
- Adicionar guia de integração do Databricks.
|
||||
- Adicionar guia de integração do Snowflake.
|
||||
|
||||
### Desempenho
|
||||
- Melhorar a velocidade de importação do crewai através do carregamento preguiçoso de imports do docling.
|
||||
|
||||
### Refatoração
|
||||
- Simplificar a avaliação de condições de fluxo para ser sem estado por evento.
|
||||
- Desacoplar a lógica de conversa da execução e adicionar uma conversational_definition.
|
||||
- Dividir `flow.py` em DSL, definição e execução.
|
||||
|
||||
## Contribuidores
|
||||
|
||||
@Luzk, @alex-clawd, @devin-ai-integration[bot], @greysonlalonde, @gvieira, @jessemiller, @lorenzejay, @lucasgomide, @mattatcha, @vinibrsl
|
||||
|
||||
</Update>
|
||||
|
||||
<Update label="10 jun 2026">
|
||||
## v1.14.7rc2
|
||||
|
||||
[Ver release no GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7rc2)
|
||||
|
||||
## O que Mudou
|
||||
|
||||
### Correções de Bugs
|
||||
- Restauração de portão em uma flag para evitar que snapshots ao vivo sejam reproduzidos como retomar
|
||||
|
||||
### Documentação
|
||||
- Atualizar changelog e versão para v1.14.7rc1
|
||||
|
||||
## Contributors
|
||||
|
||||
@greysonlalonde
|
||||
|
||||
</Update>
|
||||
|
||||
<Update label="10 jun 2026">
|
||||
## v1.14.7rc1
|
||||
|
||||
|
||||
@@ -8,7 +8,7 @@ authors = [
|
||||
]
|
||||
requires-python = ">=3.10, <3.14"
|
||||
dependencies = [
|
||||
"crewai-core==1.14.7rc1",
|
||||
"crewai-core==1.14.7",
|
||||
"click>=8.1.7,<9",
|
||||
"pydantic>=2.11.9,<2.13",
|
||||
"pydantic-settings~=2.10.1",
|
||||
|
||||
@@ -1 +1 @@
|
||||
__version__ = "1.14.7rc1"
|
||||
__version__ = "1.14.7"
|
||||
|
||||
@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
|
||||
authors = [{ name = "Your Name", email = "you@example.com" }]
|
||||
requires-python = ">=3.10,<3.14"
|
||||
dependencies = [
|
||||
"crewai[tools]==1.14.7rc1"
|
||||
"crewai[tools]==1.14.7"
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
|
||||
@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
|
||||
authors = [{ name = "Your Name", email = "you@example.com" }]
|
||||
requires-python = ">=3.10,<3.14"
|
||||
dependencies = [
|
||||
"crewai[tools]==1.14.7rc1"
|
||||
"crewai[tools]==1.14.7"
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
|
||||
@@ -5,7 +5,7 @@ description = "Power up your crews with {{folder_name}}"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.10,<3.14"
|
||||
dependencies = [
|
||||
"crewai[tools]==1.14.7rc1"
|
||||
"crewai[tools]==1.14.7"
|
||||
]
|
||||
|
||||
[tool.crewai]
|
||||
|
||||
@@ -1 +1 @@
|
||||
__version__ = "1.14.7rc1"
|
||||
__version__ = "1.14.7"
|
||||
|
||||
@@ -152,4 +152,4 @@ __all__ = [
|
||||
"wrap_file_source",
|
||||
]
|
||||
|
||||
__version__ = "1.14.7rc1"
|
||||
__version__ = "1.14.7"
|
||||
|
||||
@@ -10,7 +10,7 @@ requires-python = ">=3.10, <3.14"
|
||||
dependencies = [
|
||||
"pytube~=15.0.0",
|
||||
"requests>=2.33.0,<3",
|
||||
"crewai==1.14.7rc1",
|
||||
"crewai==1.14.7",
|
||||
"tiktoken>=0.8.0,<0.13",
|
||||
"beautifulsoup4~=4.13.4",
|
||||
"python-docx~=1.2.0",
|
||||
|
||||
@@ -330,4 +330,4 @@ __all__ = [
|
||||
"ZapierActionTools",
|
||||
]
|
||||
|
||||
__version__ = "1.14.7rc1"
|
||||
__version__ = "1.14.7"
|
||||
|
||||
@@ -8,8 +8,8 @@ authors = [
|
||||
]
|
||||
requires-python = ">=3.10, <3.14"
|
||||
dependencies = [
|
||||
"crewai-core==1.14.7rc1",
|
||||
"crewai-cli==1.14.7rc1",
|
||||
"crewai-core==1.14.7",
|
||||
"crewai-cli==1.14.7",
|
||||
# Core Dependencies
|
||||
"pydantic>=2.11.9,<2.13",
|
||||
"openai>=2.30.0,<3",
|
||||
@@ -54,7 +54,7 @@ Repository = "https://github.com/crewAIInc/crewAI"
|
||||
|
||||
[project.optional-dependencies]
|
||||
tools = [
|
||||
"crewai-tools==1.14.7rc1",
|
||||
"crewai-tools==1.14.7",
|
||||
]
|
||||
embeddings = [
|
||||
"tiktoken>=0.8.0,<0.13"
|
||||
|
||||
@@ -48,7 +48,7 @@ def _suppress_pydantic_deprecation_warnings() -> None:
|
||||
|
||||
_suppress_pydantic_deprecation_warnings()
|
||||
|
||||
__version__ = "1.14.7rc1"
|
||||
__version__ = "1.14.7"
|
||||
|
||||
_LAZY_IMPORTS: dict[str, tuple[str, str]] = {
|
||||
"Memory": ("crewai.memory.unified_memory", "Memory"),
|
||||
|
||||
@@ -82,6 +82,7 @@ _LLM_TYPE_REGISTRY: dict[str, str] = {
|
||||
def _validate_llm_ref(value: Any) -> Any:
|
||||
if isinstance(value, dict):
|
||||
import importlib
|
||||
import inspect
|
||||
|
||||
llm_type = value.get("llm_type")
|
||||
if not llm_type or llm_type not in _LLM_TYPE_REGISTRY:
|
||||
@@ -92,6 +93,12 @@ def _validate_llm_ref(value: Any) -> Any:
|
||||
dotted = _LLM_TYPE_REGISTRY[llm_type]
|
||||
mod_path, cls_name = dotted.rsplit(".", 1)
|
||||
cls = getattr(importlib.import_module(mod_path), cls_name)
|
||||
if inspect.isabstract(cls):
|
||||
from crewai.llm import LLM
|
||||
|
||||
return LLM(
|
||||
**{k: v for k, v in value.items() if v is not None and k != "llm_type"}
|
||||
)
|
||||
return cls(**value)
|
||||
return value
|
||||
|
||||
|
||||
@@ -47,7 +47,7 @@ from crewai.flow.conversation import (
|
||||
receive_user_message as _receive_user_message,
|
||||
)
|
||||
from crewai.flow.dsl import listen, start
|
||||
from crewai.flow.dsl._utils import _set_flow_method_definition
|
||||
from crewai.flow.dsl._utils import _method_action, _set_flow_method_definition
|
||||
from crewai.flow.flow_definition import FlowMethodDefinition
|
||||
from crewai.utilities.types import LLMMessage
|
||||
|
||||
@@ -78,7 +78,7 @@ def _conversation_start_router(func: Callable[..., Any]) -> Any:
|
||||
wrapper = start()(func)
|
||||
_set_flow_method_definition(
|
||||
cast(Any, wrapper),
|
||||
FlowMethodDefinition(start=True, router=True),
|
||||
FlowMethodDefinition(do=_method_action(func), start=True, router=True),
|
||||
)
|
||||
return wrapper
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ from crewai.flow.dsl._types import FlowMethodDecorator, FlowTrigger
|
||||
from crewai.flow.dsl._utils import (
|
||||
P,
|
||||
R,
|
||||
_method_action,
|
||||
_set_flow_method_definition,
|
||||
)
|
||||
from crewai.flow.flow_definition import FlowMethodDefinition
|
||||
@@ -45,7 +46,11 @@ def listen(condition: FlowTrigger) -> FlowMethodDecorator:
|
||||
wrapper = ListenMethod(func)
|
||||
|
||||
_set_flow_method_definition(
|
||||
wrapper, FlowMethodDefinition(listen=_to_definition_condition(condition))
|
||||
wrapper,
|
||||
FlowMethodDefinition(
|
||||
do=_method_action(func),
|
||||
listen=_to_definition_condition(condition),
|
||||
),
|
||||
)
|
||||
return wrapper
|
||||
|
||||
|
||||
@@ -19,6 +19,7 @@ from crewai.flow.dsl._types import FlowMethodDecorator, FlowTrigger
|
||||
from crewai.flow.dsl._utils import (
|
||||
P,
|
||||
R,
|
||||
_method_action,
|
||||
_set_flow_method_definition,
|
||||
)
|
||||
from crewai.flow.flow_definition import FlowMethodDefinition
|
||||
@@ -148,6 +149,7 @@ def router(
|
||||
_set_flow_method_definition(
|
||||
wrapper,
|
||||
FlowMethodDefinition(
|
||||
do=_method_action(func),
|
||||
listen=_to_definition_condition(condition),
|
||||
router=True,
|
||||
emit=router_events or None,
|
||||
|
||||
@@ -8,6 +8,7 @@ from crewai.flow.dsl._types import FlowMethodDecorator, FlowTrigger
|
||||
from crewai.flow.dsl._utils import (
|
||||
P,
|
||||
R,
|
||||
_method_action,
|
||||
_set_flow_method_definition,
|
||||
)
|
||||
from crewai.flow.flow_definition import FlowMethodDefinition
|
||||
@@ -53,13 +54,17 @@ def start(
|
||||
def decorator(func: Callable[P, R]) -> StartMethod[P, R]:
|
||||
wrapper = StartMethod(func)
|
||||
|
||||
if condition is not None:
|
||||
_set_flow_method_definition(
|
||||
wrapper,
|
||||
FlowMethodDefinition(start=_to_definition_condition(condition)),
|
||||
)
|
||||
else:
|
||||
_set_flow_method_definition(wrapper, FlowMethodDefinition(start=True))
|
||||
_set_flow_method_definition(
|
||||
wrapper,
|
||||
FlowMethodDefinition(
|
||||
do=_method_action(func),
|
||||
start=(
|
||||
_to_definition_condition(condition)
|
||||
if condition is not None
|
||||
else True
|
||||
),
|
||||
),
|
||||
)
|
||||
return wrapper
|
||||
|
||||
return cast(FlowMethodDecorator, decorator)
|
||||
|
||||
@@ -8,6 +8,7 @@ from pydantic import BaseModel
|
||||
from typing_extensions import TypeIs
|
||||
|
||||
from crewai.flow.flow_definition import (
|
||||
FlowActionDefinition,
|
||||
FlowConfigDefinition,
|
||||
FlowConversationalDefinition,
|
||||
FlowConversationalRouterDefinition,
|
||||
@@ -84,6 +85,10 @@ def _stamp_inherited_conversational_metadata(
|
||||
return method
|
||||
|
||||
|
||||
def _method_action(method: Any) -> FlowActionDefinition:
|
||||
return FlowActionDefinition(ref=f"{method.__module__}:{method.__qualname__}")
|
||||
|
||||
|
||||
def _set_flow_method_definition(
|
||||
wrapper: FlowMethod[P, R],
|
||||
definition: FlowMethodDefinition,
|
||||
@@ -373,9 +378,11 @@ def _build_method_definition(
|
||||
) -> FlowMethodDefinition:
|
||||
fragment = _get_flow_method_definition(method)
|
||||
if fragment is None:
|
||||
method_definition = FlowMethodDefinition()
|
||||
method_definition = FlowMethodDefinition(do=_method_action(method))
|
||||
else:
|
||||
method_definition = fragment.model_copy(deep=True)
|
||||
method_definition = fragment.model_copy(
|
||||
deep=True, update={"do": _method_action(method)}
|
||||
)
|
||||
|
||||
human_feedback = _build_human_feedback_definition(
|
||||
method, diagnostics, f"{path}.human_feedback"
|
||||
|
||||
@@ -27,6 +27,7 @@ logger = logging.getLogger(__name__)
|
||||
FlowDefinitionCondition = str | dict[str, Any]
|
||||
|
||||
__all__ = [
|
||||
"FlowActionDefinition",
|
||||
"FlowConfigDefinition",
|
||||
"FlowConversationalDefinition",
|
||||
"FlowConversationalRouterDefinition",
|
||||
@@ -52,8 +53,9 @@ class FlowDefinitionDiagnostic(BaseModel):
|
||||
class FlowStateDefinition(BaseModel):
|
||||
"""Static description of a Flow state contract."""
|
||||
|
||||
type: TypingLiteral["dict", "pydantic", "unknown"] = "dict"
|
||||
type: TypingLiteral["dict", "pydantic", "json_schema", "unknown"] = "dict"
|
||||
ref: str | None = None
|
||||
json_schema: dict[str, Any] | None = None
|
||||
default: Any = None
|
||||
|
||||
|
||||
@@ -90,9 +92,17 @@ class FlowHumanFeedbackDefinition(BaseModel):
|
||||
learn_strict: bool = False
|
||||
|
||||
|
||||
class FlowActionDefinition(BaseModel):
|
||||
"""What a Flow method node executes, independent of when it fires."""
|
||||
|
||||
call: TypingLiteral["code"] = "code"
|
||||
ref: str
|
||||
|
||||
|
||||
class FlowMethodDefinition(BaseModel):
|
||||
"""Static definition of one Flow method and its execution roles."""
|
||||
|
||||
do: FlowActionDefinition
|
||||
start: bool | FlowDefinitionCondition | None = None
|
||||
listen: FlowDefinitionCondition | None = None
|
||||
router: bool = False
|
||||
|
||||
@@ -22,6 +22,7 @@ from concurrent.futures import Future, ThreadPoolExecutor
|
||||
import contextvars
|
||||
import copy
|
||||
import enum
|
||||
import importlib
|
||||
import inspect
|
||||
import logging
|
||||
import threading
|
||||
@@ -95,6 +96,7 @@ from crewai.flow.flow_definition import (
|
||||
FlowDefinition,
|
||||
FlowDefinitionCondition,
|
||||
FlowMethodDefinition,
|
||||
FlowStateDefinition,
|
||||
)
|
||||
from crewai.flow.flow_wrappers import (
|
||||
FlowMethod,
|
||||
@@ -105,6 +107,7 @@ from crewai.flow.flow_wrappers import (
|
||||
from crewai.flow.human_feedback import HumanFeedbackResult
|
||||
from crewai.flow.input_provider import InputProvider
|
||||
from crewai.flow.persistence.base import FlowPersistence
|
||||
from crewai.flow.runtime._action_resolvers import resolve_action
|
||||
from crewai.flow.types import (
|
||||
FlowExecutionData,
|
||||
FlowMethodName,
|
||||
@@ -169,6 +172,57 @@ def _condition_satisfied(condition: FlowDefinitionCondition, events: set[str]) -
|
||||
return combine(_condition_satisfied(branch, events) for branch in branches)
|
||||
|
||||
|
||||
def _build_definition_state_model(
|
||||
state_definition: FlowStateDefinition,
|
||||
) -> BaseModel | None:
|
||||
kwargs = (
|
||||
dict(state_definition.default)
|
||||
if isinstance(state_definition.default, dict)
|
||||
else {}
|
||||
)
|
||||
|
||||
model_class: type[BaseModel] | None = None
|
||||
if state_definition.ref:
|
||||
try:
|
||||
module_name, _, qualname = state_definition.ref.partition(":")
|
||||
resolved: Any = importlib.import_module(module_name)
|
||||
for part in qualname.split("."):
|
||||
resolved = getattr(resolved, part)
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"Could not import state ref %r", state_definition.ref, exc_info=True
|
||||
)
|
||||
else:
|
||||
if isinstance(resolved, type) and issubclass(resolved, BaseModel):
|
||||
model_class = resolved
|
||||
else:
|
||||
logger.warning(
|
||||
"State ref %r is not a pydantic model", state_definition.ref
|
||||
)
|
||||
|
||||
if model_class is None and state_definition.json_schema:
|
||||
from crewai.utilities.pydantic_schema_utils import create_model_from_schema
|
||||
|
||||
try:
|
||||
model_class = create_model_from_schema(state_definition.json_schema)
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"Could not build a state model from the declared json_schema",
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
if model_class is None:
|
||||
return None
|
||||
|
||||
if not issubclass(model_class, FlowState):
|
||||
|
||||
class StateWithId(FlowState, model_class): # type: ignore[misc, valid-type]
|
||||
pass
|
||||
|
||||
model_class = StateWithId
|
||||
return model_class(**kwargs)
|
||||
|
||||
|
||||
def _iter_condition_events(condition: FlowDefinitionCondition) -> Iterator[str]:
|
||||
if isinstance(condition, str):
|
||||
yield condition
|
||||
@@ -695,21 +749,24 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
return flow_definition
|
||||
|
||||
@classmethod
|
||||
def _start_method_names(cls) -> list[FlowMethodName]:
|
||||
def from_definition(cls, definition: FlowDefinition) -> Flow[Any]:
|
||||
"""Build a runnable Flow directly from a definition; no subclass required."""
|
||||
return cls.model_validate({}, context={"flow_definition": definition})
|
||||
|
||||
def _start_method_names(self) -> list[FlowMethodName]:
|
||||
return [
|
||||
FlowMethodName(method_name)
|
||||
for method_name, method_definition in cls.flow_definition().methods.items()
|
||||
for method_name, method_definition in self._definition.methods.items()
|
||||
if method_definition.is_start
|
||||
]
|
||||
|
||||
@classmethod
|
||||
def _listener_methods(
|
||||
cls,
|
||||
self,
|
||||
) -> Iterator[tuple[FlowMethodName, FlowMethodDefinition, FlowDefinitionCondition]]:
|
||||
# (name, definition, condition) for every non-start method that listens.
|
||||
# Routers are included (they listen too); callers wanting only plain
|
||||
# listeners filter on definition.router.
|
||||
for method_name, method_definition in cls.flow_definition().methods.items():
|
||||
for method_name, method_definition in self._definition.methods.items():
|
||||
if method_definition.listen is not None and not method_definition.is_start:
|
||||
yield (
|
||||
FlowMethodName(method_name),
|
||||
@@ -717,25 +774,22 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
method_definition.listen,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def _start_condition(
|
||||
cls, method_name: FlowMethodName
|
||||
self, method_name: FlowMethodName
|
||||
) -> FlowDefinitionCondition | None:
|
||||
method_definition = cls.flow_definition().methods[str(method_name)]
|
||||
method_definition = self._definition.methods[str(method_name)]
|
||||
start = method_definition.start
|
||||
if isinstance(start, (str, dict)):
|
||||
return start
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def _listen_condition(
|
||||
cls, method_name: FlowMethodName
|
||||
self, method_name: FlowMethodName
|
||||
) -> FlowDefinitionCondition | None:
|
||||
return cls.flow_definition().methods[str(method_name)].listen
|
||||
return self._definition.methods[str(method_name)].listen
|
||||
|
||||
@classmethod
|
||||
def _is_router(cls, method_name: FlowMethodName) -> bool:
|
||||
return cls.flow_definition().methods[str(method_name)].router
|
||||
def _is_router(self, method_name: FlowMethodName) -> bool:
|
||||
return self._definition.methods[str(method_name)].router
|
||||
|
||||
initial_state: Annotated[ # type: ignore[type-arg]
|
||||
type[BaseModel] | type[dict] | dict[str, Any] | BaseModel | None,
|
||||
@@ -862,6 +916,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
self._completed_methods = {
|
||||
FlowMethodName(m) for m in self.checkpoint_completed_methods
|
||||
}
|
||||
self._restored_from_checkpoint = True
|
||||
if self.checkpoint_method_outputs is not None:
|
||||
self._method_outputs = list(self.checkpoint_method_outputs)
|
||||
if self.checkpoint_method_counts is not None:
|
||||
@@ -878,7 +933,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
restore_event_scope(())
|
||||
reset_last_event_id()
|
||||
|
||||
_methods: dict[FlowMethodName, FlowMethod[Any, Any]] = PrivateAttr(
|
||||
_methods: dict[FlowMethodName, Callable[..., Any]] = PrivateAttr(
|
||||
default_factory=dict
|
||||
)
|
||||
_method_execution_counts: dict[FlowMethodName, int] = PrivateAttr(
|
||||
@@ -892,11 +947,13 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
PrivateAttr(default=None)
|
||||
)
|
||||
_method_outputs: list[Any] = PrivateAttr(default_factory=list)
|
||||
_definition: FlowDefinition = PrivateAttr()
|
||||
_state_lock: threading.Lock = PrivateAttr(default_factory=threading.Lock)
|
||||
_or_listeners_lock: threading.Lock = PrivateAttr(default_factory=threading.Lock)
|
||||
_completed_methods: set[FlowMethodName] = PrivateAttr(default_factory=set)
|
||||
_method_call_counts: dict[FlowMethodName, int] = PrivateAttr(default_factory=dict)
|
||||
_is_execution_resuming: bool = PrivateAttr(default=False)
|
||||
_restored_from_checkpoint: bool = PrivateAttr(default=False)
|
||||
_event_futures: list[Future[None]] = PrivateAttr(default_factory=list)
|
||||
_pending_feedback_context: PendingFeedbackContext | None = PrivateAttr(default=None)
|
||||
_human_feedback_method_outputs: dict[str, Any] = PrivateAttr(default_factory=dict)
|
||||
@@ -920,15 +977,27 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
object.__setattr__(self, name, value)
|
||||
|
||||
def model_post_init(self, __context: Any) -> None:
|
||||
self._flow_post_init()
|
||||
definition = (
|
||||
__context.get("flow_definition") if isinstance(__context, dict) else None
|
||||
)
|
||||
self._flow_post_init(definition)
|
||||
|
||||
def _flow_post_init(self) -> None:
|
||||
def _flow_post_init(self, definition: FlowDefinition | None = None) -> None:
|
||||
"""Heavy initialization: state creation, events, memory, method registration."""
|
||||
if getattr(self, "_flow_post_init_done", False):
|
||||
return
|
||||
object.__setattr__(self, "_flow_post_init_done", True)
|
||||
self._initialize_runtime_extension_attrs()
|
||||
|
||||
self._definition = definition or type(self).flow_definition()
|
||||
if self.name and self.name != self._definition.name:
|
||||
self._definition = self._definition.model_copy(update={"name": self.name})
|
||||
methods = (
|
||||
self._action_bound_methods()
|
||||
if definition is not None
|
||||
else self._class_bound_methods()
|
||||
)
|
||||
|
||||
if self._state is None:
|
||||
self._state = self._create_initial_state()
|
||||
|
||||
@@ -943,7 +1012,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
self,
|
||||
FlowCreatedEvent(
|
||||
type="flow_created",
|
||||
flow_name=self.name or self.__class__.__name__,
|
||||
flow_name=self._definition.name,
|
||||
),
|
||||
)
|
||||
|
||||
@@ -953,17 +1022,42 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
if self.memory is None and not getattr(self, "_skip_auto_memory", False):
|
||||
from crewai.memory.utils import sanitize_scope_name
|
||||
|
||||
flow_name = sanitize_scope_name(self.name or self.__class__.__name__)
|
||||
flow_name = sanitize_scope_name(self._definition.name)
|
||||
self.memory = Memory(root_scope=f"/flow/{flow_name}")
|
||||
|
||||
# Build the runtime method lookup from the static FlowDefinition.
|
||||
for method_name in type(self).flow_definition().methods:
|
||||
self._methods.update(methods)
|
||||
|
||||
def _action_bound_methods(self) -> dict[FlowMethodName, Callable[..., Any]]:
|
||||
def resolve(name: str, definition: FlowMethodDefinition) -> Callable[..., Any]:
|
||||
try:
|
||||
return resolve_action(self, definition.do)
|
||||
except Exception as e:
|
||||
unresolved.append(f"{name}: {e}")
|
||||
return lambda *args, **kwargs: None
|
||||
|
||||
methods: dict[FlowMethodName, Callable[..., Any]] = {}
|
||||
unresolved: list[str] = []
|
||||
for method_name, method_definition in self._definition.methods.items():
|
||||
methods[FlowMethodName(method_name)] = resolve(
|
||||
method_name, method_definition
|
||||
)
|
||||
if unresolved:
|
||||
raise ValueError(
|
||||
f"Cannot build flow {self._definition.name!r} from its definition; "
|
||||
"methods with unresolvable actions: " + "; ".join(unresolved)
|
||||
)
|
||||
return methods
|
||||
|
||||
def _class_bound_methods(self) -> dict[FlowMethodName, Callable[..., Any]]:
|
||||
methods: dict[FlowMethodName, Callable[..., Any]] = {}
|
||||
for method_name in self._definition.methods:
|
||||
method = getattr(self, method_name, None)
|
||||
if method is None:
|
||||
continue
|
||||
if not hasattr(method, "__self__"):
|
||||
method = method.__get__(self, self.__class__)
|
||||
self._methods[FlowMethodName(method_name)] = method
|
||||
method = method.__get__(self, type(self))
|
||||
methods[FlowMethodName(method_name)] = method
|
||||
return methods
|
||||
|
||||
def recall(self, query: str, **kwargs: Any) -> Any:
|
||||
"""Recall relevant memories. Delegates to this flow's memory.
|
||||
@@ -1041,7 +1135,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
def _start_condition_triggered_by(
|
||||
self, method_name: FlowMethodName, trigger: FlowMethodName
|
||||
) -> bool:
|
||||
condition = type(self)._start_condition(method_name)
|
||||
condition = self._start_condition(method_name)
|
||||
if condition is None:
|
||||
return False
|
||||
return self._condition_met(
|
||||
@@ -1069,7 +1163,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
trigger_str = str(trigger)
|
||||
to_discard: list[FlowMethodName] = []
|
||||
for listener_name in candidates:
|
||||
condition = type(self)._listen_condition(listener_name)
|
||||
condition = self._listen_condition(listener_name)
|
||||
if condition is None:
|
||||
continue
|
||||
if trigger_str in _iter_condition_events(condition):
|
||||
@@ -1091,9 +1185,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
racing_groups: dict[frozenset[FlowMethodName], FlowMethodName] = {}
|
||||
listener_conditions: dict[FlowMethodName, FlowDefinitionCondition] = {
|
||||
listener_name: condition
|
||||
for listener_name, method_definition, condition in type(
|
||||
self
|
||||
)._listener_methods()
|
||||
for listener_name, method_definition, condition in self._listener_methods()
|
||||
if not method_definition.router
|
||||
}
|
||||
|
||||
@@ -1366,7 +1458,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
self,
|
||||
FlowStartedEvent(
|
||||
type="flow_started",
|
||||
flow_name=self.name or self.__class__.__name__,
|
||||
flow_name=self._definition.name,
|
||||
inputs=None,
|
||||
),
|
||||
)
|
||||
@@ -1442,7 +1534,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
self,
|
||||
MethodExecutionFinishedEvent(
|
||||
type="method_execution_finished",
|
||||
flow_name=self.name or self.__class__.__name__,
|
||||
flow_name=self._definition.name,
|
||||
method_name=context.method_name,
|
||||
result=collapsed_outcome if emit else result,
|
||||
state=self._state,
|
||||
@@ -1496,7 +1588,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
self,
|
||||
FlowPausedEvent(
|
||||
type="flow_paused",
|
||||
flow_name=self.name or self.__class__.__name__,
|
||||
flow_name=self._definition.name,
|
||||
flow_id=e.context.flow_id,
|
||||
method_name=e.context.method_name,
|
||||
state=self._copy_and_serialize_state(),
|
||||
@@ -1527,7 +1619,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
self,
|
||||
FlowFinishedEvent(
|
||||
type="flow_finished",
|
||||
flow_name=self.name or self.__class__.__name__,
|
||||
flow_name=self._definition.name,
|
||||
result=final_result,
|
||||
state=self._copy_and_serialize_state(),
|
||||
),
|
||||
@@ -1593,7 +1685,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
return cast(T, {"id": str(uuid4())})
|
||||
|
||||
if init_state is None:
|
||||
return cast(T, {"id": str(uuid4())})
|
||||
return cast(T, self._create_definition_state())
|
||||
|
||||
if isinstance(init_state, type):
|
||||
state_class = init_state
|
||||
@@ -1635,6 +1727,34 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
f"Initial state must be dict or BaseModel, got {type(self.initial_state)}"
|
||||
)
|
||||
|
||||
def _create_definition_state(self) -> dict[str, Any] | BaseModel:
|
||||
state_definition = self._definition.state
|
||||
if state_definition is None:
|
||||
return {"id": str(uuid4())}
|
||||
if state_definition.type in ("pydantic", "json_schema"):
|
||||
state = _build_definition_state_model(state_definition)
|
||||
if state is not None:
|
||||
return state
|
||||
logger.error(
|
||||
"Flow %r declares %s state but neither ref nor json_schema "
|
||||
"produced a model; falling back to dict state",
|
||||
self._definition.name,
|
||||
state_definition.type,
|
||||
)
|
||||
elif state_definition.type == "unknown":
|
||||
logger.warning(
|
||||
"Flow %r declares state of unknown type; falling back to dict state",
|
||||
self._definition.name,
|
||||
)
|
||||
dict_state: dict[str, Any] = (
|
||||
dict(state_definition.default)
|
||||
if isinstance(state_definition.default, dict)
|
||||
else {}
|
||||
)
|
||||
if "id" not in dict_state:
|
||||
dict_state["id"] = str(uuid4())
|
||||
return dict_state
|
||||
|
||||
def _copy_state(self) -> T:
|
||||
"""Create a copy of the current state.
|
||||
|
||||
@@ -2058,7 +2178,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
# Reset flow state for fresh execution unless restoring from persistence
|
||||
is_restoring = (
|
||||
inputs and "id" in inputs and self.persistence is not None
|
||||
) or self.checkpoint_completed_methods is not None
|
||||
) or self._restored_from_checkpoint
|
||||
if not is_restoring:
|
||||
# Clear completed methods and outputs for a fresh start
|
||||
self._completed_methods.clear()
|
||||
@@ -2075,6 +2195,10 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
if self._completed_methods:
|
||||
self._is_execution_resuming = True
|
||||
|
||||
# Restore is single-shot: a later kickoff on the same instance
|
||||
# starts fresh.
|
||||
self._restored_from_checkpoint = False
|
||||
|
||||
# Fork hydration: when restore_from_state_id is set and persistence is
|
||||
# available, hydrate self._state from the source UUID's latest snapshot
|
||||
# and reassign state.id to a fresh value so subsequent @persist writes
|
||||
@@ -2166,7 +2290,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
# explicit finalization call closes the batch.
|
||||
started_event = FlowStartedEvent(
|
||||
type="flow_started",
|
||||
flow_name=self.name or self.__class__.__name__,
|
||||
flow_name=self._definition.name,
|
||||
inputs=inputs,
|
||||
)
|
||||
future = crewai_event_bus.emit(self, started_event)
|
||||
@@ -2206,11 +2330,11 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
# Determine which start methods to execute at kickoff
|
||||
# Conditional start methods are only triggered by their conditions
|
||||
# UNLESS there are no unconditional starts (then all starts run as entry points)
|
||||
start_methods = type(self)._start_method_names()
|
||||
start_methods = self._start_method_names()
|
||||
unconditional_starts = [
|
||||
start_method
|
||||
for start_method in start_methods
|
||||
if type(self)._start_condition(start_method) is None
|
||||
if self._start_condition(start_method) is None
|
||||
]
|
||||
# If there are unconditional starts, only run those at kickoff
|
||||
# If there are NO unconditional starts, run all starts (including conditional ones)
|
||||
@@ -2258,7 +2382,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
self,
|
||||
FlowPausedEvent(
|
||||
type="flow_paused",
|
||||
flow_name=self.name or self.__class__.__name__,
|
||||
flow_name=self._definition.name,
|
||||
flow_id=e.context.flow_id,
|
||||
method_name=e.context.method_name,
|
||||
state=self._copy_and_serialize_state(),
|
||||
@@ -2308,7 +2432,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
self,
|
||||
FlowFinishedEvent(
|
||||
type="flow_finished",
|
||||
flow_name=self.name or self.__class__.__name__,
|
||||
flow_name=self._definition.name,
|
||||
result=final_output,
|
||||
state=self._copy_and_serialize_state(),
|
||||
),
|
||||
@@ -2394,7 +2518,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
MethodExecutionFinishedEvent,
|
||||
MethodExecutionFailedEvent,
|
||||
)
|
||||
flow_name = self.name or self.__class__.__name__
|
||||
flow_name = self._definition.name
|
||||
nodes = sorted(
|
||||
(
|
||||
n
|
||||
@@ -2453,7 +2577,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
)
|
||||
|
||||
# If start method is a router, use its result as an additional trigger
|
||||
if type(self)._is_router(start_method_name) and result is not None:
|
||||
if self._is_router(start_method_name) and result is not None:
|
||||
# Execute listeners for the start method name first
|
||||
await self._execute_listeners(start_method_name, result, finished_event_id)
|
||||
# Then execute listeners for the router result (e.g., "approved")
|
||||
@@ -2473,15 +2597,16 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
def _inject_trigger_payload_for_start_method(
|
||||
self, original_method: Callable[..., Any]
|
||||
) -> Callable[..., Any]:
|
||||
accepts_trigger_payload = (
|
||||
"crewai_trigger_payload" in inspect.signature(original_method).parameters
|
||||
)
|
||||
|
||||
def prepare_kwargs(
|
||||
*args: Any, **kwargs: Any
|
||||
) -> tuple[tuple[Any, ...], dict[str, Any]]:
|
||||
inputs = cast(dict[str, Any], baggage.get_baggage("flow_inputs") or {})
|
||||
trigger_payload = inputs.get("crewai_trigger_payload")
|
||||
|
||||
sig = inspect.signature(original_method)
|
||||
accepts_trigger_payload = "crewai_trigger_payload" in sig.parameters
|
||||
|
||||
if trigger_payload is not None and accepts_trigger_payload:
|
||||
kwargs["crewai_trigger_payload"] = trigger_payload
|
||||
elif trigger_payload is not None:
|
||||
@@ -2531,7 +2656,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
MethodExecutionStartedEvent(
|
||||
type="method_execution_started",
|
||||
method_name=method_name,
|
||||
flow_name=self.name or self.__class__.__name__,
|
||||
flow_name=self._definition.name,
|
||||
params=dumped_params,
|
||||
state=self._copy_and_serialize_state(),
|
||||
),
|
||||
@@ -2583,7 +2708,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
finished_event = MethodExecutionFinishedEvent(
|
||||
type="method_execution_finished",
|
||||
method_name=method_name,
|
||||
flow_name=self.name or self.__class__.__name__,
|
||||
flow_name=self._definition.name,
|
||||
state=self._copy_and_serialize_state(),
|
||||
result=result,
|
||||
)
|
||||
@@ -2612,7 +2737,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
MethodExecutionPausedEvent(
|
||||
type="method_execution_paused",
|
||||
method_name=method_name,
|
||||
flow_name=self.name or self.__class__.__name__,
|
||||
flow_name=self._definition.name,
|
||||
state=self._copy_and_serialize_state(),
|
||||
flow_id=e.context.flow_id,
|
||||
message=e.context.message,
|
||||
@@ -2628,7 +2753,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
MethodExecutionFailedEvent(
|
||||
type="method_execution_failed",
|
||||
method_name=method_name,
|
||||
flow_name=self.name or self.__class__.__name__,
|
||||
flow_name=self._definition.name,
|
||||
error=e,
|
||||
),
|
||||
)
|
||||
@@ -2760,7 +2885,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
await asyncio.gather(*tasks)
|
||||
|
||||
if current_trigger in router_results:
|
||||
for method_name in type(self)._start_method_names():
|
||||
for method_name in self._start_method_names():
|
||||
if self._start_condition_triggered_by(
|
||||
method_name, current_trigger
|
||||
):
|
||||
@@ -2791,9 +2916,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
) -> list[FlowMethodName]:
|
||||
triggered: list[FlowMethodName] = []
|
||||
|
||||
for listener_name, method_definition, condition in type(
|
||||
self
|
||||
)._listener_methods():
|
||||
for listener_name, method_definition, condition in self._listener_methods():
|
||||
is_router = method_definition.router
|
||||
if router_only != is_router:
|
||||
continue
|
||||
@@ -2859,10 +2982,10 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
|
||||
# For routers, also check if any conditional starts they triggered are completed
|
||||
# If so, continue their chains
|
||||
if type(self)._is_router(listener_name):
|
||||
for start_method_name in type(self)._start_method_names():
|
||||
if self._is_router(listener_name):
|
||||
for start_method_name in self._start_method_names():
|
||||
if (
|
||||
type(self)._start_condition(start_method_name) is not None
|
||||
self._start_condition(start_method_name) is not None
|
||||
and start_method_name in self._completed_methods
|
||||
):
|
||||
# This conditional start was executed, continue its chain
|
||||
@@ -2881,8 +3004,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
method = self._methods[listener_name]
|
||||
|
||||
sig = inspect.signature(method)
|
||||
params = list(sig.parameters.values())
|
||||
method_params = [p for p in params if p.name != "self"]
|
||||
method_params = [p for p in sig.parameters.values() if p.name != "self"]
|
||||
|
||||
if triggering_event_id:
|
||||
with triggered_by_scope(triggering_event_id):
|
||||
@@ -3038,7 +3160,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
self,
|
||||
FlowInputRequestedEvent(
|
||||
type="flow_input_requested",
|
||||
flow_name=self.name or self.__class__.__name__,
|
||||
flow_name=self._definition.name,
|
||||
method_name=method_name,
|
||||
message=message,
|
||||
metadata=metadata,
|
||||
@@ -3105,7 +3227,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
self,
|
||||
FlowInputReceivedEvent(
|
||||
type="flow_input_received",
|
||||
flow_name=self.name or self.__class__.__name__,
|
||||
flow_name=self._definition.name,
|
||||
method_name=method_name,
|
||||
message=message,
|
||||
response=response,
|
||||
@@ -3143,7 +3265,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
self,
|
||||
HumanFeedbackRequestedEvent(
|
||||
type="human_feedback_requested",
|
||||
flow_name=self.name or self.__class__.__name__,
|
||||
flow_name=self._definition.name,
|
||||
method_name="", # Will be set by decorator if needed
|
||||
output=output,
|
||||
message=message,
|
||||
@@ -3172,7 +3294,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
self,
|
||||
HumanFeedbackReceivedEvent(
|
||||
type="human_feedback_received",
|
||||
flow_name=self.name or self.__class__.__name__,
|
||||
flow_name=self._definition.name,
|
||||
method_name="", # Will be set by decorator if needed
|
||||
feedback=feedback,
|
||||
outcome=None, # Will be determined after collapsing
|
||||
@@ -3347,7 +3469,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
self,
|
||||
FlowPlotEvent(
|
||||
type="flow_plot",
|
||||
flow_name=self.name or self.__class__.__name__,
|
||||
flow_name=self._definition.name,
|
||||
),
|
||||
)
|
||||
structure = build_flow_structure(cast(Any, self))
|
||||
43
lib/crewai/src/crewai/flow/runtime/_action_resolvers.py
Normal file
43
lib/crewai/src/crewai/flow/runtime/_action_resolvers.py
Normal file
@@ -0,0 +1,43 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Callable
|
||||
import importlib
|
||||
from operator import attrgetter
|
||||
from typing import TYPE_CHECKING, Any, cast
|
||||
|
||||
from crewai.flow.flow_definition import FlowActionDefinition
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from crewai.flow.runtime import Flow
|
||||
|
||||
|
||||
class InvalidActionRefError(ValueError):
|
||||
def __init__(self, ref: str) -> None:
|
||||
super().__init__(f"invalid callable {ref!r}; expected 'module:qualname'")
|
||||
|
||||
|
||||
def _resolve_code_action(
|
||||
flow: Flow[Any], action: FlowActionDefinition
|
||||
) -> Callable[..., Any]:
|
||||
ref = action.ref
|
||||
module_name, _, qualname = ref.partition(":")
|
||||
if "<" in ref or not module_name or not qualname:
|
||||
raise InvalidActionRefError(ref)
|
||||
try:
|
||||
target = attrgetter(qualname)(importlib.import_module(module_name))
|
||||
except (ImportError, AttributeError) as e:
|
||||
raise InvalidActionRefError(ref) from e
|
||||
if not callable(target):
|
||||
raise InvalidActionRefError(ref)
|
||||
handler = cast(Callable[..., Any], target)
|
||||
if getattr(handler, "__self__", None) is None:
|
||||
handler = handler.__get__(flow, type(flow))
|
||||
return handler
|
||||
|
||||
|
||||
def resolve_action(flow: Flow[Any], action: FlowActionDefinition) -> Callable[..., Any]:
|
||||
"""Turn one `do:` action into the callable the flow runs for that node."""
|
||||
if action.call == "code":
|
||||
return _resolve_code_action(flow, action)
|
||||
raise ValueError(f"unknown call type {action.call!r}")
|
||||
@@ -999,7 +999,11 @@ def _json_schema_to_pydantic_field(
|
||||
if examples:
|
||||
schema_extra["examples"] = examples
|
||||
|
||||
default = ... if is_required else None
|
||||
default = (
|
||||
json_schema["default"]
|
||||
if "default" in json_schema
|
||||
else (... if is_required else None)
|
||||
)
|
||||
|
||||
if isinstance(type_, type) and issubclass(type_, (int, float)):
|
||||
if "minimum" in json_schema:
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import inspect
|
||||
import json
|
||||
import os
|
||||
import sqlite3
|
||||
@@ -16,6 +17,7 @@ from pydantic import BaseModel
|
||||
from crewai.agent.core import Agent
|
||||
from crewai.agents.agent_builder.base_agent import BaseAgent
|
||||
from crewai.crew import Crew
|
||||
from crewai.llms.base_llm import BaseLLM
|
||||
from crewai.flow.flow import _INITIAL_STATE_CLASS_MARKER, Flow, start
|
||||
from crewai.state.checkpoint_config import CheckpointConfig
|
||||
from crewai.state.checkpoint_listener import (
|
||||
@@ -682,3 +684,85 @@ class TestAgentCheckpoint:
|
||||
cfg = CheckpointConfig(restore_from=loc)
|
||||
restored = Agent.from_checkpoint(cfg)
|
||||
assert restored._kickoff_event_id == "evt-456"
|
||||
|
||||
|
||||
class _FinalAnswerLLM(BaseLLM):
|
||||
"""Stub LLM that always returns a final answer without any API calls."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__(model="stub")
|
||||
|
||||
def call(
|
||||
self,
|
||||
messages,
|
||||
tools=None,
|
||||
callbacks=None,
|
||||
available_functions=None,
|
||||
from_task=None,
|
||||
from_agent=None,
|
||||
response_model=None,
|
||||
):
|
||||
return "Final Answer: done."
|
||||
|
||||
def supports_function_calling(self) -> bool:
|
||||
return False
|
||||
|
||||
def supports_stop_words(self) -> bool:
|
||||
return False
|
||||
|
||||
def get_context_window_size(self) -> int:
|
||||
return 4096
|
||||
|
||||
async def acall(self, *args, **kwargs):
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class TestCheckpointReusedExecutor:
|
||||
"""Checkpoint serialization stamps every live Flow's completed methods.
|
||||
|
||||
The agent executor is a Flow reused across a crew's tasks, so the stamp
|
||||
must not be read back as a restore signal on the next task — otherwise the
|
||||
second task replays as a resume and never reaches a final answer.
|
||||
"""
|
||||
|
||||
def test_second_task_runs_with_checkpointing_enabled(self) -> None:
|
||||
agent = Agent(role="r", goal="g", backstory="b", llm=_FinalAnswerLLM())
|
||||
task1 = Task(description="first", expected_output="x", agent=agent)
|
||||
task2 = Task(description="second", expected_output="y", agent=agent)
|
||||
with tempfile.TemporaryDirectory() as d:
|
||||
crew = Crew(
|
||||
agents=[agent],
|
||||
tasks=[task1, task2],
|
||||
verbose=False,
|
||||
checkpoint=CheckpointConfig(
|
||||
provider=JsonProvider(location=d),
|
||||
on_events=["task_started", "task_completed"],
|
||||
),
|
||||
)
|
||||
result = crew.kickoff()
|
||||
|
||||
assert len(result.tasks_output) == 2
|
||||
assert result.tasks_output[1].raw
|
||||
|
||||
|
||||
class TestCustomLLMCheckpointRestore:
|
||||
"""A custom BaseLLM subclass serializes with the inherited llm_type "base".
|
||||
|
||||
Restoring it must not try to instantiate the abstract BaseLLM; it is rebuilt
|
||||
as a concrete LLM from the saved config instead.
|
||||
"""
|
||||
|
||||
def test_restore_does_not_instantiate_abstract_base_llm(self) -> None:
|
||||
agent = Agent(role="r", goal="g", backstory="b", llm=_FinalAnswerLLM())
|
||||
task = Task(description="d", expected_output="e", agent=agent)
|
||||
crew = Crew(agents=[agent], tasks=[task], verbose=False)
|
||||
|
||||
raw = RuntimeState(root=[crew]).model_dump_json()
|
||||
restored = RuntimeState.model_validate_json(
|
||||
raw, context={"from_checkpoint": True}
|
||||
)
|
||||
|
||||
llm = restored.root[0].agents[0].llm
|
||||
assert isinstance(llm, BaseLLM)
|
||||
assert not inspect.isabstract(type(llm))
|
||||
assert llm.model == "stub"
|
||||
|
||||
@@ -1040,7 +1040,7 @@ def test_flow_plotting():
|
||||
received_events.append(event)
|
||||
event_received.set()
|
||||
|
||||
flow.plot("test_flow")
|
||||
flow.plot("test_flow", show=False)
|
||||
|
||||
assert event_received.wait(timeout=5), "Timeout waiting for plot event"
|
||||
assert len(received_events) == 1
|
||||
@@ -1157,6 +1157,26 @@ def test_flow_name():
|
||||
assert flow.name == "MyFlow"
|
||||
|
||||
|
||||
def test_flow_custom_name_overrides_class_name_in_events():
|
||||
class InternalFlowClass(Flow):
|
||||
name = "PublicName"
|
||||
|
||||
@start()
|
||||
def begin(self):
|
||||
return "done"
|
||||
|
||||
received = []
|
||||
|
||||
with crewai_event_bus.scoped_handlers():
|
||||
@crewai_event_bus.on(FlowStartedEvent)
|
||||
def handle(source, event):
|
||||
received.append(event)
|
||||
|
||||
InternalFlowClass().kickoff()
|
||||
|
||||
assert received[0].flow_name == "PublicName"
|
||||
|
||||
|
||||
def test_nested_and_or_conditions():
|
||||
"""Test nested conditions like or_(and_(A, B), and_(C, D)).
|
||||
|
||||
|
||||
@@ -36,6 +36,7 @@ def test_flow_public_exports_are_explicit():
|
||||
"start",
|
||||
}
|
||||
assert set(flow_definition.__all__) == {
|
||||
"FlowActionDefinition",
|
||||
"FlowConfigDefinition",
|
||||
"FlowConversationalDefinition",
|
||||
"FlowConversationalRouterDefinition",
|
||||
@@ -629,6 +630,7 @@ def test_flow_definition_preserves_diagnostics_loaded_from_contract():
|
||||
"name": "LoadedDiagnosticsFlow",
|
||||
"methods": {
|
||||
"decision": {
|
||||
"do": {"ref": "loaded_flows:LoadedDiagnosticsFlow.decision"},
|
||||
"router": True,
|
||||
"emit": ["continue"],
|
||||
}
|
||||
@@ -662,6 +664,7 @@ def test_router_start_false_without_listen_reports_missing_trigger():
|
||||
"name": "LoadedFlow",
|
||||
"methods": {
|
||||
"decision": {
|
||||
"do": {"ref": "loaded_flows:LoadedFlow.decision"},
|
||||
"router": True,
|
||||
"start": False,
|
||||
"emit": ["continue"],
|
||||
@@ -740,8 +743,14 @@ def test_static_string_listener_is_allowed_by_contract():
|
||||
"schema": "crewai.flow/v1",
|
||||
"name": "TypoFlow",
|
||||
"methods": {
|
||||
"begin": {"start": True},
|
||||
"handle": {"listen": "begni"},
|
||||
"begin": {
|
||||
"do": {"ref": "loaded_flows:TypoFlow.begin"},
|
||||
"start": True,
|
||||
},
|
||||
"handle": {
|
||||
"do": {"ref": "loaded_flows:TypoFlow.handle"},
|
||||
"listen": "begni",
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
@@ -754,8 +763,15 @@ def test_start_false_not_classified_as_start_method():
|
||||
"schema": "crewai.flow/v1",
|
||||
"name": "ExplicitNonStartFlow",
|
||||
"methods": {
|
||||
"begin": {"start": True},
|
||||
"handle": {"start": False, "listen": "begin"},
|
||||
"begin": {
|
||||
"do": {"ref": "loaded_flows:ExplicitNonStartFlow.begin"},
|
||||
"start": True,
|
||||
},
|
||||
"handle": {
|
||||
"do": {"ref": "loaded_flows:ExplicitNonStartFlow.handle"},
|
||||
"start": False,
|
||||
"listen": "begin",
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
@@ -812,6 +828,7 @@ def test_flow_definition_logs_diagnostics_when_loaded_from_contract(caplog):
|
||||
"name": "LoadedFlow",
|
||||
"methods": {
|
||||
"decision": {
|
||||
"do": {"ref": "loaded_flows:LoadedFlow.decision"},
|
||||
"router": True,
|
||||
"emit": ["continue"],
|
||||
}
|
||||
|
||||
552
lib/crewai/tests/test_flow_from_definition.py
Normal file
552
lib/crewai/tests/test_flow_from_definition.py
Normal file
@@ -0,0 +1,552 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
from pydantic import ValidationError
|
||||
|
||||
from crewai.events.event_bus import crewai_event_bus
|
||||
from crewai.events.types.flow_events import (
|
||||
MethodExecutionFinishedEvent,
|
||||
MethodExecutionStartedEvent,
|
||||
)
|
||||
from crewai.flow import Flow, and_, listen, or_, router, start
|
||||
from crewai.flow.flow import FlowState
|
||||
from crewai.flow.flow_definition import FlowDefinition
|
||||
|
||||
|
||||
class ChainFlow(Flow):
|
||||
@start()
|
||||
def begin(self):
|
||||
self.state["begin_ran"] = True
|
||||
return "hello"
|
||||
|
||||
@listen(begin)
|
||||
def shout(self, result):
|
||||
return result.upper()
|
||||
|
||||
@listen(shout)
|
||||
def confirm(self):
|
||||
self.state["confirmed"] = True
|
||||
return f"confirmed:{self.state['confirmed']}"
|
||||
|
||||
|
||||
CHAIN_YAML = f"""
|
||||
schema: crewai.flow/v1
|
||||
name: ChainFlow
|
||||
methods:
|
||||
begin:
|
||||
do:
|
||||
call: code
|
||||
ref: {__name__}:ChainFlow.begin
|
||||
start: true
|
||||
shout:
|
||||
do:
|
||||
ref: {__name__}:ChainFlow.shout
|
||||
listen: begin
|
||||
confirm:
|
||||
do:
|
||||
ref: {__name__}:ChainFlow.confirm
|
||||
listen: shout
|
||||
"""
|
||||
|
||||
|
||||
class MergeFlow(Flow):
|
||||
@start()
|
||||
def begin(self):
|
||||
return "go"
|
||||
|
||||
@listen(begin)
|
||||
def left(self):
|
||||
return "left"
|
||||
|
||||
@listen(begin)
|
||||
def right(self):
|
||||
return "right"
|
||||
|
||||
@listen(or_(left, right))
|
||||
def either(self):
|
||||
self.state["either_ran"] = True
|
||||
return "either"
|
||||
|
||||
@listen(and_(left, right, either))
|
||||
def join(self):
|
||||
self.state["joined"] = True
|
||||
return "joined"
|
||||
|
||||
|
||||
MERGE_YAML = f"""
|
||||
schema: crewai.flow/v1
|
||||
name: MergeFlow
|
||||
methods:
|
||||
begin:
|
||||
do:
|
||||
ref: {__name__}:MergeFlow.begin
|
||||
start: true
|
||||
left:
|
||||
do:
|
||||
ref: {__name__}:MergeFlow.left
|
||||
listen: begin
|
||||
right:
|
||||
do:
|
||||
ref: {__name__}:MergeFlow.right
|
||||
listen: begin
|
||||
either:
|
||||
do:
|
||||
ref: {__name__}:MergeFlow.either
|
||||
listen:
|
||||
or: [left, right]
|
||||
join:
|
||||
do:
|
||||
ref: {__name__}:MergeFlow.join
|
||||
listen:
|
||||
and: [left, right, either]
|
||||
"""
|
||||
|
||||
|
||||
class RouteFlow(Flow):
|
||||
@start()
|
||||
def begin(self):
|
||||
return "go"
|
||||
|
||||
@router(begin)
|
||||
def decide(self):
|
||||
return "left" if self.state.get("direction") == "left" else "right"
|
||||
|
||||
@listen("left")
|
||||
def take_left(self):
|
||||
return "took-left"
|
||||
|
||||
@listen("right")
|
||||
def take_right(self):
|
||||
return "took-right"
|
||||
|
||||
|
||||
ROUTE_YAML = f"""
|
||||
schema: crewai.flow/v1
|
||||
name: RouteFlow
|
||||
methods:
|
||||
begin:
|
||||
do:
|
||||
ref: {__name__}:RouteFlow.begin
|
||||
start: true
|
||||
decide:
|
||||
do:
|
||||
ref: {__name__}:RouteFlow.decide
|
||||
listen: begin
|
||||
router: true
|
||||
take_left:
|
||||
do:
|
||||
ref: {__name__}:RouteFlow.take_left
|
||||
listen: left
|
||||
take_right:
|
||||
do:
|
||||
ref: {__name__}:RouteFlow.take_right
|
||||
listen: right
|
||||
"""
|
||||
|
||||
|
||||
class LoopFlow(Flow):
|
||||
@start("retry")
|
||||
def step(self):
|
||||
self.state["count"] = self.state.get("count", 0) + 1
|
||||
return self.state["count"]
|
||||
|
||||
@router(step)
|
||||
def decide(self):
|
||||
if self.state["count"] < 3:
|
||||
return "retry"
|
||||
return "done"
|
||||
|
||||
@listen("done")
|
||||
def finish(self):
|
||||
return f"finished:{self.state['count']}"
|
||||
|
||||
|
||||
LOOP_YAML = f"""
|
||||
schema: crewai.flow/v1
|
||||
name: LoopFlow
|
||||
methods:
|
||||
step:
|
||||
do:
|
||||
ref: {__name__}:LoopFlow.step
|
||||
start: retry
|
||||
decide:
|
||||
do:
|
||||
ref: {__name__}:LoopFlow.decide
|
||||
listen: step
|
||||
router: true
|
||||
finish:
|
||||
do:
|
||||
ref: {__name__}:LoopFlow.finish
|
||||
listen: done
|
||||
"""
|
||||
|
||||
|
||||
class CounterState(FlowState):
|
||||
count: int = 0
|
||||
label: str = "none"
|
||||
|
||||
|
||||
class PydanticStateFlow(Flow[CounterState]):
|
||||
@start()
|
||||
def begin(self):
|
||||
self.state.count += 1
|
||||
return self.state.count
|
||||
|
||||
@listen(begin)
|
||||
def finish(self):
|
||||
self.state.label = f"count={self.state.count}"
|
||||
return self.state.label
|
||||
|
||||
|
||||
PYDANTIC_STATE_YAML = f"""
|
||||
schema: crewai.flow/v1
|
||||
name: PydanticStateFlow
|
||||
state:
|
||||
type: pydantic
|
||||
ref: {__name__}:CounterState
|
||||
methods:
|
||||
begin:
|
||||
do:
|
||||
ref: {__name__}:PydanticStateFlow.begin
|
||||
start: true
|
||||
finish:
|
||||
do:
|
||||
ref: {__name__}:PydanticStateFlow.finish
|
||||
listen: begin
|
||||
"""
|
||||
|
||||
PYDANTIC_STATE_OVERLAY_YAML = f"""
|
||||
schema: crewai.flow/v1
|
||||
name: PydanticStateFlow
|
||||
state:
|
||||
type: pydantic
|
||||
ref: {__name__}:CounterState
|
||||
default:
|
||||
count: 5
|
||||
methods:
|
||||
begin:
|
||||
do:
|
||||
ref: {__name__}:PydanticStateFlow.begin
|
||||
start: true
|
||||
finish:
|
||||
do:
|
||||
ref: {__name__}:PydanticStateFlow.finish
|
||||
listen: begin
|
||||
"""
|
||||
|
||||
JSON_SCHEMA_STATE_YAML = f"""
|
||||
schema: crewai.flow/v1
|
||||
name: JsonSchemaStateFlow
|
||||
state:
|
||||
type: json_schema
|
||||
json_schema:
|
||||
title: CounterState
|
||||
type: object
|
||||
properties:
|
||||
count:
|
||||
type: integer
|
||||
default: 0
|
||||
label:
|
||||
type: string
|
||||
default: none
|
||||
methods:
|
||||
begin:
|
||||
do:
|
||||
ref: {__name__}:PydanticStateFlow.begin
|
||||
start: true
|
||||
finish:
|
||||
do:
|
||||
ref: {__name__}:PydanticStateFlow.finish
|
||||
listen: begin
|
||||
"""
|
||||
|
||||
PYDANTIC_REF_WITH_SCHEMA_FALLBACK_YAML = f"""
|
||||
schema: crewai.flow/v1
|
||||
name: SchemaFallbackFlow
|
||||
state:
|
||||
type: pydantic
|
||||
ref: definitely_not_a_module_xyz:MissingState
|
||||
json_schema:
|
||||
title: CounterState
|
||||
type: object
|
||||
properties:
|
||||
count:
|
||||
type: integer
|
||||
default: 0
|
||||
label:
|
||||
type: string
|
||||
default: none
|
||||
methods:
|
||||
begin:
|
||||
do:
|
||||
ref: {__name__}:PydanticStateFlow.begin
|
||||
start: true
|
||||
finish:
|
||||
do:
|
||||
ref: {__name__}:PydanticStateFlow.finish
|
||||
listen: begin
|
||||
"""
|
||||
|
||||
UNRESOLVABLE_STATE_YAML = f"""
|
||||
schema: crewai.flow/v1
|
||||
name: UnresolvableStateFlow
|
||||
state:
|
||||
type: pydantic
|
||||
ref: definitely_not_a_module_xyz:MissingState
|
||||
methods:
|
||||
begin:
|
||||
do:
|
||||
ref: {__name__}:ChainFlow.begin
|
||||
start: true
|
||||
"""
|
||||
|
||||
DICT_STATE_YAML = f"""
|
||||
schema: crewai.flow/v1
|
||||
name: DictStateFlow
|
||||
state:
|
||||
type: dict
|
||||
default:
|
||||
count: 5
|
||||
methods:
|
||||
begin:
|
||||
do:
|
||||
ref: {__name__}:ChainFlow.begin
|
||||
start: true
|
||||
"""
|
||||
|
||||
UNKNOWN_STATE_YAML = f"""
|
||||
schema: crewai.flow/v1
|
||||
name: UnknownStateFlow
|
||||
state:
|
||||
type: unknown
|
||||
ref: somewhere:Something
|
||||
methods:
|
||||
begin:
|
||||
do:
|
||||
ref: {__name__}:ChainFlow.begin
|
||||
start: true
|
||||
"""
|
||||
|
||||
|
||||
def _run_with_events(flow, inputs=None):
|
||||
events = []
|
||||
with crewai_event_bus.scoped_handlers():
|
||||
|
||||
@crewai_event_bus.on(MethodExecutionStartedEvent)
|
||||
def on_started(source, event):
|
||||
events.append(event)
|
||||
|
||||
@crewai_event_bus.on(MethodExecutionFinishedEvent)
|
||||
def on_finished(source, event):
|
||||
events.append(event)
|
||||
|
||||
result = flow.kickoff(inputs=inputs)
|
||||
events.sort(key=lambda e: e.timestamp)
|
||||
return result, [
|
||||
(type(e).__name__, str(e.method_name), e.flow_name) for e in events
|
||||
]
|
||||
|
||||
|
||||
def _state_without_id(flow):
|
||||
snapshot = dict(flow.state.model_dump())
|
||||
snapshot.pop("id", None)
|
||||
return snapshot
|
||||
|
||||
|
||||
def assert_parity(flow_cls, yaml_str, inputs=None, ordered=True):
|
||||
class_flow = flow_cls()
|
||||
class_result, class_events = _run_with_events(class_flow, inputs)
|
||||
|
||||
definition = FlowDefinition.from_yaml(yaml_str)
|
||||
definition_flow = Flow.from_definition(definition)
|
||||
definition_result, definition_events = _run_with_events(definition_flow, inputs)
|
||||
|
||||
assert definition_result == class_result
|
||||
assert _state_without_id(definition_flow) == _state_without_id(class_flow)
|
||||
if ordered:
|
||||
assert definition_flow.method_outputs == class_flow.method_outputs
|
||||
assert definition_events == class_events
|
||||
else:
|
||||
assert sorted(map(repr, definition_flow.method_outputs)) == sorted(
|
||||
map(repr, class_flow.method_outputs)
|
||||
)
|
||||
assert sorted(definition_events) == sorted(class_events)
|
||||
return definition_flow, definition_result
|
||||
|
||||
|
||||
def test_simple_chain_parity():
|
||||
flow, result = assert_parity(ChainFlow, CHAIN_YAML)
|
||||
assert result == "confirmed:True"
|
||||
assert flow.method_outputs == ["hello", "HELLO", "confirmed:True"]
|
||||
|
||||
|
||||
def test_and_or_merge_parity():
|
||||
flow, _ = assert_parity(MergeFlow, MERGE_YAML, ordered=False)
|
||||
assert flow.state["joined"] is True
|
||||
assert flow.state["either_ran"] is True
|
||||
|
||||
|
||||
def test_router_label_parity_for_each_branch():
|
||||
left_flow, _ = assert_parity(RouteFlow, ROUTE_YAML, inputs={"direction": "left"})
|
||||
assert "took-left" in left_flow.method_outputs
|
||||
assert "took-right" not in left_flow.method_outputs
|
||||
|
||||
right_flow, _ = assert_parity(RouteFlow, ROUTE_YAML, inputs={"direction": "right"})
|
||||
assert "took-right" in right_flow.method_outputs
|
||||
|
||||
|
||||
def test_cyclic_flow_parity():
|
||||
flow, result = assert_parity(LoopFlow, LOOP_YAML)
|
||||
assert result == "finished:3"
|
||||
assert flow.state["count"] == 3
|
||||
|
||||
|
||||
def test_definition_flow_events_use_definition_name():
|
||||
definition = FlowDefinition.from_yaml(CHAIN_YAML)
|
||||
flow = Flow.from_definition(definition)
|
||||
_, events = _run_with_events(flow)
|
||||
assert events
|
||||
assert all(flow_name == "ChainFlow" for _, _, flow_name in events)
|
||||
|
||||
|
||||
def test_definition_method_without_action_is_invalid():
|
||||
with pytest.raises(ValidationError, match="do"):
|
||||
FlowDefinition.from_dict(
|
||||
{
|
||||
"schema": "crewai.flow/v1",
|
||||
"name": "NoActions",
|
||||
"methods": {"begin": {"start": True}},
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def test_from_definition_unresolvable_ref_raises():
|
||||
definition = FlowDefinition.from_dict(
|
||||
{
|
||||
"schema": "crewai.flow/v1",
|
||||
"name": "BadRefs",
|
||||
"methods": {
|
||||
"begin": {
|
||||
"start": True,
|
||||
"do": {"ref": "definitely_not_a_module_xyz:nope"},
|
||||
}
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
with pytest.raises(ValueError, match="unresolvable actions.*begin"):
|
||||
Flow.from_definition(definition)
|
||||
|
||||
|
||||
def test_from_definition_malformed_ref_raises():
|
||||
definition = FlowDefinition.from_dict(
|
||||
{
|
||||
"schema": "crewai.flow/v1",
|
||||
"name": "MalformedRefs",
|
||||
"methods": {"begin": {"start": True, "do": {"ref": "no-colon-here"}}},
|
||||
}
|
||||
)
|
||||
|
||||
with pytest.raises(ValueError, match="expected 'module:qualname'"):
|
||||
Flow.from_definition(definition)
|
||||
|
||||
|
||||
def test_from_definition_local_scope_ref_raises():
|
||||
definition = FlowDefinition.from_dict(
|
||||
{
|
||||
"schema": "crewai.flow/v1",
|
||||
"name": "LocalRefs",
|
||||
"methods": {
|
||||
"begin": {
|
||||
"start": True,
|
||||
"do": {"ref": f"{__name__}:make.<locals>.LocalFlow.begin"},
|
||||
}
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
with pytest.raises(ValueError, match="expected 'module:qualname'"):
|
||||
Flow.from_definition(definition)
|
||||
|
||||
|
||||
def test_flow_definition_stamps_refs():
|
||||
definition = ChainFlow.flow_definition()
|
||||
|
||||
assert definition.methods["begin"].do.ref == f"{__name__}:ChainFlow.begin"
|
||||
assert definition.methods["shout"].do.ref == f"{__name__}:ChainFlow.shout"
|
||||
|
||||
|
||||
def test_pydantic_state_from_ref_parity():
|
||||
flow, result = assert_parity(PydanticStateFlow, PYDANTIC_STATE_YAML)
|
||||
assert result == "count=1"
|
||||
assert flow.state.count == 1
|
||||
assert flow.state.label == "count=1"
|
||||
assert flow.state.id
|
||||
|
||||
|
||||
def test_pydantic_state_default_overlay():
|
||||
flow = Flow.from_definition(FlowDefinition.from_yaml(PYDANTIC_STATE_OVERLAY_YAML))
|
||||
result = flow.kickoff()
|
||||
assert result == "count=6"
|
||||
assert flow.state.count == 6
|
||||
|
||||
|
||||
def test_json_schema_state():
|
||||
flow = Flow.from_definition(FlowDefinition.from_yaml(JSON_SCHEMA_STATE_YAML))
|
||||
result = flow.kickoff()
|
||||
assert result == "count=1"
|
||||
assert flow.state.count == 1
|
||||
assert flow.state.label == "count=1"
|
||||
assert flow.state.id
|
||||
|
||||
|
||||
def test_json_schema_state_validates_inputs():
|
||||
flow = Flow.from_definition(FlowDefinition.from_yaml(JSON_SCHEMA_STATE_YAML))
|
||||
with pytest.raises(ValueError, match="Invalid inputs"):
|
||||
flow.kickoff(inputs={"count": "not-a-number"})
|
||||
|
||||
|
||||
def test_pydantic_state_falls_back_to_json_schema_when_ref_unimportable():
|
||||
flow = Flow.from_definition(
|
||||
FlowDefinition.from_yaml(PYDANTIC_REF_WITH_SCHEMA_FALLBACK_YAML)
|
||||
)
|
||||
result = flow.kickoff()
|
||||
assert result == "count=1"
|
||||
assert flow.state.count == 1
|
||||
|
||||
|
||||
def test_pydantic_state_without_ref_or_schema_falls_back_to_dict(caplog):
|
||||
with caplog.at_level("ERROR"):
|
||||
flow = Flow.from_definition(FlowDefinition.from_yaml(UNRESOLVABLE_STATE_YAML))
|
||||
assert "falling back to dict state" in caplog.text
|
||||
|
||||
result = flow.kickoff()
|
||||
assert result == "hello"
|
||||
assert flow.state["begin_ran"] is True
|
||||
assert flow.state["id"]
|
||||
|
||||
|
||||
def test_dict_state_is_a_copy_of_default_plus_id():
|
||||
definition = FlowDefinition.from_yaml(DICT_STATE_YAML)
|
||||
|
||||
flow = Flow.from_definition(definition)
|
||||
assert flow.state["count"] == 5
|
||||
assert flow.state["id"]
|
||||
flow.kickoff()
|
||||
assert flow.state["begin_ran"] is True
|
||||
|
||||
second = Flow.from_definition(definition)
|
||||
assert second.state["count"] == 5
|
||||
assert "begin_ran" not in second.state
|
||||
assert second.state["id"] != flow.state["id"]
|
||||
assert definition.state.default == {"count": 5}
|
||||
|
||||
|
||||
def test_unknown_state_type_falls_back_to_dict(caplog):
|
||||
with caplog.at_level("WARNING"):
|
||||
flow = Flow.from_definition(FlowDefinition.from_yaml(UNKNOWN_STATE_YAML))
|
||||
assert "falling back to dict state" in caplog.text
|
||||
|
||||
result = flow.kickoff()
|
||||
assert result == "hello"
|
||||
assert flow.state["begin_ran"] is True
|
||||
@@ -77,12 +77,22 @@ class ComplexFlow(Flow):
|
||||
return "complete"
|
||||
|
||||
|
||||
def _attach_flow_definition(flow_class: type[Flow], methods: dict[str, object]) -> None:
|
||||
def _attach_flow_definition(
|
||||
flow_class: type[Flow], methods: dict[str, dict[str, object]]
|
||||
) -> None:
|
||||
flow_class._flow_definition = FlowDefinition.from_dict(
|
||||
{
|
||||
"schema": "crewai.flow/v1",
|
||||
"name": flow_class.__name__,
|
||||
"methods": methods,
|
||||
"methods": {
|
||||
name: {
|
||||
"do": {
|
||||
"ref": f"{flow_class.__module__}:{flow_class.__name__}.{name}"
|
||||
},
|
||||
**spec,
|
||||
}
|
||||
for name, spec in methods.items()
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
@@ -125,13 +135,20 @@ def test_build_flow_structure_from_flow_definition():
|
||||
"schema": "crewai.flow/v1",
|
||||
"name": "DefinedFlow",
|
||||
"methods": {
|
||||
"begin": {"start": True},
|
||||
"begin": {
|
||||
"do": {"ref": "defined_flows:DefinedFlow.begin"},
|
||||
"start": True,
|
||||
},
|
||||
"decide": {
|
||||
"do": {"ref": "defined_flows:DefinedFlow.decide"},
|
||||
"listen": "begin",
|
||||
"router": True,
|
||||
"emit": ["done"],
|
||||
},
|
||||
"finish": {"listen": "done"},
|
||||
"finish": {
|
||||
"do": {"ref": "defined_flows:DefinedFlow.finish"},
|
||||
"listen": "done",
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
"""CrewAI development tools."""
|
||||
|
||||
__version__ = "1.14.7rc1"
|
||||
__version__ = "1.14.7"
|
||||
|
||||
Reference in New Issue
Block a user