Compare commits

..

2 Commits

Author SHA1 Message Date
Iris Clawd
d883c7999c docs: address review feedback on Discovery page
- Remove Connect Data Sources / Data Source Integration (doesn't exist)
- Replace 'production data' references with 'knowledge' / 'world model'
- Merge Getting Started into How It Works (were redundant)
- Remove Best Practices and FAQ sections
2026-06-10 18:01:43 +00:00
Iris Clawd
e5d37196c7 docs: add Discovery section to documentation
Add comprehensive documentation for the Discovery feature under
enterprise/features/. Covers overview, how it works, key features,
getting started guide, use cases, best practices, and FAQ.

Screenshot placeholders are included for future visual updates.

Closes DIS-73
2026-06-10 16:59:08 +00:00
55 changed files with 475 additions and 1627 deletions

View File

@@ -64,7 +64,6 @@ jobs:
--ignore-vuln PYSEC-2025-197 \
--ignore-vuln PYSEC-2025-210 \
--ignore-vuln PYSEC-2026-139 \
--ignore-vuln GHSA-rrmf-rvhw-rf47 \
--ignore-vuln PYSEC-2025-211 \
--ignore-vuln PYSEC-2025-212 \
--ignore-vuln PYSEC-2025-213 \
@@ -82,7 +81,6 @@ jobs:
# PYSEC-2025-183 - pyjwt 2.12.1: disputed weak-encryption claim; key length is application-chosen
# PYSEC-2025-189..197 - torch 2.11.0: memory-corruption/DoS in functions only reachable via untrusted models; no fix available
# PYSEC-2025-210, PYSEC-2026-139 - torch 2.11.0: profiler/deserialization issues; no fix available
# GHSA-rrmf-rvhw-rf47 - torch 2.11.0 (CVE-2025-3000, alias of PYSEC-2025-194): memory corruption in torch.jit.script, CVSS 1.9, local-only; affected <=2.12.0, no fix available. pip-audit reports it under the GHSA id so the PYSEC ignore above does not catch it.
# PYSEC-2025-211..218 - transformers 5.5.4: deserialization/code injection via malicious model checkpoints; no fix available
# GHSA-f4j7-r4q5-qw2c - chromadb 1.1.1 (CVE-2026-45829): pre-auth RCE via /api/v2/tenants/{tenant}/databases/{db}/collections when trust_remote_code=true.
# Advisory: vulnerable >=1.0.0,<=1.5.9, firstPatchedVersion=none. We only use chromadb.PersistentClient (lib/crewai/src/crewai/rag/chromadb/factory.py)

View File

@@ -47,7 +47,6 @@ repos:
--ignore-vuln PYSEC-2025-197
--ignore-vuln PYSEC-2025-210
--ignore-vuln PYSEC-2026-139
--ignore-vuln GHSA-rrmf-rvhw-rf47
--ignore-vuln PYSEC-2025-211
--ignore-vuln PYSEC-2025-212
--ignore-vuln PYSEC-2025-213

View File

@@ -4,38 +4,6 @@ description: "تحديثات المنتج والتحسينات وإصلاحات
icon: "clock"
mode: "wide"
---
<Update label="10 يونيو 2026">
## v1.14.7rc1
[عرض الإصدار على GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7rc1)
## ما الذي تغير
### الميزات
- إضافة `reset_runtime_state` لإطلاق حالة الحافلة المتراكمة
- التعامل مع دعم كل من الموجهات المخصصة
- فصل منطق المحادثة عن وقت التشغيل وإضافة `conversational_definition`
### إصلاحات الأخطاء
- إصلاح نطاق حالة وقت التشغيل لكل تشغيل للحد من النمو وعزل التشغيلات المتزامنة
- إصلاح إعدادات القياس عن بُعد على `crewai-login`
- إصلاح احترام `suppress_flow_events` لفعاليات تنفيذ الأساليب
### الوثائق
- تحديث صور OpenTelemetry
- تحديث الوثائق لتعكس الحالة الجديدة لجمع بيانات OpenTelemetry
- تحديث سجل التغييرات والإصدار لـ v1.14.7a4
### إعادة الهيكلة
- تبسيط تقييم شرط التدفق ليكون بلا حالة لكل حدث
- تحسين دورة توجيه المحادثة مع تقليل مسار واحد
## المساهمون
@greysonlalonde, @lorenzejay, @lucasgomide, @vinibrsl
</Update>
<Update label="9 يونيو 2026">
## v1.14.7a4

View File

@@ -24,39 +24,15 @@ mode: "wide"
1. في CrewAI AMP، انتقل إلى **Settings** > **OpenTelemetry Collectors**.
2. انقر على **Add Collector**.
3. اختر تكاملاً:
- **OpenTelemetry Traces** و**OpenTelemetry Logs** — صدّر إلى أي مجمّع أو واجهة خلفية متوافقة مع OTLP.
- **Datadog** — أرسل التتبعات مباشرة إلى استقبال OTLP الخاص بـ Datadog، دون الحاجة إلى مجمّع منفصل أو Datadog Agent.
4. هيّئ الاتصال. تعتمد الحقول على التكامل الذي اخترته:
3. اختر نوع التكامل — **OpenTelemetry Traces** أو **OpenTelemetry Logs**.
4. هيّئ الاتصال:
- **Endpoint** — نقطة نهاية OTLP لمجمّعك (مثل `https://otel-collector.example.com:4317`).
- **Service Name** — اسم لتعريف هذه الخدمة في منصة المراقبة.
- **Custom Headers** *(اختياري)* — أضف رؤوس المصادقة أو التوجيه كأزواج مفتاح-قيمة.
- **Certificate** *(اختياري)* — قدم شهادة TLS إذا كان مجمّعك يتطلبها.
5. انقر على **Save**.
<Tabs>
<Tab title="OpenTelemetry Traces / Logs">
إن **OpenTelemetry Traces** و**OpenTelemetry Logs** تكاملان منفصلان يتشاركان نفس الحقول — اختر التكامل المطابق للإشارة التي تريد تصديرها.
- **Endpoint** — نقطة نهاية OTLP لمجمّعك (مثل `https://otel-collector.example.com:4317`).
- **Service Name** — اسم لتعريف هذه الخدمة في منصة المراقبة.
- **Custom Headers** *(اختياري)* — أضف رؤوس المصادقة أو التوجيه كأزواج مفتاح-قيمة.
- **Certificate** *(اختياري)* — قدم شهادة TLS إذا كان مجمّعك يتطلبها.
<Frame>![تهيئة مجمّع OpenTelemetry](/images/crewai-otel-collector-opentelemetry.png)</Frame>
</Tab>
<Tab title="Datadog">
- **Datadog Site Domain** — مضيف OTLP لموقع Datadog الخاص بك فقط، دون بروتوكول أو مسار. يقوم CrewAI ببناء نقطة نهاية HTTPS OTLP الكاملة نيابةً عنك. استخدم المضيف المطابق لـ [موقع Datadog](https://docs.datadoghq.com/getting_started/site/) الخاص بك:
- `otlp.datadoghq.com` (US1)
- `otlp.us3.datadoghq.com` (US3)
- `otlp.us5.datadoghq.com` (US5)
- `otlp.datadoghq.eu` (EU1)
- `otlp.ap1.datadoghq.com` (AP1)
- **API Key** — مفتاح واجهة برمجة تطبيقات Datadog الخاص بك. راجع [كيفية إنشاء واحد](https://docs.datadoghq.com/account_management/api-app-keys/#api-keys).
يصدّر تكامل Datadog **التتبعات**.
<Frame>![تهيئة مجمّع Datadog](/images/crewai-otel-collector-datadog.png)</Frame>
</Tab>
</Tabs>
5. *(اختياري)* انقر على **Test Connection** للتحقق من قدرة CrewAI على الوصول إلى نقطة النهاية باستخدام بيانات الاعتماد التي قدمتها.
6. انقر على **Save**.
<Frame>![تهيئة مجمّع OpenTelemetry](/images/crewai-otel-collector-config.png)</Frame>
<Tip>
يمكنك إضافة مجمّعات متعددة — على سبيل المثال، واحد للتتبعات وآخر للسجلات، أو الإرسال إلى واجهات خلفية مختلفة لأغراض مختلفة.

View File

@@ -161,18 +161,6 @@ crew = Crew(
)
```
<Note>
يُحتفظ بـ `agent.i18n` للتوافق مع الإصدارات السابقة فقط، وقد تم إهماله. لتخصيص المطالبات أثناء التشغيل، مرّر `prompt_file` إلى `Crew`. وللوصول البرمجي المباشر إلى شرائح المطالبات، استخدم أداة i18n مباشرة:
</Note>
```python
from crewai.utilities.i18n import get_i18n
i18n = get_i18n("custom_prompts.json")
format_slice = i18n.slice("format")
tool_prompt = i18n.tools("ask_question")
```
#### الخيار 3: تعطيل مطالبات النظام لنماذج o1
```python
agent = Agent(
@@ -220,8 +208,6 @@ agent = Agent(
يدمج CrewAI بعد ذلك تخصيصاتك مع الإعدادات الافتراضية، فلا تحتاج لإعادة تعريف كل مطالبة. إليك الطريقة:
بالنسبة للكود الذي يحتاج إلى قراءة شرائح المطالبات مباشرة، استخدم `crewai.utilities.i18n.get_i18n()` مع ملف المطالبات نفسه بدلًا من قراءة `agent.i18n`.
### مثال: تخصيص أساسي للمطالبات
أنشئ ملف `custom_prompts.json` بالمطالبات التي تريد تعديلها. تأكد من إدراج جميع المطالبات عالية المستوى التي يجب أن يحتويها، وليس فقط تغييراتك:

View File

@@ -4,38 +4,6 @@ description: "Product updates, improvements, and bug fixes for CrewAI"
icon: "clock"
mode: "wide"
---
<Update label="Jun 10, 2026">
## v1.14.7rc1
[View release on GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7rc1)
## What's Changed
### Features
- Add `reset_runtime_state` to release accumulated bus state
- Handle supporting both custom prompts
- Decouple conversation logic from runtime and add a `conversational_definition`
### Bug Fixes
- Fix scope of runtime state per run to bound growth and isolate concurrent runs
- Fix telemetry setup on `crewai-login`
- Fix respect for `suppress_flow_events` for method-execution events
### Documentation
- Update OpenTelemetry images
- Update documentation to reflect new state of OpenTelemetry collector
- Update changelog and version for v1.14.7a4
### Refactoring
- Simplify flow condition evaluation to be stateless per event
- Improve conversation routing cycle with one less route
## Contributors
@greysonlalonde, @lorenzejay, @lucasgomide, @vinibrsl
</Update>
<Update label="Jun 09, 2026">
## v1.14.7a4

View File

@@ -0,0 +1,98 @@
---
title: Discovery
description: "Identify the highest-impact AI automation use cases for your business."
icon: "compass"
mode: "wide"
---
## Overview
Discovery is a new engine inside CrewAI AMP that helps companies identify the best automation use cases for their business.
The bottleneck in AI adoption is not building agents — it's knowing _what_ to build and _how_ to build it for production. Discovery closes that gap.
{/* TODO: Add screenshot of Discovery dashboard */}
Instead of weeks of stakeholder interviews, consultant engagements, and slide decks, Discovery leverages CrewAI's deep knowledge of agent patterns and what works in production to match your business context against proven approaches. Within minutes, you get actionable, evidence-based recommendations specific to your organization.
## How It Works
<Steps>
<Step title="Describe Your Business">
Tell Discovery about your organization — your processes, challenges, goals, and the teams involved. The more context you provide, the more precise the recommendations.
</Step>
<Step title="Multi-Signal Matching">
Discovery runs cohort analysis and structural pattern recognition using CrewAI's world model, matching your business context to automation patterns already running successfully at scale.
</Step>
<Step title="Review Use Cases">
Within minutes, you receive a set of use cases specific to your company — not generic templates. Each one shows what the automation does, expected impact, complexity, and how it would work in your organization.
{/* TODO: Add screenshot of use case recommendations */}
</Step>
<Step title="Build">
Select a use case and go directly into Crew Studio or export to code to start building.
</Step>
</Steps>
{/* TODO: Add screenshot of Discovery flow / results page */}
## Key Features
<CardGroup cols={2}>
<Card title="Business-Specific Recommendations" icon="bullseye">
Not generic templates. Real use cases matched to your organization based on CrewAI's knowledge of what works in production.
</Card>
<Card title="Impact & Complexity Scoring" icon="chart-mixed">
Each recommendation includes expected impact, implementation complexity, and how it fits your org — so you can prioritize with confidence.
</Card>
<Card title="Iterative Discovery" icon="arrows-rotate">
Run Discovery multiple times across different business units. It becomes part of how you plan and iterate on your AI roadmap.
</Card>
<Card title="Evidence-Based" icon="flask-vial">
Every recommendation is grounded in what CrewAI knows actually works in production — not guesswork or intuition.
</Card>
</CardGroup>
## From Discovery to Production
Discovery fits at the very beginning of the CrewAI workflow — it's the "what to build" step before the "how to build" step.
{/* TODO: Add diagram showing Discovery → Crew Studio → Automations flow */}
The end-to-end flow:
1. **Discovery** identifies the use case and provides the blueprint
2. **Crew Studio** or code lets you build the automation
3. **Automations** deploys it to production
This means you go from "we should use AI somewhere" to a running production automation with a clear, guided path — no guesswork at any stage.
## Use Cases
<CardGroup cols={2}>
<Card title="New to AI Agents" icon="seedling">
Don't know where to start? Discovery identifies the highest-impact opportunities specific to your business, so you begin with what matters most.
</Card>
<Card title="Scaling AI Programs" icon="rocket">
Already have some automations? Discovery finds the next wave of use cases across departments, helping you expand beyond initial pilots.
</Card>
<Card title="Cross-Department Rollout" icon="building">
Run Discovery for different business units to build a company-wide AI roadmap with use cases tailored to each team's needs.
</Card>
<Card title="ROI Prioritization" icon="chart-line">
Need to justify AI investment? Discovery provides evidence-based impact estimates grounded in real-world results.
</Card>
</CardGroup>
## Related
<CardGroup cols={3}>
<Card title="Crew Studio" href="/en/enterprise/features/crew-studio" icon="pencil">
Build automations with AI assistance and a visual editor.
</Card>
<Card title="Automations" href="/en/enterprise/features/automations" icon="bolt">
Deploy and manage your automations in production.
</Card>
<Card title="Marketplace" href="/en/enterprise/features/marketplace" icon="store">
Browse pre-built automations and components.
</Card>
</CardGroup>

View File

@@ -24,39 +24,15 @@ Telemetry data follows the [OpenTelemetry GenAI semantic conventions](https://op
1. In CrewAI AMP, go to **Settings** > **OpenTelemetry Collectors**.
2. Click **Add Collector**.
3. Select an integration:
- **OpenTelemetry Traces** and **OpenTelemetry Logs** — export to any OTLP-compatible collector or backend.
- **Datadog** — send traces straight to Datadog's OTLP intake, no separate collector or Datadog Agent required.
4. Configure the connection. The fields depend on the integration you selected:
3. Select an integration type — **OpenTelemetry Traces** or **OpenTelemetry Logs**.
4. Configure the connection:
- **Endpoint** — Your collector's OTLP endpoint (e.g., `https://otel-collector.example.com:4317`).
- **Service Name** — A name to identify this service in your observability platform.
- **Custom Headers** *(optional)* — Add authentication or routing headers as key-value pairs.
- **Certificate** *(optional)* — Provide a TLS certificate if your collector requires one.
5. Click **Save**.
<Tabs>
<Tab title="OpenTelemetry Traces / Logs">
**OpenTelemetry Traces** and **OpenTelemetry Logs** are separate integrations that share the same fields — pick the one matching the signal you want to export.
- **Endpoint** — Your collector's OTLP endpoint (e.g., `https://otel-collector.example.com:4317`).
- **Service Name** — A name to identify this service in your observability platform.
- **Custom Headers** *(optional)* — Add authentication or routing headers as key-value pairs.
- **Certificate** *(optional)* — Provide a TLS certificate if your collector requires one.
<Frame>![OpenTelemetry collector configuration](/images/crewai-otel-collector-opentelemetry.png)</Frame>
</Tab>
<Tab title="Datadog">
- **Datadog Site Domain** — Your Datadog site's OTLP host only, with no protocol or path. CrewAI builds the full HTTPS OTLP endpoint for you. Use the host that matches your [Datadog site](https://docs.datadoghq.com/getting_started/site/):
- `otlp.datadoghq.com` (US1)
- `otlp.us3.datadoghq.com` (US3)
- `otlp.us5.datadoghq.com` (US5)
- `otlp.datadoghq.eu` (EU1)
- `otlp.ap1.datadoghq.com` (AP1)
- **API Key** — Your Datadog API key. See [how to create one](https://docs.datadoghq.com/account_management/api-app-keys/#api-keys).
The Datadog integration exports **traces**.
<Frame>![Datadog collector configuration](/images/crewai-otel-collector-datadog.png)</Frame>
</Tab>
</Tabs>
5. *(optional)* Click **Test Connection** to verify CrewAI can reach the endpoint with the credentials you provided.
6. Click **Save**.
<Frame>![OpenTelemetry Collector Configuration](/images/crewai-otel-collector-config.png)</Frame>
<Tip>
You can add multiple collectors — for example, one for traces and another for logs, or send to different backends for different purposes.

View File

@@ -161,18 +161,6 @@ crew = Crew(
)
```
<Note>
`agent.i18n` is maintained only for backward compatibility and is deprecated. For runtime prompt customization, pass `prompt_file` to `Crew`. For programmatic access to prompt slices, use the i18n utility directly:
</Note>
```python
from crewai.utilities.i18n import get_i18n
i18n = get_i18n("custom_prompts.json")
format_slice = i18n.slice("format")
tool_prompt = i18n.tools("ask_question")
```
#### Option 3: Disable System Prompts for o1 Models
```python
agent = Agent(
@@ -220,8 +208,6 @@ One straightforward approach is to create a JSON file for the prompts you want t
CrewAI then merges your customizations with the defaults, so you don't have to redefine every prompt. Here's how:
For code that needs to read prompt slices directly, use `crewai.utilities.i18n.get_i18n()` with the same prompt file instead of reading `agent.i18n`.
### Example: Basic Prompt Customization
Create a `custom_prompts.json` file with the prompts you want to modify. Ensure you list all top-level prompts it should contain, not just your changes:

Binary file not shown.

Before

Width:  |  Height:  |  Size: 455 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 420 KiB

View File

@@ -4,38 +4,6 @@ description: "CrewAI의 제품 업데이트, 개선 사항 및 버그 수정"
icon: "clock"
mode: "wide"
---
<Update label="2026년 6월 10일">
## v1.14.7rc1
[GitHub 릴리스 보기](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7rc1)
## 변경 사항
### 기능
- 누적된 버스 상태를 해제하기 위해 `reset_runtime_state` 추가
- 사용자 정의 프롬프트를 모두 지원하도록 처리
- 대화 논리를 런타임과 분리하고 `conversational_definition` 추가
### 버그 수정
- 실행당 런타임 상태의 범위를 수정하여 성장 제한 및 동시 실행 격리
- `crewai-login`에서 원격 측정 설정 수정
- 메서드 실행 이벤트에 대한 `suppress_flow_events` 존중 수정
### 문서
- OpenTelemetry 이미지 업데이트
- OpenTelemetry 수집기의 새로운 상태를 반영하도록 문서 업데이트
- v1.14.7a4에 대한 변경 로그 및 버전 업데이트
### 리팩토링
- 이벤트당 상태 비저장 방식으로 흐름 조건 평가 단순화
- 경로를 하나 줄여 대화 라우팅 사이클 개선
## 기여자
@greysonlalonde, @lorenzejay, @lucasgomide, @vinibrsl
</Update>
<Update label="2026년 6월 9일">
## v1.14.7a4

View File

@@ -24,39 +24,15 @@ CrewAI AMP는 배포에서 OpenTelemetry **트레이스**와 **로그**를 자
1. CrewAI AMP에서 **Settings** > **OpenTelemetry Collectors**로 이동합니다.
2. **Add Collector**를 클릭합니다.
3. 통합을 선택합니다:
- **OpenTelemetry Traces** 및 **OpenTelemetry Logs** — OTLP 호환 수집기 또는 백엔드로 내보냅니다.
- **Datadog** — 별도의 수집기나 Datadog Agent 없이 트레이스를 Datadog의 OTLP 인테이크로 직접 전송합니다.
4. 연결을 구성합니다. 필드는 선택한 통합에 따라 달라집니다:
3. 통합 유형을 선택합니다 — **OpenTelemetry Traces** 또는 **OpenTelemetry Logs**.
4. 연결을 구성합니다:
- **Endpoint** — 수집기의 OTLP 엔드포인트 (예: `https://otel-collector.example.com:4317`).
- **Service Name** — 관측 가능성 플랫폼에서 이 서비스를 식별하기 위한 이름.
- **Custom Headers** *(선택 사항)* — 인증 또는 라우팅 헤더를 키-값 쌍으로 추가합니다.
- **Certificate** *(선택 사항)* — 수집기에서 TLS 인증서가 필요한 경우 제공합니다.
5. **Save**를 클릭합니다.
<Tabs>
<Tab title="OpenTelemetry Traces / Logs">
**OpenTelemetry Traces**와 **OpenTelemetry Logs**는 동일한 필드를 공유하는 별개의 통합입니다 — 내보내려는 신호에 맞는 것을 선택하세요.
- **Endpoint** — 수집기의 OTLP 엔드포인트 (예: `https://otel-collector.example.com:4317`).
- **Service Name** — 관측 가능성 플랫폼에서 이 서비스를 식별하기 위한 이름.
- **Custom Headers** *(선택 사항)* — 인증 또는 라우팅 헤더를 키-값 쌍으로 추가합니다.
- **Certificate** *(선택 사항)* — 수집기에서 TLS 인증서가 필요한 경우 제공합니다.
<Frame>![OpenTelemetry 수집기 구성](/images/crewai-otel-collector-opentelemetry.png)</Frame>
</Tab>
<Tab title="Datadog">
- **Datadog Site Domain** — Datadog 사이트의 OTLP 호스트만 입력합니다 (프로토콜이나 경로 제외). CrewAI가 전체 HTTPS OTLP 엔드포인트를 자동으로 구성합니다. [Datadog 사이트](https://docs.datadoghq.com/getting_started/site/)에 맞는 호스트를 사용하세요:
- `otlp.datadoghq.com` (US1)
- `otlp.us3.datadoghq.com` (US3)
- `otlp.us5.datadoghq.com` (US5)
- `otlp.datadoghq.eu` (EU1)
- `otlp.ap1.datadoghq.com` (AP1)
- **API Key** — Datadog API 키입니다. [키 생성 방법](https://docs.datadoghq.com/account_management/api-app-keys/#api-keys)을 참고하세요.
Datadog 통합은 **트레이스**를 내보냅니다.
<Frame>![Datadog 수집기 구성](/images/crewai-otel-collector-datadog.png)</Frame>
</Tab>
</Tabs>
5. *(선택 사항)* **Test Connection**을 클릭하여 제공한 자격 증명으로 CrewAI가 엔드포인트에 연결할 수 있는지 확인합니다.
6. **Save**를 클릭합니다.
<Frame>![OpenTelemetry 수집기 구성](/images/crewai-otel-collector-config.png)</Frame>
<Tip>
여러 수집기를 추가할 수 있습니다 — 예를 들어, 트레이스용 하나와 로그용 하나를 추가하거나, 다른 목적을 위해 다른 백엔드로 전송할 수 있습니다.

View File

@@ -161,18 +161,6 @@ crew = Crew(
)
```
<Note>
`agent.i18n`은 이전 버전과의 호환성을 위해서만 유지되며 사용이 중단될 예정입니다. 런타임 프롬프트 커스터마이징에는 `Crew`에 `prompt_file`을 전달하세요. 프롬프트 슬라이스를 코드에서 직접 읽어야 한다면 i18n 유틸리티를 직접 사용하세요:
</Note>
```python
from crewai.utilities.i18n import get_i18n
i18n = get_i18n("custom_prompts.json")
format_slice = i18n.slice("format")
tool_prompt = i18n.tools("ask_question")
```
#### 옵션 3: o1 모델에 대한 시스템 프롬프트 비활성화
```python
agent = Agent(
@@ -220,8 +208,6 @@ agent = Agent(
그러면 CrewAI가 기본값과 사용자가 지정한 내용을 병합하므로, 모든 프롬프트를 다시 정의할 필요가 없습니다. 방법은 다음과 같습니다:
프롬프트 슬라이스를 코드에서 직접 읽어야 하는 경우에는 `agent.i18n`을 읽는 대신 동일한 프롬프트 파일로 `crewai.utilities.i18n.get_i18n()`을 사용하세요.
### 예시: 기본 프롬프트 커스터마이징
수정하고 싶은 프롬프트를 포함하는 `custom_prompts.json` 파일을 생성하세요. 변경 사항만이 아니라 포함해야 하는 모든 최상위 프롬프트를 반드시 나열해야 합니다:
@@ -328,4 +314,4 @@ CrewAI에서의 저수준 prompt 커스터마이제이션은 매우 맞춤화되
<Check>
이제 CrewAI에서 고급 prompt 커스터마이징을 위한 기초를 갖추었습니다. 모델별 구조나 도메인별 제약에 맞춰 적용하든, 이러한 저수준 접근 방식은 agent 상호작용을 매우 전문적으로 조정할 수 있게 해줍니다.
</Check>
</Check>

View File

@@ -4,38 +4,6 @@ description: "Atualizações de produto, melhorias e correções do CrewAI"
icon: "clock"
mode: "wide"
---
<Update label="10 jun 2026">
## v1.14.7rc1
[Ver release no GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7rc1)
## O que Mudou
### Recursos
- Adicionar `reset_runtime_state` para liberar o estado acumulado do barramento
- Lidar com suporte a ambos os prompts personalizados
- Desacoplar a lógica de conversa do tempo de execução e adicionar uma `conversational_definition`
### Correções de Bugs
- Corrigir o escopo do estado de tempo de execução por execução para limitar o crescimento e isolar execuções concorrentes
- Corrigir a configuração de telemetria em `crewai-login`
- Corrigir o respeito a `suppress_flow_events` para eventos de execução de método
### Documentação
- Atualizar imagens do OpenTelemetry
- Atualizar a documentação para refletir o novo estado do coletor OpenTelemetry
- Atualizar o changelog e a versão para v1.14.7a4
### Refatoração
- Simplificar a avaliação da condição de fluxo para ser sem estado por evento
- Melhorar o ciclo de roteamento de conversas com uma rota a menos
## Contribuidores
@greysonlalonde, @lorenzejay, @lucasgomide, @vinibrsl
</Update>
<Update label="09 jun 2026">
## v1.14.7a4

View File

@@ -24,39 +24,15 @@ Os dados de telemetria seguem as [convenções semânticas GenAI do OpenTelemetr
1. No CrewAI AMP, vá para **Settings** > **OpenTelemetry Collectors**.
2. Clique em **Add Collector**.
3. Selecione uma integração:
- **OpenTelemetry Traces** e **OpenTelemetry Logs** — exporte para qualquer coletor ou backend compatível com OTLP.
- **Datadog** — envie traces diretamente para a ingestão OTLP do Datadog, sem precisar de um coletor separado ou do Datadog Agent.
4. Configure a conexão. Os campos dependem da integração selecionada:
3. Selecione um tipo de integração — **OpenTelemetry Traces** ou **OpenTelemetry Logs**.
4. Configure a conexão:
- **Endpoint** — O endpoint OTLP do seu coletor (por exemplo, `https://otel-collector.example.com:4317`).
- **Service Name** — Um nome para identificar este serviço na sua plataforma de observabilidade.
- **Custom Headers** *(opcional)* — Adicione headers de autenticação ou roteamento como pares chave-valor.
- **Certificate** *(opcional)* — Forneça um certificado TLS se o seu coletor exigir um.
5. Clique em **Save**.
<Tabs>
<Tab title="OpenTelemetry Traces / Logs">
**OpenTelemetry Traces** e **OpenTelemetry Logs** são integrações separadas que compartilham os mesmos campos — escolha a que corresponde ao sinal que você quer exportar.
- **Endpoint** — O endpoint OTLP do seu coletor (por exemplo, `https://otel-collector.example.com:4317`).
- **Service Name** — Um nome para identificar este serviço na sua plataforma de observabilidade.
- **Custom Headers** *(opcional)* — Adicione headers de autenticação ou roteamento como pares chave-valor.
- **Certificate** *(opcional)* — Forneça um certificado TLS se o seu coletor exigir um.
<Frame>![Configuração do coletor OpenTelemetry](/images/crewai-otel-collector-opentelemetry.png)</Frame>
</Tab>
<Tab title="Datadog">
- **Datadog Site Domain** — Apenas o host OTLP do seu site Datadog, sem protocolo ou caminho. O CrewAI monta o endpoint HTTPS OTLP completo para você. Use o host correspondente ao seu [site Datadog](https://docs.datadoghq.com/getting_started/site/):
- `otlp.datadoghq.com` (US1)
- `otlp.us3.datadoghq.com` (US3)
- `otlp.us5.datadoghq.com` (US5)
- `otlp.datadoghq.eu` (EU1)
- `otlp.ap1.datadoghq.com` (AP1)
- **API Key** — Sua chave de API do Datadog. Veja [como criar uma](https://docs.datadoghq.com/account_management/api-app-keys/#api-keys).
A integração com o Datadog exporta **traces**.
<Frame>![Configuração do coletor Datadog](/images/crewai-otel-collector-datadog.png)</Frame>
</Tab>
</Tabs>
5. *(opcional)* Clique em **Test Connection** para verificar se o CrewAI consegue acessar o endpoint com as credenciais fornecidas.
6. Clique em **Save**.
<Frame>![Configuração do Coletor OpenTelemetry](/images/crewai-otel-collector-config.png)</Frame>
<Tip>
Você pode adicionar múltiplos coletores — por exemplo, um para traces e outro para logs, ou enviar para diferentes backends para diferentes propósitos.

View File

@@ -161,18 +161,6 @@ crew = Crew(
)
```
<Note>
`agent.i18n` é mantido apenas para compatibilidade retroativa e está obsoleto. Para customização de prompts em tempo de execução, passe `prompt_file` para `Crew`. Para acesso programático aos slices de prompt, use diretamente o utilitário de i18n:
</Note>
```python
from crewai.utilities.i18n import get_i18n
i18n = get_i18n("custom_prompts.json")
format_slice = i18n.slice("format")
tool_prompt = i18n.tools("ask_question")
```
#### Opção 3: Desativar Prompts de Sistema para Modelos o1
```python
agent = Agent(
@@ -220,8 +208,6 @@ Uma abordagem direta é criar um arquivo JSON para os prompts que deseja sobresc
O CrewAI então mescla suas customizações com os padrões, assim você não precisa redefinir todos os prompts. Veja como:
Para código que precisa ler slices de prompt diretamente, use `crewai.utilities.i18n.get_i18n()` com o mesmo arquivo de prompts em vez de ler `agent.i18n`.
### Exemplo: Customização Básica de Prompt
Crie um arquivo `custom_prompts.json` com os prompts que deseja modificar. Certifique-se de listar todos os prompts de nível superior que ele deve conter, não apenas suas alterações:

View File

@@ -8,7 +8,7 @@ authors = [
]
requires-python = ">=3.10, <3.14"
dependencies = [
"crewai-core==1.14.7rc1",
"crewai-core==1.14.7a4",
"click>=8.1.7,<9",
"pydantic>=2.11.9,<2.13",
"pydantic-settings~=2.10.1",

View File

@@ -1 +1 @@
__version__ = "1.14.7rc1"
__version__ = "1.14.7a4"

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.14.7rc1"
"crewai[tools]==1.14.7a4"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.14.7rc1"
"crewai[tools]==1.14.7a4"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "Power up your crews with {{folder_name}}"
readme = "README.md"
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.14.7rc1"
"crewai[tools]==1.14.7a4"
]
[tool.crewai]

View File

@@ -1 +1 @@
__version__ = "1.14.7rc1"
__version__ = "1.14.7a4"

View File

@@ -17,7 +17,7 @@ import contextlib
import logging
import os
import threading
from typing import Any, ClassVar, Final
from typing import Any, Final
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
@@ -27,7 +27,7 @@ from opentelemetry.sdk.trace.export import (
BatchSpanProcessor,
SpanExportResult,
)
from opentelemetry.trace import ProxyTracerProvider, Span, Status, StatusCode
from opentelemetry.trace import Span, Status, StatusCode
from typing_extensions import Self
@@ -72,8 +72,8 @@ class Telemetry:
and event-bus signal handlers (see ``crewai.telemetry.telemetry``).
"""
_instance: ClassVar[Self | None] = None
_lock: ClassVar[threading.Lock] = threading.Lock()
_instance = None
_lock = threading.Lock()
def __new__(cls) -> Self:
if cls._instance is None:
@@ -149,10 +149,6 @@ class Telemetry:
if self.ready and not self.trace_set:
try:
with suppress_warnings():
existing_provider = trace.get_tracer_provider()
if not isinstance(existing_provider, ProxyTracerProvider):
self.trace_set = True
return
trace.set_tracer_provider(self.provider)
self.trace_set = True
except Exception as e:

View File

@@ -14,7 +14,6 @@ from crewai_core import (
version,
)
import pytest
from opentelemetry.sdk.trace import TracerProvider
def test_version_returns_string() -> None:
@@ -95,36 +94,3 @@ def test_user_data_decline_blocks(
def test_unused_var_warning_silenced() -> None:
# Touch os to keep the import (used by env-var fixtures above)
assert os.environ is not None
def test_core_telemetry_skips_duplicate_tracer_provider(
monkeypatch: pytest.MonkeyPatch,
) -> None:
from crewai_core.telemetry import Telemetry
Telemetry._instance = None
monkeypatch.delenv("OTEL_SDK_DISABLED", raising=False)
monkeypatch.delenv("CREWAI_DISABLE_TELEMETRY", raising=False)
monkeypatch.delenv("CREWAI_DISABLE_TRACKING", raising=False)
monkeypatch.setattr(
"crewai_core.telemetry.trace.get_tracer_provider",
lambda: TracerProvider(),
)
called = False
def fail_if_called(provider: object) -> None:
nonlocal called
called = True
monkeypatch.setattr(
"crewai_core.telemetry.trace.set_tracer_provider",
fail_if_called,
)
telemetry = Telemetry()
telemetry.set_tracer()
assert called is False
assert telemetry.trace_set is True

View File

@@ -152,4 +152,4 @@ __all__ = [
"wrap_file_source",
]
__version__ = "1.14.7rc1"
__version__ = "1.14.7a4"

View File

@@ -10,7 +10,7 @@ requires-python = ">=3.10, <3.14"
dependencies = [
"pytube~=15.0.0",
"requests>=2.33.0,<3",
"crewai==1.14.7rc1",
"crewai==1.14.7a4",
"tiktoken>=0.8.0,<0.13",
"beautifulsoup4~=4.13.4",
"python-docx~=1.2.0",

View File

@@ -330,4 +330,4 @@ __all__ = [
"ZapierActionTools",
]
__version__ = "1.14.7rc1"
__version__ = "1.14.7a4"

View File

@@ -8,8 +8,8 @@ authors = [
]
requires-python = ">=3.10, <3.14"
dependencies = [
"crewai-core==1.14.7rc1",
"crewai-cli==1.14.7rc1",
"crewai-core==1.14.7a4",
"crewai-cli==1.14.7a4",
# Core Dependencies
"pydantic>=2.11.9,<2.13",
"openai>=2.30.0,<3",
@@ -54,7 +54,7 @@ Repository = "https://github.com/crewAIInc/crewAI"
[project.optional-dependencies]
tools = [
"crewai-tools==1.14.7rc1",
"crewai-tools==1.14.7a4",
]
embeddings = [
"tiktoken>=0.8.0,<0.13"

View File

@@ -48,7 +48,7 @@ def _suppress_pydantic_deprecation_warnings() -> None:
_suppress_pydantic_deprecation_warnings()
__version__ = "1.14.7rc1"
__version__ = "1.14.7a4"
_LAZY_IMPORTS: dict[str, tuple[str, str]] = {
"Memory": ("crewai.memory.unified_memory", "Memory"),

View File

@@ -46,7 +46,6 @@ from crewai.state.checkpoint_config import CheckpointConfig, _coerce_checkpoint
from crewai.tools.base_tool import BaseTool, Tool
from crewai.types.callback import SerializableCallable
from crewai.utilities.config import process_config
from crewai.utilities.i18n import I18N, get_i18n
from crewai.utilities.logger import Logger
from crewai.utilities.rpm_controller import RPMController
from crewai.utilities.string_utils import interpolate_only
@@ -187,7 +186,6 @@ class BaseAgent(BaseModel, ABC, metaclass=AgentMeta):
tools (list[Any] | None): Tools at the agent's disposal.
max_iter (int): Maximum iterations for an agent to execute a task.
agent_executor: An instance of the CrewAgentExecutor class.
i18n (I18N): Internationalization settings.
llm (Any): Language model that will run the agent.
crew (Any): Crew to which the agent belongs.
@@ -267,14 +265,6 @@ class BaseAgent(BaseModel, ABC, metaclass=AgentMeta):
_serialize_executor_ref, return_type=dict | None, when_used="json"
),
] = Field(default=None, description="An instance of the CrewAgentExecutor class.")
i18n: I18N = Field(
default_factory=get_i18n,
description="Internationalization settings.",
deprecated=(
"Agent.i18n is deprecated and will be removed in a future release. "
"Use crewai.utilities.i18n.get_i18n() or Crew(prompt_file=...) instead."
),
)
llm: Annotated[
str | BaseLLM | None,

View File

@@ -117,10 +117,8 @@ def capture_execution_context(
)
def apply_execution_context(ctx: ExecutionContext | dict[str, Any]) -> None:
def apply_execution_context(ctx: ExecutionContext) -> None:
"""Write an ExecutionContext back into the ContextVars."""
if isinstance(ctx, dict):
ctx = ExecutionContext.model_validate(ctx)
_current_task_id.set(ctx.current_task_id)
current_flow_request_id.set(ctx.flow_request_id)
current_flow_id.set(ctx.flow_id)

View File

@@ -1013,7 +1013,6 @@ class Crew(FlowTrackable, BaseModel):
)
token = attach(baggage_ctx)
runtime_scope = crewai_event_bus._enter_runtime_scope()
try:
inputs = prepare_kickoff(self, inputs, input_files)
@@ -1049,7 +1048,6 @@ class Crew(FlowTrackable, BaseModel):
self._memory.drain_writes()
clear_files(self.id)
detach(token)
crewai_event_bus._exit_runtime_scope(runtime_scope)
def _post_kickoff(self, result: CrewOutput) -> CrewOutput:
return result
@@ -1225,7 +1223,6 @@ class Crew(FlowTrackable, BaseModel):
)
token = attach(baggage_ctx)
runtime_scope = crewai_event_bus._enter_runtime_scope()
try:
inputs = prepare_kickoff(self, inputs, input_files)
@@ -1259,7 +1256,6 @@ class Crew(FlowTrackable, BaseModel):
finally:
clear_files(self.id)
detach(token)
crewai_event_bus._exit_runtime_scope(runtime_scope)
async def akickoff_for_each(
self,

View File

@@ -80,17 +80,6 @@ def is_replaying() -> bool:
return _replaying.get()
_runtime_state_var: contextvars.ContextVar[RuntimeState | None] = (
contextvars.ContextVar("crewai_runtime_state", default=None)
)
_registered_entity_ids_var: contextvars.ContextVar[set[int] | None] = (
contextvars.ContextVar("crewai_registered_entity_ids", default=None)
)
_runtime_scope_depth: contextvars.ContextVar[int] = contextvars.ContextVar(
"crewai_runtime_scope_depth", default=0
)
class CrewAIEventsBus:
"""Singleton event bus for handling events in CrewAI.
@@ -127,6 +116,7 @@ class CrewAIEventsBus:
_futures_lock: threading.Lock
_executor_initialized: bool
_has_pending_events: bool
_runtime_state: RuntimeState | None
def __new__(cls) -> Self:
"""Create or return the singleton instance.
@@ -161,6 +151,8 @@ class CrewAIEventsBus:
self._console = ConsoleFormatter()
self._executor_initialized = False
self._has_pending_events = False
self._runtime_state: RuntimeState | None = None
self._registered_entity_ids: set[int] = set()
def _ensure_executor_initialized(self) -> None:
"""Lazily initialize the thread pool executor and event loop.
@@ -289,51 +281,6 @@ class CrewAIEventsBus:
"""The RuntimeState currently attached to the bus, if any."""
return self._runtime_state
@property
def _runtime_state(self) -> RuntimeState | None:
return _runtime_state_var.get()
@_runtime_state.setter
def _runtime_state(self, value: RuntimeState | None) -> None:
_runtime_state_var.set(value)
@property
def _registered_entity_ids(self) -> set[int]:
ids = _registered_entity_ids_var.get()
if ids is None:
ids = set()
_registered_entity_ids_var.set(ids)
return ids
@_registered_entity_ids.setter
def _registered_entity_ids(self, value: set[int]) -> None:
_registered_entity_ids_var.set(value)
def reset_runtime_state(self) -> None:
"""Detach the RuntimeState and clear the entity registry."""
self._runtime_state = None
self._registered_entity_ids = set()
def _enter_runtime_scope(self) -> bool:
depth = _runtime_scope_depth.get()
_runtime_scope_depth.set(depth + 1)
if depth != 0:
return False
if _runtime_state_var.get() is None:
from crewai import RuntimeState
if RuntimeState is not None:
_runtime_state_var.set(RuntimeState(root=[]))
_registered_entity_ids_var.set(set())
return True
def _exit_runtime_scope(self, outermost: bool) -> None:
depth = _runtime_scope_depth.get()
_runtime_scope_depth.set(depth - 1 if depth > 0 else 0)
if outermost:
_runtime_state_var.set(None)
_registered_entity_ids_var.set(None)
def register_entity(self, entity: Any) -> None:
"""Add an entity to the RuntimeState, creating it if needed.
@@ -402,7 +349,6 @@ class CrewAIEventsBus:
source: Any,
event: BaseEvent,
handlers: SyncHandlerSet,
state: RuntimeState | None,
) -> None:
"""Call provided synchronous handlers.
@@ -410,8 +356,8 @@ class CrewAIEventsBus:
source: The emitting object
event: The event instance
handlers: Frozenset of sync handlers to call
state: The RuntimeState captured on the emitting context
"""
state = self._runtime_state
errors: list[tuple[SyncHandler, Exception]] = [
(handler, error)
for handler in handlers
@@ -430,7 +376,6 @@ class CrewAIEventsBus:
source: Any,
event: BaseEvent,
handlers: AsyncHandlerSet,
state: RuntimeState | None,
) -> None:
"""Asynchronously call provided async handlers.
@@ -438,8 +383,8 @@ class CrewAIEventsBus:
source: The object that emitted the event
event: The event instance
handlers: Frozenset of async handlers to call
state: The RuntimeState captured on the emitting context
"""
state = self._runtime_state
async def _call(handler: AsyncHandler) -> Any:
if _get_param_count(handler) >= 3:
@@ -454,9 +399,7 @@ class CrewAIEventsBus:
f"[CrewAIEventsBus] Async handler error in {getattr(handler, '__name__', handler)}: {result}"
)
async def _emit_with_dependencies(
self, source: Any, event: BaseEvent, state: RuntimeState | None
) -> None:
async def _emit_with_dependencies(self, source: Any, event: BaseEvent) -> None:
"""Emit an event with dependency-aware handler execution.
Handlers are grouped into execution levels based on their dependencies.
@@ -507,18 +450,18 @@ class CrewAIEventsBus:
if level_sync:
if event_type is LLMStreamChunkEvent:
self._call_handlers(source, event, level_sync, state)
self._call_handlers(source, event, level_sync)
else:
ctx = contextvars.copy_context()
future = self._sync_executor.submit(
ctx.run, self._call_handlers, source, event, level_sync, state
ctx.run, self._call_handlers, source, event, level_sync
)
await asyncio.get_running_loop().run_in_executor(
None, future.result
)
if level_async:
await self._acall_handlers(source, event, level_async, state)
await self._acall_handlers(source, event, level_async)
def _register_source(self, source: Any) -> None:
"""Register the source entity in RuntimeState if applicable."""
@@ -613,23 +556,21 @@ class CrewAIEventsBus:
self._ensure_executor_initialized()
self._has_pending_events = True
state = self._runtime_state
if has_dependencies:
return self._track_future(
asyncio.run_coroutine_threadsafe(
self._emit_with_dependencies(source, event, state),
self._emit_with_dependencies(source, event),
self._loop,
)
)
if sync_handlers:
if event_type is LLMStreamChunkEvent:
self._call_handlers(source, event, sync_handlers, state)
self._call_handlers(source, event, sync_handlers)
else:
ctx = contextvars.copy_context()
sync_future = self._sync_executor.submit(
ctx.run, self._call_handlers, source, event, sync_handlers, state
ctx.run, self._call_handlers, source, event, sync_handlers
)
if not async_handlers:
return self._track_future(sync_future)
@@ -637,7 +578,7 @@ class CrewAIEventsBus:
if async_handlers:
return self._track_future(
asyncio.run_coroutine_threadsafe(
self._acall_handlers(source, event, async_handlers, state),
self._acall_handlers(source, event, async_handlers),
self._loop,
)
)
@@ -649,22 +590,21 @@ class CrewAIEventsBus:
source: Any,
event: BaseEvent,
handlers: AsyncHandlerSet,
state: RuntimeState | None,
) -> None:
"""Call async handlers with the replaying flag set on the loop thread."""
token = _replaying.set(True)
try:
await self._acall_handlers(source, event, handlers, state)
await self._acall_handlers(source, event, handlers)
finally:
_replaying.reset(token)
async def _emit_with_dependencies_replaying(
self, source: Any, event: BaseEvent, state: RuntimeState | None
self, source: Any, event: BaseEvent
) -> None:
"""Dependency-aware dispatch with the replaying flag set."""
token = _replaying.set(True)
try:
await self._emit_with_dependencies(source, event, state)
await self._emit_with_dependencies(source, event)
finally:
_replaying.reset(token)
@@ -698,13 +638,12 @@ class CrewAIEventsBus:
self._ensure_executor_initialized()
self._has_pending_events = True
state = self._runtime_state
token = _replaying.set(True)
try:
if has_dependencies:
return self._track_future(
asyncio.run_coroutine_threadsafe(
self._emit_with_dependencies_replaying(source, event, state),
self._emit_with_dependencies_replaying(source, event),
self._loop,
)
)
@@ -712,7 +651,7 @@ class CrewAIEventsBus:
if sync_handlers:
ctx = contextvars.copy_context()
sync_future = self._sync_executor.submit(
ctx.run, self._call_handlers, source, event, sync_handlers, state
ctx.run, self._call_handlers, source, event, sync_handlers
)
self._track_future(sync_future)
if not async_handlers:
@@ -720,9 +659,7 @@ class CrewAIEventsBus:
return self._track_future(
asyncio.run_coroutine_threadsafe(
self._acall_handlers_replaying(
source, event, async_handlers, state
),
self._acall_handlers_replaying(source, event, async_handlers),
self._loop,
)
)
@@ -790,9 +727,7 @@ class CrewAIEventsBus:
async_handlers = self._async_handlers.get(event_type, frozenset())
if async_handlers:
await self._acall_handlers(
source, event, async_handlers, self._runtime_state
)
await self._acall_handlers(source, event, async_handlers)
def register_handler(
self,

View File

@@ -292,7 +292,7 @@ class TraceCollectionListener(BaseEventListener):
@event_bus.on(CrewKickoffCompletedEvent)
def on_crew_completed(source: Any, event: CrewKickoffCompletedEvent) -> None:
self._handle_trace_event("crew_kickoff_completed", source, event)
if self._should_defer_session_finalization():
if self.batch_manager.defer_session_finalization:
return
if self._nested_in_flow_execution():
return
@@ -306,7 +306,7 @@ class TraceCollectionListener(BaseEventListener):
@event_bus.on(CrewKickoffFailedEvent)
def on_crew_failed(source: Any, event: CrewKickoffFailedEvent) -> None:
self._handle_trace_event("crew_kickoff_failed", source, event)
if self._should_defer_session_finalization():
if self.batch_manager.defer_session_finalization:
return
if self._nested_in_flow_execution():
return
@@ -734,7 +734,7 @@ class TraceCollectionListener(BaseEventListener):
if not self.batch_manager.is_batch_initialized():
return
# Multi-turn flows defer batch finalization to finalize_session_traces().
if self._should_defer_session_finalization():
if self.batch_manager.defer_session_finalization:
return
self.batch_manager.finalize_batch()
@@ -745,15 +745,6 @@ class TraceCollectionListener(BaseEventListener):
return current_flow_id.get() is not None
def _should_defer_session_finalization(self) -> bool:
"""True when the active trace belongs to a deferred flow session."""
from crewai.flow.flow_context import current_flow_defer_trace_finalization
return (
self.batch_manager.defer_session_finalization
or current_flow_defer_trace_finalization.get()
)
def _flow_owns_trace_batch(self) -> bool:
"""True when an in-flight conversational flow already owns the trace batch."""
if self.batch_manager.batch_owner_type == "flow":
@@ -795,11 +786,7 @@ class TraceCollectionListener(BaseEventListener):
(``current_flow_id``) to keep LLM/tool events from falling back to an
implicit crew batch.
"""
from crewai.flow.flow_context import (
current_flow_defer_trace_finalization,
current_flow_id,
current_flow_name,
)
from crewai.flow.flow_context import current_flow_id, current_flow_name
flow_id = current_flow_id.get()
if flow_id is None:
@@ -815,8 +802,6 @@ class TraceCollectionListener(BaseEventListener):
}
self.batch_manager.batch_owner_type = "flow"
self.batch_manager.batch_owner_id = flow_id
if current_flow_defer_trace_finalization.get():
self.batch_manager.defer_session_finalization = True
self._initialize_batch(user_context, execution_metadata)
return True

View File

@@ -1,6 +1,6 @@
from typing import Any, Literal
from pydantic import BaseModel, ConfigDict, field_serializer
from pydantic import BaseModel, ConfigDict
from crewai.events.base_events import BaseEvent
@@ -57,10 +57,6 @@ class MethodExecutionFailedEvent(FlowEvent):
model_config = ConfigDict(arbitrary_types_allowed=True)
@field_serializer("error")
def _serialize_error(self, error: Exception) -> str:
return str(error)
class MethodExecutionPausedEvent(FlowEvent):
"""Event emitted when a flow method is paused waiting for human feedback.

View File

@@ -1,17 +1,15 @@
"""Conversational graph + helpers as an experimental Flow extension.
"""Conversational graph + helpers as a mixin for ``Flow`` (experimental).
The conversational chat surface remains experimental and may change before the
v2 graduation path. It lives here so ``crewai.flow.runtime`` can stay focused
on the execution engine. ``crewai.flow.flow`` composes this mixin onto the
public ``Flow`` class for backwards compatibility.
The built-in conversational graph only registers for subclasses that opt in
with ``conversational = True``. Static conversational metadata is projected
into ``FlowDefinition.conversational`` via the Python DSL builder.
The experimental conversational chat surface lives here as a mixin so that
``crewai.flow.runtime`` stays focused on the execution engine. ``Flow``
inherits from ``_ConversationalMixin``; the methods only register on
subclasses that opt in via ``conversational = True`` (enforced by the
``_conversational_only`` marker + ``FlowMeta`` gating in
``crewai.flow.runtime``).
Import surface:
- :class:`_ConversationalMixin` — internal; the public ``Flow`` class
composes it in. Users don't import it directly.
- :class:`_ConversationalMixin` — internal; ``Flow`` mixes it in. Users
don't import it directly.
- The data types this mixin uses live in
:mod:`crewai.experimental.conversational`.
"""
@@ -22,7 +20,7 @@ from collections.abc import Callable, Mapping, Sequence
from enum import Enum
import json
import logging
from typing import TYPE_CHECKING, Any, ClassVar, Literal, TypeVar, cast
from typing import TYPE_CHECKING, Any, ClassVar, Literal, cast
from pydantic import BaseModel, Field, create_model
@@ -46,69 +44,26 @@ from crewai.flow.conversation import (
get_conversation_messages,
receive_user_message as _receive_user_message,
)
from crewai.flow.dsl import listen, start
from crewai.flow.dsl._utils import _set_flow_method_definition
from crewai.flow.flow_definition import FlowMethodDefinition
from crewai.flow.dsl import listen, router, start
from crewai.utilities.types import LLMMessage
if TYPE_CHECKING:
from crewai.flow.runtime import Flow
from crewai.llms.base_llm import BaseLLM
logger = logging.getLogger(__name__)
def _iter_condition_labels(condition: Any) -> set[str]:
if isinstance(condition, str):
return {condition}
if isinstance(condition, dict):
labels: set[str] = set()
for value in condition.values():
if isinstance(value, list):
for item in value:
labels.update(_iter_condition_labels(item))
else:
labels.update(_iter_condition_labels(value))
return labels
return set()
def _conversation_start_router(func: Callable[..., Any]) -> Any:
wrapper = start()(func)
_set_flow_method_definition(
cast(Any, wrapper),
FlowMethodDefinition(start=True, router=True),
)
return wrapper
class _ConversationalMixin:
"""Experimental conversational graph for ``Flow``.
"""Built-in conversational graph for ``Flow`` (gated on ``conversational``).
This mixin owns chat behavior and runtime hooks. Non-chat flows see these
methods as inert attributes unless they opt in with ``conversational = True``.
Mixed into ``Flow`` so its execution engine (``runtime.py``) stays focused
on running graphs. The methods here only register on subclasses that set
``conversational = True``; non-chat flows see them as inert attributes.
"""
# === EXPERIMENTAL: conversational mode ===
# When ``conversational = True`` on a Flow subclass, this mixin's built-in
# graph registers and ``handle_turn`` / ``chat`` become chat entry points.
conversational: ClassVar[bool] = False
conversational_config: ClassVar[ConversationConfig | None] = None
builtin_routes: ClassVar[tuple[str, ...]] = ("converse", "end")
internal_routes: ClassVar[tuple[str, ...]] = ("answer_from_history",)
builtin_route_descriptions: ClassVar[dict[str, str]] = {
"converse": (
"Ordinary chat, follow-ups, summaries, clarifications, and "
"questions answerable from prior conversation history."
),
"end": ("User signals the conversation is finished (goodbye, exit, done)."),
"answer_from_history": (
"Answer directly from prior conversation history without invoking "
"tools, agents, or custom routes."
),
}
# The metaclass + state attributes referenced below live on ``Flow`` —
# this mixin is never instantiated standalone. These type-only
# declarations exist so static analyzers don't flag attribute access.
@@ -116,15 +71,22 @@ class _ConversationalMixin:
# (otherwise mypy flags "Cannot override instance variable with class
# variable" when Flow declares them as ``ClassVar``).
if TYPE_CHECKING:
conversational: ClassVar[bool]
conversational_config: ClassVar[ConversationConfig | None]
builtin_routes: ClassVar[tuple[str, ...]]
internal_routes: ClassVar[tuple[str, ...]]
builtin_route_descriptions: ClassVar[dict[str, str]]
# Registry ClassVars populated by ``FlowMeta`` at class creation.
_listeners: ClassVar[dict[Any, Any]]
# Instance attrs from ``Flow``.
state: Any
name: str | None
_completed_methods: set[Any]
_method_outputs: list[Any]
_pending_events: dict[Any, Any]
_pending_and_listeners: dict[Any, Any]
_method_call_counts: dict[Any, int]
_is_execution_resuming: bool
_conversation_messages: list[LLMMessage]
_pending_user_message: str | dict[str, Any] | None
_pending_intents: Sequence[str] | None
_pending_intent_llm: str | BaseLLM | None
@@ -135,8 +97,8 @@ class _ConversationalMixin:
def _collapse_to_outcome(
self,
feedback: str,
outcomes: Sequence[str],
llm: str | BaseLLM,
outcomes: tuple[str, ...],
llm: str | BaseLLM | Any,
) -> str:
pass
@@ -146,24 +108,23 @@ class _ConversationalMixin:
def kickoff(self, *args: Any, **kwargs: Any) -> Any:
pass
@start()
@_conversational_only
def conversation_start(self) -> str | None:
"""Return the current user message for conversational route selection.
"""Internal Flow entrypoint that hands the user message to the router.
This remains as a plain overridable helper for compatibility. It is not
registered as a Flow method; ``route_conversation`` is the synthetic
built-in start/router that begins a conversational turn.
In conversational mode, ``Flow.kickoff_async`` runs all ``@start``
methods sequentially and this one is registered last, so any user
``@start`` methods (e.g. permission loading) have already finished
before the returned value triggers ``route_conversation``.
"""
state = cast(ConversationState, self.state)
return state.current_user_message
@_conversation_start_router
@router(conversation_start)
@_conversational_only
def route_conversation(self) -> str:
"""Route the current turn to a listener label."""
if "conversation_start" not in {
str(method_name) for method_name in self._completed_methods
}:
self.conversation_start()
state = cast(ConversationState, self.state)
context = self.build_router_context()
previous_intent = state.last_intent
@@ -277,8 +238,8 @@ class _ConversationalMixin:
state = cast(ConversationState, self.state)
sid = session_id or state.id
# Stash the pending turn so the kickoff extension hook picks it up
# after persist restore.
# Stash the pending turn so ``_apply_pending_conversational_turn``
# picks it up AFTER persist restore.
self._pending_user_message = message
self._pending_intents = list(intents) if intents else None
self._pending_intent_llm = intent_llm
@@ -325,7 +286,7 @@ class _ConversationalMixin:
callers can customize prompts or exercise the loop without patching
builtins.
"""
if not self._is_conversational_enabled():
if not getattr(type(self), "conversational", False):
raise ValueError("Flow.chat() is only available on conversational flows")
exit_set = {command.lower() for command in exit_commands}
@@ -530,14 +491,14 @@ class _ConversationalMixin:
**extra: Any,
) -> None:
"""Append a message to conversation history (legacy ChatState path)."""
_append_conversation_message(cast(Any, self), role, content, **extra)
_append_conversation_message(cast("Flow[Any]", self), role, content, **extra)
@property
def conversation_messages(self) -> list[LLMMessage]:
"""Message history from state, coerced to LLM-shaped dicts."""
return [
message_to_llm_dict(message)
for message in get_conversation_messages(cast(Any, self))
for message in get_conversation_messages(cast("Flow[Any]", self))
]
def receive_user_message(
@@ -553,7 +514,7 @@ class _ConversationalMixin:
``state.messages`` and preserve ``last_intent`` across turns.
Non-conversational flows fall through to the legacy helper.
"""
if self._is_conversational_enabled():
if self.conversational:
state = cast(ConversationState, self.state)
state.messages.append(ConversationMessage(role="user", content=text))
self._emit_conversation_message_added(
@@ -574,7 +535,9 @@ class _ConversationalMixin:
return intent
return text
return _receive_user_message(cast(Any, self), text, outcomes=outcomes, llm=llm)
return _receive_user_message(
cast("Flow[Any]", self), text, outcomes=outcomes, llm=llm
)
def classify_intent(
self,
@@ -598,104 +561,27 @@ class _ConversationalMixin:
def _conversation_config(self) -> ConversationConfig | None:
return getattr(type(self), "conversational_config", None)
@property
def _conversation_definition(self) -> Any | None:
return self._conversation_flow_definition().conversational
def _conversation_flow_definition(self) -> Any:
flow_definition = getattr(type(self), "flow_definition", None)
if not callable(flow_definition):
raise AttributeError(
f"{type(self).__name__} does not expose flow_definition()"
)
return flow_definition()
@classmethod
def _conversational_definition(cls) -> Any | None:
flow_definition = getattr(cls, "flow_definition", None)
if not callable(flow_definition):
return None
return flow_definition().conversational
@classmethod
def _is_conversational(cls) -> bool:
definition = cls._conversational_definition()
return bool(definition and definition.enabled)
def _is_conversational_enabled(self) -> bool:
definition = self._conversation_definition
return bool(definition and definition.enabled)
def _initialize_runtime_extension_attrs(self) -> None:
if not isinstance(getattr(self, "_conversation_messages", None), list):
object.__setattr__(self, "_conversation_messages", [])
if not hasattr(self, "_pending_user_message"):
object.__setattr__(self, "_pending_user_message", None)
if not hasattr(self, "_pending_intents"):
object.__setattr__(self, "_pending_intents", None)
if not hasattr(self, "_pending_intent_llm"):
object.__setattr__(self, "_pending_intent_llm", None)
def _create_default_extension_state(self) -> ConversationState | None:
initial_state_t = getattr(self, "_initial_state_t", None)
if type(self)._is_conversational() and (
not hasattr(self, "_initial_state_t")
or isinstance(initial_state_t, TypeVar)
):
return ConversationState()
return None
def _should_apply_pending_kickoff_context(self) -> bool:
return (
type(self)._is_conversational() and self._pending_user_message is not None
)
def _apply_pending_kickoff_context(self) -> None:
self._apply_pending_conversational_turn()
def _order_start_methods_for_kickoff(
self,
start_methods: list[Any],
) -> tuple[list[Any], bool]:
if not type(self)._is_conversational():
return start_methods, False
route_conversation = "route_conversation"
if route_conversation not in {str(method) for method in start_methods}:
return start_methods, False
ordered_starts = [
method for method in start_methods if str(method) != route_conversation
]
ordered_starts.append(
next(
method for method in start_methods if str(method) == route_conversation
)
)
return ordered_starts, True
def _should_defer_trace_finalization(self) -> bool:
"""Whether per-turn ``FlowFinished`` + ``finalize_batch`` should be skipped.
True when either:
- ``flow.defer_trace_finalization`` is set on the instance, OR
- the static conversational definition enables deferred finalization.
- the class-level ``ConversationConfig.defer_trace_finalization``
on a conversational subclass is True.
Either source enables the deferred-session pattern. The caller
eventually invokes ``finalize_session_traces()`` to close the batch.
"""
if getattr(self, "defer_trace_finalization", False):
return True
definition = self._conversation_definition
return bool(
definition and definition.enabled and definition.defer_trace_finalization
)
config = self._conversation_config
return bool(config and config.defer_trace_finalization)
def _reset_turn_execution_state(self) -> None:
"""Clear per-execution tracking so the next turn re-runs the graph."""
self._completed_methods.clear()
self._method_outputs.clear()
self._pending_events.clear()
self._pending_and_listeners.clear()
self._method_call_counts.clear()
self._clear_or_listeners()
self._is_execution_resuming = False
@@ -847,12 +733,11 @@ class _ConversationalMixin:
router_config: RouterConfig | None,
) -> dict[str, str]:
label_to_method: dict[str, str] = {}
flow_definition = self._conversation_flow_definition()
for listener_name, method_definition in flow_definition.methods.items():
if method_definition.listen is None or method_definition.router:
continue
for trigger_label in _iter_condition_labels(method_definition.listen):
label_to_method.setdefault(trigger_label, listener_name)
for listener_name, condition in self._listeners.items():
if isinstance(condition, tuple):
_, trigger_labels = condition
for trigger_label in trigger_labels:
label_to_method.setdefault(str(trigger_label), str(listener_name))
routes = self._effective_routes(router_config)
overrides = (
@@ -903,31 +788,21 @@ class _ConversationalMixin:
def _valid_route_labels(self) -> set[str]:
labels: set[str] = set()
flow_definition = self._conversation_flow_definition()
for method_definition in flow_definition.methods.values():
if method_definition.listen is None or method_definition.router:
continue
labels.update(_iter_condition_labels(method_definition.listen))
for condition in self._listeners.values():
if isinstance(condition, tuple):
_, methods = condition
labels.update(str(method) for method in methods)
return labels
def _effective_routes(self, router_config: RouterConfig | None = None) -> set[str]:
custom_routes = set(router_config.routes or ()) if router_config else set()
definition = self._conversation_definition
builtin_routes = (
tuple(definition.builtin_routes)
if definition is not None
else self.builtin_routes
)
internal_routes = (
tuple(definition.internal_routes)
if definition is not None
else self.internal_routes
)
if not custom_routes:
custom_routes = (
self._valid_route_labels() - set(builtin_routes) - set(internal_routes)
self._valid_route_labels()
- set(self.builtin_routes)
- set(self.internal_routes)
)
return custom_routes | set(builtin_routes)
return custom_routes | set(self.builtin_routes)
def _default_conversation_llm(self) -> Any | None:
config = self._conversation_config
@@ -1056,15 +931,12 @@ class _ConversationalMixin:
trace_listener = TraceCollectionListener()
batch_manager = trace_listener.batch_manager
try:
if batch_manager.batch_owner_type == "flow":
if trace_listener.first_time_handler.is_first_time:
trace_listener.first_time_handler.mark_events_collected()
trace_listener.first_time_handler.handle_execution_completion()
else:
batch_manager.finalize_batch()
finally:
batch_manager.defer_session_finalization = False
if batch_manager.batch_owner_type == "flow":
if trace_listener.first_time_handler.is_first_time:
trace_listener.first_time_handler.mark_events_collected()
trace_listener.first_time_handler.handle_execution_completion()
else:
batch_manager.finalize_batch()
__all__ = ["_ConversationalMixin"]

View File

@@ -1,48 +0,0 @@
"""Static conversational Flow definition models.
This module is part of the serializable Flow Definition contract. It should
only contain static data shapes. Experimental conversational runtime behavior
continues to live in ``crewai.experimental.conversational_mixin``.
"""
from __future__ import annotations
from typing import Any, Literal
from pydantic import BaseModel, Field
class FlowConversationalRouterDefinition(BaseModel):
"""Static conversational router configuration."""
prompt: str | None = None
response_format: Any = None
llm: Any = None
routes: list[str] | None = None
route_descriptions: dict[str, str] | None = None
default_intent: str | None = "converse"
fallback_intent: str | None = "converse"
intent_field: str = "intent"
class FlowConversationalDefinition(BaseModel):
"""Static conversational Flow configuration."""
enabled: bool = False
system_prompt: str | None = None
llm: Any = None
router: FlowConversationalRouterDefinition | None = None
answer_from_history_prompt: str | None = None
default_intents: list[str] | None = None
intent_llm: Any = None
answer_from_history_llm: Any = None
visible_agent_outputs: list[str] | Literal["all"] | None = None
defer_trace_finalization: bool = True
builtin_routes: list[str] = Field(default_factory=lambda: ["converse", "end"])
internal_routes: list[str] = Field(default_factory=lambda: ["answer_from_history"])
__all__ = [
"FlowConversationalDefinition",
"FlowConversationalRouterDefinition",
]

View File

@@ -9,8 +9,6 @@ from typing_extensions import TypeIs
from crewai.flow.flow_definition import (
FlowConfigDefinition,
FlowConversationalDefinition,
FlowConversationalRouterDefinition,
FlowDefinition,
FlowDefinitionDiagnostic,
FlowHumanFeedbackDefinition,
@@ -29,13 +27,6 @@ R = TypeVar("R")
logger = logging.getLogger(__name__)
_FLOW_METHOD_DEFINITION_ATTR = "__flow_method_definition__"
_FLOW_METHOD_METADATA_ATTRS = [
"__conversational_only__",
"__flow_method_definition__",
"__flow_persistence_config__",
"__human_feedback_config__",
"_human_feedback_llm",
]
def is_flow_method(obj: Any) -> TypeIs[FlowMethod[Any, Any]]:
@@ -51,39 +42,6 @@ def _should_include_flow_method(flow_class: type, method: Any) -> bool:
return True
def _is_conversational_flow(flow_class: type) -> bool:
return bool(getattr(flow_class, "conversational", False))
def _get_inherited_conversational_method(
flow_class: type,
attr_name: str,
) -> Any | None:
if not _is_conversational_flow(flow_class):
return None
for base in flow_class.__mro__[1:]:
inherited = base.__dict__.get(attr_name)
if inherited is None:
continue
if getattr(inherited, "__conversational_only__", False) and is_flow_method(
inherited
):
return inherited
return None
def _stamp_inherited_conversational_metadata(
method: Any,
inherited: Any,
) -> Any:
for attr in _FLOW_METHOD_METADATA_ATTRS:
if hasattr(inherited, attr):
setattr(method, attr, getattr(inherited, attr))
method.__is_flow_method__ = True
return method
def _set_flow_method_definition(
wrapper: FlowMethod[P, R],
definition: FlowMethodDefinition,
@@ -177,8 +135,6 @@ def _build_state_definition(
from pydantic import BaseModel as PydanticBaseModel
state_value = getattr(flow_class, "_initial_state_t", None)
if isinstance(state_value, TypeVar):
state_value = None
initial_state = getattr(flow_class, "initial_state", None)
if initial_state is not None:
state_value = initial_state
@@ -274,98 +230,6 @@ def _build_persistence_definition(
)
def _build_conversational_router_definition(
router_config: Any,
diagnostics: list[FlowDefinitionDiagnostic],
path: str,
) -> FlowConversationalRouterDefinition | None:
if router_config is None:
return None
routes = getattr(router_config, "routes", None)
return FlowConversationalRouterDefinition(
prompt=getattr(router_config, "prompt", None),
response_format=_serialize_static_value(
getattr(router_config, "response_format", None),
diagnostics,
f"{path}.response_format",
),
llm=_serialize_static_value(
getattr(router_config, "llm", None), diagnostics, f"{path}.llm"
),
routes=[str(route) for route in routes] if routes is not None else None,
route_descriptions=getattr(router_config, "route_descriptions", None),
default_intent=getattr(router_config, "default_intent", "converse"),
fallback_intent=getattr(router_config, "fallback_intent", "converse"),
intent_field=str(getattr(router_config, "intent_field", "intent")),
)
def _build_conversational_definition(
flow_class: type,
diagnostics: list[FlowDefinitionDiagnostic],
) -> FlowConversationalDefinition | None:
if not _is_conversational_flow(flow_class):
return None
config = getattr(flow_class, "conversational_config", None)
builtin_routes = getattr(flow_class, "builtin_routes", ("converse", "end"))
internal_routes = getattr(
flow_class,
"internal_routes",
("answer_from_history",),
)
if config is None:
return FlowConversationalDefinition(
enabled=True,
builtin_routes=[str(route) for route in builtin_routes],
internal_routes=[str(route) for route in internal_routes],
)
default_intents = getattr(config, "default_intents", None)
visible_agent_outputs = getattr(config, "visible_agent_outputs", None)
return FlowConversationalDefinition(
enabled=True,
system_prompt=getattr(config, "system_prompt", None),
llm=_serialize_static_value(
getattr(config, "llm", None), diagnostics, "conversational.llm"
),
router=_build_conversational_router_definition(
getattr(config, "router", None),
diagnostics,
"conversational.router",
),
answer_from_history_prompt=getattr(config, "answer_from_history_prompt", None),
default_intents=(
[str(intent) for intent in default_intents]
if default_intents is not None
else None
),
intent_llm=_serialize_static_value(
getattr(config, "intent_llm", None),
diagnostics,
"conversational.intent_llm",
),
answer_from_history_llm=_serialize_static_value(
getattr(config, "answer_from_history_llm", None),
diagnostics,
"conversational.answer_from_history_llm",
),
visible_agent_outputs=(
"all"
if visible_agent_outputs == "all"
else [str(output) for output in visible_agent_outputs]
if visible_agent_outputs is not None
else None
),
defer_trace_finalization=bool(
getattr(config, "defer_trace_finalization", True)
),
builtin_routes=[str(route) for route in builtin_routes],
internal_routes=[str(route) for route in internal_routes],
)
def _build_method_definition(
method: Any,
diagnostics: list[FlowDefinitionDiagnostic],
@@ -406,29 +270,6 @@ def _iter_flow_methods(flow_class: type) -> dict[str, Any]:
flow_class, attr_value
):
methods[attr_name] = attr_value
continue
inherited = _get_inherited_conversational_method(flow_class, attr_name)
if inherited is not None and callable(attr_value):
methods[attr_name] = _stamp_inherited_conversational_metadata(
attr_value, inherited
)
if _is_conversational_flow(flow_class):
for base in reversed(flow_class.__mro__[1:]):
for attr_name, raw_value in base.__dict__.items():
if attr_name.startswith("_") or attr_name in methods:
continue
if not getattr(raw_value, "__conversational_only__", False):
continue
try:
attr_value = getattr(flow_class, attr_name)
except AttributeError:
continue
if is_flow_method(attr_value) and _should_include_flow_method(
flow_class, attr_value
):
methods[attr_name] = attr_value
# A wrapped method whose name collides with a base Flow model field
# (e.g. ``checkpoint``) is absorbed by Pydantic as a field; the underlying
@@ -473,7 +314,6 @@ def _build_flow_definition_from_class(
state=_build_state_definition(flow_class, diagnostics),
config=_build_config_definition(flow_class, diagnostics),
persist=_build_persistence_definition(flow_class, diagnostics, "persist"),
conversational=_build_conversational_definition(flow_class, diagnostics),
methods=methods,
diagnostics=diagnostics,
)

View File

@@ -6,22 +6,15 @@ The implementation now lives in three modules, split by concern:
``@router``, ``or_`` / ``and_``) and Python Flow class projection
- ``crewai.flow.flow_definition`` -- the serializable Flow Definition contract
- ``crewai.flow.runtime`` -- the Flow execution engine and state
- ``crewai.experimental.conversational_mixin`` -- experimental conversational
runtime extension composed onto the public ``Flow`` class
Prefer importing from those modules in new code; this module preserves the
historical ``crewai.flow.flow`` import path.
"""
from typing import Any, TypeVar
from pydantic import BaseModel
from crewai.experimental.conversational_mixin import _ConversationalMixin
from crewai.flow.dsl import and_, listen, or_, router, start
from crewai.flow.runtime import (
_INITIAL_STATE_CLASS_MARKER,
Flow as RuntimeFlow,
Flow,
FlowMeta,
FlowState,
LockedDictProxy,
@@ -30,13 +23,6 @@ from crewai.flow.runtime import (
)
T = TypeVar("T", bound=dict[str, Any] | BaseModel)
class Flow(_ConversationalMixin, RuntimeFlow[T]):
"""Public Flow class with experimental conversational extension behavior."""
__all__ = [
"_INITIAL_STATE_CLASS_MARKER",
"Flow",

View File

@@ -15,10 +15,6 @@ current_flow_id: contextvars.ContextVar[str | None] = contextvars.ContextVar(
"flow_id", default=None
)
current_flow_defer_trace_finalization: contextvars.ContextVar[bool] = (
contextvars.ContextVar("flow_defer_trace_finalization", default=False)
)
current_flow_method_name: contextvars.ContextVar[str] = contextvars.ContextVar(
"flow_method_name", default="unknown"
)

View File

@@ -16,11 +16,6 @@ from typing import Any, Literal as TypingLiteral
from pydantic import BaseModel, ConfigDict, Field
import yaml
from crewai.flow.conversational_definition import (
FlowConversationalDefinition,
FlowConversationalRouterDefinition,
)
logger = logging.getLogger(__name__)
@@ -28,8 +23,6 @@ FlowDefinitionCondition = str | dict[str, Any]
__all__ = [
"FlowConfigDefinition",
"FlowConversationalDefinition",
"FlowConversationalRouterDefinition",
"FlowDefinition",
"FlowDefinitionCondition",
"FlowDefinitionDiagnostic",
@@ -122,7 +115,6 @@ class FlowDefinition(BaseModel):
state: FlowStateDefinition | None = None
config: FlowConfigDefinition = Field(default_factory=FlowConfigDefinition)
persist: FlowPersistenceDefinition | None = None
conversational: FlowConversationalDefinition | None = None
methods: dict[str, FlowMethodDefinition] = Field(default_factory=dict)
diagnostics: list[FlowDefinitionDiagnostic] = Field(default_factory=list)

View File

@@ -84,13 +84,13 @@ from crewai.events.types.flow_events import (
MethodExecutionPausedEvent,
MethodExecutionStartedEvent,
)
from crewai.flow.dsl._utils import build_flow_definition
from crewai.flow.flow_context import (
current_flow_defer_trace_finalization,
current_flow_id,
current_flow_name,
current_flow_request_id,
from crewai.experimental.conversational import (
ConversationConfig,
ConversationState,
)
from crewai.experimental.conversational_mixin import _ConversationalMixin
from crewai.flow.dsl._utils import build_flow_definition
from crewai.flow.flow_context import current_flow_id, current_flow_request_id
from crewai.flow.flow_definition import (
FlowDefinition,
FlowDefinitionCondition,
@@ -139,6 +139,7 @@ from crewai.utilities.streaming import (
signal_end,
signal_error,
)
from crewai.utilities.types import LLMMessage
# Runtime alias so Pydantic can resolve the ``execution_context`` field's
@@ -153,42 +154,14 @@ ExecutionContext = Any # type: ignore[assignment,misc]
logger = logging.getLogger(__name__)
def _condition_branches(
condition: dict[str, Any],
) -> tuple[Literal["and", "or"], list[FlowDefinitionCondition]]:
if "and" in condition:
return "and", condition["and"]
return "or", condition["or"]
def _condition_satisfied(condition: FlowDefinitionCondition, events: set[str]) -> bool:
if isinstance(condition, str):
return condition in events
operator, branches = _condition_branches(condition)
combine = all if operator == "and" else any
return combine(_condition_satisfied(branch, events) for branch in branches)
def _iter_condition_events(condition: FlowDefinitionCondition) -> Iterator[str]:
if isinstance(condition, str):
yield condition
return
_, branches = _condition_branches(condition)
for branch in branches:
yield from _iter_condition_events(branch)
def _or_alternative_events(condition: FlowDefinitionCondition) -> Iterator[str]:
if isinstance(condition, str):
yield condition
return
operator, branches = _condition_branches(condition)
if operator != "or":
return
for branch in branches:
yield from _or_alternative_events(branch)
sub_conditions = condition["and"] if "and" in condition else condition["or"]
for sub_condition in sub_conditions:
yield from _iter_condition_events(sub_condition)
def _is_multi_event_or(
@@ -197,8 +170,7 @@ def _is_multi_event_or(
if isinstance(condition, str):
return False
operator, branches = _condition_branches(condition)
return operator == "or" and len(branches) > 1
return "or" in condition and len(condition["or"]) > 1
def _resolve_persistence(value: Any) -> Any:
@@ -644,7 +616,7 @@ class FlowMeta(ModelMetaclass):
return super().__new__(mcs, name, bases, namespace)
class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
"""Base class for all flows.
type parameter T must be either dict[str, Any] or a subclass of BaseModel."""
@@ -658,33 +630,41 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
_flow_definition: ClassVar[FlowDefinition | None] = None
# === EXPERIMENTAL: conversational mode ===
# When ``conversational = True`` on a subclass, the built-in conversational
# graph (``conversation_start`` -> ``route_conversation`` -> ``converse_turn``
# / ``end_conversation`` / ``answer_from_history_turn``) registers and
# ``handle_turn`` / ``chat`` become the chat entry points. When ``False``
# (default), the methods exist as inert attributes and never register or
# fire — non-chat flows pay no runtime cost.
#
# ⚠ EXPERIMENTAL FEATURE. The whole conversational surface
# (``conversational`` ClassVar, ``handle_turn``, ``chat``,
# ``ConversationConfig``, ``RouterConfig``, ``ConversationState``, the
# built-in graph + helpers) lives under ``crewai.experimental`` and may
# change shape before graduating. Pin your CrewAI version if you depend on
# specific behavior, and watch the changelog for breaking updates.
conversational: ClassVar[bool] = False
conversational_config: ClassVar[ConversationConfig | None] = None
builtin_routes: ClassVar[tuple[str, ...]] = ("converse", "end")
internal_routes: ClassVar[tuple[str, ...]] = (
"answer_from_history",
"conversation_start",
)
builtin_route_descriptions: ClassVar[dict[str, str]] = {
"converse": (
"Ordinary chat, follow-ups, summaries, clarifications, and "
"questions answerable from prior conversation history."
),
"end": ("User signals the conversation is finished (goodbye, exit, done)."),
"answer_from_history": (
"Answer directly from prior conversation history without invoking "
"tools, agents, or custom routes."
),
}
entity_type: Literal["flow"] = "flow"
def _initialize_runtime_extension_attrs(self) -> None:
"""Initialize optional runtime-extension attributes."""
def _create_default_extension_state(self) -> Any | None:
"""Return a default state supplied by an optional runtime extension."""
return None
def _should_apply_pending_kickoff_context(self) -> bool:
"""Whether an optional runtime extension has pending kickoff context."""
return False
def _apply_pending_kickoff_context(self) -> None:
"""Apply optional runtime-extension kickoff context."""
def _order_start_methods_for_kickoff(
self,
start_methods: list[FlowMethodName],
) -> tuple[list[FlowMethodName], bool]:
"""Allow an optional runtime extension to order kickoff start methods."""
return start_methods, False
def _should_defer_trace_finalization(self) -> bool:
"""Whether this kickoff should defer final flow trace finalization."""
return bool(getattr(self, "defer_trace_finalization", False))
@classmethod
def flow_definition(cls) -> FlowDefinition:
"""Return the static Flow Definition built from this Flow class."""
@@ -884,7 +864,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
_method_execution_counts: dict[FlowMethodName, int] = PrivateAttr(
default_factory=dict
)
_pending_events: dict[PendingListenerKey, set[str]] = PrivateAttr(
_pending_and_listeners: dict[PendingListenerKey, set[int]] = PrivateAttr(
default_factory=dict
)
_fired_or_listeners: set[FlowMethodName] = PrivateAttr(default_factory=set)
@@ -902,6 +882,10 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
_human_feedback_method_outputs: dict[str, Any] = PrivateAttr(default_factory=dict)
_input_history: list[InputHistoryEntry] = PrivateAttr(default_factory=list)
_state: Any = PrivateAttr(default=None)
_conversation_messages: list[LLMMessage] = PrivateAttr(default_factory=list)
_pending_user_message: str | dict[str, Any] | None = PrivateAttr(default=None)
_pending_intents: Sequence[str] | None = PrivateAttr(default=None)
_pending_intent_llm: str | "BaseLLM" | None = PrivateAttr(default=None)
_deferred_flow_started_event_id: str | None = PrivateAttr(default=None)
def __class_getitem__(cls: type[Flow[T]], item: type[T]) -> type[Flow[T]]: # type: ignore[override]
@@ -927,7 +911,6 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
if getattr(self, "_flow_post_init_done", False):
return
object.__setattr__(self, "_flow_post_init_done", True)
self._initialize_runtime_extension_attrs()
if self._state is None:
self._state = self._create_initial_state()
@@ -1044,8 +1027,11 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
condition = type(self)._start_condition(method_name)
if condition is None:
return False
return self._condition_met(
condition, trigger, PendingListenerKey(f"start:{method_name}")
return self._evaluate_condition(
condition,
trigger,
method_name,
pending_key_prefix=f"start:{method_name}",
)
def _rearm_or_listeners_for_trigger(
@@ -1085,9 +1071,6 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
# Only events that EXCLUSIVELY feed one OR listener race; an event that
# also feeds another listener (e.g. an AND) is left alone when a sibling
# wins. e.g. @listen(or_(a, b)) on handler -> {frozenset({a, b}): handler}.
# Events nested under an and_() branch (e.g. or_(and_(a, b), c)) are not
# alternatives and never race -- cancelling one would make the AND
# unsatisfiable.
racing_groups: dict[frozenset[FlowMethodName], FlowMethodName] = {}
listener_conditions: dict[FlowMethodName, FlowDefinitionCondition] = {
listener_name: condition
@@ -1110,14 +1093,14 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
for listener_name, condition in listener_conditions.items():
if not isinstance(condition, dict):
continue
alternatives = set(_or_alternative_events(condition))
if len(alternatives) <= 1:
events = events_by_listener[listener_name]
if "or" not in condition or len(events) <= 1:
continue
exclusive_events = {
event
for event in alternatives
if listeners_by_event[event] == {listener_name}
for event in events
if listeners_by_event.get(event, set()) == {listener_name}
}
if len(exclusive_events) > 1:
# Racing only applies to method-completion events: each member is
@@ -1519,10 +1502,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
)
self._event_futures.clear()
if (
not self.suppress_flow_events
and not self._should_defer_trace_finalization()
):
if not self.suppress_flow_events:
future = crewai_event_bus.emit(
self,
FlowFinishedEvent(
@@ -1539,12 +1519,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
logger.warning("FlowFinishedEvent handler failed", exc_info=True)
trace_listener = TraceCollectionListener()
if (
trace_listener.batch_manager.batch_owner_type == "flow"
and current_flow_id.get() == self.flow_id
and not trace_listener.batch_manager.defer_session_finalization
and not current_flow_defer_trace_finalization.get()
):
if trace_listener.batch_manager.batch_owner_type == "flow":
if trace_listener.first_time_handler.is_first_time:
trace_listener.first_time_handler.mark_events_collected()
trace_listener.first_time_handler.handle_execution_completion()
@@ -1565,15 +1540,20 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
"""
init_state = self.initial_state
if init_state is None:
extension_state = self._create_default_extension_state()
if extension_state is not None:
return cast(T, extension_state)
# Conversational subclasses default to ``ConversationState`` if the
# user didn't supply an explicit type parameter (``Flow[...]``) or an
# ``initial_state``. This makes ``class MyChat(Flow): conversational
# = True`` work without forcing every user to import and parameterize
# ``ConversationState`` themselves.
if (
init_state is None
and getattr(type(self), "conversational", False)
and not hasattr(self, "_initial_state_t")
):
return cast(T, ConversationState())
if init_state is None and hasattr(self, "_initial_state_t"):
state_type = self._initial_state_t
if isinstance(state_type, TypeVar):
state_type = None
if isinstance(state_type, type):
if issubclass(state_type, FlowState):
instance = state_type()
@@ -1935,17 +1915,13 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
restore_from_state_id=restore_from_state_id,
)
runtime_scope = crewai_event_bus._enter_runtime_scope()
try:
try:
asyncio.get_running_loop()
ctx = contextvars.copy_context()
with ThreadPoolExecutor(max_workers=1) as pool:
return pool.submit(ctx.run, asyncio.run, _run_flow()).result()
except RuntimeError:
return asyncio.run(_run_flow())
finally:
crewai_event_bus._exit_runtime_scope(runtime_scope)
asyncio.get_running_loop()
ctx = contextvars.copy_context()
with ThreadPoolExecutor(max_workers=1) as pool:
return pool.submit(ctx.run, asyncio.run, _run_flow()).result()
except RuntimeError:
return asyncio.run(_run_flow())
async def kickoff_async(
self,
@@ -2037,23 +2013,12 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
flow_token = attach(ctx)
flow_id_token = None
flow_name_token = None
flow_defer_trace_finalization_token = None
request_id_token = None
if current_flow_id.get() is None:
flow_id_token = current_flow_id.set(self.flow_id)
flow_name_token = current_flow_name.set(
self.name or self.__class__.__name__
)
flow_defer_trace_finalization_token = (
current_flow_defer_trace_finalization.set(
self._should_defer_trace_finalization()
)
)
if current_flow_request_id.get() is None:
request_id_token = current_flow_request_id.set(self.flow_id)
runtime_scope = crewai_event_bus._enter_runtime_scope()
try:
# Reset flow state for fresh execution unless restoring from persistence
is_restoring = (
@@ -2063,7 +2028,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
# Clear completed methods and outputs for a fresh start
self._completed_methods.clear()
self._method_outputs.clear()
self._pending_events.clear()
self._pending_and_listeners.clear()
self._clear_or_listeners()
self._method_call_counts.clear()
else:
@@ -2145,10 +2110,6 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
should_emit_flow_started = not (
defer_trace_finalization and deferred_started_event_id
)
if current_flow_id.get() == self.flow_id:
TraceCollectionListener().batch_manager.defer_session_finalization = (
defer_trace_finalization
)
if (
defer_trace_finalization
@@ -2162,8 +2123,9 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
if should_emit_flow_started:
# In normal flows, each kickoff owns its own flow lifecycle.
# Deferred sessions reuse the first flow scope until an
# explicit finalization call closes the batch.
# Deferred conversational sessions are different: the first
# turn opens the flow scope and later turns reuse it until
# ``finalize_session_traces()`` emits the single finish event.
started_event = FlowStartedEvent(
type="flow_started",
flow_name=self.name or self.__class__.__name__,
@@ -2193,8 +2155,16 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
# with implicit "crew" execution_type.
get_env_context()
if self._should_apply_pending_kickoff_context():
self._apply_pending_kickoff_context()
# Conversational hook: apply the pending user message AFTER state
# restore and AFTER flow scope initialization, so transcript events
# are parented under the current conversation trace.
# ``handle_turn`` stashes the message on ``self._pending_user_message``
# before calling ``kickoff``; this drains it.
if (
getattr(type(self), "conversational", False)
and self._pending_user_message is not None
):
self._apply_pending_conversational_turn()
if inputs is not None and "id" not in inputs:
self._initialize_state(inputs)
@@ -2217,18 +2187,11 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
starts_to_execute = (
unconditional_starts if unconditional_starts else start_methods
)
starts_to_execute, run_starts_sequentially = (
self._order_start_methods_for_kickoff(starts_to_execute)
)
if run_starts_sequentially:
for start_method in starts_to_execute:
await self._execute_start_method(start_method)
else:
tasks = [
self._execute_start_method(start_method)
for start_method in starts_to_execute
]
await asyncio.gather(*tasks)
tasks = [
self._execute_start_method(start_method)
for start_method in starts_to_execute
]
await asyncio.gather(*tasks)
except Exception as e:
# Check if flow was paused for human feedback
from crewai.flow.async_feedback.types import HumanFeedbackPending
@@ -2300,9 +2263,10 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
# When ``defer_trace_finalization`` is set, skip both per-turn
# ``FlowFinishedEvent`` AND trace-batch finalization. The caller
# invokes the matching finalization hook once at session end. The
# flag is read from either the instance attribute or an extension
# definition.
# invokes ``finalize_session_traces()`` once at session end to
# close out the whole conversation as one trace. The flag is
# read from EITHER the instance attribute (set by user code) OR
# the class-level ``ConversationConfig.defer_trace_finalization``.
if not self._should_defer_trace_finalization():
future = crewai_event_bus.emit(
self,
@@ -2322,12 +2286,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
)
trace_listener = TraceCollectionListener()
if (
trace_listener.batch_manager.batch_owner_type == "flow"
and current_flow_id.get() == self.flow_id
and not trace_listener.batch_manager.defer_session_finalization
and not current_flow_defer_trace_finalization.get()
):
if trace_listener.batch_manager.batch_owner_type == "flow":
if trace_listener.first_time_handler.is_first_time:
trace_listener.first_time_handler.mark_events_collected()
trace_listener.first_time_handler.handle_execution_completion()
@@ -2341,16 +2300,9 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
self.memory.drain_writes()
if request_id_token is not None:
current_flow_request_id.reset(request_id_token)
if flow_defer_trace_finalization_token is not None:
current_flow_defer_trace_finalization.reset(
flow_defer_trace_finalization_token
)
if flow_name_token is not None:
current_flow_name.reset(flow_name_token)
if flow_id_token is not None:
current_flow_id.reset(flow_id_token)
detach(flow_token)
crewai_event_bus._exit_runtime_scope(runtime_scope)
async def akickoff(
self,
@@ -2773,18 +2725,63 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
else:
await self._execute_start_method(method_name)
def _condition_met(
def _evaluate_condition(
self,
condition: FlowDefinitionCondition,
trigger_method: FlowMethodName,
subscription_key: PendingListenerKey,
listener_name: FlowMethodName,
pending_key_prefix: str | None = None,
) -> bool:
seen = self._pending_events.setdefault(subscription_key, set())
seen.add(str(trigger_method))
if not _condition_satisfied(condition, seen):
return False
del self._pending_events[subscription_key]
return True
if isinstance(condition, str):
return condition == str(trigger_method)
def _sub_prefix(index: int) -> str | None:
if pending_key_prefix is None:
return None
return f"{pending_key_prefix}:{index}"
if "or" in condition:
# Evaluate every sub-condition (no short-circuit): a nested and_()
# branch needs the chance to clear its pending state in
# _pending_and_listeners even when an earlier branch already matched.
any_matched = False
for index, sub_condition in enumerate(condition["or"]):
if self._evaluate_condition(
sub_condition,
trigger_method,
listener_name,
pending_key_prefix=_sub_prefix(index),
):
any_matched = True
return any_matched
sub_conditions = condition["and"]
pending_key = PendingListenerKey(
pending_key_prefix
if pending_key_prefix is not None
else f"{listener_name}:{id(condition)}"
)
if pending_key not in self._pending_and_listeners:
self._pending_and_listeners[pending_key] = set(range(len(sub_conditions)))
pending_conditions = self._pending_and_listeners[pending_key]
for index, sub_condition in enumerate(sub_conditions):
if index not in pending_conditions:
continue
if self._evaluate_condition(
sub_condition,
trigger_method,
listener_name,
pending_key_prefix=_sub_prefix(index),
):
pending_conditions.discard(index)
if not pending_conditions:
self._pending_and_listeners.pop(pending_key, None)
return True
return False
def _find_triggered_methods(
self, trigger_method: FlowMethodName, router_only: bool
@@ -2802,8 +2799,10 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
if should_check_fired and listener_name in self._fired_or_listeners:
continue
if self._condition_met(
condition, trigger_method, PendingListenerKey(str(listener_name))
if self._evaluate_condition(
condition,
trigger_method,
listener_name,
):
triggered.append(listener_name)
if should_check_fired:
@@ -2938,7 +2937,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
return self.input_provider
if flow_config.input_provider is not None:
return flow_config.input_provider
return cast(InputProvider, ConsoleProvider())
return ConsoleProvider()
def _checkpoint_state_for_ask(self) -> None:
"""Auto-checkpoint flow state before waiting for user input.
@@ -3057,7 +3056,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
executor = ThreadPoolExecutor(max_workers=1)
ctx = contextvars.copy_context()
future = executor.submit(
ctx.run, provider.request_input, message, cast(Any, self), metadata
ctx.run, provider.request_input, message, self, metadata
)
try:
raw = future.result(timeout=timeout)
@@ -3070,9 +3069,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
# cancel_futures=True cleans up any queued-but-not-started tasks.
executor.shutdown(wait=False, cancel_futures=True)
else:
raw = provider.request_input(
message, cast(Any, self), metadata=metadata
)
raw = provider.request_input(message, self, metadata=metadata)
except KeyboardInterrupt:
raise
except Exception:
@@ -3350,7 +3347,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
flow_name=self.name or self.__class__.__name__,
),
)
structure = build_flow_structure(cast(Any, self))
structure = build_flow_structure(self)
return render_interactive(structure, filename=filename, show=show)
@staticmethod

View File

@@ -16,7 +16,7 @@ R = TypeVar("R", covariant=True)
FlowMethodName = NewType("FlowMethodName", str)
PendingListenerKey = NewType(
"PendingListenerKey",
Annotated[str, "listener method name, or 'start:<method>' for conditional starts"],
Annotated[str, "nested flow conditions use 'listener_name:object_id'"],
)

View File

@@ -259,9 +259,8 @@ class RecallFlow(Flow[RecallState]):
candidates = []
if not candidates:
candidates = [scope_prefix]
selected_scopes = candidates[:20]
self.state.candidate_scopes = selected_scopes
return selected_scopes
self.state.candidate_scopes = candidates[:20]
return self.state.candidate_scopes
@listen(filter_and_chunk)
def search_chunks(self) -> list[Any]:
@@ -369,10 +368,9 @@ class RecallFlow(Flow[RecallState]):
)
)
matches.sort(key=lambda m: m.score, reverse=True)
final_results = matches[: self.state.limit]
self.state.final_results = final_results
self.state.final_results = matches[: self.state.limit]
if self.state.evidence_gaps and self.state.final_results:
self.state.final_results[0].evidence_gaps = list(self.state.evidence_gaps)
return final_results
return self.state.final_results

View File

@@ -30,7 +30,7 @@ from opentelemetry.sdk.trace.export import (
BatchSpanProcessor,
SpanExportResult,
)
from opentelemetry.trace import ProxyTracerProvider, Span
from opentelemetry.trace import Span
from typing_extensions import Self
from crewai.events.event_bus import crewai_event_bus
@@ -162,10 +162,6 @@ class Telemetry:
if self.ready and not self.trace_set:
try:
with suppress_warnings():
existing_provider = trace.get_tracer_provider()
if not isinstance(existing_provider, ProxyTracerProvider):
self.trace_set = True
return
trace.set_tracer_provider(self.provider)
self.trace_set = True
except Exception as e:

View File

@@ -4,7 +4,6 @@ import os
import threading
from unittest import mock
from unittest.mock import MagicMock, patch
import warnings
from crewai.agents.crew_agent_executor import AgentFinish, CrewAgentExecutor
from crewai.constants import DEFAULT_LLM_MODEL
@@ -78,51 +77,6 @@ def test_agent_creation():
assert agent.backstory == "test backstory"
def test_agent_exposes_i18n_for_backward_compatibility():
from crewai.utilities.i18n import I18N_DEFAULT
agent = Agent(role="test role", goal="test goal", backstory="test backstory")
with pytest.warns(DeprecationWarning, match="Agent.i18n is deprecated"):
i18n = agent.i18n
assert i18n is I18N_DEFAULT
assert isinstance(i18n.slice("role_playing"), str)
def test_agent_accepts_custom_i18n():
from crewai.utilities.i18n import I18N
prompt_file = os.path.join(
os.path.dirname(__file__), "..", "utilities", "prompts.json"
)
i18n = I18N(prompt_file=prompt_file)
agent = Agent(
role="test role",
goal="test goal",
backstory="test backstory",
i18n=i18n,
)
with pytest.warns(DeprecationWarning, match="Agent.i18n is deprecated"):
agent_i18n = agent.i18n
assert agent_i18n is i18n
assert agent_i18n.slice("role_playing") == "Lorem ipsum dolor sit amet"
def test_agent_copy_does_not_emit_i18n_deprecation_warning():
agent = Agent(role="test role", goal="test goal", backstory="test backstory")
with warnings.catch_warnings(record=True) as caught_warnings:
warnings.simplefilter("always", DeprecationWarning)
agent.copy()
assert not any(
"Agent.i18n is deprecated" in str(w.message) for w in caught_warnings
)
def test_agent_with_only_system_template():
"""Test that an agent with only system_template works without errors."""
agent = Agent(

View File

@@ -32,7 +32,7 @@ def _build_executor(**kwargs: Any) -> AgentExecutor:
executor._method_outputs = []
executor._completed_methods = set()
executor._fired_or_listeners = set()
executor._pending_events = {}
executor._pending_and_listeners = {}
executor._method_execution_counts = {}
executor._method_call_counts = {}
executor._event_futures = []

View File

@@ -2,7 +2,6 @@
from __future__ import annotations
import threading
from typing import Any
from unittest.mock import patch
@@ -110,79 +109,10 @@ class TestCheckpointListenerOptsOut:
assert do_cp.call_count == 0
class TestCheckpointResumeReplaysEvents:
"""A flow resumed from a checkpoint replays MethodExecution* events for
completed methods and executes the pending ones. The checkpoint persists
the event record, which is reloaded into the per-run runtime state.
class TestFlowResumeReplaysEvents:
"""End-to-end: a resumed flow emits MethodExecution* events for completed methods."""
``step_c`` is gated on a threading.Event so the flow is frozen with exactly
``step_a`` and ``step_b`` completed when the checkpoint is written — the
mid-run snapshot is deterministic rather than dependent on write timing.
"""
def test_resume_replays_completed_and_executes_pending(self, tmp_path) -> None:
from crewai.flow.flow import Flow, listen, start
from crewai.state.checkpoint_config import CheckpointConfig
at_step_c = threading.Event()
release = threading.Event()
captured: list[Any] = []
class ThreeStepFlow(Flow[dict]):
@start()
def step_a(self) -> str:
return "a"
@listen(step_a)
def step_b(self) -> str:
return "b"
@listen(step_b)
def step_c(self) -> str:
captured.append(crewai_event_bus.runtime_state)
at_step_c.set()
release.wait(timeout=10)
return "c"
runner = threading.Thread(target=ThreeStepFlow().kickoff)
runner.start()
try:
assert at_step_c.wait(timeout=10)
location = captured[0].checkpoint(str(tmp_path / "cp"))
finally:
release.set()
runner.join(timeout=10)
captured_started: list[str] = []
captured_finished: list[str] = []
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(MethodExecutionStartedEvent)
def _cs(_: Any, event: MethodExecutionStartedEvent) -> None:
captured_started.append(event.method_name)
@crewai_event_bus.on(MethodExecutionFinishedEvent)
def _cf(_: Any, event: MethodExecutionFinishedEvent) -> None:
captured_finished.append(event.method_name)
ThreeStepFlow().kickoff(
from_checkpoint=CheckpointConfig(restore_from=location)
)
assert captured_started == ["step_a", "step_b", "step_c"]
assert captured_finished == ["step_a", "step_b", "step_c"]
class TestPersistResumeDoesNotReplayCompletedEvents:
"""A @persist resume continues from pending methods only.
@persist stores flow state, not the event record, so completed-method
events have no persisted source to replay from. Runtime state is scoped
per run, so flow1's events are not visible to flow2.
"""
def test_persist_resume_executes_only_pending_methods(self, tmp_path) -> None:
def test_resume_dispatches_completed_method_events(self, tmp_path) -> None:
from crewai.flow.flow import Flow, listen, start
from crewai.flow.persistence.sqlite import SQLiteFlowPersistence
@@ -202,6 +132,9 @@ class TestPersistResumeDoesNotReplayCompletedEvents:
def step_c(self) -> str:
return "c"
if crewai_event_bus.runtime_state is not None:
crewai_event_bus.runtime_state.event_record.clear()
flow1 = ThreeStepFlow(persistence=persistence)
flow1.kickoff()
flow_id = flow1.state["id"]
@@ -224,5 +157,9 @@ class TestPersistResumeDoesNotReplayCompletedEvents:
flow2.kickoff(inputs={"id": flow_id})
assert captured_started == ["step_c"]
assert captured_finished == ["step_c"]
assert captured_started.count("step_a") == 1
assert captured_started.count("step_b") == 1
assert captured_started.count("step_c") == 1
assert captured_finished.count("step_a") == 1
assert captured_finished.count("step_b") == 1
assert captured_finished.count("step_c") == 1

View File

@@ -6,7 +6,6 @@ import pytest
from crewai import Agent, Crew, Task
from crewai.telemetry import Telemetry
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
@pytest.fixture(autouse=True)
@@ -54,23 +53,6 @@ def test_telemetry_enabled_by_default():
assert telemetry.ready is True
def test_set_tracer_skips_when_provider_already_configured():
"""A second telemetry instance must not re-install the global provider."""
with (
patch.dict(os.environ, {}, clear=True),
patch(
"crewai.telemetry.telemetry.trace.get_tracer_provider",
return_value=TracerProvider(),
),
patch("crewai.telemetry.telemetry.trace.set_tracer_provider") as mock_set,
):
telemetry = Telemetry()
telemetry.set_tracer()
mock_set.assert_not_called()
assert telemetry.trace_set is True
@patch("crewai.telemetry.telemetry.logger.error")
@patch(
"opentelemetry.exporter.otlp.proto.http.trace_exporter.OTLPSpanExporter.export",

View File

@@ -409,31 +409,4 @@ class TestRuntimeStateIntegration:
old_json, context={"from_checkpoint": True}
)
assert len(restored.root) == 1
assert len(restored.event_record) == 0
def test_reset_runtime_state_clears_state_and_registry(self):
from crewai import Agent, Crew, RuntimeState
from crewai.events.event_bus import crewai_event_bus
if RuntimeState is None:
pytest.skip("RuntimeState unavailable (model_rebuild failed)")
agent = Agent(role="test", goal="test", backstory="test", llm="gpt-4o-mini")
crew = Crew(agents=[agent], tasks=[], verbose=False)
previous_state = crewai_event_bus._runtime_state
previous_ids = crewai_event_bus._registered_entity_ids
crewai_event_bus._runtime_state = None
crewai_event_bus._registered_entity_ids = set()
try:
crewai_event_bus.register_entity(crew)
assert crewai_event_bus.runtime_state is not None
assert crewai_event_bus._registered_entity_ids
crewai_event_bus.reset_runtime_state()
assert crewai_event_bus.runtime_state is None
assert crewai_event_bus._registered_entity_ids == set()
finally:
crewai_event_bus._runtime_state = previous_state
crewai_event_bus._registered_entity_ids = previous_ids
assert len(restored.event_record) == 0

View File

@@ -1542,63 +1542,40 @@ def test_deeply_nested_conditions():
def test_or_branch_does_not_leave_stale_and_state():
fired = []
"""or_() over nested and_() branches must not leave stale pending AND state.
Regression: evaluating an or_() condition stopped at the first branch that was
satisfied, so a later and_() branch that the *same* trigger would have completed
never cleared its pending state. On the next cycle that trigger alone then
spuriously re-satisfied the whole condition. Both branches share the final
event ``x`` here, so the shared trigger that completes branch ``(a AND x)`` also
completes branch ``(c AND x)`` and both must be cleared together.
"""
class StaleStateFlow(Flow):
@start()
def begin(self):
pass
@listen(begin)
def a(self):
pass
@listen(begin)
def c(self):
pass
@listen(and_(a, c))
def x(self):
pass
@listen(or_(and_("a", "x"), and_("c", "y")))
@listen(or_(and_("a", "x"), and_("c", "x")))
def joined(self):
fired.append("joined")
pass
@router(joined)
def emit_y(self):
return "y"
flow = StaleStateFlow()
condition = type(flow)._listen_condition("joined")
StaleStateFlow().kickoff()
def fires(trigger):
return flow._evaluate_condition(condition, trigger, "joined")
assert fired == ["joined"]
# First cycle: "a" then "c" arrive, then the shared "x" completes (a AND x).
assert fires("a") is False
assert fires("c") is False
assert fires("x") is True
def test_and_branch_inside_or_does_not_race():
execution_order = []
class DiamondWithFallbackFlow(Flow):
@start()
def go(self):
execution_order.append("go")
@listen(go)
def a(self):
execution_order.append("a")
@listen(go)
def b(self):
execution_order.append("b")
@listen(or_(and_(a, b), "fallback"))
def done(self):
execution_order.append("done")
DiamondWithFallbackFlow().kickoff()
assert "done" in execution_order
assert execution_order.index("done") > execution_order.index("a")
assert execution_order.index("done") > execution_order.index("b")
# Next cycle: "x" alone must NOT re-satisfy the condition. The "c" from the
# previous cycle was consumed when "joined" fired, so neither branch is half
# complete and "x" by itself is insufficient.
assert fires("x") is False
def test_mixed_sync_async_execution_order():

View File

@@ -26,11 +26,7 @@ from crewai.experimental import (
RouterConfig,
)
from crewai.flow import Flow, ChatState, listen, start
from crewai.flow.flow_context import (
current_flow_defer_trace_finalization,
current_flow_id,
current_flow_name,
)
from crewai.flow.flow_context import current_flow_id, current_flow_name
from crewai.flow.conversation import (
append_message,
get_conversation_messages,
@@ -173,6 +169,9 @@ class TestConversationalFlow:
)
@pytest.mark.skip(
reason="Experimental conversational registry behavior is out of scope for the definition-first start migration."
)
def test_handle_turn_routes_to_listener_and_records_public_result(self) -> None:
@ConversationConfig(default_intents=["research"], intent_llm="gpt-4o-mini")
class ResearchFlow(ConversationalFlow):
@@ -596,15 +595,18 @@ class TestConversationalFlow:
assert result == "legacy-searched"
assert flow.state.last_intent == "search"
@pytest.mark.skip(
reason="Experimental conversational sequential-start behavior is out of scope for the definition-first start migration."
)
def test_user_start_methods_run_sequentially_before_router_in_conversational_mode(
self,
) -> None:
"""Conversational flows: user ``@start`` methods finish before router fires.
Non-chat flows run ``@start`` methods in parallel via ``asyncio.gather``,
which would race with ``route_conversation`` and let the router fire
which would race with ``conversation_start`` and let the router fire
before user setup finished. In conversational mode the framework runs
them sequentially, with ``route_conversation`` last.
them sequentially, with ``conversation_start`` last.
"""
order: list[str] = []
@@ -647,10 +649,18 @@ class TestConversationalFlow:
assert "attach_bus" in order # still fires every turn
assert "route_turn" in order
def test_subclass_can_override_conversation_start_helper(
@pytest.mark.skip(
reason="Experimental inherited conversational start registration is out of scope for the definition-first start migration."
)
def test_subclass_can_override_conversation_start_without_redecorating(
self,
) -> None:
"""The compatibility helper remains overridable without adding a Flow node."""
"""Overriding an inherited ``@start`` method must not unregister it.
Before the metaclass fix, subclasses had to re-apply ``@start()`` on
every override or the parent's ``conversation_start`` would silently
drop out of the start registry — leaving the flow with nothing to fire.
"""
bootstrap_calls: list[str] = []
@@ -671,38 +681,6 @@ class TestConversationalFlow:
flow = BootstrapFlow()
flow.handle_turn("hi")
assert bootstrap_calls == ["ran"]
assert "conversation_start" not in BootstrapFlow.flow_definition().methods
route_definition = BootstrapFlow.flow_definition().methods["route_conversation"]
assert route_definition.start is True
assert route_definition.router is True
assert flow.state.messages[-1].content == "worked"
def test_legacy_decorated_conversation_start_runs_once_per_turn(
self,
) -> None:
"""Legacy ``@start`` overrides are not invoked again by the router."""
bootstrap_calls: list[str] = []
@ConversationConfig()
class BootstrapFlow(ConversationalFlow):
@start()
def conversation_start(self) -> str | None:
bootstrap_calls.append("ran")
return super().conversation_start()
def route_turn(self, context: dict[str, Any]) -> str | None:
return "work"
@listen("work")
def do_work(self) -> str:
self.append_assistant_message("worked")
return "worked"
flow = BootstrapFlow()
flow.handle_turn("hi")
assert bootstrap_calls == ["ran"]
assert flow.state.messages[-1].content == "worked"
@@ -1201,40 +1179,6 @@ class TestConversationalFlow:
"finalize_session_traces must finalize the trace batch once"
)
def test_deferred_resume_skips_per_resume_flow_finished_event(self) -> None:
"""Deferred sessions do not emit terminal events while resuming."""
from crewai.events.types.flow_events import FlowFinishedEvent
from crewai.flow.async_feedback.types import PendingFeedbackContext
class DeferredResumeFlow(Flow[ChatState]):
defer_trace_finalization = True
@start()
def begin(self) -> str:
return "started"
flow = DeferredResumeFlow()
flow._pending_feedback_context = PendingFeedbackContext(
flow_id=flow.flow_id,
flow_class="DeferredResumeFlow",
method_name="begin",
method_output="started",
message="Review",
)
finished_events: list[FlowFinishedEvent] = []
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(FlowFinishedEvent)
def capture(_: Any, event: FlowFinishedEvent) -> None:
finished_events.append(event)
flow.resume("approved")
crewai_event_bus.flush()
assert finished_events == []
def test_finalize_session_traces_restores_event_scope(self, capsys) -> None:
"""No ``empty scope stack`` warning when deferred ``flow_finished`` fires.
@@ -1398,12 +1342,6 @@ class TestFlowTracingWhenSuppressed:
class TestDeferTraceFinalization:
def test_bare_conversational_flow_defers_by_default(self) -> None:
class BareChat(ConversationalFlow):
pass
assert BareChat()._should_defer_trace_finalization() is True
def test_conversation_config_drives_defer_flag(self) -> None:
"""``ConversationConfig(defer_trace_finalization=...)`` controls whether
a conversational subclass defers per-turn trace finalization."""
@@ -1536,44 +1474,6 @@ class TestDeferredFlowLifecycleEvents:
listener.batch_manager.finalize_batch()
mock_finalize.assert_not_called()
def test_deferred_flow_kickoff_marks_trace_manager_session_deferred(
self,
) -> None:
class DeferredTraceFlow(Flow[ChatState]):
@start()
def begin(self) -> str:
return "done"
listener = TraceCollectionListener()
listener.batch_manager.defer_session_finalization = False
flow = DeferredTraceFlow()
flow.defer_trace_finalization = True
with patch.object(listener.batch_manager, "finalize_batch"):
flow.kickoff()
assert listener.batch_manager.defer_session_finalization is True
flow.finalize_session_traces()
assert listener.batch_manager.defer_session_finalization is False
def test_non_deferred_flow_kickoff_clears_stale_trace_manager_flag(
self,
) -> None:
class PlainTraceFlow(Flow[ChatState]):
@start()
def begin(self) -> str:
return "done"
listener = TraceCollectionListener()
listener.batch_manager.defer_session_finalization = True
PlainTraceFlow().kickoff()
assert listener.batch_manager.defer_session_finalization is False
class TestNestedCrewTracing:
def test_is_inside_active_flow_context_when_kickoff_running(self) -> None:
@@ -1627,130 +1527,3 @@ class TestNestedCrewTracing:
elif listener.batch_manager.batch_owner_type == "crew":
listener.batch_manager.finalize_batch()
mock_finalize.assert_not_called()
def test_lazy_flow_batch_from_context_preserves_deferred_parent(self) -> None:
from crewai.events.listeners.tracing.trace_listener import (
TraceCollectionListener,
)
listener = TraceCollectionListener()
listener.batch_manager.current_batch = None
listener.batch_manager.batch_owner_type = None
listener.batch_manager.batch_owner_id = None
listener.batch_manager.defer_session_finalization = False
listener.batch_manager.event_buffer.clear()
flow_id_token = current_flow_id.set("parent-flow-id")
flow_name_token = current_flow_name.set("ParentChatFlow")
defer_token = current_flow_defer_trace_finalization.set(True)
try:
initialized = listener._try_initialize_flow_batch_from_context(
type("Event", (), {"timestamp": None})()
)
assert initialized is True
assert listener.batch_manager.batch_owner_type == "flow"
assert listener.batch_manager.batch_owner_id == "parent-flow-id"
assert listener.batch_manager.defer_session_finalization is True
assert listener.batch_manager.current_batch is not None
assert (
listener.batch_manager.current_batch.execution_metadata[
"execution_type"
]
== "flow"
)
assert (
listener.batch_manager.current_batch.execution_metadata["flow_name"]
== "ParentChatFlow"
)
finally:
current_flow_defer_trace_finalization.reset(defer_token)
current_flow_name.reset(flow_name_token)
current_flow_id.reset(flow_id_token)
listener.batch_manager.current_batch = None
listener.batch_manager.batch_owner_type = None
listener.batch_manager.batch_owner_id = None
listener.batch_manager.trace_batch_id = None
listener.batch_manager.defer_session_finalization = False
listener.batch_manager.event_buffer.clear()
def test_nested_agent_executor_flow_does_not_finalize_parent_batch(
self,
) -> None:
from crewai import Agent, Crew, Task
from crewai.llms.base_llm import BaseLLM
class StaticLLM(BaseLLM):
def __init__(self) -> None:
super().__init__(model="debug-static-llm", provider="debug")
def call(
self,
messages: Any,
tools: Any = None,
callbacks: Any = None,
available_functions: Any = None,
from_task: Any = None,
from_agent: Any = None,
response_model: Any = None,
) -> str:
return (
"Thought: I can answer directly.\n"
"Final Answer: nested crew result"
)
class NestedCrewFlow(Flow[ChatState]):
defer_trace_finalization = True
tracing = True
@start()
def begin(self) -> str:
return "run_nested_crew"
@listen(begin)
def run_nested_crew(self, _: str) -> str:
agent = Agent(
role="Debug Agent",
goal="Return a short deterministic result",
backstory="Used only for trace finalization debugging.",
llm=StaticLLM(),
verbose=False,
)
task = Task(
description="Return the deterministic nested crew result.",
expected_output="nested crew result",
agent=agent,
)
return Crew(agents=[agent], tasks=[task], verbose=False).kickoff().raw
listener = TraceCollectionListener()
listener.batch_manager.current_batch = None
listener.batch_manager.batch_owner_type = None
listener.batch_manager.batch_owner_id = None
listener.batch_manager.trace_batch_id = None
listener.batch_manager.defer_session_finalization = False
listener.batch_manager.event_buffer.clear()
listener.first_time_handler.is_first_time = False
def initialize_backend_batch(*_: Any, **__: Any) -> None:
listener.batch_manager.trace_batch_id = "debug-trace-batch"
flow = NestedCrewFlow()
with (
patch.object(
listener.batch_manager,
"_initialize_backend_batch",
side_effect=initialize_backend_batch,
),
patch.object(listener.batch_manager, "finalize_batch") as mock_finalize,
):
flow.kickoff()
crewai_event_bus.flush()
flow.kickoff()
crewai_event_bus.flush()
assert mock_finalize.call_count == 0, (
"nested AgentExecutor flows inside a deferred parent Flow must "
"not finalize the parent trace batch"
)

View File

@@ -13,7 +13,6 @@ from pydantic import BaseModel
import crewai.flow.dsl as flow_dsl
import crewai.flow.flow_definition as flow_definition
import crewai.flow.visualization.builder as visualization_builder
from crewai.experimental import ConversationConfig, RouterConfig
from crewai.flow import Flow, and_, human_feedback, listen, or_, persist, router, start
@@ -37,8 +36,6 @@ def test_flow_public_exports_are_explicit():
}
assert set(flow_definition.__all__) == {
"FlowConfigDefinition",
"FlowConversationalDefinition",
"FlowConversationalRouterDefinition",
"FlowDefinition",
"FlowDefinitionCondition",
"FlowDefinitionDiagnostic",
@@ -172,7 +169,6 @@ def test_flow_definition_maps_dsl_to_static_contract():
assert definition.state.ref and "ContractState" in definition.state.ref
assert definition.config.stream is True
assert definition.config.max_method_calls == 7
assert definition.conversational is None
assert definition.methods["begin"].start is True
assert definition.methods["process"].listen == "begin"
@@ -205,75 +201,25 @@ def test_flow_definition_excludes_conversational_builtins_for_regular_flows():
methods = RegularFlow.flow_definition().methods
assert RegularFlow.flow_definition().conversational is None
assert set(methods) == {"begin"}
assert "conversation_start" not in methods
assert "route_conversation" not in methods
assert "converse_turn" not in methods
@pytest.mark.skip(
reason="Experimental conversational inherited built-ins are out of scope for the definition-first start migration."
)
def test_flow_definition_includes_conversational_builtins_when_enabled():
class ChatFlow(Flow):
conversational = True
definition = ChatFlow.flow_definition()
methods = definition.methods
assert definition.conversational is not None
assert definition.conversational.enabled is True
assert definition.conversational.defer_trace_finalization is True
assert definition.conversational.builtin_routes == ["converse", "end"]
assert "conversation_start" not in methods
assert "route_conversation" in methods
assert "converse_turn" in methods
assert methods["route_conversation"].start is True
assert methods["route_conversation"].router is True
def test_flow_definition_serializes_conversational_config():
@ConversationConfig(
system_prompt="Be concise.",
llm="gpt-4o-mini",
router=RouterConfig(
prompt="Pick a route.",
routes=["research"],
default_intent="converse",
fallback_intent="end",
),
default_intents=["research"],
visible_agent_outputs=["researcher"],
defer_trace_finalization=False,
)
class ChatFlow(Flow):
conversational = True
conversational = ChatFlow.flow_definition().conversational
assert conversational is not None
assert conversational.system_prompt == "Be concise."
assert conversational.llm == "gpt-4o-mini"
assert conversational.default_intents == ["research"]
assert conversational.visible_agent_outputs == ["researcher"]
assert conversational.defer_trace_finalization is False
assert conversational.router is not None
assert conversational.router.prompt == "Pick a route."
assert conversational.router.routes == ["research"]
assert conversational.router.fallback_intent == "end"
def test_flow_definition_uses_collapsed_conversational_router_start():
class ChatFlow(Flow):
conversational = True
def conversation_start(self) -> str | None:
return "custom"
methods = ChatFlow.flow_definition().methods
assert "conversation_start" not in methods
assert "conversation_start" in methods
assert "route_conversation" in methods
assert methods["route_conversation"].start is True
assert methods["route_conversation"].router is True
assert "converse_turn" in methods
assert methods["conversation_start"].start is True
def test_flow_definition_serializes_human_feedback_metadata():

View File

@@ -1,3 +1,3 @@
"""CrewAI development tools."""
__version__ = "1.14.7rc1"
__version__ = "1.14.7a4"