Compare commits

..

2 Commits

Author SHA1 Message Date
Iris Clawd
d883c7999c docs: address review feedback on Discovery page
- Remove Connect Data Sources / Data Source Integration (doesn't exist)
- Replace 'production data' references with 'knowledge' / 'world model'
- Merge Getting Started into How It Works (were redundant)
- Remove Best Practices and FAQ sections
2026-06-10 18:01:43 +00:00
Iris Clawd
e5d37196c7 docs: add Discovery section to documentation
Add comprehensive documentation for the Discovery feature under
enterprise/features/. Covers overview, how it works, key features,
getting started guide, use cases, best practices, and FAQ.

Screenshot placeholders are included for future visual updates.

Closes DIS-73
2026-06-10 16:59:08 +00:00
21 changed files with 442 additions and 1490 deletions

View File

@@ -24,39 +24,15 @@ mode: "wide"
1. في CrewAI AMP، انتقل إلى **Settings** > **OpenTelemetry Collectors**.
2. انقر على **Add Collector**.
3. اختر تكاملاً:
- **OpenTelemetry Traces** و**OpenTelemetry Logs** — صدّر إلى أي مجمّع أو واجهة خلفية متوافقة مع OTLP.
- **Datadog** — أرسل التتبعات مباشرة إلى استقبال OTLP الخاص بـ Datadog، دون الحاجة إلى مجمّع منفصل أو Datadog Agent.
4. هيّئ الاتصال. تعتمد الحقول على التكامل الذي اخترته:
3. اختر نوع التكامل — **OpenTelemetry Traces** أو **OpenTelemetry Logs**.
4. هيّئ الاتصال:
- **Endpoint** — نقطة نهاية OTLP لمجمّعك (مثل `https://otel-collector.example.com:4317`).
- **Service Name** — اسم لتعريف هذه الخدمة في منصة المراقبة.
- **Custom Headers** *(اختياري)* — أضف رؤوس المصادقة أو التوجيه كأزواج مفتاح-قيمة.
- **Certificate** *(اختياري)* — قدم شهادة TLS إذا كان مجمّعك يتطلبها.
5. انقر على **Save**.
<Tabs>
<Tab title="OpenTelemetry Traces / Logs">
إن **OpenTelemetry Traces** و**OpenTelemetry Logs** تكاملان منفصلان يتشاركان نفس الحقول — اختر التكامل المطابق للإشارة التي تريد تصديرها.
- **Endpoint** — نقطة نهاية OTLP لمجمّعك (مثل `https://otel-collector.example.com:4317`).
- **Service Name** — اسم لتعريف هذه الخدمة في منصة المراقبة.
- **Custom Headers** *(اختياري)* — أضف رؤوس المصادقة أو التوجيه كأزواج مفتاح-قيمة.
- **Certificate** *(اختياري)* — قدم شهادة TLS إذا كان مجمّعك يتطلبها.
<Frame>![تهيئة مجمّع OpenTelemetry](/images/crewai-otel-collector-opentelemetry.png)</Frame>
</Tab>
<Tab title="Datadog">
- **Datadog Site Domain** — مضيف OTLP لموقع Datadog الخاص بك فقط، دون بروتوكول أو مسار. يقوم CrewAI ببناء نقطة نهاية HTTPS OTLP الكاملة نيابةً عنك. استخدم المضيف المطابق لـ [موقع Datadog](https://docs.datadoghq.com/getting_started/site/) الخاص بك:
- `otlp.datadoghq.com` (US1)
- `otlp.us3.datadoghq.com` (US3)
- `otlp.us5.datadoghq.com` (US5)
- `otlp.datadoghq.eu` (EU1)
- `otlp.ap1.datadoghq.com` (AP1)
- **API Key** — مفتاح واجهة برمجة تطبيقات Datadog الخاص بك. راجع [كيفية إنشاء واحد](https://docs.datadoghq.com/account_management/api-app-keys/#api-keys).
يصدّر تكامل Datadog **التتبعات**.
<Frame>![تهيئة مجمّع Datadog](/images/crewai-otel-collector-datadog.png)</Frame>
</Tab>
</Tabs>
5. *(اختياري)* انقر على **Test Connection** للتحقق من قدرة CrewAI على الوصول إلى نقطة النهاية باستخدام بيانات الاعتماد التي قدمتها.
6. انقر على **Save**.
<Frame>![تهيئة مجمّع OpenTelemetry](/images/crewai-otel-collector-config.png)</Frame>
<Tip>
يمكنك إضافة مجمّعات متعددة — على سبيل المثال، واحد للتتبعات وآخر للسجلات، أو الإرسال إلى واجهات خلفية مختلفة لأغراض مختلفة.

View File

@@ -0,0 +1,98 @@
---
title: Discovery
description: "Identify the highest-impact AI automation use cases for your business."
icon: "compass"
mode: "wide"
---
## Overview
Discovery is a new engine inside CrewAI AMP that helps companies identify the best automation use cases for their business.
The bottleneck in AI adoption is not building agents — it's knowing _what_ to build and _how_ to build it for production. Discovery closes that gap.
{/* TODO: Add screenshot of Discovery dashboard */}
Instead of weeks of stakeholder interviews, consultant engagements, and slide decks, Discovery leverages CrewAI's deep knowledge of agent patterns and what works in production to match your business context against proven approaches. Within minutes, you get actionable, evidence-based recommendations specific to your organization.
## How It Works
<Steps>
<Step title="Describe Your Business">
Tell Discovery about your organization — your processes, challenges, goals, and the teams involved. The more context you provide, the more precise the recommendations.
</Step>
<Step title="Multi-Signal Matching">
Discovery runs cohort analysis and structural pattern recognition using CrewAI's world model, matching your business context to automation patterns already running successfully at scale.
</Step>
<Step title="Review Use Cases">
Within minutes, you receive a set of use cases specific to your company — not generic templates. Each one shows what the automation does, expected impact, complexity, and how it would work in your organization.
{/* TODO: Add screenshot of use case recommendations */}
</Step>
<Step title="Build">
Select a use case and go directly into Crew Studio or export to code to start building.
</Step>
</Steps>
{/* TODO: Add screenshot of Discovery flow / results page */}
## Key Features
<CardGroup cols={2}>
<Card title="Business-Specific Recommendations" icon="bullseye">
Not generic templates. Real use cases matched to your organization based on CrewAI's knowledge of what works in production.
</Card>
<Card title="Impact & Complexity Scoring" icon="chart-mixed">
Each recommendation includes expected impact, implementation complexity, and how it fits your org — so you can prioritize with confidence.
</Card>
<Card title="Iterative Discovery" icon="arrows-rotate">
Run Discovery multiple times across different business units. It becomes part of how you plan and iterate on your AI roadmap.
</Card>
<Card title="Evidence-Based" icon="flask-vial">
Every recommendation is grounded in what CrewAI knows actually works in production — not guesswork or intuition.
</Card>
</CardGroup>
## From Discovery to Production
Discovery fits at the very beginning of the CrewAI workflow — it's the "what to build" step before the "how to build" step.
{/* TODO: Add diagram showing Discovery → Crew Studio → Automations flow */}
The end-to-end flow:
1. **Discovery** identifies the use case and provides the blueprint
2. **Crew Studio** or code lets you build the automation
3. **Automations** deploys it to production
This means you go from "we should use AI somewhere" to a running production automation with a clear, guided path — no guesswork at any stage.
## Use Cases
<CardGroup cols={2}>
<Card title="New to AI Agents" icon="seedling">
Don't know where to start? Discovery identifies the highest-impact opportunities specific to your business, so you begin with what matters most.
</Card>
<Card title="Scaling AI Programs" icon="rocket">
Already have some automations? Discovery finds the next wave of use cases across departments, helping you expand beyond initial pilots.
</Card>
<Card title="Cross-Department Rollout" icon="building">
Run Discovery for different business units to build a company-wide AI roadmap with use cases tailored to each team's needs.
</Card>
<Card title="ROI Prioritization" icon="chart-line">
Need to justify AI investment? Discovery provides evidence-based impact estimates grounded in real-world results.
</Card>
</CardGroup>
## Related
<CardGroup cols={3}>
<Card title="Crew Studio" href="/en/enterprise/features/crew-studio" icon="pencil">
Build automations with AI assistance and a visual editor.
</Card>
<Card title="Automations" href="/en/enterprise/features/automations" icon="bolt">
Deploy and manage your automations in production.
</Card>
<Card title="Marketplace" href="/en/enterprise/features/marketplace" icon="store">
Browse pre-built automations and components.
</Card>
</CardGroup>

View File

@@ -24,39 +24,15 @@ Telemetry data follows the [OpenTelemetry GenAI semantic conventions](https://op
1. In CrewAI AMP, go to **Settings** > **OpenTelemetry Collectors**.
2. Click **Add Collector**.
3. Select an integration:
- **OpenTelemetry Traces** and **OpenTelemetry Logs** — export to any OTLP-compatible collector or backend.
- **Datadog** — send traces straight to Datadog's OTLP intake, no separate collector or Datadog Agent required.
4. Configure the connection. The fields depend on the integration you selected:
3. Select an integration type — **OpenTelemetry Traces** or **OpenTelemetry Logs**.
4. Configure the connection:
- **Endpoint** — Your collector's OTLP endpoint (e.g., `https://otel-collector.example.com:4317`).
- **Service Name** — A name to identify this service in your observability platform.
- **Custom Headers** *(optional)* — Add authentication or routing headers as key-value pairs.
- **Certificate** *(optional)* — Provide a TLS certificate if your collector requires one.
5. Click **Save**.
<Tabs>
<Tab title="OpenTelemetry Traces / Logs">
**OpenTelemetry Traces** and **OpenTelemetry Logs** are separate integrations that share the same fields — pick the one matching the signal you want to export.
- **Endpoint** — Your collector's OTLP endpoint (e.g., `https://otel-collector.example.com:4317`).
- **Service Name** — A name to identify this service in your observability platform.
- **Custom Headers** *(optional)* — Add authentication or routing headers as key-value pairs.
- **Certificate** *(optional)* — Provide a TLS certificate if your collector requires one.
<Frame>![OpenTelemetry collector configuration](/images/crewai-otel-collector-opentelemetry.png)</Frame>
</Tab>
<Tab title="Datadog">
- **Datadog Site Domain** — Your Datadog site's OTLP host only, with no protocol or path. CrewAI builds the full HTTPS OTLP endpoint for you. Use the host that matches your [Datadog site](https://docs.datadoghq.com/getting_started/site/):
- `otlp.datadoghq.com` (US1)
- `otlp.us3.datadoghq.com` (US3)
- `otlp.us5.datadoghq.com` (US5)
- `otlp.datadoghq.eu` (EU1)
- `otlp.ap1.datadoghq.com` (AP1)
- **API Key** — Your Datadog API key. See [how to create one](https://docs.datadoghq.com/account_management/api-app-keys/#api-keys).
The Datadog integration exports **traces**.
<Frame>![Datadog collector configuration](/images/crewai-otel-collector-datadog.png)</Frame>
</Tab>
</Tabs>
5. *(optional)* Click **Test Connection** to verify CrewAI can reach the endpoint with the credentials you provided.
6. Click **Save**.
<Frame>![OpenTelemetry Collector Configuration](/images/crewai-otel-collector-config.png)</Frame>
<Tip>
You can add multiple collectors — for example, one for traces and another for logs, or send to different backends for different purposes.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 455 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 420 KiB

View File

@@ -24,39 +24,15 @@ CrewAI AMP는 배포에서 OpenTelemetry **트레이스**와 **로그**를 자
1. CrewAI AMP에서 **Settings** > **OpenTelemetry Collectors**로 이동합니다.
2. **Add Collector**를 클릭합니다.
3. 통합을 선택합니다:
- **OpenTelemetry Traces** 및 **OpenTelemetry Logs** — OTLP 호환 수집기 또는 백엔드로 내보냅니다.
- **Datadog** — 별도의 수집기나 Datadog Agent 없이 트레이스를 Datadog의 OTLP 인테이크로 직접 전송합니다.
4. 연결을 구성합니다. 필드는 선택한 통합에 따라 달라집니다:
3. 통합 유형을 선택합니다 — **OpenTelemetry Traces** 또는 **OpenTelemetry Logs**.
4. 연결을 구성합니다:
- **Endpoint** — 수집기의 OTLP 엔드포인트 (예: `https://otel-collector.example.com:4317`).
- **Service Name** — 관측 가능성 플랫폼에서 이 서비스를 식별하기 위한 이름.
- **Custom Headers** *(선택 사항)* — 인증 또는 라우팅 헤더를 키-값 쌍으로 추가합니다.
- **Certificate** *(선택 사항)* — 수집기에서 TLS 인증서가 필요한 경우 제공합니다.
5. **Save**를 클릭합니다.
<Tabs>
<Tab title="OpenTelemetry Traces / Logs">
**OpenTelemetry Traces**와 **OpenTelemetry Logs**는 동일한 필드를 공유하는 별개의 통합입니다 — 내보내려는 신호에 맞는 것을 선택하세요.
- **Endpoint** — 수집기의 OTLP 엔드포인트 (예: `https://otel-collector.example.com:4317`).
- **Service Name** — 관측 가능성 플랫폼에서 이 서비스를 식별하기 위한 이름.
- **Custom Headers** *(선택 사항)* — 인증 또는 라우팅 헤더를 키-값 쌍으로 추가합니다.
- **Certificate** *(선택 사항)* — 수집기에서 TLS 인증서가 필요한 경우 제공합니다.
<Frame>![OpenTelemetry 수집기 구성](/images/crewai-otel-collector-opentelemetry.png)</Frame>
</Tab>
<Tab title="Datadog">
- **Datadog Site Domain** — Datadog 사이트의 OTLP 호스트만 입력합니다 (프로토콜이나 경로 제외). CrewAI가 전체 HTTPS OTLP 엔드포인트를 자동으로 구성합니다. [Datadog 사이트](https://docs.datadoghq.com/getting_started/site/)에 맞는 호스트를 사용하세요:
- `otlp.datadoghq.com` (US1)
- `otlp.us3.datadoghq.com` (US3)
- `otlp.us5.datadoghq.com` (US5)
- `otlp.datadoghq.eu` (EU1)
- `otlp.ap1.datadoghq.com` (AP1)
- **API Key** — Datadog API 키입니다. [키 생성 방법](https://docs.datadoghq.com/account_management/api-app-keys/#api-keys)을 참고하세요.
Datadog 통합은 **트레이스**를 내보냅니다.
<Frame>![Datadog 수집기 구성](/images/crewai-otel-collector-datadog.png)</Frame>
</Tab>
</Tabs>
5. *(선택 사항)* **Test Connection**을 클릭하여 제공한 자격 증명으로 CrewAI가 엔드포인트에 연결할 수 있는지 확인합니다.
6. **Save**를 클릭합니다.
<Frame>![OpenTelemetry 수집기 구성](/images/crewai-otel-collector-config.png)</Frame>
<Tip>
여러 수집기를 추가할 수 있습니다 — 예를 들어, 트레이스용 하나와 로그용 하나를 추가하거나, 다른 목적을 위해 다른 백엔드로 전송할 수 있습니다.

View File

@@ -24,39 +24,15 @@ Os dados de telemetria seguem as [convenções semânticas GenAI do OpenTelemetr
1. No CrewAI AMP, vá para **Settings** > **OpenTelemetry Collectors**.
2. Clique em **Add Collector**.
3. Selecione uma integração:
- **OpenTelemetry Traces** e **OpenTelemetry Logs** — exporte para qualquer coletor ou backend compatível com OTLP.
- **Datadog** — envie traces diretamente para a ingestão OTLP do Datadog, sem precisar de um coletor separado ou do Datadog Agent.
4. Configure a conexão. Os campos dependem da integração selecionada:
3. Selecione um tipo de integração — **OpenTelemetry Traces** ou **OpenTelemetry Logs**.
4. Configure a conexão:
- **Endpoint** — O endpoint OTLP do seu coletor (por exemplo, `https://otel-collector.example.com:4317`).
- **Service Name** — Um nome para identificar este serviço na sua plataforma de observabilidade.
- **Custom Headers** *(opcional)* — Adicione headers de autenticação ou roteamento como pares chave-valor.
- **Certificate** *(opcional)* — Forneça um certificado TLS se o seu coletor exigir um.
5. Clique em **Save**.
<Tabs>
<Tab title="OpenTelemetry Traces / Logs">
**OpenTelemetry Traces** e **OpenTelemetry Logs** são integrações separadas que compartilham os mesmos campos — escolha a que corresponde ao sinal que você quer exportar.
- **Endpoint** — O endpoint OTLP do seu coletor (por exemplo, `https://otel-collector.example.com:4317`).
- **Service Name** — Um nome para identificar este serviço na sua plataforma de observabilidade.
- **Custom Headers** *(opcional)* — Adicione headers de autenticação ou roteamento como pares chave-valor.
- **Certificate** *(opcional)* — Forneça um certificado TLS se o seu coletor exigir um.
<Frame>![Configuração do coletor OpenTelemetry](/images/crewai-otel-collector-opentelemetry.png)</Frame>
</Tab>
<Tab title="Datadog">
- **Datadog Site Domain** — Apenas o host OTLP do seu site Datadog, sem protocolo ou caminho. O CrewAI monta o endpoint HTTPS OTLP completo para você. Use o host correspondente ao seu [site Datadog](https://docs.datadoghq.com/getting_started/site/):
- `otlp.datadoghq.com` (US1)
- `otlp.us3.datadoghq.com` (US3)
- `otlp.us5.datadoghq.com` (US5)
- `otlp.datadoghq.eu` (EU1)
- `otlp.ap1.datadoghq.com` (AP1)
- **API Key** — Sua chave de API do Datadog. Veja [como criar uma](https://docs.datadoghq.com/account_management/api-app-keys/#api-keys).
A integração com o Datadog exporta **traces**.
<Frame>![Configuração do coletor Datadog](/images/crewai-otel-collector-datadog.png)</Frame>
</Tab>
</Tabs>
5. *(opcional)* Clique em **Test Connection** para verificar se o CrewAI consegue acessar o endpoint com as credenciais fornecidas.
6. Clique em **Save**.
<Frame>![Configuração do Coletor OpenTelemetry](/images/crewai-otel-collector-config.png)</Frame>
<Tip>
Você pode adicionar múltiplos coletores — por exemplo, um para traces e outro para logs, ou enviar para diferentes backends para diferentes propósitos.

View File

@@ -1,17 +1,15 @@
"""Conversational graph + helpers as an experimental Flow extension.
"""Conversational graph + helpers as a mixin for ``Flow`` (experimental).
The conversational chat surface remains experimental and may change before the
v2 graduation path. It lives here so ``crewai.flow.runtime`` can stay focused
on the execution engine. ``crewai.flow.flow`` composes this mixin onto the
public ``Flow`` class for backwards compatibility.
The built-in conversational graph only registers for subclasses that opt in
with ``conversational = True``. Static conversational metadata is projected
into ``FlowDefinition.conversational`` via the Python DSL builder.
The experimental conversational chat surface lives here as a mixin so that
``crewai.flow.runtime`` stays focused on the execution engine. ``Flow``
inherits from ``_ConversationalMixin``; the methods only register on
subclasses that opt in via ``conversational = True`` (enforced by the
``_conversational_only`` marker + ``FlowMeta`` gating in
``crewai.flow.runtime``).
Import surface:
- :class:`_ConversationalMixin` — internal; the public ``Flow`` class
composes it in. Users don't import it directly.
- :class:`_ConversationalMixin` — internal; ``Flow`` mixes it in. Users
don't import it directly.
- The data types this mixin uses live in
:mod:`crewai.experimental.conversational`.
"""
@@ -22,7 +20,7 @@ from collections.abc import Callable, Mapping, Sequence
from enum import Enum
import json
import logging
from typing import TYPE_CHECKING, Any, ClassVar, Literal, TypeVar, cast
from typing import TYPE_CHECKING, Any, ClassVar, Literal, cast
from pydantic import BaseModel, Field, create_model
@@ -51,56 +49,21 @@ from crewai.utilities.types import LLMMessage
if TYPE_CHECKING:
from crewai.flow.runtime import Flow
from crewai.llms.base_llm import BaseLLM
logger = logging.getLogger(__name__)
def _iter_condition_labels(condition: Any) -> set[str]:
if isinstance(condition, str):
return {condition}
if isinstance(condition, dict):
labels: set[str] = set()
for value in condition.values():
if isinstance(value, list):
for item in value:
labels.update(_iter_condition_labels(item))
else:
labels.update(_iter_condition_labels(value))
return labels
return set()
class _ConversationalMixin:
"""Experimental conversational graph for ``Flow``.
"""Built-in conversational graph for ``Flow`` (gated on ``conversational``).
This mixin owns chat behavior and runtime hooks. Non-chat flows see these
methods as inert attributes unless they opt in with ``conversational = True``.
Mixed into ``Flow`` so its execution engine (``runtime.py``) stays focused
on running graphs. The methods here only register on subclasses that set
``conversational = True``; non-chat flows see them as inert attributes.
"""
# === EXPERIMENTAL: conversational mode ===
# When ``conversational = True`` on a Flow subclass, this mixin's built-in
# graph registers and ``handle_turn`` / ``chat`` become chat entry points.
conversational: ClassVar[bool] = False
conversational_config: ClassVar[ConversationConfig | None] = None
builtin_routes: ClassVar[tuple[str, ...]] = ("converse", "end")
internal_routes: ClassVar[tuple[str, ...]] = (
"answer_from_history",
"conversation_start",
)
builtin_route_descriptions: ClassVar[dict[str, str]] = {
"converse": (
"Ordinary chat, follow-ups, summaries, clarifications, and "
"questions answerable from prior conversation history."
),
"end": ("User signals the conversation is finished (goodbye, exit, done)."),
"answer_from_history": (
"Answer directly from prior conversation history without invoking "
"tools, agents, or custom routes."
),
}
# The metaclass + state attributes referenced below live on ``Flow`` —
# this mixin is never instantiated standalone. These type-only
# declarations exist so static analyzers don't flag attribute access.
@@ -108,15 +71,22 @@ class _ConversationalMixin:
# (otherwise mypy flags "Cannot override instance variable with class
# variable" when Flow declares them as ``ClassVar``).
if TYPE_CHECKING:
conversational: ClassVar[bool]
conversational_config: ClassVar[ConversationConfig | None]
builtin_routes: ClassVar[tuple[str, ...]]
internal_routes: ClassVar[tuple[str, ...]]
builtin_route_descriptions: ClassVar[dict[str, str]]
# Registry ClassVars populated by ``FlowMeta`` at class creation.
_listeners: ClassVar[dict[Any, Any]]
# Instance attrs from ``Flow``.
state: Any
name: str | None
_completed_methods: set[Any]
_method_outputs: list[Any]
_pending_events: dict[Any, Any]
_pending_and_listeners: dict[Any, Any]
_method_call_counts: dict[Any, int]
_is_execution_resuming: bool
_conversation_messages: list[LLMMessage]
_pending_user_message: str | dict[str, Any] | None
_pending_intents: Sequence[str] | None
_pending_intent_llm: str | BaseLLM | None
@@ -127,8 +97,8 @@ class _ConversationalMixin:
def _collapse_to_outcome(
self,
feedback: str,
outcomes: Sequence[str],
llm: str | BaseLLM,
outcomes: tuple[str, ...],
llm: str | BaseLLM | Any,
) -> str:
pass
@@ -268,8 +238,8 @@ class _ConversationalMixin:
state = cast(ConversationState, self.state)
sid = session_id or state.id
# Stash the pending turn so the kickoff extension hook picks it up
# after persist restore.
# Stash the pending turn so ``_apply_pending_conversational_turn``
# picks it up AFTER persist restore.
self._pending_user_message = message
self._pending_intents = list(intents) if intents else None
self._pending_intent_llm = intent_llm
@@ -316,7 +286,7 @@ class _ConversationalMixin:
callers can customize prompts or exercise the loop without patching
builtins.
"""
if not self._is_conversational_enabled():
if not getattr(type(self), "conversational", False):
raise ValueError("Flow.chat() is only available on conversational flows")
exit_set = {command.lower() for command in exit_commands}
@@ -521,14 +491,14 @@ class _ConversationalMixin:
**extra: Any,
) -> None:
"""Append a message to conversation history (legacy ChatState path)."""
_append_conversation_message(cast(Any, self), role, content, **extra)
_append_conversation_message(cast("Flow[Any]", self), role, content, **extra)
@property
def conversation_messages(self) -> list[LLMMessage]:
"""Message history from state, coerced to LLM-shaped dicts."""
return [
message_to_llm_dict(message)
for message in get_conversation_messages(cast(Any, self))
for message in get_conversation_messages(cast("Flow[Any]", self))
]
def receive_user_message(
@@ -544,7 +514,7 @@ class _ConversationalMixin:
``state.messages`` and preserve ``last_intent`` across turns.
Non-conversational flows fall through to the legacy helper.
"""
if self._is_conversational_enabled():
if self.conversational:
state = cast(ConversationState, self.state)
state.messages.append(ConversationMessage(role="user", content=text))
self._emit_conversation_message_added(
@@ -565,7 +535,9 @@ class _ConversationalMixin:
return intent
return text
return _receive_user_message(cast(Any, self), text, outcomes=outcomes, llm=llm)
return _receive_user_message(
cast("Flow[Any]", self), text, outcomes=outcomes, llm=llm
)
def classify_intent(
self,
@@ -589,104 +561,27 @@ class _ConversationalMixin:
def _conversation_config(self) -> ConversationConfig | None:
return getattr(type(self), "conversational_config", None)
@property
def _conversation_definition(self) -> Any | None:
return self._conversation_flow_definition().conversational
def _conversation_flow_definition(self) -> Any:
flow_definition = getattr(type(self), "flow_definition", None)
if not callable(flow_definition):
raise AttributeError(
f"{type(self).__name__} does not expose flow_definition()"
)
return flow_definition()
@classmethod
def _conversational_definition(cls) -> Any | None:
flow_definition = getattr(cls, "flow_definition", None)
if not callable(flow_definition):
return None
return flow_definition().conversational
@classmethod
def _is_conversational(cls) -> bool:
definition = cls._conversational_definition()
return bool(definition and definition.enabled)
def _is_conversational_enabled(self) -> bool:
definition = self._conversation_definition
return bool(definition and definition.enabled)
def _initialize_runtime_extension_attrs(self) -> None:
if not isinstance(getattr(self, "_conversation_messages", None), list):
object.__setattr__(self, "_conversation_messages", [])
if not hasattr(self, "_pending_user_message"):
object.__setattr__(self, "_pending_user_message", None)
if not hasattr(self, "_pending_intents"):
object.__setattr__(self, "_pending_intents", None)
if not hasattr(self, "_pending_intent_llm"):
object.__setattr__(self, "_pending_intent_llm", None)
def _create_default_extension_state(self) -> ConversationState | None:
initial_state_t = getattr(self, "_initial_state_t", None)
if type(self)._is_conversational() and (
not hasattr(self, "_initial_state_t")
or isinstance(initial_state_t, TypeVar)
):
return ConversationState()
return None
def _should_apply_pending_kickoff_context(self) -> bool:
return (
type(self)._is_conversational() and self._pending_user_message is not None
)
def _apply_pending_kickoff_context(self) -> None:
self._apply_pending_conversational_turn()
def _order_start_methods_for_kickoff(
self,
start_methods: list[Any],
) -> tuple[list[Any], bool]:
if not type(self)._is_conversational():
return start_methods, False
conversation_start = "conversation_start"
if conversation_start not in {str(method) for method in start_methods}:
return start_methods, False
ordered_starts = [
method for method in start_methods if str(method) != conversation_start
]
ordered_starts.append(
next(
method for method in start_methods if str(method) == conversation_start
)
)
return ordered_starts, True
def _should_defer_trace_finalization(self) -> bool:
"""Whether per-turn ``FlowFinished`` + ``finalize_batch`` should be skipped.
True when either:
- ``flow.defer_trace_finalization`` is set on the instance, OR
- the static conversational definition enables deferred finalization.
- the class-level ``ConversationConfig.defer_trace_finalization``
on a conversational subclass is True.
Either source enables the deferred-session pattern. The caller
eventually invokes ``finalize_session_traces()`` to close the batch.
"""
if getattr(self, "defer_trace_finalization", False):
return True
definition = self._conversation_definition
return bool(
definition and definition.enabled and definition.defer_trace_finalization
)
config = self._conversation_config
return bool(config and config.defer_trace_finalization)
def _reset_turn_execution_state(self) -> None:
"""Clear per-execution tracking so the next turn re-runs the graph."""
self._completed_methods.clear()
self._method_outputs.clear()
self._pending_events.clear()
self._pending_and_listeners.clear()
self._method_call_counts.clear()
self._clear_or_listeners()
self._is_execution_resuming = False
@@ -838,12 +733,11 @@ class _ConversationalMixin:
router_config: RouterConfig | None,
) -> dict[str, str]:
label_to_method: dict[str, str] = {}
flow_definition = self._conversation_flow_definition()
for listener_name, method_definition in flow_definition.methods.items():
if method_definition.listen is None or method_definition.router:
continue
for trigger_label in _iter_condition_labels(method_definition.listen):
label_to_method.setdefault(trigger_label, listener_name)
for listener_name, condition in self._listeners.items():
if isinstance(condition, tuple):
_, trigger_labels = condition
for trigger_label in trigger_labels:
label_to_method.setdefault(str(trigger_label), str(listener_name))
routes = self._effective_routes(router_config)
overrides = (
@@ -894,31 +788,21 @@ class _ConversationalMixin:
def _valid_route_labels(self) -> set[str]:
labels: set[str] = set()
flow_definition = self._conversation_flow_definition()
for method_definition in flow_definition.methods.values():
if method_definition.listen is None or method_definition.router:
continue
labels.update(_iter_condition_labels(method_definition.listen))
for condition in self._listeners.values():
if isinstance(condition, tuple):
_, methods = condition
labels.update(str(method) for method in methods)
return labels
def _effective_routes(self, router_config: RouterConfig | None = None) -> set[str]:
custom_routes = set(router_config.routes or ()) if router_config else set()
definition = self._conversation_definition
builtin_routes = (
tuple(definition.builtin_routes)
if definition is not None
else self.builtin_routes
)
internal_routes = (
tuple(definition.internal_routes)
if definition is not None
else self.internal_routes
)
if not custom_routes:
custom_routes = (
self._valid_route_labels() - set(builtin_routes) - set(internal_routes)
self._valid_route_labels()
- set(self.builtin_routes)
- set(self.internal_routes)
)
return custom_routes | set(builtin_routes)
return custom_routes | set(self.builtin_routes)
def _default_conversation_llm(self) -> Any | None:
config = self._conversation_config

View File

@@ -1,50 +0,0 @@
"""Static conversational Flow definition models.
This module is part of the serializable Flow Definition contract. It should
only contain static data shapes. Experimental conversational runtime behavior
continues to live in ``crewai.experimental.conversational_mixin``.
"""
from __future__ import annotations
from typing import Any, Literal
from pydantic import BaseModel, Field
class FlowConversationalRouterDefinition(BaseModel):
"""Static conversational router configuration."""
prompt: str | None = None
response_format: Any = None
llm: Any = None
routes: list[str] | None = None
route_descriptions: dict[str, str] | None = None
default_intent: str | None = "converse"
fallback_intent: str | None = "converse"
intent_field: str = "intent"
class FlowConversationalDefinition(BaseModel):
"""Static conversational Flow configuration."""
enabled: bool = False
system_prompt: str | None = None
llm: Any = None
router: FlowConversationalRouterDefinition | None = None
answer_from_history_prompt: str | None = None
default_intents: list[str] | None = None
intent_llm: Any = None
answer_from_history_llm: Any = None
visible_agent_outputs: list[str] | Literal["all"] | None = None
defer_trace_finalization: bool = True
builtin_routes: list[str] = Field(default_factory=lambda: ["converse", "end"])
internal_routes: list[str] = Field(
default_factory=lambda: ["answer_from_history", "conversation_start"]
)
__all__ = [
"FlowConversationalDefinition",
"FlowConversationalRouterDefinition",
]

View File

@@ -9,8 +9,6 @@ from typing_extensions import TypeIs
from crewai.flow.flow_definition import (
FlowConfigDefinition,
FlowConversationalDefinition,
FlowConversationalRouterDefinition,
FlowDefinition,
FlowDefinitionDiagnostic,
FlowHumanFeedbackDefinition,
@@ -29,13 +27,6 @@ R = TypeVar("R")
logger = logging.getLogger(__name__)
_FLOW_METHOD_DEFINITION_ATTR = "__flow_method_definition__"
_FLOW_METHOD_METADATA_ATTRS = [
"__conversational_only__",
"__flow_method_definition__",
"__flow_persistence_config__",
"__human_feedback_config__",
"_human_feedback_llm",
]
def is_flow_method(obj: Any) -> TypeIs[FlowMethod[Any, Any]]:
@@ -51,39 +42,6 @@ def _should_include_flow_method(flow_class: type, method: Any) -> bool:
return True
def _is_conversational_flow(flow_class: type) -> bool:
return bool(getattr(flow_class, "conversational", False))
def _get_inherited_conversational_method(
flow_class: type,
attr_name: str,
) -> Any | None:
if not _is_conversational_flow(flow_class):
return None
for base in flow_class.__mro__[1:]:
inherited = base.__dict__.get(attr_name)
if inherited is None:
continue
if getattr(inherited, "__conversational_only__", False) and is_flow_method(
inherited
):
return inherited
return None
def _stamp_inherited_conversational_metadata(
method: Any,
inherited: Any,
) -> Any:
for attr in _FLOW_METHOD_METADATA_ATTRS:
if hasattr(inherited, attr):
setattr(method, attr, getattr(inherited, attr))
method.__is_flow_method__ = True
return method
def _set_flow_method_definition(
wrapper: FlowMethod[P, R],
definition: FlowMethodDefinition,
@@ -177,8 +135,6 @@ def _build_state_definition(
from pydantic import BaseModel as PydanticBaseModel
state_value = getattr(flow_class, "_initial_state_t", None)
if isinstance(state_value, TypeVar):
state_value = None
initial_state = getattr(flow_class, "initial_state", None)
if initial_state is not None:
state_value = initial_state
@@ -274,98 +230,6 @@ def _build_persistence_definition(
)
def _build_conversational_router_definition(
router_config: Any,
diagnostics: list[FlowDefinitionDiagnostic],
path: str,
) -> FlowConversationalRouterDefinition | None:
if router_config is None:
return None
routes = getattr(router_config, "routes", None)
return FlowConversationalRouterDefinition(
prompt=getattr(router_config, "prompt", None),
response_format=_serialize_static_value(
getattr(router_config, "response_format", None),
diagnostics,
f"{path}.response_format",
),
llm=_serialize_static_value(
getattr(router_config, "llm", None), diagnostics, f"{path}.llm"
),
routes=[str(route) for route in routes] if routes is not None else None,
route_descriptions=getattr(router_config, "route_descriptions", None),
default_intent=getattr(router_config, "default_intent", "converse"),
fallback_intent=getattr(router_config, "fallback_intent", "converse"),
intent_field=str(getattr(router_config, "intent_field", "intent")),
)
def _build_conversational_definition(
flow_class: type,
diagnostics: list[FlowDefinitionDiagnostic],
) -> FlowConversationalDefinition | None:
if not _is_conversational_flow(flow_class):
return None
config = getattr(flow_class, "conversational_config", None)
builtin_routes = getattr(flow_class, "builtin_routes", ("converse", "end"))
internal_routes = getattr(
flow_class,
"internal_routes",
("answer_from_history", "conversation_start"),
)
if config is None:
return FlowConversationalDefinition(
enabled=True,
builtin_routes=[str(route) for route in builtin_routes],
internal_routes=[str(route) for route in internal_routes],
)
default_intents = getattr(config, "default_intents", None)
visible_agent_outputs = getattr(config, "visible_agent_outputs", None)
return FlowConversationalDefinition(
enabled=True,
system_prompt=getattr(config, "system_prompt", None),
llm=_serialize_static_value(
getattr(config, "llm", None), diagnostics, "conversational.llm"
),
router=_build_conversational_router_definition(
getattr(config, "router", None),
diagnostics,
"conversational.router",
),
answer_from_history_prompt=getattr(config, "answer_from_history_prompt", None),
default_intents=(
[str(intent) for intent in default_intents]
if default_intents is not None
else None
),
intent_llm=_serialize_static_value(
getattr(config, "intent_llm", None),
diagnostics,
"conversational.intent_llm",
),
answer_from_history_llm=_serialize_static_value(
getattr(config, "answer_from_history_llm", None),
diagnostics,
"conversational.answer_from_history_llm",
),
visible_agent_outputs=(
"all"
if visible_agent_outputs == "all"
else [str(output) for output in visible_agent_outputs]
if visible_agent_outputs is not None
else None
),
defer_trace_finalization=bool(
getattr(config, "defer_trace_finalization", True)
),
builtin_routes=[str(route) for route in builtin_routes],
internal_routes=[str(route) for route in internal_routes],
)
def _build_method_definition(
method: Any,
diagnostics: list[FlowDefinitionDiagnostic],
@@ -377,11 +241,6 @@ def _build_method_definition(
else:
method_definition = fragment.model_copy(deep=True)
# Skip <locals>/<lambda> qualnames: they can never be re-imported, so a
# missing handler is more honest than a dead reference.
if "<" not in method.__qualname__:
method_definition.handler = f"{method.__module__}:{method.__qualname__}"
human_feedback = _build_human_feedback_definition(
method, diagnostics, f"{path}.human_feedback"
)
@@ -411,29 +270,6 @@ def _iter_flow_methods(flow_class: type) -> dict[str, Any]:
flow_class, attr_value
):
methods[attr_name] = attr_value
continue
inherited = _get_inherited_conversational_method(flow_class, attr_name)
if inherited is not None and callable(attr_value):
methods[attr_name] = _stamp_inherited_conversational_metadata(
attr_value, inherited
)
if _is_conversational_flow(flow_class):
for base in reversed(flow_class.__mro__[1:]):
for attr_name, raw_value in base.__dict__.items():
if attr_name.startswith("_") or attr_name in methods:
continue
if not getattr(raw_value, "__conversational_only__", False):
continue
try:
attr_value = getattr(flow_class, attr_name)
except AttributeError:
continue
if is_flow_method(attr_value) and _should_include_flow_method(
flow_class, attr_value
):
methods[attr_name] = attr_value
# A wrapped method whose name collides with a base Flow model field
# (e.g. ``checkpoint``) is absorbed by Pydantic as a field; the underlying
@@ -478,7 +314,6 @@ def _build_flow_definition_from_class(
state=_build_state_definition(flow_class, diagnostics),
config=_build_config_definition(flow_class, diagnostics),
persist=_build_persistence_definition(flow_class, diagnostics, "persist"),
conversational=_build_conversational_definition(flow_class, diagnostics),
methods=methods,
diagnostics=diagnostics,
)

View File

@@ -6,22 +6,15 @@ The implementation now lives in three modules, split by concern:
``@router``, ``or_`` / ``and_``) and Python Flow class projection
- ``crewai.flow.flow_definition`` -- the serializable Flow Definition contract
- ``crewai.flow.runtime`` -- the Flow execution engine and state
- ``crewai.experimental.conversational_mixin`` -- experimental conversational
runtime extension composed onto the public ``Flow`` class
Prefer importing from those modules in new code; this module preserves the
historical ``crewai.flow.flow`` import path.
"""
from typing import Any, TypeVar
from pydantic import BaseModel
from crewai.experimental.conversational_mixin import _ConversationalMixin
from crewai.flow.dsl import and_, listen, or_, router, start
from crewai.flow.runtime import (
_INITIAL_STATE_CLASS_MARKER,
Flow as RuntimeFlow,
Flow,
FlowMeta,
FlowState,
LockedDictProxy,
@@ -30,13 +23,6 @@ from crewai.flow.runtime import (
)
T = TypeVar("T", bound=dict[str, Any] | BaseModel)
class Flow(_ConversationalMixin, RuntimeFlow[T]):
"""Public Flow class with experimental conversational extension behavior."""
__all__ = [
"_INITIAL_STATE_CLASS_MARKER",
"Flow",

View File

@@ -16,11 +16,6 @@ from typing import Any, Literal as TypingLiteral
from pydantic import BaseModel, ConfigDict, Field
import yaml
from crewai.flow.conversational_definition import (
FlowConversationalDefinition,
FlowConversationalRouterDefinition,
)
logger = logging.getLogger(__name__)
@@ -28,8 +23,6 @@ FlowDefinitionCondition = str | dict[str, Any]
__all__ = [
"FlowConfigDefinition",
"FlowConversationalDefinition",
"FlowConversationalRouterDefinition",
"FlowDefinition",
"FlowDefinitionCondition",
"FlowDefinitionDiagnostic",
@@ -52,9 +45,8 @@ class FlowDefinitionDiagnostic(BaseModel):
class FlowStateDefinition(BaseModel):
"""Static description of a Flow state contract."""
type: TypingLiteral["dict", "pydantic", "json_schema", "unknown"] = "dict"
type: TypingLiteral["dict", "pydantic", "unknown"] = "dict"
ref: str | None = None
json_schema: dict[str, Any] | None = None
default: Any = None
@@ -94,7 +86,6 @@ class FlowHumanFeedbackDefinition(BaseModel):
class FlowMethodDefinition(BaseModel):
"""Static definition of one Flow method and its execution roles."""
handler: str | None = None
start: bool | FlowDefinitionCondition | None = None
listen: FlowDefinitionCondition | None = None
router: bool = False
@@ -124,7 +115,6 @@ class FlowDefinition(BaseModel):
state: FlowStateDefinition | None = None
config: FlowConfigDefinition = Field(default_factory=FlowConfigDefinition)
persist: FlowPersistenceDefinition | None = None
conversational: FlowConversationalDefinition | None = None
methods: dict[str, FlowMethodDefinition] = Field(default_factory=dict)
diagnostics: list[FlowDefinitionDiagnostic] = Field(default_factory=list)

View File

@@ -22,7 +22,6 @@ from concurrent.futures import Future, ThreadPoolExecutor
import contextvars
import copy
import enum
import importlib
import inspect
import logging
import threading
@@ -85,13 +84,17 @@ from crewai.events.types.flow_events import (
MethodExecutionPausedEvent,
MethodExecutionStartedEvent,
)
from crewai.experimental.conversational import (
ConversationConfig,
ConversationState,
)
from crewai.experimental.conversational_mixin import _ConversationalMixin
from crewai.flow.dsl._utils import build_flow_definition
from crewai.flow.flow_context import current_flow_id, current_flow_request_id
from crewai.flow.flow_definition import (
FlowDefinition,
FlowDefinitionCondition,
FlowMethodDefinition,
FlowStateDefinition,
)
from crewai.flow.flow_wrappers import (
FlowMethod,
@@ -136,6 +139,7 @@ from crewai.utilities.streaming import (
signal_end,
signal_error,
)
from crewai.utilities.types import LLMMessage
# Runtime alias so Pydantic can resolve the ``execution_context`` field's
@@ -150,108 +154,14 @@ ExecutionContext = Any # type: ignore[assignment,misc]
logger = logging.getLogger(__name__)
def _condition_branches(
condition: dict[str, Any],
) -> tuple[Literal["and", "or"], list[FlowDefinitionCondition]]:
if "and" in condition:
return "and", condition["and"]
return "or", condition["or"]
def _condition_satisfied(condition: FlowDefinitionCondition, events: set[str]) -> bool:
if isinstance(condition, str):
return condition in events
operator, branches = _condition_branches(condition)
combine = all if operator == "and" else any
return combine(_condition_satisfied(branch, events) for branch in branches)
def _resolve_handler(ref: str) -> Callable[..., Any]:
module_name, separator, qualname = ref.partition(":")
if not separator or not module_name or not qualname:
raise ValueError(
f"invalid handler reference {ref!r}; expected 'module:qualname'"
)
module = importlib.import_module(module_name)
target: Any = module
for part in qualname.split("."):
target = getattr(target, part)
if not callable(target):
raise TypeError(
f"handler reference {ref!r} resolved to a non-callable "
f"{type(target).__name__}"
)
return cast(Callable[..., Any], target)
def _build_definition_state_model(
state_definition: FlowStateDefinition,
) -> BaseModel | None:
kwargs = (
dict(state_definition.default)
if isinstance(state_definition.default, dict)
else {}
)
model_class: type[BaseModel] | None = None
if state_definition.ref:
try:
resolved = _resolve_handler(state_definition.ref)
except Exception:
logger.warning(
"Could not import state ref %r", state_definition.ref, exc_info=True
)
else:
if isinstance(resolved, type) and issubclass(resolved, BaseModel):
model_class = resolved
else:
logger.warning(
"State ref %r is not a pydantic model", state_definition.ref
)
if model_class is None and state_definition.json_schema:
from crewai.utilities.pydantic_schema_utils import create_model_from_schema
try:
model_class = create_model_from_schema(state_definition.json_schema)
except Exception:
logger.warning(
"Could not build a state model from the declared json_schema",
exc_info=True,
)
if model_class is None:
return None
if not issubclass(model_class, FlowState):
class StateWithId(FlowState, model_class): # type: ignore[misc, valid-type]
pass
model_class = StateWithId
return model_class(**kwargs)
def _iter_condition_events(condition: FlowDefinitionCondition) -> Iterator[str]:
if isinstance(condition, str):
yield condition
return
_, branches = _condition_branches(condition)
for branch in branches:
yield from _iter_condition_events(branch)
def _or_alternative_events(condition: FlowDefinitionCondition) -> Iterator[str]:
if isinstance(condition, str):
yield condition
return
operator, branches = _condition_branches(condition)
if operator != "or":
return
for branch in branches:
yield from _or_alternative_events(branch)
sub_conditions = condition["and"] if "and" in condition else condition["or"]
for sub_condition in sub_conditions:
yield from _iter_condition_events(sub_condition)
def _is_multi_event_or(
@@ -260,8 +170,7 @@ def _is_multi_event_or(
if isinstance(condition, str):
return False
operator, branches = _condition_branches(condition)
return operator == "or" and len(branches) > 1
return "or" in condition and len(condition["or"]) > 1
def _resolve_persistence(value: Any) -> Any:
@@ -707,7 +616,7 @@ class FlowMeta(ModelMetaclass):
return super().__new__(mcs, name, bases, namespace)
class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
"""Base class for all flows.
type parameter T must be either dict[str, Any] or a subclass of BaseModel."""
@@ -721,33 +630,41 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
_flow_definition: ClassVar[FlowDefinition | None] = None
# === EXPERIMENTAL: conversational mode ===
# When ``conversational = True`` on a subclass, the built-in conversational
# graph (``conversation_start`` -> ``route_conversation`` -> ``converse_turn``
# / ``end_conversation`` / ``answer_from_history_turn``) registers and
# ``handle_turn`` / ``chat`` become the chat entry points. When ``False``
# (default), the methods exist as inert attributes and never register or
# fire — non-chat flows pay no runtime cost.
#
# ⚠ EXPERIMENTAL FEATURE. The whole conversational surface
# (``conversational`` ClassVar, ``handle_turn``, ``chat``,
# ``ConversationConfig``, ``RouterConfig``, ``ConversationState``, the
# built-in graph + helpers) lives under ``crewai.experimental`` and may
# change shape before graduating. Pin your CrewAI version if you depend on
# specific behavior, and watch the changelog for breaking updates.
conversational: ClassVar[bool] = False
conversational_config: ClassVar[ConversationConfig | None] = None
builtin_routes: ClassVar[tuple[str, ...]] = ("converse", "end")
internal_routes: ClassVar[tuple[str, ...]] = (
"answer_from_history",
"conversation_start",
)
builtin_route_descriptions: ClassVar[dict[str, str]] = {
"converse": (
"Ordinary chat, follow-ups, summaries, clarifications, and "
"questions answerable from prior conversation history."
),
"end": ("User signals the conversation is finished (goodbye, exit, done)."),
"answer_from_history": (
"Answer directly from prior conversation history without invoking "
"tools, agents, or custom routes."
),
}
entity_type: Literal["flow"] = "flow"
def _initialize_runtime_extension_attrs(self) -> None:
"""Initialize optional runtime-extension attributes."""
def _create_default_extension_state(self) -> Any | None:
"""Return a default state supplied by an optional runtime extension."""
return None
def _should_apply_pending_kickoff_context(self) -> bool:
"""Whether an optional runtime extension has pending kickoff context."""
return False
def _apply_pending_kickoff_context(self) -> None:
"""Apply optional runtime-extension kickoff context."""
def _order_start_methods_for_kickoff(
self,
start_methods: list[FlowMethodName],
) -> tuple[list[FlowMethodName], bool]:
"""Allow an optional runtime extension to order kickoff start methods."""
return start_methods, False
def _should_defer_trace_finalization(self) -> bool:
"""Whether this kickoff should defer final flow trace finalization."""
return bool(getattr(self, "defer_trace_finalization", False))
@classmethod
def flow_definition(cls) -> FlowDefinition:
"""Return the static Flow Definition built from this Flow class."""
@@ -758,24 +675,21 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
return flow_definition
@classmethod
def from_definition(cls, definition: FlowDefinition) -> Flow[Any]:
"""Build a runnable Flow directly from a definition; no subclass required."""
return cls.model_validate({}, context={"flow_definition": definition})
def _start_method_names(self) -> list[FlowMethodName]:
def _start_method_names(cls) -> list[FlowMethodName]:
return [
FlowMethodName(method_name)
for method_name, method_definition in self._definition.methods.items()
for method_name, method_definition in cls.flow_definition().methods.items()
if method_definition.is_start
]
@classmethod
def _listener_methods(
self,
cls,
) -> Iterator[tuple[FlowMethodName, FlowMethodDefinition, FlowDefinitionCondition]]:
# (name, definition, condition) for every non-start method that listens.
# Routers are included (they listen too); callers wanting only plain
# listeners filter on definition.router.
for method_name, method_definition in self._definition.methods.items():
for method_name, method_definition in cls.flow_definition().methods.items():
if method_definition.listen is not None and not method_definition.is_start:
yield (
FlowMethodName(method_name),
@@ -783,22 +697,25 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
method_definition.listen,
)
@classmethod
def _start_condition(
self, method_name: FlowMethodName
cls, method_name: FlowMethodName
) -> FlowDefinitionCondition | None:
method_definition = self._definition.methods[str(method_name)]
method_definition = cls.flow_definition().methods[str(method_name)]
start = method_definition.start
if isinstance(start, (str, dict)):
return start
return None
@classmethod
def _listen_condition(
self, method_name: FlowMethodName
cls, method_name: FlowMethodName
) -> FlowDefinitionCondition | None:
return self._definition.methods[str(method_name)].listen
return cls.flow_definition().methods[str(method_name)].listen
def _is_router(self, method_name: FlowMethodName) -> bool:
return self._definition.methods[str(method_name)].router
@classmethod
def _is_router(cls, method_name: FlowMethodName) -> bool:
return cls.flow_definition().methods[str(method_name)].router
initial_state: Annotated[ # type: ignore[type-arg]
type[BaseModel] | type[dict] | dict[str, Any] | BaseModel | None,
@@ -941,13 +858,13 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
restore_event_scope(())
reset_last_event_id()
_methods: dict[FlowMethodName, Callable[..., Any]] = PrivateAttr(
_methods: dict[FlowMethodName, FlowMethod[Any, Any]] = PrivateAttr(
default_factory=dict
)
_method_execution_counts: dict[FlowMethodName, int] = PrivateAttr(
default_factory=dict
)
_pending_events: dict[PendingListenerKey, set[str]] = PrivateAttr(
_pending_and_listeners: dict[PendingListenerKey, set[int]] = PrivateAttr(
default_factory=dict
)
_fired_or_listeners: set[FlowMethodName] = PrivateAttr(default_factory=set)
@@ -955,7 +872,6 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
PrivateAttr(default=None)
)
_method_outputs: list[Any] = PrivateAttr(default_factory=list)
_definition: FlowDefinition = PrivateAttr()
_state_lock: threading.Lock = PrivateAttr(default_factory=threading.Lock)
_or_listeners_lock: threading.Lock = PrivateAttr(default_factory=threading.Lock)
_completed_methods: set[FlowMethodName] = PrivateAttr(default_factory=set)
@@ -966,6 +882,10 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
_human_feedback_method_outputs: dict[str, Any] = PrivateAttr(default_factory=dict)
_input_history: list[InputHistoryEntry] = PrivateAttr(default_factory=list)
_state: Any = PrivateAttr(default=None)
_conversation_messages: list[LLMMessage] = PrivateAttr(default_factory=list)
_pending_user_message: str | dict[str, Any] | None = PrivateAttr(default=None)
_pending_intents: Sequence[str] | None = PrivateAttr(default=None)
_pending_intent_llm: str | "BaseLLM" | None = PrivateAttr(default=None)
_deferred_flow_started_event_id: str | None = PrivateAttr(default=None)
def __class_getitem__(cls: type[Flow[T]], item: type[T]) -> type[Flow[T]]: # type: ignore[override]
@@ -984,26 +904,13 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
object.__setattr__(self, name, value)
def model_post_init(self, __context: Any) -> None:
definition = (
__context.get("flow_definition") if isinstance(__context, dict) else None
)
self._flow_post_init(definition)
self._flow_post_init()
def _flow_post_init(self, definition: FlowDefinition | None = None) -> None:
def _flow_post_init(self) -> None:
"""Heavy initialization: state creation, events, memory, method registration."""
if getattr(self, "_flow_post_init_done", False):
return
object.__setattr__(self, "_flow_post_init_done", True)
self._initialize_runtime_extension_attrs()
self._definition = definition or type(self).flow_definition()
if self.name and self.name != self._definition.name:
self._definition = self._definition.model_copy(update={"name": self.name})
methods = (
self._handler_bound_methods()
if definition is not None
else self._class_bound_methods()
)
if self._state is None:
self._state = self._create_initial_state()
@@ -1019,7 +926,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
self,
FlowCreatedEvent(
type="flow_created",
flow_name=self._definition.name,
flow_name=self.name or self.__class__.__name__,
),
)
@@ -1029,44 +936,17 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
if self.memory is None and not getattr(self, "_skip_auto_memory", False):
from crewai.memory.utils import sanitize_scope_name
flow_name = sanitize_scope_name(self._definition.name)
flow_name = sanitize_scope_name(self.name or self.__class__.__name__)
self.memory = Memory(root_scope=f"/flow/{flow_name}")
self._methods.update(methods)
def _handler_bound_methods(self) -> dict[FlowMethodName, Callable[..., Any]]:
methods: dict[FlowMethodName, Callable[..., Any]] = {}
unresolved: list[str] = []
for method_name, method_definition in self._definition.methods.items():
if method_definition.handler is None:
unresolved.append(f"{method_name}: no handler")
continue
try:
handler = _resolve_handler(method_definition.handler)
except Exception as e:
unresolved.append(f"{method_name}: {e}")
continue
if getattr(handler, "__self__", None) is None:
handler = handler.__get__(self, type(self))
methods[FlowMethodName(method_name)] = handler
if unresolved:
raise ValueError(
f"Cannot build flow {self._definition.name!r} from its definition; "
"methods with missing or unresolvable handlers: "
+ "; ".join(unresolved)
)
return methods
def _class_bound_methods(self) -> dict[FlowMethodName, Callable[..., Any]]:
methods: dict[FlowMethodName, Callable[..., Any]] = {}
for method_name in self._definition.methods:
# Build the runtime method lookup from the static FlowDefinition.
for method_name in type(self).flow_definition().methods:
method = getattr(self, method_name, None)
if method is None:
continue
if not hasattr(method, "__self__"):
method = method.__get__(self, type(self))
methods[FlowMethodName(method_name)] = method
return methods
method = method.__get__(self, self.__class__)
self._methods[FlowMethodName(method_name)] = method
def recall(self, query: str, **kwargs: Any) -> Any:
"""Recall relevant memories. Delegates to this flow's memory.
@@ -1144,11 +1024,14 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
def _start_condition_triggered_by(
self, method_name: FlowMethodName, trigger: FlowMethodName
) -> bool:
condition = self._start_condition(method_name)
condition = type(self)._start_condition(method_name)
if condition is None:
return False
return self._condition_met(
condition, trigger, PendingListenerKey(f"start:{method_name}")
return self._evaluate_condition(
condition,
trigger,
method_name,
pending_key_prefix=f"start:{method_name}",
)
def _rearm_or_listeners_for_trigger(
@@ -1172,7 +1055,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
trigger_str = str(trigger)
to_discard: list[FlowMethodName] = []
for listener_name in candidates:
condition = self._listen_condition(listener_name)
condition = type(self)._listen_condition(listener_name)
if condition is None:
continue
if trigger_str in _iter_condition_events(condition):
@@ -1188,13 +1071,12 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
# Only events that EXCLUSIVELY feed one OR listener race; an event that
# also feeds another listener (e.g. an AND) is left alone when a sibling
# wins. e.g. @listen(or_(a, b)) on handler -> {frozenset({a, b}): handler}.
# Events nested under an and_() branch (e.g. or_(and_(a, b), c)) are not
# alternatives and never race -- cancelling one would make the AND
# unsatisfiable.
racing_groups: dict[frozenset[FlowMethodName], FlowMethodName] = {}
listener_conditions: dict[FlowMethodName, FlowDefinitionCondition] = {
listener_name: condition
for listener_name, method_definition, condition in self._listener_methods()
for listener_name, method_definition, condition in type(
self
)._listener_methods()
if not method_definition.router
}
@@ -1211,14 +1093,14 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
for listener_name, condition in listener_conditions.items():
if not isinstance(condition, dict):
continue
alternatives = set(_or_alternative_events(condition))
if len(alternatives) <= 1:
events = events_by_listener[listener_name]
if "or" not in condition or len(events) <= 1:
continue
exclusive_events = {
event
for event in alternatives
if listeners_by_event[event] == {listener_name}
for event in events
if listeners_by_event.get(event, set()) == {listener_name}
}
if len(exclusive_events) > 1:
# Racing only applies to method-completion events: each member is
@@ -1467,7 +1349,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
self,
FlowStartedEvent(
type="flow_started",
flow_name=self._definition.name,
flow_name=self.name or self.__class__.__name__,
inputs=None,
),
)
@@ -1543,7 +1425,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
self,
MethodExecutionFinishedEvent(
type="method_execution_finished",
flow_name=self._definition.name,
flow_name=self.name or self.__class__.__name__,
method_name=context.method_name,
result=collapsed_outcome if emit else result,
state=self._state,
@@ -1597,7 +1479,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
self,
FlowPausedEvent(
type="flow_paused",
flow_name=self._definition.name,
flow_name=self.name or self.__class__.__name__,
flow_id=e.context.flow_id,
method_name=e.context.method_name,
state=self._copy_and_serialize_state(),
@@ -1625,7 +1507,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
self,
FlowFinishedEvent(
type="flow_finished",
flow_name=self._definition.name,
flow_name=self.name or self.__class__.__name__,
result=final_result,
state=self._copy_and_serialize_state(),
),
@@ -1658,15 +1540,20 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
"""
init_state = self.initial_state
if init_state is None:
extension_state = self._create_default_extension_state()
if extension_state is not None:
return cast(T, extension_state)
# Conversational subclasses default to ``ConversationState`` if the
# user didn't supply an explicit type parameter (``Flow[...]``) or an
# ``initial_state``. This makes ``class MyChat(Flow): conversational
# = True`` work without forcing every user to import and parameterize
# ``ConversationState`` themselves.
if (
init_state is None
and getattr(type(self), "conversational", False)
and not hasattr(self, "_initial_state_t")
):
return cast(T, ConversationState())
if init_state is None and hasattr(self, "_initial_state_t"):
state_type = self._initial_state_t
if isinstance(state_type, TypeVar):
state_type = None
if isinstance(state_type, type):
if issubclass(state_type, FlowState):
instance = state_type()
@@ -1686,7 +1573,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
return cast(T, {"id": str(uuid4())})
if init_state is None:
return cast(T, self._create_definition_state())
return cast(T, {"id": str(uuid4())})
if isinstance(init_state, type):
state_class = init_state
@@ -1728,34 +1615,6 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
f"Initial state must be dict or BaseModel, got {type(self.initial_state)}"
)
def _create_definition_state(self) -> dict[str, Any] | BaseModel:
state_definition = self._definition.state
if state_definition is None:
return {"id": str(uuid4())}
if state_definition.type in ("pydantic", "json_schema"):
state = _build_definition_state_model(state_definition)
if state is not None:
return state
logger.error(
"Flow %r declares %s state but neither ref nor json_schema "
"produced a model; falling back to dict state",
self._definition.name,
state_definition.type,
)
elif state_definition.type == "unknown":
logger.warning(
"Flow %r declares state of unknown type; falling back to dict state",
self._definition.name,
)
dict_state: dict[str, Any] = (
dict(state_definition.default)
if isinstance(state_definition.default, dict)
else {}
)
if "id" not in dict_state:
dict_state["id"] = str(uuid4())
return dict_state
def _copy_state(self) -> T:
"""Create a copy of the current state.
@@ -2169,7 +2028,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
# Clear completed methods and outputs for a fresh start
self._completed_methods.clear()
self._method_outputs.clear()
self._pending_events.clear()
self._pending_and_listeners.clear()
self._clear_or_listeners()
self._method_call_counts.clear()
else:
@@ -2264,11 +2123,12 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
if should_emit_flow_started:
# In normal flows, each kickoff owns its own flow lifecycle.
# Deferred sessions reuse the first flow scope until an
# explicit finalization call closes the batch.
# Deferred conversational sessions are different: the first
# turn opens the flow scope and later turns reuse it until
# ``finalize_session_traces()`` emits the single finish event.
started_event = FlowStartedEvent(
type="flow_started",
flow_name=self._definition.name,
flow_name=self.name or self.__class__.__name__,
inputs=inputs,
)
future = crewai_event_bus.emit(self, started_event)
@@ -2295,8 +2155,16 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
# with implicit "crew" execution_type.
get_env_context()
if self._should_apply_pending_kickoff_context():
self._apply_pending_kickoff_context()
# Conversational hook: apply the pending user message AFTER state
# restore and AFTER flow scope initialization, so transcript events
# are parented under the current conversation trace.
# ``handle_turn`` stashes the message on ``self._pending_user_message``
# before calling ``kickoff``; this drains it.
if (
getattr(type(self), "conversational", False)
and self._pending_user_message is not None
):
self._apply_pending_conversational_turn()
if inputs is not None and "id" not in inputs:
self._initialize_state(inputs)
@@ -2308,29 +2176,22 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
# Determine which start methods to execute at kickoff
# Conditional start methods are only triggered by their conditions
# UNLESS there are no unconditional starts (then all starts run as entry points)
start_methods = self._start_method_names()
start_methods = type(self)._start_method_names()
unconditional_starts = [
start_method
for start_method in start_methods
if self._start_condition(start_method) is None
if type(self)._start_condition(start_method) is None
]
# If there are unconditional starts, only run those at kickoff
# If there are NO unconditional starts, run all starts (including conditional ones)
starts_to_execute = (
unconditional_starts if unconditional_starts else start_methods
)
starts_to_execute, run_starts_sequentially = (
self._order_start_methods_for_kickoff(starts_to_execute)
)
if run_starts_sequentially:
for start_method in starts_to_execute:
await self._execute_start_method(start_method)
else:
tasks = [
self._execute_start_method(start_method)
for start_method in starts_to_execute
]
await asyncio.gather(*tasks)
tasks = [
self._execute_start_method(start_method)
for start_method in starts_to_execute
]
await asyncio.gather(*tasks)
except Exception as e:
# Check if flow was paused for human feedback
from crewai.flow.async_feedback.types import HumanFeedbackPending
@@ -2360,7 +2221,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
self,
FlowPausedEvent(
type="flow_paused",
flow_name=self._definition.name,
flow_name=self.name or self.__class__.__name__,
flow_id=e.context.flow_id,
method_name=e.context.method_name,
state=self._copy_and_serialize_state(),
@@ -2402,15 +2263,16 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
# When ``defer_trace_finalization`` is set, skip both per-turn
# ``FlowFinishedEvent`` AND trace-batch finalization. The caller
# invokes the matching finalization hook once at session end. The
# flag is read from either the instance attribute or an extension
# definition.
# invokes ``finalize_session_traces()`` once at session end to
# close out the whole conversation as one trace. The flag is
# read from EITHER the instance attribute (set by user code) OR
# the class-level ``ConversationConfig.defer_trace_finalization``.
if not self._should_defer_trace_finalization():
future = crewai_event_bus.emit(
self,
FlowFinishedEvent(
type="flow_finished",
flow_name=self._definition.name,
flow_name=self.name or self.__class__.__name__,
result=final_output,
state=self._copy_and_serialize_state(),
),
@@ -2484,7 +2346,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
MethodExecutionFinishedEvent,
MethodExecutionFailedEvent,
)
flow_name = self._definition.name
flow_name = self.name or self.__class__.__name__
nodes = sorted(
(
n
@@ -2543,7 +2405,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
)
# If start method is a router, use its result as an additional trigger
if self._is_router(start_method_name) and result is not None:
if type(self)._is_router(start_method_name) and result is not None:
# Execute listeners for the start method name first
await self._execute_listeners(start_method_name, result, finished_event_id)
# Then execute listeners for the router result (e.g., "approved")
@@ -2563,16 +2425,15 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
def _inject_trigger_payload_for_start_method(
self, original_method: Callable[..., Any]
) -> Callable[..., Any]:
accepts_trigger_payload = (
"crewai_trigger_payload" in inspect.signature(original_method).parameters
)
def prepare_kwargs(
*args: Any, **kwargs: Any
) -> tuple[tuple[Any, ...], dict[str, Any]]:
inputs = cast(dict[str, Any], baggage.get_baggage("flow_inputs") or {})
trigger_payload = inputs.get("crewai_trigger_payload")
sig = inspect.signature(original_method)
accepts_trigger_payload = "crewai_trigger_payload" in sig.parameters
if trigger_payload is not None and accepts_trigger_payload:
kwargs["crewai_trigger_payload"] = trigger_payload
elif trigger_payload is not None:
@@ -2622,7 +2483,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
MethodExecutionStartedEvent(
type="method_execution_started",
method_name=method_name,
flow_name=self._definition.name,
flow_name=self.name or self.__class__.__name__,
params=dumped_params,
state=self._copy_and_serialize_state(),
),
@@ -2674,7 +2535,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
finished_event = MethodExecutionFinishedEvent(
type="method_execution_finished",
method_name=method_name,
flow_name=self._definition.name,
flow_name=self.name or self.__class__.__name__,
state=self._copy_and_serialize_state(),
result=result,
)
@@ -2703,7 +2564,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
MethodExecutionPausedEvent(
type="method_execution_paused",
method_name=method_name,
flow_name=self._definition.name,
flow_name=self.name or self.__class__.__name__,
state=self._copy_and_serialize_state(),
flow_id=e.context.flow_id,
message=e.context.message,
@@ -2719,7 +2580,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
MethodExecutionFailedEvent(
type="method_execution_failed",
method_name=method_name,
flow_name=self._definition.name,
flow_name=self.name or self.__class__.__name__,
error=e,
),
)
@@ -2851,7 +2712,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
await asyncio.gather(*tasks)
if current_trigger in router_results:
for method_name in self._start_method_names():
for method_name in type(self)._start_method_names():
if self._start_condition_triggered_by(
method_name, current_trigger
):
@@ -2864,25 +2725,72 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
else:
await self._execute_start_method(method_name)
def _condition_met(
def _evaluate_condition(
self,
condition: FlowDefinitionCondition,
trigger_method: FlowMethodName,
subscription_key: PendingListenerKey,
listener_name: FlowMethodName,
pending_key_prefix: str | None = None,
) -> bool:
seen = self._pending_events.setdefault(subscription_key, set())
seen.add(str(trigger_method))
if not _condition_satisfied(condition, seen):
return False
del self._pending_events[subscription_key]
return True
if isinstance(condition, str):
return condition == str(trigger_method)
def _sub_prefix(index: int) -> str | None:
if pending_key_prefix is None:
return None
return f"{pending_key_prefix}:{index}"
if "or" in condition:
# Evaluate every sub-condition (no short-circuit): a nested and_()
# branch needs the chance to clear its pending state in
# _pending_and_listeners even when an earlier branch already matched.
any_matched = False
for index, sub_condition in enumerate(condition["or"]):
if self._evaluate_condition(
sub_condition,
trigger_method,
listener_name,
pending_key_prefix=_sub_prefix(index),
):
any_matched = True
return any_matched
sub_conditions = condition["and"]
pending_key = PendingListenerKey(
pending_key_prefix
if pending_key_prefix is not None
else f"{listener_name}:{id(condition)}"
)
if pending_key not in self._pending_and_listeners:
self._pending_and_listeners[pending_key] = set(range(len(sub_conditions)))
pending_conditions = self._pending_and_listeners[pending_key]
for index, sub_condition in enumerate(sub_conditions):
if index not in pending_conditions:
continue
if self._evaluate_condition(
sub_condition,
trigger_method,
listener_name,
pending_key_prefix=_sub_prefix(index),
):
pending_conditions.discard(index)
if not pending_conditions:
self._pending_and_listeners.pop(pending_key, None)
return True
return False
def _find_triggered_methods(
self, trigger_method: FlowMethodName, router_only: bool
) -> list[FlowMethodName]:
triggered: list[FlowMethodName] = []
for listener_name, method_definition, condition in self._listener_methods():
for listener_name, method_definition, condition in type(
self
)._listener_methods():
is_router = method_definition.router
if router_only != is_router:
continue
@@ -2891,8 +2799,10 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
if should_check_fired and listener_name in self._fired_or_listeners:
continue
if self._condition_met(
condition, trigger_method, PendingListenerKey(str(listener_name))
if self._evaluate_condition(
condition,
trigger_method,
listener_name,
):
triggered.append(listener_name)
if should_check_fired:
@@ -2948,10 +2858,10 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
# For routers, also check if any conditional starts they triggered are completed
# If so, continue their chains
if self._is_router(listener_name):
for start_method_name in self._start_method_names():
if type(self)._is_router(listener_name):
for start_method_name in type(self)._start_method_names():
if (
self._start_condition(start_method_name) is not None
type(self)._start_condition(start_method_name) is not None
and start_method_name in self._completed_methods
):
# This conditional start was executed, continue its chain
@@ -2970,7 +2880,8 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
method = self._methods[listener_name]
sig = inspect.signature(method)
method_params = [p for p in sig.parameters.values() if p.name != "self"]
params = list(sig.parameters.values())
method_params = [p for p in params if p.name != "self"]
if triggering_event_id:
with triggered_by_scope(triggering_event_id):
@@ -3026,7 +2937,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
return self.input_provider
if flow_config.input_provider is not None:
return flow_config.input_provider
return cast(InputProvider, ConsoleProvider())
return ConsoleProvider()
def _checkpoint_state_for_ask(self) -> None:
"""Auto-checkpoint flow state before waiting for user input.
@@ -3126,7 +3037,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
self,
FlowInputRequestedEvent(
type="flow_input_requested",
flow_name=self._definition.name,
flow_name=self.name or self.__class__.__name__,
method_name=method_name,
message=message,
metadata=metadata,
@@ -3145,7 +3056,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
executor = ThreadPoolExecutor(max_workers=1)
ctx = contextvars.copy_context()
future = executor.submit(
ctx.run, provider.request_input, message, cast(Any, self), metadata
ctx.run, provider.request_input, message, self, metadata
)
try:
raw = future.result(timeout=timeout)
@@ -3158,9 +3069,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
# cancel_futures=True cleans up any queued-but-not-started tasks.
executor.shutdown(wait=False, cancel_futures=True)
else:
raw = provider.request_input(
message, cast(Any, self), metadata=metadata
)
raw = provider.request_input(message, self, metadata=metadata)
except KeyboardInterrupt:
raise
except Exception:
@@ -3193,7 +3102,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
self,
FlowInputReceivedEvent(
type="flow_input_received",
flow_name=self._definition.name,
flow_name=self.name or self.__class__.__name__,
method_name=method_name,
message=message,
response=response,
@@ -3231,7 +3140,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
self,
HumanFeedbackRequestedEvent(
type="human_feedback_requested",
flow_name=self._definition.name,
flow_name=self.name or self.__class__.__name__,
method_name="", # Will be set by decorator if needed
output=output,
message=message,
@@ -3260,7 +3169,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
self,
HumanFeedbackReceivedEvent(
type="human_feedback_received",
flow_name=self._definition.name,
flow_name=self.name or self.__class__.__name__,
method_name="", # Will be set by decorator if needed
feedback=feedback,
outcome=None, # Will be determined after collapsing
@@ -3435,10 +3344,10 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
self,
FlowPlotEvent(
type="flow_plot",
flow_name=self._definition.name,
flow_name=self.name or self.__class__.__name__,
),
)
structure = build_flow_structure(cast(Any, self))
structure = build_flow_structure(self)
return render_interactive(structure, filename=filename, show=show)
@staticmethod

View File

@@ -16,7 +16,7 @@ R = TypeVar("R", covariant=True)
FlowMethodName = NewType("FlowMethodName", str)
PendingListenerKey = NewType(
"PendingListenerKey",
Annotated[str, "listener method name, or 'start:<method>' for conditional starts"],
Annotated[str, "nested flow conditions use 'listener_name:object_id'"],
)

View File

@@ -259,9 +259,8 @@ class RecallFlow(Flow[RecallState]):
candidates = []
if not candidates:
candidates = [scope_prefix]
selected_scopes = candidates[:20]
self.state.candidate_scopes = selected_scopes
return selected_scopes
self.state.candidate_scopes = candidates[:20]
return self.state.candidate_scopes
@listen(filter_and_chunk)
def search_chunks(self) -> list[Any]:
@@ -369,10 +368,9 @@ class RecallFlow(Flow[RecallState]):
)
)
matches.sort(key=lambda m: m.score, reverse=True)
final_results = matches[: self.state.limit]
self.state.final_results = final_results
self.state.final_results = matches[: self.state.limit]
if self.state.evidence_gaps and self.state.final_results:
self.state.final_results[0].evidence_gaps = list(self.state.evidence_gaps)
return final_results
return self.state.final_results

View File

@@ -999,11 +999,7 @@ def _json_schema_to_pydantic_field(
if examples:
schema_extra["examples"] = examples
default = (
json_schema["default"]
if "default" in json_schema
else (... if is_required else None)
)
default = ... if is_required else None
if isinstance(type_, type) and issubclass(type_, (int, float)):
if "minimum" in json_schema:

View File

@@ -32,7 +32,7 @@ def _build_executor(**kwargs: Any) -> AgentExecutor:
executor._method_outputs = []
executor._completed_methods = set()
executor._fired_or_listeners = set()
executor._pending_events = {}
executor._pending_and_listeners = {}
executor._method_execution_counts = {}
executor._method_call_counts = {}
executor._event_futures = []

View File

@@ -1157,25 +1157,6 @@ def test_flow_name():
assert flow.name == "MyFlow"
def test_flow_custom_name_overrides_class_name_in_events():
class InternalFlowClass(Flow):
name = "PublicName"
@start()
def begin(self):
return "done"
received = []
@crewai_event_bus.on(FlowStartedEvent)
def handle(source, event):
received.append(event)
InternalFlowClass().kickoff()
assert received[0].flow_name == "PublicName"
def test_nested_and_or_conditions():
"""Test nested conditions like or_(and_(A, B), and_(C, D)).
@@ -1561,63 +1542,40 @@ def test_deeply_nested_conditions():
def test_or_branch_does_not_leave_stale_and_state():
fired = []
"""or_() over nested and_() branches must not leave stale pending AND state.
Regression: evaluating an or_() condition stopped at the first branch that was
satisfied, so a later and_() branch that the *same* trigger would have completed
never cleared its pending state. On the next cycle that trigger alone then
spuriously re-satisfied the whole condition. Both branches share the final
event ``x`` here, so the shared trigger that completes branch ``(a AND x)`` also
completes branch ``(c AND x)`` and both must be cleared together.
"""
class StaleStateFlow(Flow):
@start()
def begin(self):
pass
@listen(begin)
def a(self):
pass
@listen(begin)
def c(self):
pass
@listen(and_(a, c))
def x(self):
pass
@listen(or_(and_("a", "x"), and_("c", "y")))
@listen(or_(and_("a", "x"), and_("c", "x")))
def joined(self):
fired.append("joined")
pass
@router(joined)
def emit_y(self):
return "y"
flow = StaleStateFlow()
condition = type(flow)._listen_condition("joined")
StaleStateFlow().kickoff()
def fires(trigger):
return flow._evaluate_condition(condition, trigger, "joined")
assert fired == ["joined"]
# First cycle: "a" then "c" arrive, then the shared "x" completes (a AND x).
assert fires("a") is False
assert fires("c") is False
assert fires("x") is True
def test_and_branch_inside_or_does_not_race():
execution_order = []
class DiamondWithFallbackFlow(Flow):
@start()
def go(self):
execution_order.append("go")
@listen(go)
def a(self):
execution_order.append("a")
@listen(go)
def b(self):
execution_order.append("b")
@listen(or_(and_(a, b), "fallback"))
def done(self):
execution_order.append("done")
DiamondWithFallbackFlow().kickoff()
assert "done" in execution_order
assert execution_order.index("done") > execution_order.index("a")
assert execution_order.index("done") > execution_order.index("b")
# Next cycle: "x" alone must NOT re-satisfy the condition. The "c" from the
# previous cycle was consumed when "joined" fired, so neither branch is half
# complete and "x" by itself is insufficient.
assert fires("x") is False
def test_mixed_sync_async_execution_order():

View File

@@ -169,6 +169,9 @@ class TestConversationalFlow:
)
@pytest.mark.skip(
reason="Experimental conversational registry behavior is out of scope for the definition-first start migration."
)
def test_handle_turn_routes_to_listener_and_records_public_result(self) -> None:
@ConversationConfig(default_intents=["research"], intent_llm="gpt-4o-mini")
class ResearchFlow(ConversationalFlow):
@@ -592,6 +595,9 @@ class TestConversationalFlow:
assert result == "legacy-searched"
assert flow.state.last_intent == "search"
@pytest.mark.skip(
reason="Experimental conversational sequential-start behavior is out of scope for the definition-first start migration."
)
def test_user_start_methods_run_sequentially_before_router_in_conversational_mode(
self,
) -> None:
@@ -643,6 +649,9 @@ class TestConversationalFlow:
assert "attach_bus" in order # still fires every turn
assert "route_turn" in order
@pytest.mark.skip(
reason="Experimental inherited conversational start registration is out of scope for the definition-first start migration."
)
def test_subclass_can_override_conversation_start_without_redecorating(
self,
) -> None:
@@ -1333,12 +1342,6 @@ class TestFlowTracingWhenSuppressed:
class TestDeferTraceFinalization:
def test_bare_conversational_flow_defers_by_default(self) -> None:
class BareChat(ConversationalFlow):
pass
assert BareChat()._should_defer_trace_finalization() is True
def test_conversation_config_drives_defer_flag(self) -> None:
"""``ConversationConfig(defer_trace_finalization=...)`` controls whether
a conversational subclass defers per-turn trace finalization."""

View File

@@ -13,7 +13,6 @@ from pydantic import BaseModel
import crewai.flow.dsl as flow_dsl
import crewai.flow.flow_definition as flow_definition
import crewai.flow.visualization.builder as visualization_builder
from crewai.experimental import ConversationConfig, RouterConfig
from crewai.flow import Flow, and_, human_feedback, listen, or_, persist, router, start
@@ -37,8 +36,6 @@ def test_flow_public_exports_are_explicit():
}
assert set(flow_definition.__all__) == {
"FlowConfigDefinition",
"FlowConversationalDefinition",
"FlowConversationalRouterDefinition",
"FlowDefinition",
"FlowDefinitionCondition",
"FlowDefinitionDiagnostic",
@@ -172,7 +169,6 @@ def test_flow_definition_maps_dsl_to_static_contract():
assert definition.state.ref and "ContractState" in definition.state.ref
assert definition.config.stream is True
assert definition.config.max_method_calls == 7
assert definition.conversational is None
assert definition.methods["begin"].start is True
assert definition.methods["process"].listen == "begin"
@@ -205,74 +201,27 @@ def test_flow_definition_excludes_conversational_builtins_for_regular_flows():
methods = RegularFlow.flow_definition().methods
assert RegularFlow.flow_definition().conversational is None
assert set(methods) == {"begin"}
assert "conversation_start" not in methods
assert "route_conversation" not in methods
assert "converse_turn" not in methods
@pytest.mark.skip(
reason="Experimental conversational inherited built-ins are out of scope for the definition-first start migration."
)
def test_flow_definition_includes_conversational_builtins_when_enabled():
class ChatFlow(Flow):
conversational = True
definition = ChatFlow.flow_definition()
methods = definition.methods
methods = ChatFlow.flow_definition().methods
assert definition.conversational is not None
assert definition.conversational.enabled is True
assert definition.conversational.defer_trace_finalization is True
assert definition.conversational.builtin_routes == ["converse", "end"]
assert "conversation_start" in methods
assert "route_conversation" in methods
assert "converse_turn" in methods
assert methods["conversation_start"].start is True
def test_flow_definition_serializes_conversational_config():
@ConversationConfig(
system_prompt="Be concise.",
llm="gpt-4o-mini",
router=RouterConfig(
prompt="Pick a route.",
routes=["research"],
default_intent="converse",
fallback_intent="end",
),
default_intents=["research"],
visible_agent_outputs=["researcher"],
defer_trace_finalization=False,
)
class ChatFlow(Flow):
conversational = True
conversational = ChatFlow.flow_definition().conversational
assert conversational is not None
assert conversational.system_prompt == "Be concise."
assert conversational.llm == "gpt-4o-mini"
assert conversational.default_intents == ["research"]
assert conversational.visible_agent_outputs == ["researcher"]
assert conversational.defer_trace_finalization is False
assert conversational.router is not None
assert conversational.router.prompt == "Pick a route."
assert conversational.router.routes == ["research"]
assert conversational.router.fallback_intent == "end"
def test_flow_definition_preserves_undecorated_conversational_override():
class ChatFlow(Flow):
conversational = True
def conversation_start(self) -> str | None:
return "custom"
methods = ChatFlow.flow_definition().methods
assert methods["conversation_start"].start is True
assert "route_conversation" in methods
def test_flow_definition_serializes_human_feedback_metadata():
marker = object()

View File

@@ -1,508 +0,0 @@
from __future__ import annotations
import pytest
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.flow_events import (
MethodExecutionFinishedEvent,
MethodExecutionStartedEvent,
)
from crewai.flow import Flow, and_, listen, or_, router, start
from crewai.flow.flow import FlowState
from crewai.flow.flow_definition import FlowDefinition
class ChainFlow(Flow):
@start()
def begin(self):
self.state["begin_ran"] = True
return "hello"
@listen(begin)
def shout(self, result):
return result.upper()
@listen(shout)
def confirm(self):
self.state["confirmed"] = True
return f"confirmed:{self.state['confirmed']}"
CHAIN_YAML = f"""
schema: crewai.flow/v1
name: ChainFlow
methods:
begin:
handler: {__name__}:ChainFlow.begin
start: true
shout:
handler: {__name__}:ChainFlow.shout
listen: begin
confirm:
handler: {__name__}:ChainFlow.confirm
listen: shout
"""
class MergeFlow(Flow):
@start()
def begin(self):
return "go"
@listen(begin)
def left(self):
return "left"
@listen(begin)
def right(self):
return "right"
@listen(or_(left, right))
def either(self):
self.state["either_ran"] = True
return "either"
@listen(and_(left, right, either))
def join(self):
self.state["joined"] = True
return "joined"
MERGE_YAML = f"""
schema: crewai.flow/v1
name: MergeFlow
methods:
begin:
handler: {__name__}:MergeFlow.begin
start: true
left:
handler: {__name__}:MergeFlow.left
listen: begin
right:
handler: {__name__}:MergeFlow.right
listen: begin
either:
handler: {__name__}:MergeFlow.either
listen:
or: [left, right]
join:
handler: {__name__}:MergeFlow.join
listen:
and: [left, right, either]
"""
class RouteFlow(Flow):
@start()
def begin(self):
return "go"
@router(begin)
def decide(self):
return "left" if self.state.get("direction") == "left" else "right"
@listen("left")
def take_left(self):
return "took-left"
@listen("right")
def take_right(self):
return "took-right"
ROUTE_YAML = f"""
schema: crewai.flow/v1
name: RouteFlow
methods:
begin:
handler: {__name__}:RouteFlow.begin
start: true
decide:
handler: {__name__}:RouteFlow.decide
listen: begin
router: true
take_left:
handler: {__name__}:RouteFlow.take_left
listen: left
take_right:
handler: {__name__}:RouteFlow.take_right
listen: right
"""
class LoopFlow(Flow):
@start("retry")
def step(self):
self.state["count"] = self.state.get("count", 0) + 1
return self.state["count"]
@router(step)
def decide(self):
if self.state["count"] < 3:
return "retry"
return "done"
@listen("done")
def finish(self):
return f"finished:{self.state['count']}"
LOOP_YAML = f"""
schema: crewai.flow/v1
name: LoopFlow
methods:
step:
handler: {__name__}:LoopFlow.step
start: retry
decide:
handler: {__name__}:LoopFlow.decide
listen: step
router: true
finish:
handler: {__name__}:LoopFlow.finish
listen: done
"""
class CounterState(FlowState):
count: int = 0
label: str = "none"
class PydanticStateFlow(Flow[CounterState]):
@start()
def begin(self):
self.state.count += 1
return self.state.count
@listen(begin)
def finish(self):
self.state.label = f"count={self.state.count}"
return self.state.label
PYDANTIC_STATE_YAML = f"""
schema: crewai.flow/v1
name: PydanticStateFlow
state:
type: pydantic
ref: {__name__}:CounterState
methods:
begin:
handler: {__name__}:PydanticStateFlow.begin
start: true
finish:
handler: {__name__}:PydanticStateFlow.finish
listen: begin
"""
PYDANTIC_STATE_OVERLAY_YAML = f"""
schema: crewai.flow/v1
name: PydanticStateFlow
state:
type: pydantic
ref: {__name__}:CounterState
default:
count: 5
methods:
begin:
handler: {__name__}:PydanticStateFlow.begin
start: true
finish:
handler: {__name__}:PydanticStateFlow.finish
listen: begin
"""
JSON_SCHEMA_STATE_YAML = f"""
schema: crewai.flow/v1
name: JsonSchemaStateFlow
state:
type: json_schema
json_schema:
title: CounterState
type: object
properties:
count:
type: integer
default: 0
label:
type: string
default: none
methods:
begin:
handler: {__name__}:PydanticStateFlow.begin
start: true
finish:
handler: {__name__}:PydanticStateFlow.finish
listen: begin
"""
PYDANTIC_REF_WITH_SCHEMA_FALLBACK_YAML = f"""
schema: crewai.flow/v1
name: SchemaFallbackFlow
state:
type: pydantic
ref: definitely_not_a_module_xyz:MissingState
json_schema:
title: CounterState
type: object
properties:
count:
type: integer
default: 0
label:
type: string
default: none
methods:
begin:
handler: {__name__}:PydanticStateFlow.begin
start: true
finish:
handler: {__name__}:PydanticStateFlow.finish
listen: begin
"""
UNRESOLVABLE_STATE_YAML = f"""
schema: crewai.flow/v1
name: UnresolvableStateFlow
state:
type: pydantic
ref: definitely_not_a_module_xyz:MissingState
methods:
begin:
handler: {__name__}:ChainFlow.begin
start: true
"""
DICT_STATE_YAML = f"""
schema: crewai.flow/v1
name: DictStateFlow
state:
type: dict
default:
count: 5
methods:
begin:
handler: {__name__}:ChainFlow.begin
start: true
"""
UNKNOWN_STATE_YAML = f"""
schema: crewai.flow/v1
name: UnknownStateFlow
state:
type: unknown
ref: somewhere:Something
methods:
begin:
handler: {__name__}:ChainFlow.begin
start: true
"""
def _run_with_events(flow, inputs=None):
events = []
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(MethodExecutionStartedEvent)
def on_started(source, event):
events.append(event)
@crewai_event_bus.on(MethodExecutionFinishedEvent)
def on_finished(source, event):
events.append(event)
result = flow.kickoff(inputs=inputs)
events.sort(key=lambda e: e.timestamp)
return result, [
(type(e).__name__, str(e.method_name), e.flow_name) for e in events
]
def _state_without_id(flow):
snapshot = dict(flow.state.model_dump())
snapshot.pop("id", None)
return snapshot
def assert_parity(flow_cls, yaml_str, inputs=None, ordered=True):
class_flow = flow_cls()
class_result, class_events = _run_with_events(class_flow, inputs)
definition = FlowDefinition.from_yaml(yaml_str)
definition_flow = Flow.from_definition(definition)
definition_result, definition_events = _run_with_events(definition_flow, inputs)
assert definition_result == class_result
assert _state_without_id(definition_flow) == _state_without_id(class_flow)
if ordered:
assert definition_flow.method_outputs == class_flow.method_outputs
assert definition_events == class_events
else:
assert sorted(map(repr, definition_flow.method_outputs)) == sorted(
map(repr, class_flow.method_outputs)
)
assert sorted(definition_events) == sorted(class_events)
return definition_flow, definition_result
def test_simple_chain_parity():
flow, result = assert_parity(ChainFlow, CHAIN_YAML)
assert result == "confirmed:True"
assert flow.method_outputs == ["hello", "HELLO", "confirmed:True"]
def test_and_or_merge_parity():
flow, _ = assert_parity(MergeFlow, MERGE_YAML, ordered=False)
assert flow.state["joined"] is True
assert flow.state["either_ran"] is True
def test_router_label_parity_for_each_branch():
left_flow, _ = assert_parity(RouteFlow, ROUTE_YAML, inputs={"direction": "left"})
assert "took-left" in left_flow.method_outputs
assert "took-right" not in left_flow.method_outputs
right_flow, _ = assert_parity(RouteFlow, ROUTE_YAML, inputs={"direction": "right"})
assert "took-right" in right_flow.method_outputs
def test_cyclic_flow_parity():
flow, result = assert_parity(LoopFlow, LOOP_YAML)
assert result == "finished:3"
assert flow.state["count"] == 3
def test_definition_flow_events_use_definition_name():
definition = FlowDefinition.from_yaml(CHAIN_YAML)
flow = Flow.from_definition(definition)
_, events = _run_with_events(flow)
assert events
assert all(flow_name == "ChainFlow" for _, _, flow_name in events)
def test_from_definition_missing_handler_raises():
definition = FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "NoHandlers",
"methods": {"begin": {"start": True}},
}
)
with pytest.raises(ValueError, match="begin: no handler"):
Flow.from_definition(definition)
def test_from_definition_unresolvable_handler_raises():
definition = FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "BadHandlers",
"methods": {
"begin": {
"start": True,
"handler": "definitely_not_a_module_xyz:nope",
}
},
}
)
with pytest.raises(ValueError, match="missing or unresolvable handlers.*begin"):
Flow.from_definition(definition)
def test_from_definition_malformed_handler_raises():
definition = FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "MalformedHandlers",
"methods": {"begin": {"start": True, "handler": "no-colon-here"}},
}
)
with pytest.raises(ValueError, match="expected 'module:qualname'"):
Flow.from_definition(definition)
def test_flow_definition_stamps_handler_refs():
definition = ChainFlow.flow_definition()
assert definition.methods["begin"].handler == f"{__name__}:ChainFlow.begin"
assert definition.methods["shout"].handler == f"{__name__}:ChainFlow.shout"
def test_pydantic_state_from_ref_parity():
flow, result = assert_parity(PydanticStateFlow, PYDANTIC_STATE_YAML)
assert result == "count=1"
assert flow.state.count == 1
assert flow.state.label == "count=1"
assert flow.state.id
def test_pydantic_state_default_overlay():
flow = Flow.from_definition(FlowDefinition.from_yaml(PYDANTIC_STATE_OVERLAY_YAML))
result = flow.kickoff()
assert result == "count=6"
assert flow.state.count == 6
def test_json_schema_state():
flow = Flow.from_definition(FlowDefinition.from_yaml(JSON_SCHEMA_STATE_YAML))
result = flow.kickoff()
assert result == "count=1"
assert flow.state.count == 1
assert flow.state.label == "count=1"
assert flow.state.id
def test_json_schema_state_validates_inputs():
flow = Flow.from_definition(FlowDefinition.from_yaml(JSON_SCHEMA_STATE_YAML))
with pytest.raises(ValueError, match="Invalid inputs"):
flow.kickoff(inputs={"count": "not-a-number"})
def test_pydantic_state_falls_back_to_json_schema_when_ref_unimportable():
flow = Flow.from_definition(
FlowDefinition.from_yaml(PYDANTIC_REF_WITH_SCHEMA_FALLBACK_YAML)
)
result = flow.kickoff()
assert result == "count=1"
assert flow.state.count == 1
def test_pydantic_state_without_ref_or_schema_falls_back_to_dict(caplog):
with caplog.at_level("ERROR"):
flow = Flow.from_definition(FlowDefinition.from_yaml(UNRESOLVABLE_STATE_YAML))
assert "falling back to dict state" in caplog.text
result = flow.kickoff()
assert result == "hello"
assert flow.state["begin_ran"] is True
assert flow.state["id"]
def test_dict_state_is_a_copy_of_default_plus_id():
definition = FlowDefinition.from_yaml(DICT_STATE_YAML)
flow = Flow.from_definition(definition)
assert flow.state["count"] == 5
assert flow.state["id"]
flow.kickoff()
assert flow.state["begin_ran"] is True
second = Flow.from_definition(definition)
assert second.state["count"] == 5
assert "begin_ran" not in second.state.model_dump()
assert second.state["id"] != flow.state["id"]
assert definition.state.default == {"count": 5}
def test_unknown_state_type_falls_back_to_dict(caplog):
with caplog.at_level("WARNING"):
flow = Flow.from_definition(FlowDefinition.from_yaml(UNKNOWN_STATE_YAML))
assert "falling back to dict state" in caplog.text
result = flow.kickoff()
assert result == "hello"
assert flow.state["begin_ran"] is True