Compare commits

...

10 Commits

Author SHA1 Message Date
Vinicius Brasil
acdec7cd83 Build flow state from FlowDefinition
Definition-driven flows previously always started with a bare dict
state.
2026-06-10 15:46:24 -07:00
Vinicius Brasil
615f5683de Add Flow.from_definition to run flows without a subclass
A FlowDefinition (e.g. loaded from YAML) was only usable for dispatch on
decorator-authored subclasses. Now each method definition records an
importable `module:qualname` handler ref, and `Flow.from_definition`
resolves and binds those handlers to build a runnable flow directly.
2026-06-10 12:31:48 -07:00
Vinicius Brasil
3517115221 Read flow dispatch from FlowDefinition
Store the definition in a `_definition` PrivateAttr at post-init and
convert the dispatch helpers (`_start_method_names`, `_listener_methods`,
`_start_condition`, `_listen_condition`, `_is_router`) from classmethods
to instance methods that read it. Event names now fall back to
`self._definition.name` instead of `self.__class__.__name__`.

Behavior is identical for decorator subclasses, but the engine no longer
assumes the definition comes from the class. This is the seam for
`Flow.from_definition`, where an instance runs a definition that was
loaded rather than built from a Python subclass.
2026-06-10 12:19:37 -07:00
Lucas Gomide
b3f175b56f docs: update otel images (#6103)
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Check Documentation Broken Links / Check broken links (push) Has been cancelled
Vulnerability Scan / pip-audit (push) Has been cancelled
2026-06-10 14:34:30 -04:00
Lucas Gomide
f523a7d029 docs: udpate docs to reflect new state of OpenTelemetry collector (#6100)
* docs: udpate docs to reflect new state of OpenTelemetry collector

* docs: add OTel collector and Datadog screenshots

These images are referenced by the capture_telemetry_logs guides but were
missing from the tree, which broke the link checker across all locales.

* docs: address PR review on OTel collector guide

- Clarify that OpenTelemetry Traces and Logs are separate integrations
  sharing the same fields (resolves Traces/Logs wording inconsistency)
- List regional Datadog OTLP hosts (US1/US3/US5/EU1/AP1) so users outside
  US5 can copy the right domain
2026-06-10 14:26:35 -04:00
Lorenze Jay
f214ff4b7b decouple convo logic from runtime and added a conversational_definition (#6091)
* decouple convo logic from runtime and added a conversational_definition

* type check fix

* always defer traces for convo and so fix tests to reflect that
2026-06-10 10:49:39 -07:00
Vini Brasil
a9e7c3a44f Simplify flow condition evaluation to be stateless per event (#6097)
Re-evaluate the whole `@listen`/`@router` condition tree against the set
of events seen so far, instead of tracking which AND sub-branches remain
pending.

Net effect:
* Fixes a regression where `or_()` short-circuited at the first
  satisfied branch, leaving a sibling `and_()` half-complete so a later
  trigger could spuriously re-fire the listener
* Removes the fragile per-branch pending state and `id()`-based keys
* Shrinks the evaluator to one readable predicate
2026-06-10 10:35:25 -07:00
Lucas Gomide
da8fe8c715 fix: respect suppress_flow_events for method-execution events (#6095)
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Check Documentation Broken Links / Check broken links (push) Has been cancelled
Vulnerability Scan / pip-audit (push) Has been cancelled
Nightly Canary Release / Check for new commits (push) Has been cancelled
Nightly Canary Release / Build nightly packages (push) Has been cancelled
Nightly Canary Release / Publish nightly to PyPI (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
* fix: respect suppress_flow_events for method-execution events

* test: align suppressed-flow test with new method-event behavior
2026-06-09 17:19:25 -04:00
Greyson LaLonde
ce42994ae3 docs: update changelog and version for v1.14.7a4 2026-06-09 12:58:38 -07:00
Greyson LaLonde
820c3905e3 feat: bump versions to 1.14.7a4 2026-06-09 12:51:55 -07:00
38 changed files with 1698 additions and 400 deletions

View File

@@ -4,6 +4,26 @@ description: "تحديثات المنتج والتحسينات وإصلاحات
icon: "clock"
mode: "wide"
---
<Update label="9 يونيو 2026">
## v1.14.7a4
[عرض الإصدار على GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7a4)
## ما الذي تغير
### الميزات
- نقل وقت التشغيل @listen/@router لقراءة من FlowDefinition
- إضافة واجهات خلفية افتراضية قابلة للتوصيل للذاكرة، والمعرفة، وrag، وflow
### الوثائق
- تحديث سجل التغييرات والإصدار لـ v1.14.7a3
## المساهمون
@greysonlalonde, @mattatcha, @vinibrsl
</Update>
<Update label="8 يونيو 2026">
## v1.14.7a3

View File

@@ -24,15 +24,39 @@ mode: "wide"
1. في CrewAI AMP، انتقل إلى **Settings** > **OpenTelemetry Collectors**.
2. انقر على **Add Collector**.
3. اختر نوع التكامل — **OpenTelemetry Traces** أو **OpenTelemetry Logs**.
4. هيّئ الاتصال:
- **Endpoint** — نقطة نهاية OTLP لمجمّعك (مثل `https://otel-collector.example.com:4317`).
- **Service Name** — اسم لتعريف هذه الخدمة في منصة المراقبة.
- **Custom Headers** *(اختياري)* — أضف رؤوس المصادقة أو التوجيه كأزواج مفتاح-قيمة.
- **Certificate** *(اختياري)* — قدم شهادة TLS إذا كان مجمّعك يتطلبها.
5. انقر على **Save**.
3. اختر تكاملاً:
- **OpenTelemetry Traces** و**OpenTelemetry Logs** — صدّر إلى أي مجمّع أو واجهة خلفية متوافقة مع OTLP.
- **Datadog** — أرسل التتبعات مباشرة إلى استقبال OTLP الخاص بـ Datadog، دون الحاجة إلى مجمّع منفصل أو Datadog Agent.
4. هيّئ الاتصال. تعتمد الحقول على التكامل الذي اخترته:
<Frame>![تهيئة مجمّع OpenTelemetry](/images/crewai-otel-collector-config.png)</Frame>
<Tabs>
<Tab title="OpenTelemetry Traces / Logs">
إن **OpenTelemetry Traces** و**OpenTelemetry Logs** تكاملان منفصلان يتشاركان نفس الحقول — اختر التكامل المطابق للإشارة التي تريد تصديرها.
- **Endpoint** — نقطة نهاية OTLP لمجمّعك (مثل `https://otel-collector.example.com:4317`).
- **Service Name** — اسم لتعريف هذه الخدمة في منصة المراقبة.
- **Custom Headers** *(اختياري)* — أضف رؤوس المصادقة أو التوجيه كأزواج مفتاح-قيمة.
- **Certificate** *(اختياري)* — قدم شهادة TLS إذا كان مجمّعك يتطلبها.
<Frame>![تهيئة مجمّع OpenTelemetry](/images/crewai-otel-collector-opentelemetry.png)</Frame>
</Tab>
<Tab title="Datadog">
- **Datadog Site Domain** — مضيف OTLP لموقع Datadog الخاص بك فقط، دون بروتوكول أو مسار. يقوم CrewAI ببناء نقطة نهاية HTTPS OTLP الكاملة نيابةً عنك. استخدم المضيف المطابق لـ [موقع Datadog](https://docs.datadoghq.com/getting_started/site/) الخاص بك:
- `otlp.datadoghq.com` (US1)
- `otlp.us3.datadoghq.com` (US3)
- `otlp.us5.datadoghq.com` (US5)
- `otlp.datadoghq.eu` (EU1)
- `otlp.ap1.datadoghq.com` (AP1)
- **API Key** — مفتاح واجهة برمجة تطبيقات Datadog الخاص بك. راجع [كيفية إنشاء واحد](https://docs.datadoghq.com/account_management/api-app-keys/#api-keys).
يصدّر تكامل Datadog **التتبعات**.
<Frame>![تهيئة مجمّع Datadog](/images/crewai-otel-collector-datadog.png)</Frame>
</Tab>
</Tabs>
5. *(اختياري)* انقر على **Test Connection** للتحقق من قدرة CrewAI على الوصول إلى نقطة النهاية باستخدام بيانات الاعتماد التي قدمتها.
6. انقر على **Save**.
<Tip>
يمكنك إضافة مجمّعات متعددة — على سبيل المثال، واحد للتتبعات وآخر للسجلات، أو الإرسال إلى واجهات خلفية مختلفة لأغراض مختلفة.

View File

@@ -4,6 +4,26 @@ description: "Product updates, improvements, and bug fixes for CrewAI"
icon: "clock"
mode: "wide"
---
<Update label="Jun 09, 2026">
## v1.14.7a4
[View release on GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7a4)
## What's Changed
### Features
- Migrate @listen/@router runtime to read from FlowDefinition
- Add pluggable default backends for memory, knowledge, rag, and flow
### Documentation
- Update changelog and version for v1.14.7a3
## Contributors
@greysonlalonde, @mattatcha, @vinibrsl
</Update>
<Update label="Jun 08, 2026">
## v1.14.7a3

View File

@@ -24,15 +24,39 @@ Telemetry data follows the [OpenTelemetry GenAI semantic conventions](https://op
1. In CrewAI AMP, go to **Settings** > **OpenTelemetry Collectors**.
2. Click **Add Collector**.
3. Select an integration type — **OpenTelemetry Traces** or **OpenTelemetry Logs**.
4. Configure the connection:
- **Endpoint** — Your collector's OTLP endpoint (e.g., `https://otel-collector.example.com:4317`).
- **Service Name** — A name to identify this service in your observability platform.
- **Custom Headers** *(optional)* — Add authentication or routing headers as key-value pairs.
- **Certificate** *(optional)* — Provide a TLS certificate if your collector requires one.
5. Click **Save**.
3. Select an integration:
- **OpenTelemetry Traces** and **OpenTelemetry Logs** — export to any OTLP-compatible collector or backend.
- **Datadog** — send traces straight to Datadog's OTLP intake, no separate collector or Datadog Agent required.
4. Configure the connection. The fields depend on the integration you selected:
<Frame>![OpenTelemetry Collector Configuration](/images/crewai-otel-collector-config.png)</Frame>
<Tabs>
<Tab title="OpenTelemetry Traces / Logs">
**OpenTelemetry Traces** and **OpenTelemetry Logs** are separate integrations that share the same fields — pick the one matching the signal you want to export.
- **Endpoint** — Your collector's OTLP endpoint (e.g., `https://otel-collector.example.com:4317`).
- **Service Name** — A name to identify this service in your observability platform.
- **Custom Headers** *(optional)* — Add authentication or routing headers as key-value pairs.
- **Certificate** *(optional)* — Provide a TLS certificate if your collector requires one.
<Frame>![OpenTelemetry collector configuration](/images/crewai-otel-collector-opentelemetry.png)</Frame>
</Tab>
<Tab title="Datadog">
- **Datadog Site Domain** — Your Datadog site's OTLP host only, with no protocol or path. CrewAI builds the full HTTPS OTLP endpoint for you. Use the host that matches your [Datadog site](https://docs.datadoghq.com/getting_started/site/):
- `otlp.datadoghq.com` (US1)
- `otlp.us3.datadoghq.com` (US3)
- `otlp.us5.datadoghq.com` (US5)
- `otlp.datadoghq.eu` (EU1)
- `otlp.ap1.datadoghq.com` (AP1)
- **API Key** — Your Datadog API key. See [how to create one](https://docs.datadoghq.com/account_management/api-app-keys/#api-keys).
The Datadog integration exports **traces**.
<Frame>![Datadog collector configuration](/images/crewai-otel-collector-datadog.png)</Frame>
</Tab>
</Tabs>
5. *(optional)* Click **Test Connection** to verify CrewAI can reach the endpoint with the credentials you provided.
6. Click **Save**.
<Tip>
You can add multiple collectors — for example, one for traces and another for logs, or send to different backends for different purposes.

Binary file not shown.

After

Width:  |  Height:  |  Size: 455 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 420 KiB

View File

@@ -4,6 +4,26 @@ description: "CrewAI의 제품 업데이트, 개선 사항 및 버그 수정"
icon: "clock"
mode: "wide"
---
<Update label="2026년 6월 9일">
## v1.14.7a4
[GitHub 릴리스 보기](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7a4)
## 변경 사항
### 기능
- @listen/@router 런타임을 FlowDefinition에서 읽도록 마이그레이션
- 메모리, 지식, rag 및 flow에 대한 플러그형 기본 백엔드 추가
### 문서
- v1.14.7a3에 대한 변경 로그 및 버전 업데이트
## 기여자
@greysonlalonde, @mattatcha, @vinibrsl
</Update>
<Update label="2026년 6월 8일">
## v1.14.7a3

View File

@@ -24,15 +24,39 @@ CrewAI AMP는 배포에서 OpenTelemetry **트레이스**와 **로그**를 자
1. CrewAI AMP에서 **Settings** > **OpenTelemetry Collectors**로 이동합니다.
2. **Add Collector**를 클릭합니다.
3. 통합 유형을 선택합니다 — **OpenTelemetry Traces** 또는 **OpenTelemetry Logs**.
4. 연결을 구성합니다:
- **Endpoint** — 수집기의 OTLP 엔드포인트 (예: `https://otel-collector.example.com:4317`).
- **Service Name** — 관측 가능성 플랫폼에서 이 서비스를 식별하기 위한 이름.
- **Custom Headers** *(선택 사항)* — 인증 또는 라우팅 헤더를 키-값 쌍으로 추가합니다.
- **Certificate** *(선택 사항)* — 수집기에서 TLS 인증서가 필요한 경우 제공합니다.
5. **Save**를 클릭합니다.
3. 통합을 선택합니다:
- **OpenTelemetry Traces** 및 **OpenTelemetry Logs** — OTLP 호환 수집기 또는 백엔드로 내보냅니다.
- **Datadog** — 별도의 수집기나 Datadog Agent 없이 트레이스를 Datadog의 OTLP 인테이크로 직접 전송합니다.
4. 연결을 구성합니다. 필드는 선택한 통합에 따라 달라집니다:
<Frame>![OpenTelemetry 수집기 구성](/images/crewai-otel-collector-config.png)</Frame>
<Tabs>
<Tab title="OpenTelemetry Traces / Logs">
**OpenTelemetry Traces**와 **OpenTelemetry Logs**는 동일한 필드를 공유하는 별개의 통합입니다 — 내보내려는 신호에 맞는 것을 선택하세요.
- **Endpoint** — 수집기의 OTLP 엔드포인트 (예: `https://otel-collector.example.com:4317`).
- **Service Name** — 관측 가능성 플랫폼에서 이 서비스를 식별하기 위한 이름.
- **Custom Headers** *(선택 사항)* — 인증 또는 라우팅 헤더를 키-값 쌍으로 추가합니다.
- **Certificate** *(선택 사항)* — 수집기에서 TLS 인증서가 필요한 경우 제공합니다.
<Frame>![OpenTelemetry 수집기 구성](/images/crewai-otel-collector-opentelemetry.png)</Frame>
</Tab>
<Tab title="Datadog">
- **Datadog Site Domain** — Datadog 사이트의 OTLP 호스트만 입력합니다 (프로토콜이나 경로 제외). CrewAI가 전체 HTTPS OTLP 엔드포인트를 자동으로 구성합니다. [Datadog 사이트](https://docs.datadoghq.com/getting_started/site/)에 맞는 호스트를 사용하세요:
- `otlp.datadoghq.com` (US1)
- `otlp.us3.datadoghq.com` (US3)
- `otlp.us5.datadoghq.com` (US5)
- `otlp.datadoghq.eu` (EU1)
- `otlp.ap1.datadoghq.com` (AP1)
- **API Key** — Datadog API 키입니다. [키 생성 방법](https://docs.datadoghq.com/account_management/api-app-keys/#api-keys)을 참고하세요.
Datadog 통합은 **트레이스**를 내보냅니다.
<Frame>![Datadog 수집기 구성](/images/crewai-otel-collector-datadog.png)</Frame>
</Tab>
</Tabs>
5. *(선택 사항)* **Test Connection**을 클릭하여 제공한 자격 증명으로 CrewAI가 엔드포인트에 연결할 수 있는지 확인합니다.
6. **Save**를 클릭합니다.
<Tip>
여러 수집기를 추가할 수 있습니다 — 예를 들어, 트레이스용 하나와 로그용 하나를 추가하거나, 다른 목적을 위해 다른 백엔드로 전송할 수 있습니다.

View File

@@ -4,6 +4,26 @@ description: "Atualizações de produto, melhorias e correções do CrewAI"
icon: "clock"
mode: "wide"
---
<Update label="09 jun 2026">
## v1.14.7a4
[Ver release no GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7a4)
## O Que Mudou
### Funcionalidades
- Migrar a execução @listen/@router para ler a partir de FlowDefinition
- Adicionar backends padrão plugáveis para memória, conhecimento, rag e flow
### Documentação
- Atualizar changelog e versão para v1.14.7a3
## Contributors
@greysonlalonde, @mattatcha, @vinibrsl
</Update>
<Update label="08 jun 2026">
## v1.14.7a3

View File

@@ -24,15 +24,39 @@ Os dados de telemetria seguem as [convenções semânticas GenAI do OpenTelemetr
1. No CrewAI AMP, vá para **Settings** > **OpenTelemetry Collectors**.
2. Clique em **Add Collector**.
3. Selecione um tipo de integração — **OpenTelemetry Traces** ou **OpenTelemetry Logs**.
4. Configure a conexão:
- **Endpoint** — O endpoint OTLP do seu coletor (por exemplo, `https://otel-collector.example.com:4317`).
- **Service Name** — Um nome para identificar este serviço na sua plataforma de observabilidade.
- **Custom Headers** *(opcional)* — Adicione headers de autenticação ou roteamento como pares chave-valor.
- **Certificate** *(opcional)* — Forneça um certificado TLS se o seu coletor exigir um.
5. Clique em **Save**.
3. Selecione uma integração:
- **OpenTelemetry Traces** e **OpenTelemetry Logs** — exporte para qualquer coletor ou backend compatível com OTLP.
- **Datadog** — envie traces diretamente para a ingestão OTLP do Datadog, sem precisar de um coletor separado ou do Datadog Agent.
4. Configure a conexão. Os campos dependem da integração selecionada:
<Frame>![Configuração do Coletor OpenTelemetry](/images/crewai-otel-collector-config.png)</Frame>
<Tabs>
<Tab title="OpenTelemetry Traces / Logs">
**OpenTelemetry Traces** e **OpenTelemetry Logs** são integrações separadas que compartilham os mesmos campos — escolha a que corresponde ao sinal que você quer exportar.
- **Endpoint** — O endpoint OTLP do seu coletor (por exemplo, `https://otel-collector.example.com:4317`).
- **Service Name** — Um nome para identificar este serviço na sua plataforma de observabilidade.
- **Custom Headers** *(opcional)* — Adicione headers de autenticação ou roteamento como pares chave-valor.
- **Certificate** *(opcional)* — Forneça um certificado TLS se o seu coletor exigir um.
<Frame>![Configuração do coletor OpenTelemetry](/images/crewai-otel-collector-opentelemetry.png)</Frame>
</Tab>
<Tab title="Datadog">
- **Datadog Site Domain** — Apenas o host OTLP do seu site Datadog, sem protocolo ou caminho. O CrewAI monta o endpoint HTTPS OTLP completo para você. Use o host correspondente ao seu [site Datadog](https://docs.datadoghq.com/getting_started/site/):
- `otlp.datadoghq.com` (US1)
- `otlp.us3.datadoghq.com` (US3)
- `otlp.us5.datadoghq.com` (US5)
- `otlp.datadoghq.eu` (EU1)
- `otlp.ap1.datadoghq.com` (AP1)
- **API Key** — Sua chave de API do Datadog. Veja [como criar uma](https://docs.datadoghq.com/account_management/api-app-keys/#api-keys).
A integração com o Datadog exporta **traces**.
<Frame>![Configuração do coletor Datadog](/images/crewai-otel-collector-datadog.png)</Frame>
</Tab>
</Tabs>
5. *(opcional)* Clique em **Test Connection** para verificar se o CrewAI consegue acessar o endpoint com as credenciais fornecidas.
6. Clique em **Save**.
<Tip>
Você pode adicionar múltiplos coletores — por exemplo, um para traces e outro para logs, ou enviar para diferentes backends para diferentes propósitos.

View File

@@ -8,7 +8,7 @@ authors = [
]
requires-python = ">=3.10, <3.14"
dependencies = [
"crewai-core==1.14.7a3",
"crewai-core==1.14.7a4",
"click>=8.1.7,<9",
"pydantic>=2.11.9,<2.13",
"pydantic-settings~=2.10.1",

View File

@@ -1 +1 @@
__version__ = "1.14.7a3"
__version__ = "1.14.7a4"

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.14.7a3"
"crewai[tools]==1.14.7a4"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.14.7a3"
"crewai[tools]==1.14.7a4"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "Power up your crews with {{folder_name}}"
readme = "README.md"
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.14.7a3"
"crewai[tools]==1.14.7a4"
]
[tool.crewai]

View File

@@ -1 +1 @@
__version__ = "1.14.7a3"
__version__ = "1.14.7a4"

View File

@@ -152,4 +152,4 @@ __all__ = [
"wrap_file_source",
]
__version__ = "1.14.7a3"
__version__ = "1.14.7a4"

View File

@@ -10,7 +10,7 @@ requires-python = ">=3.10, <3.14"
dependencies = [
"pytube~=15.0.0",
"requests>=2.33.0,<3",
"crewai==1.14.7a3",
"crewai==1.14.7a4",
"tiktoken>=0.8.0,<0.13",
"beautifulsoup4~=4.13.4",
"python-docx~=1.2.0",

View File

@@ -330,4 +330,4 @@ __all__ = [
"ZapierActionTools",
]
__version__ = "1.14.7a3"
__version__ = "1.14.7a4"

View File

@@ -8,8 +8,8 @@ authors = [
]
requires-python = ">=3.10, <3.14"
dependencies = [
"crewai-core==1.14.7a3",
"crewai-cli==1.14.7a3",
"crewai-core==1.14.7a4",
"crewai-cli==1.14.7a4",
# Core Dependencies
"pydantic>=2.11.9,<2.13",
"openai>=2.30.0,<3",
@@ -54,7 +54,7 @@ Repository = "https://github.com/crewAIInc/crewAI"
[project.optional-dependencies]
tools = [
"crewai-tools==1.14.7a3",
"crewai-tools==1.14.7a4",
]
embeddings = [
"tiktoken>=0.8.0,<0.13"

View File

@@ -48,7 +48,7 @@ def _suppress_pydantic_deprecation_warnings() -> None:
_suppress_pydantic_deprecation_warnings()
__version__ = "1.14.7a3"
__version__ = "1.14.7a4"
_LAZY_IMPORTS: dict[str, tuple[str, str]] = {
"Memory": ("crewai.memory.unified_memory", "Memory"),

View File

@@ -780,10 +780,11 @@ class TraceCollectionListener(BaseEventListener):
def _try_initialize_flow_batch_from_context(self, event: Any) -> bool:
"""Claim a flow trace batch when an action event fires inside kickoff.
When ``suppress_flow_events=True``, console panels are hidden but
``FlowStartedEvent`` and method lifecycle events still emit; if no
batch exists yet, LLM/tool events must not fall back to implicit crew
batches.
When ``suppress_flow_events=True`` (infrastructure flows such as
``AgentExecutor`` and the memory flows), flow and method lifecycle
events are not emitted, so the batch is claimed from the flow context
(``current_flow_id``) to keep LLM/tool events from falling back to an
implicit crew batch.
"""
from crewai.flow.flow_context import current_flow_id, current_flow_name

View File

@@ -1,15 +1,17 @@
"""Conversational graph + helpers as a mixin for ``Flow`` (experimental).
"""Conversational graph + helpers as an experimental Flow extension.
The experimental conversational chat surface lives here as a mixin so that
``crewai.flow.runtime`` stays focused on the execution engine. ``Flow``
inherits from ``_ConversationalMixin``; the methods only register on
subclasses that opt in via ``conversational = True`` (enforced by the
``_conversational_only`` marker + ``FlowMeta`` gating in
``crewai.flow.runtime``).
The conversational chat surface remains experimental and may change before the
v2 graduation path. It lives here so ``crewai.flow.runtime`` can stay focused
on the execution engine. ``crewai.flow.flow`` composes this mixin onto the
public ``Flow`` class for backwards compatibility.
The built-in conversational graph only registers for subclasses that opt in
with ``conversational = True``. Static conversational metadata is projected
into ``FlowDefinition.conversational`` via the Python DSL builder.
Import surface:
- :class:`_ConversationalMixin` — internal; ``Flow`` mixes it in. Users
don't import it directly.
- :class:`_ConversationalMixin` — internal; the public ``Flow`` class
composes it in. Users don't import it directly.
- The data types this mixin uses live in
:mod:`crewai.experimental.conversational`.
"""
@@ -20,7 +22,7 @@ from collections.abc import Callable, Mapping, Sequence
from enum import Enum
import json
import logging
from typing import TYPE_CHECKING, Any, ClassVar, Literal, cast
from typing import TYPE_CHECKING, Any, ClassVar, Literal, TypeVar, cast
from pydantic import BaseModel, Field, create_model
@@ -49,21 +51,56 @@ from crewai.utilities.types import LLMMessage
if TYPE_CHECKING:
from crewai.flow.runtime import Flow
from crewai.llms.base_llm import BaseLLM
logger = logging.getLogger(__name__)
class _ConversationalMixin:
"""Built-in conversational graph for ``Flow`` (gated on ``conversational``).
def _iter_condition_labels(condition: Any) -> set[str]:
if isinstance(condition, str):
return {condition}
if isinstance(condition, dict):
labels: set[str] = set()
for value in condition.values():
if isinstance(value, list):
for item in value:
labels.update(_iter_condition_labels(item))
else:
labels.update(_iter_condition_labels(value))
return labels
return set()
Mixed into ``Flow`` so its execution engine (``runtime.py``) stays focused
on running graphs. The methods here only register on subclasses that set
``conversational = True``; non-chat flows see them as inert attributes.
class _ConversationalMixin:
"""Experimental conversational graph for ``Flow``.
This mixin owns chat behavior and runtime hooks. Non-chat flows see these
methods as inert attributes unless they opt in with ``conversational = True``.
"""
# === EXPERIMENTAL: conversational mode ===
# When ``conversational = True`` on a Flow subclass, this mixin's built-in
# graph registers and ``handle_turn`` / ``chat`` become chat entry points.
conversational: ClassVar[bool] = False
conversational_config: ClassVar[ConversationConfig | None] = None
builtin_routes: ClassVar[tuple[str, ...]] = ("converse", "end")
internal_routes: ClassVar[tuple[str, ...]] = (
"answer_from_history",
"conversation_start",
)
builtin_route_descriptions: ClassVar[dict[str, str]] = {
"converse": (
"Ordinary chat, follow-ups, summaries, clarifications, and "
"questions answerable from prior conversation history."
),
"end": ("User signals the conversation is finished (goodbye, exit, done)."),
"answer_from_history": (
"Answer directly from prior conversation history without invoking "
"tools, agents, or custom routes."
),
}
# The metaclass + state attributes referenced below live on ``Flow`` —
# this mixin is never instantiated standalone. These type-only
# declarations exist so static analyzers don't flag attribute access.
@@ -71,22 +108,15 @@ class _ConversationalMixin:
# (otherwise mypy flags "Cannot override instance variable with class
# variable" when Flow declares them as ``ClassVar``).
if TYPE_CHECKING:
conversational: ClassVar[bool]
conversational_config: ClassVar[ConversationConfig | None]
builtin_routes: ClassVar[tuple[str, ...]]
internal_routes: ClassVar[tuple[str, ...]]
builtin_route_descriptions: ClassVar[dict[str, str]]
# Registry ClassVars populated by ``FlowMeta`` at class creation.
_listeners: ClassVar[dict[Any, Any]]
# Instance attrs from ``Flow``.
state: Any
name: str | None
_completed_methods: set[Any]
_method_outputs: list[Any]
_pending_and_listeners: dict[Any, Any]
_pending_events: dict[Any, Any]
_method_call_counts: dict[Any, int]
_is_execution_resuming: bool
_conversation_messages: list[LLMMessage]
_pending_user_message: str | dict[str, Any] | None
_pending_intents: Sequence[str] | None
_pending_intent_llm: str | BaseLLM | None
@@ -97,8 +127,8 @@ class _ConversationalMixin:
def _collapse_to_outcome(
self,
feedback: str,
outcomes: tuple[str, ...],
llm: str | BaseLLM | Any,
outcomes: Sequence[str],
llm: str | BaseLLM,
) -> str:
pass
@@ -238,8 +268,8 @@ class _ConversationalMixin:
state = cast(ConversationState, self.state)
sid = session_id or state.id
# Stash the pending turn so ``_apply_pending_conversational_turn``
# picks it up AFTER persist restore.
# Stash the pending turn so the kickoff extension hook picks it up
# after persist restore.
self._pending_user_message = message
self._pending_intents = list(intents) if intents else None
self._pending_intent_llm = intent_llm
@@ -286,7 +316,7 @@ class _ConversationalMixin:
callers can customize prompts or exercise the loop without patching
builtins.
"""
if not getattr(type(self), "conversational", False):
if not self._is_conversational_enabled():
raise ValueError("Flow.chat() is only available on conversational flows")
exit_set = {command.lower() for command in exit_commands}
@@ -491,14 +521,14 @@ class _ConversationalMixin:
**extra: Any,
) -> None:
"""Append a message to conversation history (legacy ChatState path)."""
_append_conversation_message(cast("Flow[Any]", self), role, content, **extra)
_append_conversation_message(cast(Any, self), role, content, **extra)
@property
def conversation_messages(self) -> list[LLMMessage]:
"""Message history from state, coerced to LLM-shaped dicts."""
return [
message_to_llm_dict(message)
for message in get_conversation_messages(cast("Flow[Any]", self))
for message in get_conversation_messages(cast(Any, self))
]
def receive_user_message(
@@ -514,7 +544,7 @@ class _ConversationalMixin:
``state.messages`` and preserve ``last_intent`` across turns.
Non-conversational flows fall through to the legacy helper.
"""
if self.conversational:
if self._is_conversational_enabled():
state = cast(ConversationState, self.state)
state.messages.append(ConversationMessage(role="user", content=text))
self._emit_conversation_message_added(
@@ -535,9 +565,7 @@ class _ConversationalMixin:
return intent
return text
return _receive_user_message(
cast("Flow[Any]", self), text, outcomes=outcomes, llm=llm
)
return _receive_user_message(cast(Any, self), text, outcomes=outcomes, llm=llm)
def classify_intent(
self,
@@ -561,27 +589,104 @@ class _ConversationalMixin:
def _conversation_config(self) -> ConversationConfig | None:
return getattr(type(self), "conversational_config", None)
@property
def _conversation_definition(self) -> Any | None:
return self._conversation_flow_definition().conversational
def _conversation_flow_definition(self) -> Any:
flow_definition = getattr(type(self), "flow_definition", None)
if not callable(flow_definition):
raise AttributeError(
f"{type(self).__name__} does not expose flow_definition()"
)
return flow_definition()
@classmethod
def _conversational_definition(cls) -> Any | None:
flow_definition = getattr(cls, "flow_definition", None)
if not callable(flow_definition):
return None
return flow_definition().conversational
@classmethod
def _is_conversational(cls) -> bool:
definition = cls._conversational_definition()
return bool(definition and definition.enabled)
def _is_conversational_enabled(self) -> bool:
definition = self._conversation_definition
return bool(definition and definition.enabled)
def _initialize_runtime_extension_attrs(self) -> None:
if not isinstance(getattr(self, "_conversation_messages", None), list):
object.__setattr__(self, "_conversation_messages", [])
if not hasattr(self, "_pending_user_message"):
object.__setattr__(self, "_pending_user_message", None)
if not hasattr(self, "_pending_intents"):
object.__setattr__(self, "_pending_intents", None)
if not hasattr(self, "_pending_intent_llm"):
object.__setattr__(self, "_pending_intent_llm", None)
def _create_default_extension_state(self) -> ConversationState | None:
initial_state_t = getattr(self, "_initial_state_t", None)
if type(self)._is_conversational() and (
not hasattr(self, "_initial_state_t")
or isinstance(initial_state_t, TypeVar)
):
return ConversationState()
return None
def _should_apply_pending_kickoff_context(self) -> bool:
return (
type(self)._is_conversational() and self._pending_user_message is not None
)
def _apply_pending_kickoff_context(self) -> None:
self._apply_pending_conversational_turn()
def _order_start_methods_for_kickoff(
self,
start_methods: list[Any],
) -> tuple[list[Any], bool]:
if not type(self)._is_conversational():
return start_methods, False
conversation_start = "conversation_start"
if conversation_start not in {str(method) for method in start_methods}:
return start_methods, False
ordered_starts = [
method for method in start_methods if str(method) != conversation_start
]
ordered_starts.append(
next(
method for method in start_methods if str(method) == conversation_start
)
)
return ordered_starts, True
def _should_defer_trace_finalization(self) -> bool:
"""Whether per-turn ``FlowFinished`` + ``finalize_batch`` should be skipped.
True when either:
- ``flow.defer_trace_finalization`` is set on the instance, OR
- the class-level ``ConversationConfig.defer_trace_finalization``
on a conversational subclass is True.
- the static conversational definition enables deferred finalization.
Either source enables the deferred-session pattern. The caller
eventually invokes ``finalize_session_traces()`` to close the batch.
"""
if getattr(self, "defer_trace_finalization", False):
return True
config = self._conversation_config
return bool(config and config.defer_trace_finalization)
definition = self._conversation_definition
return bool(
definition and definition.enabled and definition.defer_trace_finalization
)
def _reset_turn_execution_state(self) -> None:
"""Clear per-execution tracking so the next turn re-runs the graph."""
self._completed_methods.clear()
self._method_outputs.clear()
self._pending_and_listeners.clear()
self._pending_events.clear()
self._method_call_counts.clear()
self._clear_or_listeners()
self._is_execution_resuming = False
@@ -733,11 +838,12 @@ class _ConversationalMixin:
router_config: RouterConfig | None,
) -> dict[str, str]:
label_to_method: dict[str, str] = {}
for listener_name, condition in self._listeners.items():
if isinstance(condition, tuple):
_, trigger_labels = condition
for trigger_label in trigger_labels:
label_to_method.setdefault(str(trigger_label), str(listener_name))
flow_definition = self._conversation_flow_definition()
for listener_name, method_definition in flow_definition.methods.items():
if method_definition.listen is None or method_definition.router:
continue
for trigger_label in _iter_condition_labels(method_definition.listen):
label_to_method.setdefault(trigger_label, listener_name)
routes = self._effective_routes(router_config)
overrides = (
@@ -788,21 +894,31 @@ class _ConversationalMixin:
def _valid_route_labels(self) -> set[str]:
labels: set[str] = set()
for condition in self._listeners.values():
if isinstance(condition, tuple):
_, methods = condition
labels.update(str(method) for method in methods)
flow_definition = self._conversation_flow_definition()
for method_definition in flow_definition.methods.values():
if method_definition.listen is None or method_definition.router:
continue
labels.update(_iter_condition_labels(method_definition.listen))
return labels
def _effective_routes(self, router_config: RouterConfig | None = None) -> set[str]:
custom_routes = set(router_config.routes or ()) if router_config else set()
definition = self._conversation_definition
builtin_routes = (
tuple(definition.builtin_routes)
if definition is not None
else self.builtin_routes
)
internal_routes = (
tuple(definition.internal_routes)
if definition is not None
else self.internal_routes
)
if not custom_routes:
custom_routes = (
self._valid_route_labels()
- set(self.builtin_routes)
- set(self.internal_routes)
self._valid_route_labels() - set(builtin_routes) - set(internal_routes)
)
return custom_routes | set(self.builtin_routes)
return custom_routes | set(builtin_routes)
def _default_conversation_llm(self) -> Any | None:
config = self._conversation_config

View File

@@ -0,0 +1,50 @@
"""Static conversational Flow definition models.
This module is part of the serializable Flow Definition contract. It should
only contain static data shapes. Experimental conversational runtime behavior
continues to live in ``crewai.experimental.conversational_mixin``.
"""
from __future__ import annotations
from typing import Any, Literal
from pydantic import BaseModel, Field
class FlowConversationalRouterDefinition(BaseModel):
"""Static conversational router configuration."""
prompt: str | None = None
response_format: Any = None
llm: Any = None
routes: list[str] | None = None
route_descriptions: dict[str, str] | None = None
default_intent: str | None = "converse"
fallback_intent: str | None = "converse"
intent_field: str = "intent"
class FlowConversationalDefinition(BaseModel):
"""Static conversational Flow configuration."""
enabled: bool = False
system_prompt: str | None = None
llm: Any = None
router: FlowConversationalRouterDefinition | None = None
answer_from_history_prompt: str | None = None
default_intents: list[str] | None = None
intent_llm: Any = None
answer_from_history_llm: Any = None
visible_agent_outputs: list[str] | Literal["all"] | None = None
defer_trace_finalization: bool = True
builtin_routes: list[str] = Field(default_factory=lambda: ["converse", "end"])
internal_routes: list[str] = Field(
default_factory=lambda: ["answer_from_history", "conversation_start"]
)
__all__ = [
"FlowConversationalDefinition",
"FlowConversationalRouterDefinition",
]

View File

@@ -9,6 +9,8 @@ from typing_extensions import TypeIs
from crewai.flow.flow_definition import (
FlowConfigDefinition,
FlowConversationalDefinition,
FlowConversationalRouterDefinition,
FlowDefinition,
FlowDefinitionDiagnostic,
FlowHumanFeedbackDefinition,
@@ -27,6 +29,13 @@ R = TypeVar("R")
logger = logging.getLogger(__name__)
_FLOW_METHOD_DEFINITION_ATTR = "__flow_method_definition__"
_FLOW_METHOD_METADATA_ATTRS = [
"__conversational_only__",
"__flow_method_definition__",
"__flow_persistence_config__",
"__human_feedback_config__",
"_human_feedback_llm",
]
def is_flow_method(obj: Any) -> TypeIs[FlowMethod[Any, Any]]:
@@ -42,6 +51,39 @@ def _should_include_flow_method(flow_class: type, method: Any) -> bool:
return True
def _is_conversational_flow(flow_class: type) -> bool:
return bool(getattr(flow_class, "conversational", False))
def _get_inherited_conversational_method(
flow_class: type,
attr_name: str,
) -> Any | None:
if not _is_conversational_flow(flow_class):
return None
for base in flow_class.__mro__[1:]:
inherited = base.__dict__.get(attr_name)
if inherited is None:
continue
if getattr(inherited, "__conversational_only__", False) and is_flow_method(
inherited
):
return inherited
return None
def _stamp_inherited_conversational_metadata(
method: Any,
inherited: Any,
) -> Any:
for attr in _FLOW_METHOD_METADATA_ATTRS:
if hasattr(inherited, attr):
setattr(method, attr, getattr(inherited, attr))
method.__is_flow_method__ = True
return method
def _set_flow_method_definition(
wrapper: FlowMethod[P, R],
definition: FlowMethodDefinition,
@@ -135,6 +177,8 @@ def _build_state_definition(
from pydantic import BaseModel as PydanticBaseModel
state_value = getattr(flow_class, "_initial_state_t", None)
if isinstance(state_value, TypeVar):
state_value = None
initial_state = getattr(flow_class, "initial_state", None)
if initial_state is not None:
state_value = initial_state
@@ -230,6 +274,98 @@ def _build_persistence_definition(
)
def _build_conversational_router_definition(
router_config: Any,
diagnostics: list[FlowDefinitionDiagnostic],
path: str,
) -> FlowConversationalRouterDefinition | None:
if router_config is None:
return None
routes = getattr(router_config, "routes", None)
return FlowConversationalRouterDefinition(
prompt=getattr(router_config, "prompt", None),
response_format=_serialize_static_value(
getattr(router_config, "response_format", None),
diagnostics,
f"{path}.response_format",
),
llm=_serialize_static_value(
getattr(router_config, "llm", None), diagnostics, f"{path}.llm"
),
routes=[str(route) for route in routes] if routes is not None else None,
route_descriptions=getattr(router_config, "route_descriptions", None),
default_intent=getattr(router_config, "default_intent", "converse"),
fallback_intent=getattr(router_config, "fallback_intent", "converse"),
intent_field=str(getattr(router_config, "intent_field", "intent")),
)
def _build_conversational_definition(
flow_class: type,
diagnostics: list[FlowDefinitionDiagnostic],
) -> FlowConversationalDefinition | None:
if not _is_conversational_flow(flow_class):
return None
config = getattr(flow_class, "conversational_config", None)
builtin_routes = getattr(flow_class, "builtin_routes", ("converse", "end"))
internal_routes = getattr(
flow_class,
"internal_routes",
("answer_from_history", "conversation_start"),
)
if config is None:
return FlowConversationalDefinition(
enabled=True,
builtin_routes=[str(route) for route in builtin_routes],
internal_routes=[str(route) for route in internal_routes],
)
default_intents = getattr(config, "default_intents", None)
visible_agent_outputs = getattr(config, "visible_agent_outputs", None)
return FlowConversationalDefinition(
enabled=True,
system_prompt=getattr(config, "system_prompt", None),
llm=_serialize_static_value(
getattr(config, "llm", None), diagnostics, "conversational.llm"
),
router=_build_conversational_router_definition(
getattr(config, "router", None),
diagnostics,
"conversational.router",
),
answer_from_history_prompt=getattr(config, "answer_from_history_prompt", None),
default_intents=(
[str(intent) for intent in default_intents]
if default_intents is not None
else None
),
intent_llm=_serialize_static_value(
getattr(config, "intent_llm", None),
diagnostics,
"conversational.intent_llm",
),
answer_from_history_llm=_serialize_static_value(
getattr(config, "answer_from_history_llm", None),
diagnostics,
"conversational.answer_from_history_llm",
),
visible_agent_outputs=(
"all"
if visible_agent_outputs == "all"
else [str(output) for output in visible_agent_outputs]
if visible_agent_outputs is not None
else None
),
defer_trace_finalization=bool(
getattr(config, "defer_trace_finalization", True)
),
builtin_routes=[str(route) for route in builtin_routes],
internal_routes=[str(route) for route in internal_routes],
)
def _build_method_definition(
method: Any,
diagnostics: list[FlowDefinitionDiagnostic],
@@ -241,6 +377,11 @@ def _build_method_definition(
else:
method_definition = fragment.model_copy(deep=True)
# Skip <locals>/<lambda> qualnames: they can never be re-imported, so a
# missing handler is more honest than a dead reference.
if "<" not in method.__qualname__:
method_definition.handler = f"{method.__module__}:{method.__qualname__}"
human_feedback = _build_human_feedback_definition(
method, diagnostics, f"{path}.human_feedback"
)
@@ -270,6 +411,29 @@ def _iter_flow_methods(flow_class: type) -> dict[str, Any]:
flow_class, attr_value
):
methods[attr_name] = attr_value
continue
inherited = _get_inherited_conversational_method(flow_class, attr_name)
if inherited is not None and callable(attr_value):
methods[attr_name] = _stamp_inherited_conversational_metadata(
attr_value, inherited
)
if _is_conversational_flow(flow_class):
for base in reversed(flow_class.__mro__[1:]):
for attr_name, raw_value in base.__dict__.items():
if attr_name.startswith("_") or attr_name in methods:
continue
if not getattr(raw_value, "__conversational_only__", False):
continue
try:
attr_value = getattr(flow_class, attr_name)
except AttributeError:
continue
if is_flow_method(attr_value) and _should_include_flow_method(
flow_class, attr_value
):
methods[attr_name] = attr_value
# A wrapped method whose name collides with a base Flow model field
# (e.g. ``checkpoint``) is absorbed by Pydantic as a field; the underlying
@@ -314,6 +478,7 @@ def _build_flow_definition_from_class(
state=_build_state_definition(flow_class, diagnostics),
config=_build_config_definition(flow_class, diagnostics),
persist=_build_persistence_definition(flow_class, diagnostics, "persist"),
conversational=_build_conversational_definition(flow_class, diagnostics),
methods=methods,
diagnostics=diagnostics,
)

View File

@@ -6,15 +6,22 @@ The implementation now lives in three modules, split by concern:
``@router``, ``or_`` / ``and_``) and Python Flow class projection
- ``crewai.flow.flow_definition`` -- the serializable Flow Definition contract
- ``crewai.flow.runtime`` -- the Flow execution engine and state
- ``crewai.experimental.conversational_mixin`` -- experimental conversational
runtime extension composed onto the public ``Flow`` class
Prefer importing from those modules in new code; this module preserves the
historical ``crewai.flow.flow`` import path.
"""
from typing import Any, TypeVar
from pydantic import BaseModel
from crewai.experimental.conversational_mixin import _ConversationalMixin
from crewai.flow.dsl import and_, listen, or_, router, start
from crewai.flow.runtime import (
_INITIAL_STATE_CLASS_MARKER,
Flow,
Flow as RuntimeFlow,
FlowMeta,
FlowState,
LockedDictProxy,
@@ -23,6 +30,13 @@ from crewai.flow.runtime import (
)
T = TypeVar("T", bound=dict[str, Any] | BaseModel)
class Flow(_ConversationalMixin, RuntimeFlow[T]):
"""Public Flow class with experimental conversational extension behavior."""
__all__ = [
"_INITIAL_STATE_CLASS_MARKER",
"Flow",

View File

@@ -16,6 +16,11 @@ from typing import Any, Literal as TypingLiteral
from pydantic import BaseModel, ConfigDict, Field
import yaml
from crewai.flow.conversational_definition import (
FlowConversationalDefinition,
FlowConversationalRouterDefinition,
)
logger = logging.getLogger(__name__)
@@ -23,6 +28,8 @@ FlowDefinitionCondition = str | dict[str, Any]
__all__ = [
"FlowConfigDefinition",
"FlowConversationalDefinition",
"FlowConversationalRouterDefinition",
"FlowDefinition",
"FlowDefinitionCondition",
"FlowDefinitionDiagnostic",
@@ -45,8 +52,9 @@ class FlowDefinitionDiagnostic(BaseModel):
class FlowStateDefinition(BaseModel):
"""Static description of a Flow state contract."""
type: TypingLiteral["dict", "pydantic", "unknown"] = "dict"
type: TypingLiteral["dict", "pydantic", "json_schema", "unknown"] = "dict"
ref: str | None = None
json_schema: dict[str, Any] | None = None
default: Any = None
@@ -86,6 +94,7 @@ class FlowHumanFeedbackDefinition(BaseModel):
class FlowMethodDefinition(BaseModel):
"""Static definition of one Flow method and its execution roles."""
handler: str | None = None
start: bool | FlowDefinitionCondition | None = None
listen: FlowDefinitionCondition | None = None
router: bool = False
@@ -115,6 +124,7 @@ class FlowDefinition(BaseModel):
state: FlowStateDefinition | None = None
config: FlowConfigDefinition = Field(default_factory=FlowConfigDefinition)
persist: FlowPersistenceDefinition | None = None
conversational: FlowConversationalDefinition | None = None
methods: dict[str, FlowMethodDefinition] = Field(default_factory=dict)
diagnostics: list[FlowDefinitionDiagnostic] = Field(default_factory=list)

View File

@@ -22,6 +22,7 @@ from concurrent.futures import Future, ThreadPoolExecutor
import contextvars
import copy
import enum
import importlib
import inspect
import logging
import threading
@@ -84,17 +85,13 @@ from crewai.events.types.flow_events import (
MethodExecutionPausedEvent,
MethodExecutionStartedEvent,
)
from crewai.experimental.conversational import (
ConversationConfig,
ConversationState,
)
from crewai.experimental.conversational_mixin import _ConversationalMixin
from crewai.flow.dsl._utils import build_flow_definition
from crewai.flow.flow_context import current_flow_id, current_flow_request_id
from crewai.flow.flow_definition import (
FlowDefinition,
FlowDefinitionCondition,
FlowMethodDefinition,
FlowStateDefinition,
)
from crewai.flow.flow_wrappers import (
FlowMethod,
@@ -139,7 +136,6 @@ from crewai.utilities.streaming import (
signal_end,
signal_error,
)
from crewai.utilities.types import LLMMessage
# Runtime alias so Pydantic can resolve the ``execution_context`` field's
@@ -154,14 +150,108 @@ ExecutionContext = Any # type: ignore[assignment,misc]
logger = logging.getLogger(__name__)
def _condition_branches(
condition: dict[str, Any],
) -> tuple[Literal["and", "or"], list[FlowDefinitionCondition]]:
if "and" in condition:
return "and", condition["and"]
return "or", condition["or"]
def _condition_satisfied(condition: FlowDefinitionCondition, events: set[str]) -> bool:
if isinstance(condition, str):
return condition in events
operator, branches = _condition_branches(condition)
combine = all if operator == "and" else any
return combine(_condition_satisfied(branch, events) for branch in branches)
def _resolve_handler(ref: str) -> Callable[..., Any]:
module_name, separator, qualname = ref.partition(":")
if not separator or not module_name or not qualname:
raise ValueError(
f"invalid handler reference {ref!r}; expected 'module:qualname'"
)
module = importlib.import_module(module_name)
target: Any = module
for part in qualname.split("."):
target = getattr(target, part)
if not callable(target):
raise TypeError(
f"handler reference {ref!r} resolved to a non-callable "
f"{type(target).__name__}"
)
return cast(Callable[..., Any], target)
def _build_definition_state_model(
state_definition: FlowStateDefinition,
) -> BaseModel | None:
kwargs = (
dict(state_definition.default)
if isinstance(state_definition.default, dict)
else {}
)
model_class: type[BaseModel] | None = None
if state_definition.ref:
try:
resolved = _resolve_handler(state_definition.ref)
except Exception:
logger.warning(
"Could not import state ref %r", state_definition.ref, exc_info=True
)
else:
if isinstance(resolved, type) and issubclass(resolved, BaseModel):
model_class = resolved
else:
logger.warning(
"State ref %r is not a pydantic model", state_definition.ref
)
if model_class is None and state_definition.json_schema:
from crewai.utilities.pydantic_schema_utils import create_model_from_schema
try:
model_class = create_model_from_schema(state_definition.json_schema)
except Exception:
logger.warning(
"Could not build a state model from the declared json_schema",
exc_info=True,
)
if model_class is None:
return None
if not issubclass(model_class, FlowState):
class StateWithId(FlowState, model_class): # type: ignore[misc, valid-type]
pass
model_class = StateWithId
return model_class(**kwargs)
def _iter_condition_events(condition: FlowDefinitionCondition) -> Iterator[str]:
if isinstance(condition, str):
yield condition
return
sub_conditions = condition["and"] if "and" in condition else condition["or"]
for sub_condition in sub_conditions:
yield from _iter_condition_events(sub_condition)
_, branches = _condition_branches(condition)
for branch in branches:
yield from _iter_condition_events(branch)
def _or_alternative_events(condition: FlowDefinitionCondition) -> Iterator[str]:
if isinstance(condition, str):
yield condition
return
operator, branches = _condition_branches(condition)
if operator != "or":
return
for branch in branches:
yield from _or_alternative_events(branch)
def _is_multi_event_or(
@@ -170,7 +260,8 @@ def _is_multi_event_or(
if isinstance(condition, str):
return False
return "or" in condition and len(condition["or"]) > 1
operator, branches = _condition_branches(condition)
return operator == "or" and len(branches) > 1
def _resolve_persistence(value: Any) -> Any:
@@ -616,7 +707,7 @@ class FlowMeta(ModelMetaclass):
return super().__new__(mcs, name, bases, namespace)
class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
"""Base class for all flows.
type parameter T must be either dict[str, Any] or a subclass of BaseModel."""
@@ -630,41 +721,33 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
_flow_definition: ClassVar[FlowDefinition | None] = None
# === EXPERIMENTAL: conversational mode ===
# When ``conversational = True`` on a subclass, the built-in conversational
# graph (``conversation_start`` -> ``route_conversation`` -> ``converse_turn``
# / ``end_conversation`` / ``answer_from_history_turn``) registers and
# ``handle_turn`` / ``chat`` become the chat entry points. When ``False``
# (default), the methods exist as inert attributes and never register or
# fire — non-chat flows pay no runtime cost.
#
# ⚠ EXPERIMENTAL FEATURE. The whole conversational surface
# (``conversational`` ClassVar, ``handle_turn``, ``chat``,
# ``ConversationConfig``, ``RouterConfig``, ``ConversationState``, the
# built-in graph + helpers) lives under ``crewai.experimental`` and may
# change shape before graduating. Pin your CrewAI version if you depend on
# specific behavior, and watch the changelog for breaking updates.
conversational: ClassVar[bool] = False
conversational_config: ClassVar[ConversationConfig | None] = None
builtin_routes: ClassVar[tuple[str, ...]] = ("converse", "end")
internal_routes: ClassVar[tuple[str, ...]] = (
"answer_from_history",
"conversation_start",
)
builtin_route_descriptions: ClassVar[dict[str, str]] = {
"converse": (
"Ordinary chat, follow-ups, summaries, clarifications, and "
"questions answerable from prior conversation history."
),
"end": ("User signals the conversation is finished (goodbye, exit, done)."),
"answer_from_history": (
"Answer directly from prior conversation history without invoking "
"tools, agents, or custom routes."
),
}
entity_type: Literal["flow"] = "flow"
def _initialize_runtime_extension_attrs(self) -> None:
"""Initialize optional runtime-extension attributes."""
def _create_default_extension_state(self) -> Any | None:
"""Return a default state supplied by an optional runtime extension."""
return None
def _should_apply_pending_kickoff_context(self) -> bool:
"""Whether an optional runtime extension has pending kickoff context."""
return False
def _apply_pending_kickoff_context(self) -> None:
"""Apply optional runtime-extension kickoff context."""
def _order_start_methods_for_kickoff(
self,
start_methods: list[FlowMethodName],
) -> tuple[list[FlowMethodName], bool]:
"""Allow an optional runtime extension to order kickoff start methods."""
return start_methods, False
def _should_defer_trace_finalization(self) -> bool:
"""Whether this kickoff should defer final flow trace finalization."""
return bool(getattr(self, "defer_trace_finalization", False))
@classmethod
def flow_definition(cls) -> FlowDefinition:
"""Return the static Flow Definition built from this Flow class."""
@@ -675,21 +758,24 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
return flow_definition
@classmethod
def _start_method_names(cls) -> list[FlowMethodName]:
def from_definition(cls, definition: FlowDefinition) -> Flow[Any]:
"""Build a runnable Flow directly from a definition; no subclass required."""
return cls.model_validate({}, context={"flow_definition": definition})
def _start_method_names(self) -> list[FlowMethodName]:
return [
FlowMethodName(method_name)
for method_name, method_definition in cls.flow_definition().methods.items()
for method_name, method_definition in self._definition.methods.items()
if method_definition.is_start
]
@classmethod
def _listener_methods(
cls,
self,
) -> Iterator[tuple[FlowMethodName, FlowMethodDefinition, FlowDefinitionCondition]]:
# (name, definition, condition) for every non-start method that listens.
# Routers are included (they listen too); callers wanting only plain
# listeners filter on definition.router.
for method_name, method_definition in cls.flow_definition().methods.items():
for method_name, method_definition in self._definition.methods.items():
if method_definition.listen is not None and not method_definition.is_start:
yield (
FlowMethodName(method_name),
@@ -697,25 +783,22 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
method_definition.listen,
)
@classmethod
def _start_condition(
cls, method_name: FlowMethodName
self, method_name: FlowMethodName
) -> FlowDefinitionCondition | None:
method_definition = cls.flow_definition().methods[str(method_name)]
method_definition = self._definition.methods[str(method_name)]
start = method_definition.start
if isinstance(start, (str, dict)):
return start
return None
@classmethod
def _listen_condition(
cls, method_name: FlowMethodName
self, method_name: FlowMethodName
) -> FlowDefinitionCondition | None:
return cls.flow_definition().methods[str(method_name)].listen
return self._definition.methods[str(method_name)].listen
@classmethod
def _is_router(cls, method_name: FlowMethodName) -> bool:
return cls.flow_definition().methods[str(method_name)].router
def _is_router(self, method_name: FlowMethodName) -> bool:
return self._definition.methods[str(method_name)].router
initial_state: Annotated[ # type: ignore[type-arg]
type[BaseModel] | type[dict] | dict[str, Any] | BaseModel | None,
@@ -858,13 +941,13 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
restore_event_scope(())
reset_last_event_id()
_methods: dict[FlowMethodName, FlowMethod[Any, Any]] = PrivateAttr(
_methods: dict[FlowMethodName, Callable[..., Any]] = PrivateAttr(
default_factory=dict
)
_method_execution_counts: dict[FlowMethodName, int] = PrivateAttr(
default_factory=dict
)
_pending_and_listeners: dict[PendingListenerKey, set[int]] = PrivateAttr(
_pending_events: dict[PendingListenerKey, set[str]] = PrivateAttr(
default_factory=dict
)
_fired_or_listeners: set[FlowMethodName] = PrivateAttr(default_factory=set)
@@ -872,6 +955,7 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
PrivateAttr(default=None)
)
_method_outputs: list[Any] = PrivateAttr(default_factory=list)
_definition: FlowDefinition = PrivateAttr()
_state_lock: threading.Lock = PrivateAttr(default_factory=threading.Lock)
_or_listeners_lock: threading.Lock = PrivateAttr(default_factory=threading.Lock)
_completed_methods: set[FlowMethodName] = PrivateAttr(default_factory=set)
@@ -882,10 +966,6 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
_human_feedback_method_outputs: dict[str, Any] = PrivateAttr(default_factory=dict)
_input_history: list[InputHistoryEntry] = PrivateAttr(default_factory=list)
_state: Any = PrivateAttr(default=None)
_conversation_messages: list[LLMMessage] = PrivateAttr(default_factory=list)
_pending_user_message: str | dict[str, Any] | None = PrivateAttr(default=None)
_pending_intents: Sequence[str] | None = PrivateAttr(default=None)
_pending_intent_llm: str | "BaseLLM" | None = PrivateAttr(default=None)
_deferred_flow_started_event_id: str | None = PrivateAttr(default=None)
def __class_getitem__(cls: type[Flow[T]], item: type[T]) -> type[Flow[T]]: # type: ignore[override]
@@ -904,13 +984,26 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
object.__setattr__(self, name, value)
def model_post_init(self, __context: Any) -> None:
self._flow_post_init()
definition = (
__context.get("flow_definition") if isinstance(__context, dict) else None
)
self._flow_post_init(definition)
def _flow_post_init(self) -> None:
def _flow_post_init(self, definition: FlowDefinition | None = None) -> None:
"""Heavy initialization: state creation, events, memory, method registration."""
if getattr(self, "_flow_post_init_done", False):
return
object.__setattr__(self, "_flow_post_init_done", True)
self._initialize_runtime_extension_attrs()
self._definition = definition or type(self).flow_definition()
if self.name and self.name != self._definition.name:
self._definition = self._definition.model_copy(update={"name": self.name})
methods = (
self._handler_bound_methods()
if definition is not None
else self._class_bound_methods()
)
if self._state is None:
self._state = self._create_initial_state()
@@ -926,7 +1019,7 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
self,
FlowCreatedEvent(
type="flow_created",
flow_name=self.name or self.__class__.__name__,
flow_name=self._definition.name,
),
)
@@ -936,17 +1029,44 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
if self.memory is None and not getattr(self, "_skip_auto_memory", False):
from crewai.memory.utils import sanitize_scope_name
flow_name = sanitize_scope_name(self.name or self.__class__.__name__)
flow_name = sanitize_scope_name(self._definition.name)
self.memory = Memory(root_scope=f"/flow/{flow_name}")
# Build the runtime method lookup from the static FlowDefinition.
for method_name in type(self).flow_definition().methods:
self._methods.update(methods)
def _handler_bound_methods(self) -> dict[FlowMethodName, Callable[..., Any]]:
methods: dict[FlowMethodName, Callable[..., Any]] = {}
unresolved: list[str] = []
for method_name, method_definition in self._definition.methods.items():
if method_definition.handler is None:
unresolved.append(f"{method_name}: no handler")
continue
try:
handler = _resolve_handler(method_definition.handler)
except Exception as e:
unresolved.append(f"{method_name}: {e}")
continue
if getattr(handler, "__self__", None) is None:
handler = handler.__get__(self, type(self))
methods[FlowMethodName(method_name)] = handler
if unresolved:
raise ValueError(
f"Cannot build flow {self._definition.name!r} from its definition; "
"methods with missing or unresolvable handlers: "
+ "; ".join(unresolved)
)
return methods
def _class_bound_methods(self) -> dict[FlowMethodName, Callable[..., Any]]:
methods: dict[FlowMethodName, Callable[..., Any]] = {}
for method_name in self._definition.methods:
method = getattr(self, method_name, None)
if method is None:
continue
if not hasattr(method, "__self__"):
method = method.__get__(self, self.__class__)
self._methods[FlowMethodName(method_name)] = method
method = method.__get__(self, type(self))
methods[FlowMethodName(method_name)] = method
return methods
def recall(self, query: str, **kwargs: Any) -> Any:
"""Recall relevant memories. Delegates to this flow's memory.
@@ -1024,14 +1144,11 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
def _start_condition_triggered_by(
self, method_name: FlowMethodName, trigger: FlowMethodName
) -> bool:
condition = type(self)._start_condition(method_name)
condition = self._start_condition(method_name)
if condition is None:
return False
return self._evaluate_condition(
condition,
trigger,
method_name,
pending_key_prefix=f"start:{method_name}",
return self._condition_met(
condition, trigger, PendingListenerKey(f"start:{method_name}")
)
def _rearm_or_listeners_for_trigger(
@@ -1055,7 +1172,7 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
trigger_str = str(trigger)
to_discard: list[FlowMethodName] = []
for listener_name in candidates:
condition = type(self)._listen_condition(listener_name)
condition = self._listen_condition(listener_name)
if condition is None:
continue
if trigger_str in _iter_condition_events(condition):
@@ -1071,12 +1188,13 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
# Only events that EXCLUSIVELY feed one OR listener race; an event that
# also feeds another listener (e.g. an AND) is left alone when a sibling
# wins. e.g. @listen(or_(a, b)) on handler -> {frozenset({a, b}): handler}.
# Events nested under an and_() branch (e.g. or_(and_(a, b), c)) are not
# alternatives and never race -- cancelling one would make the AND
# unsatisfiable.
racing_groups: dict[frozenset[FlowMethodName], FlowMethodName] = {}
listener_conditions: dict[FlowMethodName, FlowDefinitionCondition] = {
listener_name: condition
for listener_name, method_definition, condition in type(
self
)._listener_methods()
for listener_name, method_definition, condition in self._listener_methods()
if not method_definition.router
}
@@ -1093,14 +1211,14 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
for listener_name, condition in listener_conditions.items():
if not isinstance(condition, dict):
continue
events = events_by_listener[listener_name]
if "or" not in condition or len(events) <= 1:
alternatives = set(_or_alternative_events(condition))
if len(alternatives) <= 1:
continue
exclusive_events = {
event
for event in events
if listeners_by_event.get(event, set()) == {listener_name}
for event in alternatives
if listeners_by_event[event] == {listener_name}
}
if len(exclusive_events) > 1:
# Racing only applies to method-completion events: each member is
@@ -1349,7 +1467,7 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
self,
FlowStartedEvent(
type="flow_started",
flow_name=self.name or self.__class__.__name__,
flow_name=self._definition.name,
inputs=None,
),
)
@@ -1420,16 +1538,17 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
if self.persistence is not None:
self.persistence.clear_pending_feedback(context.flow_id)
crewai_event_bus.emit(
self,
MethodExecutionFinishedEvent(
type="method_execution_finished",
flow_name=self.name or self.__class__.__name__,
method_name=context.method_name,
result=collapsed_outcome if emit else result,
state=self._state,
),
)
if not self.suppress_flow_events:
crewai_event_bus.emit(
self,
MethodExecutionFinishedEvent(
type="method_execution_finished",
flow_name=self._definition.name,
method_name=context.method_name,
result=collapsed_outcome if emit else result,
state=self._state,
),
)
# Clear resumption flag before triggering listeners
# This allows methods to re-execute in loops (e.g., implement_changes → suggest_changes → implement_changes)
@@ -1478,7 +1597,7 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
self,
FlowPausedEvent(
type="flow_paused",
flow_name=self.name or self.__class__.__name__,
flow_name=self._definition.name,
flow_id=e.context.flow_id,
method_name=e.context.method_name,
state=self._copy_and_serialize_state(),
@@ -1506,7 +1625,7 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
self,
FlowFinishedEvent(
type="flow_finished",
flow_name=self.name or self.__class__.__name__,
flow_name=self._definition.name,
result=final_result,
state=self._copy_and_serialize_state(),
),
@@ -1539,20 +1658,15 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
"""
init_state = self.initial_state
# Conversational subclasses default to ``ConversationState`` if the
# user didn't supply an explicit type parameter (``Flow[...]``) or an
# ``initial_state``. This makes ``class MyChat(Flow): conversational
# = True`` work without forcing every user to import and parameterize
# ``ConversationState`` themselves.
if (
init_state is None
and getattr(type(self), "conversational", False)
and not hasattr(self, "_initial_state_t")
):
return cast(T, ConversationState())
if init_state is None:
extension_state = self._create_default_extension_state()
if extension_state is not None:
return cast(T, extension_state)
if init_state is None and hasattr(self, "_initial_state_t"):
state_type = self._initial_state_t
if isinstance(state_type, TypeVar):
state_type = None
if isinstance(state_type, type):
if issubclass(state_type, FlowState):
instance = state_type()
@@ -1572,7 +1686,7 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
return cast(T, {"id": str(uuid4())})
if init_state is None:
return cast(T, {"id": str(uuid4())})
return cast(T, self._create_definition_state())
if isinstance(init_state, type):
state_class = init_state
@@ -1614,6 +1728,34 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
f"Initial state must be dict or BaseModel, got {type(self.initial_state)}"
)
def _create_definition_state(self) -> dict[str, Any] | BaseModel:
state_definition = self._definition.state
if state_definition is None:
return {"id": str(uuid4())}
if state_definition.type in ("pydantic", "json_schema"):
state = _build_definition_state_model(state_definition)
if state is not None:
return state
logger.error(
"Flow %r declares %s state but neither ref nor json_schema "
"produced a model; falling back to dict state",
self._definition.name,
state_definition.type,
)
elif state_definition.type == "unknown":
logger.warning(
"Flow %r declares state of unknown type; falling back to dict state",
self._definition.name,
)
dict_state: dict[str, Any] = (
dict(state_definition.default)
if isinstance(state_definition.default, dict)
else {}
)
if "id" not in dict_state:
dict_state["id"] = str(uuid4())
return dict_state
def _copy_state(self) -> T:
"""Create a copy of the current state.
@@ -2027,7 +2169,7 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
# Clear completed methods and outputs for a fresh start
self._completed_methods.clear()
self._method_outputs.clear()
self._pending_and_listeners.clear()
self._pending_events.clear()
self._clear_or_listeners()
self._method_call_counts.clear()
else:
@@ -2122,12 +2264,11 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
if should_emit_flow_started:
# In normal flows, each kickoff owns its own flow lifecycle.
# Deferred conversational sessions are different: the first
# turn opens the flow scope and later turns reuse it until
# ``finalize_session_traces()`` emits the single finish event.
# Deferred sessions reuse the first flow scope until an
# explicit finalization call closes the batch.
started_event = FlowStartedEvent(
type="flow_started",
flow_name=self.name or self.__class__.__name__,
flow_name=self._definition.name,
inputs=inputs,
)
future = crewai_event_bus.emit(self, started_event)
@@ -2154,16 +2295,8 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
# with implicit "crew" execution_type.
get_env_context()
# Conversational hook: apply the pending user message AFTER state
# restore and AFTER flow scope initialization, so transcript events
# are parented under the current conversation trace.
# ``handle_turn`` stashes the message on ``self._pending_user_message``
# before calling ``kickoff``; this drains it.
if (
getattr(type(self), "conversational", False)
and self._pending_user_message is not None
):
self._apply_pending_conversational_turn()
if self._should_apply_pending_kickoff_context():
self._apply_pending_kickoff_context()
if inputs is not None and "id" not in inputs:
self._initialize_state(inputs)
@@ -2175,22 +2308,29 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
# Determine which start methods to execute at kickoff
# Conditional start methods are only triggered by their conditions
# UNLESS there are no unconditional starts (then all starts run as entry points)
start_methods = type(self)._start_method_names()
start_methods = self._start_method_names()
unconditional_starts = [
start_method
for start_method in start_methods
if type(self)._start_condition(start_method) is None
if self._start_condition(start_method) is None
]
# If there are unconditional starts, only run those at kickoff
# If there are NO unconditional starts, run all starts (including conditional ones)
starts_to_execute = (
unconditional_starts if unconditional_starts else start_methods
)
tasks = [
self._execute_start_method(start_method)
for start_method in starts_to_execute
]
await asyncio.gather(*tasks)
starts_to_execute, run_starts_sequentially = (
self._order_start_methods_for_kickoff(starts_to_execute)
)
if run_starts_sequentially:
for start_method in starts_to_execute:
await self._execute_start_method(start_method)
else:
tasks = [
self._execute_start_method(start_method)
for start_method in starts_to_execute
]
await asyncio.gather(*tasks)
except Exception as e:
# Check if flow was paused for human feedback
from crewai.flow.async_feedback.types import HumanFeedbackPending
@@ -2220,7 +2360,7 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
self,
FlowPausedEvent(
type="flow_paused",
flow_name=self.name or self.__class__.__name__,
flow_name=self._definition.name,
flow_id=e.context.flow_id,
method_name=e.context.method_name,
state=self._copy_and_serialize_state(),
@@ -2262,16 +2402,15 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
# When ``defer_trace_finalization`` is set, skip both per-turn
# ``FlowFinishedEvent`` AND trace-batch finalization. The caller
# invokes ``finalize_session_traces()`` once at session end to
# close out the whole conversation as one trace. The flag is
# read from EITHER the instance attribute (set by user code) OR
# the class-level ``ConversationConfig.defer_trace_finalization``.
# invokes the matching finalization hook once at session end. The
# flag is read from either the instance attribute or an extension
# definition.
if not self._should_defer_trace_finalization():
future = crewai_event_bus.emit(
self,
FlowFinishedEvent(
type="flow_finished",
flow_name=self.name or self.__class__.__name__,
flow_name=self._definition.name,
result=final_output,
state=self._copy_and_serialize_state(),
),
@@ -2345,7 +2484,7 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
MethodExecutionFinishedEvent,
MethodExecutionFailedEvent,
)
flow_name = self.name or self.__class__.__name__
flow_name = self._definition.name
nodes = sorted(
(
n
@@ -2404,7 +2543,7 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
)
# If start method is a router, use its result as an additional trigger
if type(self)._is_router(start_method_name) and result is not None:
if self._is_router(start_method_name) and result is not None:
# Execute listeners for the start method name first
await self._execute_listeners(start_method_name, result, finished_event_id)
# Then execute listeners for the router result (e.g., "approved")
@@ -2424,15 +2563,16 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
def _inject_trigger_payload_for_start_method(
self, original_method: Callable[..., Any]
) -> Callable[..., Any]:
accepts_trigger_payload = (
"crewai_trigger_payload" in inspect.signature(original_method).parameters
)
def prepare_kwargs(
*args: Any, **kwargs: Any
) -> tuple[tuple[Any, ...], dict[str, Any]]:
inputs = cast(dict[str, Any], baggage.get_baggage("flow_inputs") or {})
trigger_payload = inputs.get("crewai_trigger_payload")
sig = inspect.signature(original_method)
accepts_trigger_payload = "crewai_trigger_payload" in sig.parameters
if trigger_payload is not None and accepts_trigger_payload:
kwargs["crewai_trigger_payload"] = trigger_payload
elif trigger_payload is not None:
@@ -2476,20 +2616,19 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
kwargs or {}
)
# MethodExecution events always fire — ``suppress_flow_events``
# only hides the Rich console panel, not observability events.
future = crewai_event_bus.emit(
self,
MethodExecutionStartedEvent(
type="method_execution_started",
method_name=method_name,
flow_name=self.name or self.__class__.__name__,
params=dumped_params,
state=self._copy_and_serialize_state(),
),
)
if future:
self._event_futures.append(future)
if not self.suppress_flow_events:
future = crewai_event_bus.emit(
self,
MethodExecutionStartedEvent(
type="method_execution_started",
method_name=method_name,
flow_name=self._definition.name,
params=dumped_params,
state=self._copy_and_serialize_state(),
),
)
if future:
self._event_futures.append(future)
# Set method name in context so ask() can read it without
# stack inspection. Must happen before copy_context() so the
@@ -2531,19 +2670,18 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
self._completed_methods.add(method_name)
finished_event_id: str | None = None
# MethodExecution events always fire even when console panels are
# suppressed; tracing depends on them.
finished_event = MethodExecutionFinishedEvent(
type="method_execution_finished",
method_name=method_name,
flow_name=self.name or self.__class__.__name__,
state=self._copy_and_serialize_state(),
result=result,
)
finished_event_id = finished_event.event_id
future = crewai_event_bus.emit(self, finished_event)
if future:
self._event_futures.append(future)
if not self.suppress_flow_events:
finished_event = MethodExecutionFinishedEvent(
type="method_execution_finished",
method_name=method_name,
flow_name=self._definition.name,
state=self._copy_and_serialize_state(),
result=result,
)
finished_event_id = finished_event.event_id
future = crewai_event_bus.emit(self, finished_event)
if future:
self._event_futures.append(future)
return result, finished_event_id
except Exception as e:
@@ -2565,7 +2703,7 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
MethodExecutionPausedEvent(
type="method_execution_paused",
method_name=method_name,
flow_name=self.name or self.__class__.__name__,
flow_name=self._definition.name,
state=self._copy_and_serialize_state(),
flow_id=e.context.flow_id,
message=e.context.message,
@@ -2581,7 +2719,7 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
MethodExecutionFailedEvent(
type="method_execution_failed",
method_name=method_name,
flow_name=self.name or self.__class__.__name__,
flow_name=self._definition.name,
error=e,
),
)
@@ -2713,7 +2851,7 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
await asyncio.gather(*tasks)
if current_trigger in router_results:
for method_name in type(self)._start_method_names():
for method_name in self._start_method_names():
if self._start_condition_triggered_by(
method_name, current_trigger
):
@@ -2726,72 +2864,25 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
else:
await self._execute_start_method(method_name)
def _evaluate_condition(
def _condition_met(
self,
condition: FlowDefinitionCondition,
trigger_method: FlowMethodName,
listener_name: FlowMethodName,
pending_key_prefix: str | None = None,
subscription_key: PendingListenerKey,
) -> bool:
if isinstance(condition, str):
return condition == str(trigger_method)
def _sub_prefix(index: int) -> str | None:
if pending_key_prefix is None:
return None
return f"{pending_key_prefix}:{index}"
if "or" in condition:
# Evaluate every sub-condition (no short-circuit): a nested and_()
# branch needs the chance to clear its pending state in
# _pending_and_listeners even when an earlier branch already matched.
any_matched = False
for index, sub_condition in enumerate(condition["or"]):
if self._evaluate_condition(
sub_condition,
trigger_method,
listener_name,
pending_key_prefix=_sub_prefix(index),
):
any_matched = True
return any_matched
sub_conditions = condition["and"]
pending_key = PendingListenerKey(
pending_key_prefix
if pending_key_prefix is not None
else f"{listener_name}:{id(condition)}"
)
if pending_key not in self._pending_and_listeners:
self._pending_and_listeners[pending_key] = set(range(len(sub_conditions)))
pending_conditions = self._pending_and_listeners[pending_key]
for index, sub_condition in enumerate(sub_conditions):
if index not in pending_conditions:
continue
if self._evaluate_condition(
sub_condition,
trigger_method,
listener_name,
pending_key_prefix=_sub_prefix(index),
):
pending_conditions.discard(index)
if not pending_conditions:
self._pending_and_listeners.pop(pending_key, None)
return True
return False
seen = self._pending_events.setdefault(subscription_key, set())
seen.add(str(trigger_method))
if not _condition_satisfied(condition, seen):
return False
del self._pending_events[subscription_key]
return True
def _find_triggered_methods(
self, trigger_method: FlowMethodName, router_only: bool
) -> list[FlowMethodName]:
triggered: list[FlowMethodName] = []
for listener_name, method_definition, condition in type(
self
)._listener_methods():
for listener_name, method_definition, condition in self._listener_methods():
is_router = method_definition.router
if router_only != is_router:
continue
@@ -2800,10 +2891,8 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
if should_check_fired and listener_name in self._fired_or_listeners:
continue
if self._evaluate_condition(
condition,
trigger_method,
listener_name,
if self._condition_met(
condition, trigger_method, PendingListenerKey(str(listener_name))
):
triggered.append(listener_name)
if should_check_fired:
@@ -2859,10 +2948,10 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
# For routers, also check if any conditional starts they triggered are completed
# If so, continue their chains
if type(self)._is_router(listener_name):
for start_method_name in type(self)._start_method_names():
if self._is_router(listener_name):
for start_method_name in self._start_method_names():
if (
type(self)._start_condition(start_method_name) is not None
self._start_condition(start_method_name) is not None
and start_method_name in self._completed_methods
):
# This conditional start was executed, continue its chain
@@ -2881,8 +2970,7 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
method = self._methods[listener_name]
sig = inspect.signature(method)
params = list(sig.parameters.values())
method_params = [p for p in params if p.name != "self"]
method_params = [p for p in sig.parameters.values() if p.name != "self"]
if triggering_event_id:
with triggered_by_scope(triggering_event_id):
@@ -2938,7 +3026,7 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
return self.input_provider
if flow_config.input_provider is not None:
return flow_config.input_provider
return ConsoleProvider()
return cast(InputProvider, ConsoleProvider())
def _checkpoint_state_for_ask(self) -> None:
"""Auto-checkpoint flow state before waiting for user input.
@@ -3038,7 +3126,7 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
self,
FlowInputRequestedEvent(
type="flow_input_requested",
flow_name=self.name or self.__class__.__name__,
flow_name=self._definition.name,
method_name=method_name,
message=message,
metadata=metadata,
@@ -3057,7 +3145,7 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
executor = ThreadPoolExecutor(max_workers=1)
ctx = contextvars.copy_context()
future = executor.submit(
ctx.run, provider.request_input, message, self, metadata
ctx.run, provider.request_input, message, cast(Any, self), metadata
)
try:
raw = future.result(timeout=timeout)
@@ -3070,7 +3158,9 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
# cancel_futures=True cleans up any queued-but-not-started tasks.
executor.shutdown(wait=False, cancel_futures=True)
else:
raw = provider.request_input(message, self, metadata=metadata)
raw = provider.request_input(
message, cast(Any, self), metadata=metadata
)
except KeyboardInterrupt:
raise
except Exception:
@@ -3103,7 +3193,7 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
self,
FlowInputReceivedEvent(
type="flow_input_received",
flow_name=self.name or self.__class__.__name__,
flow_name=self._definition.name,
method_name=method_name,
message=message,
response=response,
@@ -3141,7 +3231,7 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
self,
HumanFeedbackRequestedEvent(
type="human_feedback_requested",
flow_name=self.name or self.__class__.__name__,
flow_name=self._definition.name,
method_name="", # Will be set by decorator if needed
output=output,
message=message,
@@ -3170,7 +3260,7 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
self,
HumanFeedbackReceivedEvent(
type="human_feedback_received",
flow_name=self.name or self.__class__.__name__,
flow_name=self._definition.name,
method_name="", # Will be set by decorator if needed
feedback=feedback,
outcome=None, # Will be determined after collapsing
@@ -3345,10 +3435,10 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
self,
FlowPlotEvent(
type="flow_plot",
flow_name=self.name or self.__class__.__name__,
flow_name=self._definition.name,
),
)
structure = build_flow_structure(self)
structure = build_flow_structure(cast(Any, self))
return render_interactive(structure, filename=filename, show=show)
@staticmethod

View File

@@ -16,7 +16,7 @@ R = TypeVar("R", covariant=True)
FlowMethodName = NewType("FlowMethodName", str)
PendingListenerKey = NewType(
"PendingListenerKey",
Annotated[str, "nested flow conditions use 'listener_name:object_id'"],
Annotated[str, "listener method name, or 'start:<method>' for conditional starts"],
)

View File

@@ -259,8 +259,9 @@ class RecallFlow(Flow[RecallState]):
candidates = []
if not candidates:
candidates = [scope_prefix]
self.state.candidate_scopes = candidates[:20]
return self.state.candidate_scopes
selected_scopes = candidates[:20]
self.state.candidate_scopes = selected_scopes
return selected_scopes
@listen(filter_and_chunk)
def search_chunks(self) -> list[Any]:
@@ -368,9 +369,10 @@ class RecallFlow(Flow[RecallState]):
)
)
matches.sort(key=lambda m: m.score, reverse=True)
self.state.final_results = matches[: self.state.limit]
final_results = matches[: self.state.limit]
self.state.final_results = final_results
if self.state.evidence_gaps and self.state.final_results:
self.state.final_results[0].evidence_gaps = list(self.state.evidence_gaps)
return self.state.final_results
return final_results

View File

@@ -999,7 +999,11 @@ def _json_schema_to_pydantic_field(
if examples:
schema_extra["examples"] = examples
default = ... if is_required else None
default = (
json_schema["default"]
if "default" in json_schema
else (... if is_required else None)
)
if isinstance(type_, type) and issubclass(type_, (int, float)):
if "minimum" in json_schema:

View File

@@ -32,7 +32,7 @@ def _build_executor(**kwargs: Any) -> AgentExecutor:
executor._method_outputs = []
executor._completed_methods = set()
executor._fired_or_listeners = set()
executor._pending_and_listeners = {}
executor._pending_events = {}
executor._method_execution_counts = {}
executor._method_call_counts = {}
executor._event_futures = []

View File

@@ -1157,6 +1157,25 @@ def test_flow_name():
assert flow.name == "MyFlow"
def test_flow_custom_name_overrides_class_name_in_events():
class InternalFlowClass(Flow):
name = "PublicName"
@start()
def begin(self):
return "done"
received = []
@crewai_event_bus.on(FlowStartedEvent)
def handle(source, event):
received.append(event)
InternalFlowClass().kickoff()
assert received[0].flow_name == "PublicName"
def test_nested_and_or_conditions():
"""Test nested conditions like or_(and_(A, B), and_(C, D)).
@@ -1542,40 +1561,63 @@ def test_deeply_nested_conditions():
def test_or_branch_does_not_leave_stale_and_state():
"""or_() over nested and_() branches must not leave stale pending AND state.
Regression: evaluating an or_() condition stopped at the first branch that was
satisfied, so a later and_() branch that the *same* trigger would have completed
never cleared its pending state. On the next cycle that trigger alone then
spuriously re-satisfied the whole condition. Both branches share the final
event ``x`` here, so the shared trigger that completes branch ``(a AND x)`` also
completes branch ``(c AND x)`` and both must be cleared together.
"""
fired = []
class StaleStateFlow(Flow):
@start()
def begin(self):
pass
@listen(or_(and_("a", "x"), and_("c", "x")))
def joined(self):
@listen(begin)
def a(self):
pass
flow = StaleStateFlow()
condition = type(flow)._listen_condition("joined")
@listen(begin)
def c(self):
pass
def fires(trigger):
return flow._evaluate_condition(condition, trigger, "joined")
@listen(and_(a, c))
def x(self):
pass
# First cycle: "a" then "c" arrive, then the shared "x" completes (a AND x).
assert fires("a") is False
assert fires("c") is False
assert fires("x") is True
@listen(or_(and_("a", "x"), and_("c", "y")))
def joined(self):
fired.append("joined")
# Next cycle: "x" alone must NOT re-satisfy the condition. The "c" from the
# previous cycle was consumed when "joined" fired, so neither branch is half
# complete and "x" by itself is insufficient.
assert fires("x") is False
@router(joined)
def emit_y(self):
return "y"
StaleStateFlow().kickoff()
assert fired == ["joined"]
def test_and_branch_inside_or_does_not_race():
execution_order = []
class DiamondWithFallbackFlow(Flow):
@start()
def go(self):
execution_order.append("go")
@listen(go)
def a(self):
execution_order.append("a")
@listen(go)
def b(self):
execution_order.append("b")
@listen(or_(and_(a, b), "fallback"))
def done(self):
execution_order.append("done")
DiamondWithFallbackFlow().kickoff()
assert "done" in execution_order
assert execution_order.index("done") > execution_order.index("a")
assert execution_order.index("done") > execution_order.index("b")
def test_mixed_sync_async_execution_order():

View File

@@ -169,9 +169,6 @@ class TestConversationalFlow:
)
@pytest.mark.skip(
reason="Experimental conversational registry behavior is out of scope for the definition-first start migration."
)
def test_handle_turn_routes_to_listener_and_records_public_result(self) -> None:
@ConversationConfig(default_intents=["research"], intent_llm="gpt-4o-mini")
class ResearchFlow(ConversationalFlow):
@@ -595,9 +592,6 @@ class TestConversationalFlow:
assert result == "legacy-searched"
assert flow.state.last_intent == "search"
@pytest.mark.skip(
reason="Experimental conversational sequential-start behavior is out of scope for the definition-first start migration."
)
def test_user_start_methods_run_sequentially_before_router_in_conversational_mode(
self,
) -> None:
@@ -649,9 +643,6 @@ class TestConversationalFlow:
assert "attach_bus" in order # still fires every turn
assert "route_turn" in order
@pytest.mark.skip(
reason="Experimental inherited conversational start registration is out of scope for the definition-first start migration."
)
def test_subclass_can_override_conversation_start_without_redecorating(
self,
) -> None:
@@ -1281,7 +1272,11 @@ class TestFlowTracingWhenSuppressed:
assert started == ["QuietFlow"]
def test_method_execution_emitted_when_panel_events_suppressed(self) -> None:
def test_method_execution_suppressed_when_flow_events_suppressed(self) -> None:
"""``suppress_flow_events=True`` silences MethodExecution events so
infrastructure flows (AgentExecutor, memory) don't emit one trace span
per internal control-flow method."""
class QuietFlow(Flow[ChatState]):
suppress_flow_events = True
@@ -1303,8 +1298,8 @@ class TestFlowTracingWhenSuppressed:
with patch.object(crewai_event_bus, "emit", side_effect=track_emit):
QuietFlow().kickoff()
assert started == ["begin"]
assert finished == ["begin"]
assert started == []
assert finished == []
def test_llm_action_inside_flow_claims_flow_trace_batch(self) -> None:
listener = TraceCollectionListener()
@@ -1338,6 +1333,12 @@ class TestFlowTracingWhenSuppressed:
class TestDeferTraceFinalization:
def test_bare_conversational_flow_defers_by_default(self) -> None:
class BareChat(ConversationalFlow):
pass
assert BareChat()._should_defer_trace_finalization() is True
def test_conversation_config_drives_defer_flag(self) -> None:
"""``ConversationConfig(defer_trace_finalization=...)`` controls whether
a conversational subclass defers per-turn trace finalization."""

View File

@@ -13,6 +13,7 @@ from pydantic import BaseModel
import crewai.flow.dsl as flow_dsl
import crewai.flow.flow_definition as flow_definition
import crewai.flow.visualization.builder as visualization_builder
from crewai.experimental import ConversationConfig, RouterConfig
from crewai.flow import Flow, and_, human_feedback, listen, or_, persist, router, start
@@ -36,6 +37,8 @@ def test_flow_public_exports_are_explicit():
}
assert set(flow_definition.__all__) == {
"FlowConfigDefinition",
"FlowConversationalDefinition",
"FlowConversationalRouterDefinition",
"FlowDefinition",
"FlowDefinitionCondition",
"FlowDefinitionDiagnostic",
@@ -169,6 +172,7 @@ def test_flow_definition_maps_dsl_to_static_contract():
assert definition.state.ref and "ContractState" in definition.state.ref
assert definition.config.stream is True
assert definition.config.max_method_calls == 7
assert definition.conversational is None
assert definition.methods["begin"].start is True
assert definition.methods["process"].listen == "begin"
@@ -201,27 +205,74 @@ def test_flow_definition_excludes_conversational_builtins_for_regular_flows():
methods = RegularFlow.flow_definition().methods
assert RegularFlow.flow_definition().conversational is None
assert set(methods) == {"begin"}
assert "conversation_start" not in methods
assert "route_conversation" not in methods
assert "converse_turn" not in methods
@pytest.mark.skip(
reason="Experimental conversational inherited built-ins are out of scope for the definition-first start migration."
)
def test_flow_definition_includes_conversational_builtins_when_enabled():
class ChatFlow(Flow):
conversational = True
methods = ChatFlow.flow_definition().methods
definition = ChatFlow.flow_definition()
methods = definition.methods
assert definition.conversational is not None
assert definition.conversational.enabled is True
assert definition.conversational.defer_trace_finalization is True
assert definition.conversational.builtin_routes == ["converse", "end"]
assert "conversation_start" in methods
assert "route_conversation" in methods
assert "converse_turn" in methods
assert methods["conversation_start"].start is True
def test_flow_definition_serializes_conversational_config():
@ConversationConfig(
system_prompt="Be concise.",
llm="gpt-4o-mini",
router=RouterConfig(
prompt="Pick a route.",
routes=["research"],
default_intent="converse",
fallback_intent="end",
),
default_intents=["research"],
visible_agent_outputs=["researcher"],
defer_trace_finalization=False,
)
class ChatFlow(Flow):
conversational = True
conversational = ChatFlow.flow_definition().conversational
assert conversational is not None
assert conversational.system_prompt == "Be concise."
assert conversational.llm == "gpt-4o-mini"
assert conversational.default_intents == ["research"]
assert conversational.visible_agent_outputs == ["researcher"]
assert conversational.defer_trace_finalization is False
assert conversational.router is not None
assert conversational.router.prompt == "Pick a route."
assert conversational.router.routes == ["research"]
assert conversational.router.fallback_intent == "end"
def test_flow_definition_preserves_undecorated_conversational_override():
class ChatFlow(Flow):
conversational = True
def conversation_start(self) -> str | None:
return "custom"
methods = ChatFlow.flow_definition().methods
assert methods["conversation_start"].start is True
assert "route_conversation" in methods
def test_flow_definition_serializes_human_feedback_metadata():
marker = object()

View File

@@ -0,0 +1,508 @@
from __future__ import annotations
import pytest
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.flow_events import (
MethodExecutionFinishedEvent,
MethodExecutionStartedEvent,
)
from crewai.flow import Flow, and_, listen, or_, router, start
from crewai.flow.flow import FlowState
from crewai.flow.flow_definition import FlowDefinition
class ChainFlow(Flow):
@start()
def begin(self):
self.state["begin_ran"] = True
return "hello"
@listen(begin)
def shout(self, result):
return result.upper()
@listen(shout)
def confirm(self):
self.state["confirmed"] = True
return f"confirmed:{self.state['confirmed']}"
CHAIN_YAML = f"""
schema: crewai.flow/v1
name: ChainFlow
methods:
begin:
handler: {__name__}:ChainFlow.begin
start: true
shout:
handler: {__name__}:ChainFlow.shout
listen: begin
confirm:
handler: {__name__}:ChainFlow.confirm
listen: shout
"""
class MergeFlow(Flow):
@start()
def begin(self):
return "go"
@listen(begin)
def left(self):
return "left"
@listen(begin)
def right(self):
return "right"
@listen(or_(left, right))
def either(self):
self.state["either_ran"] = True
return "either"
@listen(and_(left, right, either))
def join(self):
self.state["joined"] = True
return "joined"
MERGE_YAML = f"""
schema: crewai.flow/v1
name: MergeFlow
methods:
begin:
handler: {__name__}:MergeFlow.begin
start: true
left:
handler: {__name__}:MergeFlow.left
listen: begin
right:
handler: {__name__}:MergeFlow.right
listen: begin
either:
handler: {__name__}:MergeFlow.either
listen:
or: [left, right]
join:
handler: {__name__}:MergeFlow.join
listen:
and: [left, right, either]
"""
class RouteFlow(Flow):
@start()
def begin(self):
return "go"
@router(begin)
def decide(self):
return "left" if self.state.get("direction") == "left" else "right"
@listen("left")
def take_left(self):
return "took-left"
@listen("right")
def take_right(self):
return "took-right"
ROUTE_YAML = f"""
schema: crewai.flow/v1
name: RouteFlow
methods:
begin:
handler: {__name__}:RouteFlow.begin
start: true
decide:
handler: {__name__}:RouteFlow.decide
listen: begin
router: true
take_left:
handler: {__name__}:RouteFlow.take_left
listen: left
take_right:
handler: {__name__}:RouteFlow.take_right
listen: right
"""
class LoopFlow(Flow):
@start("retry")
def step(self):
self.state["count"] = self.state.get("count", 0) + 1
return self.state["count"]
@router(step)
def decide(self):
if self.state["count"] < 3:
return "retry"
return "done"
@listen("done")
def finish(self):
return f"finished:{self.state['count']}"
LOOP_YAML = f"""
schema: crewai.flow/v1
name: LoopFlow
methods:
step:
handler: {__name__}:LoopFlow.step
start: retry
decide:
handler: {__name__}:LoopFlow.decide
listen: step
router: true
finish:
handler: {__name__}:LoopFlow.finish
listen: done
"""
class CounterState(FlowState):
count: int = 0
label: str = "none"
class PydanticStateFlow(Flow[CounterState]):
@start()
def begin(self):
self.state.count += 1
return self.state.count
@listen(begin)
def finish(self):
self.state.label = f"count={self.state.count}"
return self.state.label
PYDANTIC_STATE_YAML = f"""
schema: crewai.flow/v1
name: PydanticStateFlow
state:
type: pydantic
ref: {__name__}:CounterState
methods:
begin:
handler: {__name__}:PydanticStateFlow.begin
start: true
finish:
handler: {__name__}:PydanticStateFlow.finish
listen: begin
"""
PYDANTIC_STATE_OVERLAY_YAML = f"""
schema: crewai.flow/v1
name: PydanticStateFlow
state:
type: pydantic
ref: {__name__}:CounterState
default:
count: 5
methods:
begin:
handler: {__name__}:PydanticStateFlow.begin
start: true
finish:
handler: {__name__}:PydanticStateFlow.finish
listen: begin
"""
JSON_SCHEMA_STATE_YAML = f"""
schema: crewai.flow/v1
name: JsonSchemaStateFlow
state:
type: json_schema
json_schema:
title: CounterState
type: object
properties:
count:
type: integer
default: 0
label:
type: string
default: none
methods:
begin:
handler: {__name__}:PydanticStateFlow.begin
start: true
finish:
handler: {__name__}:PydanticStateFlow.finish
listen: begin
"""
PYDANTIC_REF_WITH_SCHEMA_FALLBACK_YAML = f"""
schema: crewai.flow/v1
name: SchemaFallbackFlow
state:
type: pydantic
ref: definitely_not_a_module_xyz:MissingState
json_schema:
title: CounterState
type: object
properties:
count:
type: integer
default: 0
label:
type: string
default: none
methods:
begin:
handler: {__name__}:PydanticStateFlow.begin
start: true
finish:
handler: {__name__}:PydanticStateFlow.finish
listen: begin
"""
UNRESOLVABLE_STATE_YAML = f"""
schema: crewai.flow/v1
name: UnresolvableStateFlow
state:
type: pydantic
ref: definitely_not_a_module_xyz:MissingState
methods:
begin:
handler: {__name__}:ChainFlow.begin
start: true
"""
DICT_STATE_YAML = f"""
schema: crewai.flow/v1
name: DictStateFlow
state:
type: dict
default:
count: 5
methods:
begin:
handler: {__name__}:ChainFlow.begin
start: true
"""
UNKNOWN_STATE_YAML = f"""
schema: crewai.flow/v1
name: UnknownStateFlow
state:
type: unknown
ref: somewhere:Something
methods:
begin:
handler: {__name__}:ChainFlow.begin
start: true
"""
def _run_with_events(flow, inputs=None):
events = []
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(MethodExecutionStartedEvent)
def on_started(source, event):
events.append(event)
@crewai_event_bus.on(MethodExecutionFinishedEvent)
def on_finished(source, event):
events.append(event)
result = flow.kickoff(inputs=inputs)
events.sort(key=lambda e: e.timestamp)
return result, [
(type(e).__name__, str(e.method_name), e.flow_name) for e in events
]
def _state_without_id(flow):
snapshot = dict(flow.state.model_dump())
snapshot.pop("id", None)
return snapshot
def assert_parity(flow_cls, yaml_str, inputs=None, ordered=True):
class_flow = flow_cls()
class_result, class_events = _run_with_events(class_flow, inputs)
definition = FlowDefinition.from_yaml(yaml_str)
definition_flow = Flow.from_definition(definition)
definition_result, definition_events = _run_with_events(definition_flow, inputs)
assert definition_result == class_result
assert _state_without_id(definition_flow) == _state_without_id(class_flow)
if ordered:
assert definition_flow.method_outputs == class_flow.method_outputs
assert definition_events == class_events
else:
assert sorted(map(repr, definition_flow.method_outputs)) == sorted(
map(repr, class_flow.method_outputs)
)
assert sorted(definition_events) == sorted(class_events)
return definition_flow, definition_result
def test_simple_chain_parity():
flow, result = assert_parity(ChainFlow, CHAIN_YAML)
assert result == "confirmed:True"
assert flow.method_outputs == ["hello", "HELLO", "confirmed:True"]
def test_and_or_merge_parity():
flow, _ = assert_parity(MergeFlow, MERGE_YAML, ordered=False)
assert flow.state["joined"] is True
assert flow.state["either_ran"] is True
def test_router_label_parity_for_each_branch():
left_flow, _ = assert_parity(RouteFlow, ROUTE_YAML, inputs={"direction": "left"})
assert "took-left" in left_flow.method_outputs
assert "took-right" not in left_flow.method_outputs
right_flow, _ = assert_parity(RouteFlow, ROUTE_YAML, inputs={"direction": "right"})
assert "took-right" in right_flow.method_outputs
def test_cyclic_flow_parity():
flow, result = assert_parity(LoopFlow, LOOP_YAML)
assert result == "finished:3"
assert flow.state["count"] == 3
def test_definition_flow_events_use_definition_name():
definition = FlowDefinition.from_yaml(CHAIN_YAML)
flow = Flow.from_definition(definition)
_, events = _run_with_events(flow)
assert events
assert all(flow_name == "ChainFlow" for _, _, flow_name in events)
def test_from_definition_missing_handler_raises():
definition = FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "NoHandlers",
"methods": {"begin": {"start": True}},
}
)
with pytest.raises(ValueError, match="begin: no handler"):
Flow.from_definition(definition)
def test_from_definition_unresolvable_handler_raises():
definition = FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "BadHandlers",
"methods": {
"begin": {
"start": True,
"handler": "definitely_not_a_module_xyz:nope",
}
},
}
)
with pytest.raises(ValueError, match="missing or unresolvable handlers.*begin"):
Flow.from_definition(definition)
def test_from_definition_malformed_handler_raises():
definition = FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "MalformedHandlers",
"methods": {"begin": {"start": True, "handler": "no-colon-here"}},
}
)
with pytest.raises(ValueError, match="expected 'module:qualname'"):
Flow.from_definition(definition)
def test_flow_definition_stamps_handler_refs():
definition = ChainFlow.flow_definition()
assert definition.methods["begin"].handler == f"{__name__}:ChainFlow.begin"
assert definition.methods["shout"].handler == f"{__name__}:ChainFlow.shout"
def test_pydantic_state_from_ref_parity():
flow, result = assert_parity(PydanticStateFlow, PYDANTIC_STATE_YAML)
assert result == "count=1"
assert flow.state.count == 1
assert flow.state.label == "count=1"
assert flow.state.id
def test_pydantic_state_default_overlay():
flow = Flow.from_definition(FlowDefinition.from_yaml(PYDANTIC_STATE_OVERLAY_YAML))
result = flow.kickoff()
assert result == "count=6"
assert flow.state.count == 6
def test_json_schema_state():
flow = Flow.from_definition(FlowDefinition.from_yaml(JSON_SCHEMA_STATE_YAML))
result = flow.kickoff()
assert result == "count=1"
assert flow.state.count == 1
assert flow.state.label == "count=1"
assert flow.state.id
def test_json_schema_state_validates_inputs():
flow = Flow.from_definition(FlowDefinition.from_yaml(JSON_SCHEMA_STATE_YAML))
with pytest.raises(ValueError, match="Invalid inputs"):
flow.kickoff(inputs={"count": "not-a-number"})
def test_pydantic_state_falls_back_to_json_schema_when_ref_unimportable():
flow = Flow.from_definition(
FlowDefinition.from_yaml(PYDANTIC_REF_WITH_SCHEMA_FALLBACK_YAML)
)
result = flow.kickoff()
assert result == "count=1"
assert flow.state.count == 1
def test_pydantic_state_without_ref_or_schema_falls_back_to_dict(caplog):
with caplog.at_level("ERROR"):
flow = Flow.from_definition(FlowDefinition.from_yaml(UNRESOLVABLE_STATE_YAML))
assert "falling back to dict state" in caplog.text
result = flow.kickoff()
assert result == "hello"
assert flow.state["begin_ran"] is True
assert flow.state["id"]
def test_dict_state_is_a_copy_of_default_plus_id():
definition = FlowDefinition.from_yaml(DICT_STATE_YAML)
flow = Flow.from_definition(definition)
assert flow.state["count"] == 5
assert flow.state["id"]
flow.kickoff()
assert flow.state["begin_ran"] is True
second = Flow.from_definition(definition)
assert second.state["count"] == 5
assert "begin_ran" not in second.state.model_dump()
assert second.state["id"] != flow.state["id"]
assert definition.state.default == {"count": 5}
def test_unknown_state_type_falls_back_to_dict(caplog):
with caplog.at_level("WARNING"):
flow = Flow.from_definition(FlowDefinition.from_yaml(UNKNOWN_STATE_YAML))
assert "falling back to dict state" in caplog.text
result = flow.kickoff()
assert result == "hello"
assert flow.state["begin_ran"] is True

View File

@@ -838,6 +838,74 @@ def test_flow_method_execution_finished_includes_serialized_state():
assert final_output == "final_result"
def test_suppress_flow_events_silences_method_lifecycle_events():
"""``suppress_flow_events=True`` emits no MethodExecution* events on the
bus (used by infrastructure flows like AgentExecutor so their control-flow
methods don't pollute traces), while default flows still emit them."""
captured: list[tuple[str, str]] = []
class SuppressedFlow(Flow):
suppress_flow_events: bool = True
@start()
def begin(self):
return "started"
@listen("begin")
def process(self):
return "done"
class ControlFlow(Flow):
@start()
def begin(self):
return "started"
@listen("begin")
def process(self):
return "done"
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(MethodExecutionStartedEvent)
def _on_started(source, event):
captured.append(("started", type(source).__name__))
@crewai_event_bus.on(MethodExecutionFinishedEvent)
def _on_finished(source, event):
captured.append(("finished", type(source).__name__))
SuppressedFlow().kickoff()
wait_for_event_handlers()
assert [e for e in captured if e[1] == "SuppressedFlow"] == [], (
"suppress_flow_events=True must emit no MethodExecution* events"
)
captured.clear()
ControlFlow().kickoff()
wait_for_event_handlers()
control = [e for e in captured if e[1] == "ControlFlow"]
assert ("started", "ControlFlow") in control
assert ("finished", "ControlFlow") in control
def test_infrastructure_flows_suppress_flow_events_by_default():
"""Pin the infra flows that must stay silent in traces.
The gating in ``_execute_method`` only helps if these flows actually set
``suppress_flow_events=True``; without this guard, removing the flag from
AgentExecutor would silently bring back the verbose per-method trace spans.
"""
from crewai.experimental.agent_executor import AgentExecutor
from crewai.memory.encoding_flow import EncodingFlow
from crewai.memory.recall_flow import RecallFlow
assert AgentExecutor.model_fields["suppress_flow_events"].default is True
for flow_cls in (EncodingFlow, RecallFlow):
flow = flow_cls(storage=None, llm=None, embedder=None)
assert flow.suppress_flow_events is True
@pytest.mark.vcr()
def test_llm_emits_call_started_event():
started_events: list[LLMCallStartedEvent] = []

View File

@@ -1,3 +1,3 @@
"""CrewAI development tools."""
__version__ = "1.14.7a3"
__version__ = "1.14.7a4"