mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-06-11 03:08:19 +00:00
Compare commits
2 Commits
flow-defin
...
docs/add-d
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d883c7999c | ||
|
|
e5d37196c7 |
@@ -24,39 +24,15 @@ mode: "wide"
|
||||
|
||||
1. في CrewAI AMP، انتقل إلى **Settings** > **OpenTelemetry Collectors**.
|
||||
2. انقر على **Add Collector**.
|
||||
3. اختر تكاملاً:
|
||||
- **OpenTelemetry Traces** و**OpenTelemetry Logs** — صدّر إلى أي مجمّع أو واجهة خلفية متوافقة مع OTLP.
|
||||
- **Datadog** — أرسل التتبعات مباشرة إلى استقبال OTLP الخاص بـ Datadog، دون الحاجة إلى مجمّع منفصل أو Datadog Agent.
|
||||
4. هيّئ الاتصال. تعتمد الحقول على التكامل الذي اخترته:
|
||||
3. اختر نوع التكامل — **OpenTelemetry Traces** أو **OpenTelemetry Logs**.
|
||||
4. هيّئ الاتصال:
|
||||
- **Endpoint** — نقطة نهاية OTLP لمجمّعك (مثل `https://otel-collector.example.com:4317`).
|
||||
- **Service Name** — اسم لتعريف هذه الخدمة في منصة المراقبة.
|
||||
- **Custom Headers** *(اختياري)* — أضف رؤوس المصادقة أو التوجيه كأزواج مفتاح-قيمة.
|
||||
- **Certificate** *(اختياري)* — قدم شهادة TLS إذا كان مجمّعك يتطلبها.
|
||||
5. انقر على **Save**.
|
||||
|
||||
<Tabs>
|
||||
<Tab title="OpenTelemetry Traces / Logs">
|
||||
إن **OpenTelemetry Traces** و**OpenTelemetry Logs** تكاملان منفصلان يتشاركان نفس الحقول — اختر التكامل المطابق للإشارة التي تريد تصديرها.
|
||||
|
||||
- **Endpoint** — نقطة نهاية OTLP لمجمّعك (مثل `https://otel-collector.example.com:4317`).
|
||||
- **Service Name** — اسم لتعريف هذه الخدمة في منصة المراقبة.
|
||||
- **Custom Headers** *(اختياري)* — أضف رؤوس المصادقة أو التوجيه كأزواج مفتاح-قيمة.
|
||||
- **Certificate** *(اختياري)* — قدم شهادة TLS إذا كان مجمّعك يتطلبها.
|
||||
|
||||
<Frame></Frame>
|
||||
</Tab>
|
||||
<Tab title="Datadog">
|
||||
- **Datadog Site Domain** — مضيف OTLP لموقع Datadog الخاص بك فقط، دون بروتوكول أو مسار. يقوم CrewAI ببناء نقطة نهاية HTTPS OTLP الكاملة نيابةً عنك. استخدم المضيف المطابق لـ [موقع Datadog](https://docs.datadoghq.com/getting_started/site/) الخاص بك:
|
||||
- `otlp.datadoghq.com` (US1)
|
||||
- `otlp.us3.datadoghq.com` (US3)
|
||||
- `otlp.us5.datadoghq.com` (US5)
|
||||
- `otlp.datadoghq.eu` (EU1)
|
||||
- `otlp.ap1.datadoghq.com` (AP1)
|
||||
- **API Key** — مفتاح واجهة برمجة تطبيقات Datadog الخاص بك. راجع [كيفية إنشاء واحد](https://docs.datadoghq.com/account_management/api-app-keys/#api-keys).
|
||||
|
||||
يصدّر تكامل Datadog **التتبعات**.
|
||||
|
||||
<Frame></Frame>
|
||||
</Tab>
|
||||
</Tabs>
|
||||
|
||||
5. *(اختياري)* انقر على **Test Connection** للتحقق من قدرة CrewAI على الوصول إلى نقطة النهاية باستخدام بيانات الاعتماد التي قدمتها.
|
||||
6. انقر على **Save**.
|
||||
<Frame></Frame>
|
||||
|
||||
<Tip>
|
||||
يمكنك إضافة مجمّعات متعددة — على سبيل المثال، واحد للتتبعات وآخر للسجلات، أو الإرسال إلى واجهات خلفية مختلفة لأغراض مختلفة.
|
||||
|
||||
98
docs/en/enterprise/features/discovery.mdx
Normal file
98
docs/en/enterprise/features/discovery.mdx
Normal file
@@ -0,0 +1,98 @@
|
||||
---
|
||||
title: Discovery
|
||||
description: "Identify the highest-impact AI automation use cases for your business."
|
||||
icon: "compass"
|
||||
mode: "wide"
|
||||
---
|
||||
|
||||
## Overview
|
||||
|
||||
Discovery is a new engine inside CrewAI AMP that helps companies identify the best automation use cases for their business.
|
||||
|
||||
The bottleneck in AI adoption is not building agents — it's knowing _what_ to build and _how_ to build it for production. Discovery closes that gap.
|
||||
|
||||
{/* TODO: Add screenshot of Discovery dashboard */}
|
||||
|
||||
Instead of weeks of stakeholder interviews, consultant engagements, and slide decks, Discovery leverages CrewAI's deep knowledge of agent patterns and what works in production to match your business context against proven approaches. Within minutes, you get actionable, evidence-based recommendations specific to your organization.
|
||||
|
||||
## How It Works
|
||||
|
||||
<Steps>
|
||||
<Step title="Describe Your Business">
|
||||
Tell Discovery about your organization — your processes, challenges, goals, and the teams involved. The more context you provide, the more precise the recommendations.
|
||||
</Step>
|
||||
<Step title="Multi-Signal Matching">
|
||||
Discovery runs cohort analysis and structural pattern recognition using CrewAI's world model, matching your business context to automation patterns already running successfully at scale.
|
||||
</Step>
|
||||
<Step title="Review Use Cases">
|
||||
Within minutes, you receive a set of use cases specific to your company — not generic templates. Each one shows what the automation does, expected impact, complexity, and how it would work in your organization.
|
||||
{/* TODO: Add screenshot of use case recommendations */}
|
||||
</Step>
|
||||
<Step title="Build">
|
||||
Select a use case and go directly into Crew Studio or export to code to start building.
|
||||
</Step>
|
||||
</Steps>
|
||||
|
||||
{/* TODO: Add screenshot of Discovery flow / results page */}
|
||||
|
||||
## Key Features
|
||||
|
||||
<CardGroup cols={2}>
|
||||
<Card title="Business-Specific Recommendations" icon="bullseye">
|
||||
Not generic templates. Real use cases matched to your organization based on CrewAI's knowledge of what works in production.
|
||||
</Card>
|
||||
<Card title="Impact & Complexity Scoring" icon="chart-mixed">
|
||||
Each recommendation includes expected impact, implementation complexity, and how it fits your org — so you can prioritize with confidence.
|
||||
</Card>
|
||||
<Card title="Iterative Discovery" icon="arrows-rotate">
|
||||
Run Discovery multiple times across different business units. It becomes part of how you plan and iterate on your AI roadmap.
|
||||
</Card>
|
||||
<Card title="Evidence-Based" icon="flask-vial">
|
||||
Every recommendation is grounded in what CrewAI knows actually works in production — not guesswork or intuition.
|
||||
</Card>
|
||||
</CardGroup>
|
||||
|
||||
## From Discovery to Production
|
||||
|
||||
Discovery fits at the very beginning of the CrewAI workflow — it's the "what to build" step before the "how to build" step.
|
||||
|
||||
{/* TODO: Add diagram showing Discovery → Crew Studio → Automations flow */}
|
||||
|
||||
The end-to-end flow:
|
||||
|
||||
1. **Discovery** identifies the use case and provides the blueprint
|
||||
2. **Crew Studio** or code lets you build the automation
|
||||
3. **Automations** deploys it to production
|
||||
|
||||
This means you go from "we should use AI somewhere" to a running production automation with a clear, guided path — no guesswork at any stage.
|
||||
|
||||
## Use Cases
|
||||
|
||||
<CardGroup cols={2}>
|
||||
<Card title="New to AI Agents" icon="seedling">
|
||||
Don't know where to start? Discovery identifies the highest-impact opportunities specific to your business, so you begin with what matters most.
|
||||
</Card>
|
||||
<Card title="Scaling AI Programs" icon="rocket">
|
||||
Already have some automations? Discovery finds the next wave of use cases across departments, helping you expand beyond initial pilots.
|
||||
</Card>
|
||||
<Card title="Cross-Department Rollout" icon="building">
|
||||
Run Discovery for different business units to build a company-wide AI roadmap with use cases tailored to each team's needs.
|
||||
</Card>
|
||||
<Card title="ROI Prioritization" icon="chart-line">
|
||||
Need to justify AI investment? Discovery provides evidence-based impact estimates grounded in real-world results.
|
||||
</Card>
|
||||
</CardGroup>
|
||||
|
||||
## Related
|
||||
|
||||
<CardGroup cols={3}>
|
||||
<Card title="Crew Studio" href="/en/enterprise/features/crew-studio" icon="pencil">
|
||||
Build automations with AI assistance and a visual editor.
|
||||
</Card>
|
||||
<Card title="Automations" href="/en/enterprise/features/automations" icon="bolt">
|
||||
Deploy and manage your automations in production.
|
||||
</Card>
|
||||
<Card title="Marketplace" href="/en/enterprise/features/marketplace" icon="store">
|
||||
Browse pre-built automations and components.
|
||||
</Card>
|
||||
</CardGroup>
|
||||
@@ -24,39 +24,15 @@ Telemetry data follows the [OpenTelemetry GenAI semantic conventions](https://op
|
||||
|
||||
1. In CrewAI AMP, go to **Settings** > **OpenTelemetry Collectors**.
|
||||
2. Click **Add Collector**.
|
||||
3. Select an integration:
|
||||
- **OpenTelemetry Traces** and **OpenTelemetry Logs** — export to any OTLP-compatible collector or backend.
|
||||
- **Datadog** — send traces straight to Datadog's OTLP intake, no separate collector or Datadog Agent required.
|
||||
4. Configure the connection. The fields depend on the integration you selected:
|
||||
3. Select an integration type — **OpenTelemetry Traces** or **OpenTelemetry Logs**.
|
||||
4. Configure the connection:
|
||||
- **Endpoint** — Your collector's OTLP endpoint (e.g., `https://otel-collector.example.com:4317`).
|
||||
- **Service Name** — A name to identify this service in your observability platform.
|
||||
- **Custom Headers** *(optional)* — Add authentication or routing headers as key-value pairs.
|
||||
- **Certificate** *(optional)* — Provide a TLS certificate if your collector requires one.
|
||||
5. Click **Save**.
|
||||
|
||||
<Tabs>
|
||||
<Tab title="OpenTelemetry Traces / Logs">
|
||||
**OpenTelemetry Traces** and **OpenTelemetry Logs** are separate integrations that share the same fields — pick the one matching the signal you want to export.
|
||||
|
||||
- **Endpoint** — Your collector's OTLP endpoint (e.g., `https://otel-collector.example.com:4317`).
|
||||
- **Service Name** — A name to identify this service in your observability platform.
|
||||
- **Custom Headers** *(optional)* — Add authentication or routing headers as key-value pairs.
|
||||
- **Certificate** *(optional)* — Provide a TLS certificate if your collector requires one.
|
||||
|
||||
<Frame></Frame>
|
||||
</Tab>
|
||||
<Tab title="Datadog">
|
||||
- **Datadog Site Domain** — Your Datadog site's OTLP host only, with no protocol or path. CrewAI builds the full HTTPS OTLP endpoint for you. Use the host that matches your [Datadog site](https://docs.datadoghq.com/getting_started/site/):
|
||||
- `otlp.datadoghq.com` (US1)
|
||||
- `otlp.us3.datadoghq.com` (US3)
|
||||
- `otlp.us5.datadoghq.com` (US5)
|
||||
- `otlp.datadoghq.eu` (EU1)
|
||||
- `otlp.ap1.datadoghq.com` (AP1)
|
||||
- **API Key** — Your Datadog API key. See [how to create one](https://docs.datadoghq.com/account_management/api-app-keys/#api-keys).
|
||||
|
||||
The Datadog integration exports **traces**.
|
||||
|
||||
<Frame></Frame>
|
||||
</Tab>
|
||||
</Tabs>
|
||||
|
||||
5. *(optional)* Click **Test Connection** to verify CrewAI can reach the endpoint with the credentials you provided.
|
||||
6. Click **Save**.
|
||||
<Frame></Frame>
|
||||
|
||||
<Tip>
|
||||
You can add multiple collectors — for example, one for traces and another for logs, or send to different backends for different purposes.
|
||||
|
||||
Binary file not shown.
|
Before Width: | Height: | Size: 455 KiB |
Binary file not shown.
|
Before Width: | Height: | Size: 420 KiB |
@@ -24,39 +24,15 @@ CrewAI AMP는 배포에서 OpenTelemetry **트레이스**와 **로그**를 자
|
||||
|
||||
1. CrewAI AMP에서 **Settings** > **OpenTelemetry Collectors**로 이동합니다.
|
||||
2. **Add Collector**를 클릭합니다.
|
||||
3. 통합을 선택합니다:
|
||||
- **OpenTelemetry Traces** 및 **OpenTelemetry Logs** — OTLP 호환 수집기 또는 백엔드로 내보냅니다.
|
||||
- **Datadog** — 별도의 수집기나 Datadog Agent 없이 트레이스를 Datadog의 OTLP 인테이크로 직접 전송합니다.
|
||||
4. 연결을 구성합니다. 필드는 선택한 통합에 따라 달라집니다:
|
||||
3. 통합 유형을 선택합니다 — **OpenTelemetry Traces** 또는 **OpenTelemetry Logs**.
|
||||
4. 연결을 구성합니다:
|
||||
- **Endpoint** — 수집기의 OTLP 엔드포인트 (예: `https://otel-collector.example.com:4317`).
|
||||
- **Service Name** — 관측 가능성 플랫폼에서 이 서비스를 식별하기 위한 이름.
|
||||
- **Custom Headers** *(선택 사항)* — 인증 또는 라우팅 헤더를 키-값 쌍으로 추가합니다.
|
||||
- **Certificate** *(선택 사항)* — 수집기에서 TLS 인증서가 필요한 경우 제공합니다.
|
||||
5. **Save**를 클릭합니다.
|
||||
|
||||
<Tabs>
|
||||
<Tab title="OpenTelemetry Traces / Logs">
|
||||
**OpenTelemetry Traces**와 **OpenTelemetry Logs**는 동일한 필드를 공유하는 별개의 통합입니다 — 내보내려는 신호에 맞는 것을 선택하세요.
|
||||
|
||||
- **Endpoint** — 수집기의 OTLP 엔드포인트 (예: `https://otel-collector.example.com:4317`).
|
||||
- **Service Name** — 관측 가능성 플랫폼에서 이 서비스를 식별하기 위한 이름.
|
||||
- **Custom Headers** *(선택 사항)* — 인증 또는 라우팅 헤더를 키-값 쌍으로 추가합니다.
|
||||
- **Certificate** *(선택 사항)* — 수집기에서 TLS 인증서가 필요한 경우 제공합니다.
|
||||
|
||||
<Frame></Frame>
|
||||
</Tab>
|
||||
<Tab title="Datadog">
|
||||
- **Datadog Site Domain** — Datadog 사이트의 OTLP 호스트만 입력합니다 (프로토콜이나 경로 제외). CrewAI가 전체 HTTPS OTLP 엔드포인트를 자동으로 구성합니다. [Datadog 사이트](https://docs.datadoghq.com/getting_started/site/)에 맞는 호스트를 사용하세요:
|
||||
- `otlp.datadoghq.com` (US1)
|
||||
- `otlp.us3.datadoghq.com` (US3)
|
||||
- `otlp.us5.datadoghq.com` (US5)
|
||||
- `otlp.datadoghq.eu` (EU1)
|
||||
- `otlp.ap1.datadoghq.com` (AP1)
|
||||
- **API Key** — Datadog API 키입니다. [키 생성 방법](https://docs.datadoghq.com/account_management/api-app-keys/#api-keys)을 참고하세요.
|
||||
|
||||
Datadog 통합은 **트레이스**를 내보냅니다.
|
||||
|
||||
<Frame></Frame>
|
||||
</Tab>
|
||||
</Tabs>
|
||||
|
||||
5. *(선택 사항)* **Test Connection**을 클릭하여 제공한 자격 증명으로 CrewAI가 엔드포인트에 연결할 수 있는지 확인합니다.
|
||||
6. **Save**를 클릭합니다.
|
||||
<Frame></Frame>
|
||||
|
||||
<Tip>
|
||||
여러 수집기를 추가할 수 있습니다 — 예를 들어, 트레이스용 하나와 로그용 하나를 추가하거나, 다른 목적을 위해 다른 백엔드로 전송할 수 있습니다.
|
||||
|
||||
@@ -24,39 +24,15 @@ Os dados de telemetria seguem as [convenções semânticas GenAI do OpenTelemetr
|
||||
|
||||
1. No CrewAI AMP, vá para **Settings** > **OpenTelemetry Collectors**.
|
||||
2. Clique em **Add Collector**.
|
||||
3. Selecione uma integração:
|
||||
- **OpenTelemetry Traces** e **OpenTelemetry Logs** — exporte para qualquer coletor ou backend compatível com OTLP.
|
||||
- **Datadog** — envie traces diretamente para a ingestão OTLP do Datadog, sem precisar de um coletor separado ou do Datadog Agent.
|
||||
4. Configure a conexão. Os campos dependem da integração selecionada:
|
||||
3. Selecione um tipo de integração — **OpenTelemetry Traces** ou **OpenTelemetry Logs**.
|
||||
4. Configure a conexão:
|
||||
- **Endpoint** — O endpoint OTLP do seu coletor (por exemplo, `https://otel-collector.example.com:4317`).
|
||||
- **Service Name** — Um nome para identificar este serviço na sua plataforma de observabilidade.
|
||||
- **Custom Headers** *(opcional)* — Adicione headers de autenticação ou roteamento como pares chave-valor.
|
||||
- **Certificate** *(opcional)* — Forneça um certificado TLS se o seu coletor exigir um.
|
||||
5. Clique em **Save**.
|
||||
|
||||
<Tabs>
|
||||
<Tab title="OpenTelemetry Traces / Logs">
|
||||
**OpenTelemetry Traces** e **OpenTelemetry Logs** são integrações separadas que compartilham os mesmos campos — escolha a que corresponde ao sinal que você quer exportar.
|
||||
|
||||
- **Endpoint** — O endpoint OTLP do seu coletor (por exemplo, `https://otel-collector.example.com:4317`).
|
||||
- **Service Name** — Um nome para identificar este serviço na sua plataforma de observabilidade.
|
||||
- **Custom Headers** *(opcional)* — Adicione headers de autenticação ou roteamento como pares chave-valor.
|
||||
- **Certificate** *(opcional)* — Forneça um certificado TLS se o seu coletor exigir um.
|
||||
|
||||
<Frame></Frame>
|
||||
</Tab>
|
||||
<Tab title="Datadog">
|
||||
- **Datadog Site Domain** — Apenas o host OTLP do seu site Datadog, sem protocolo ou caminho. O CrewAI monta o endpoint HTTPS OTLP completo para você. Use o host correspondente ao seu [site Datadog](https://docs.datadoghq.com/getting_started/site/):
|
||||
- `otlp.datadoghq.com` (US1)
|
||||
- `otlp.us3.datadoghq.com` (US3)
|
||||
- `otlp.us5.datadoghq.com` (US5)
|
||||
- `otlp.datadoghq.eu` (EU1)
|
||||
- `otlp.ap1.datadoghq.com` (AP1)
|
||||
- **API Key** — Sua chave de API do Datadog. Veja [como criar uma](https://docs.datadoghq.com/account_management/api-app-keys/#api-keys).
|
||||
|
||||
A integração com o Datadog exporta **traces**.
|
||||
|
||||
<Frame></Frame>
|
||||
</Tab>
|
||||
</Tabs>
|
||||
|
||||
5. *(opcional)* Clique em **Test Connection** para verificar se o CrewAI consegue acessar o endpoint com as credenciais fornecidas.
|
||||
6. Clique em **Save**.
|
||||
<Frame></Frame>
|
||||
|
||||
<Tip>
|
||||
Você pode adicionar múltiplos coletores — por exemplo, um para traces e outro para logs, ou enviar para diferentes backends para diferentes propósitos.
|
||||
|
||||
@@ -1,17 +1,15 @@
|
||||
"""Conversational graph + helpers as an experimental Flow extension.
|
||||
"""Conversational graph + helpers as a mixin for ``Flow`` (experimental).
|
||||
|
||||
The conversational chat surface remains experimental and may change before the
|
||||
v2 graduation path. It lives here so ``crewai.flow.runtime`` can stay focused
|
||||
on the execution engine. ``crewai.flow.flow`` composes this mixin onto the
|
||||
public ``Flow`` class for backwards compatibility.
|
||||
|
||||
The built-in conversational graph only registers for subclasses that opt in
|
||||
with ``conversational = True``. Static conversational metadata is projected
|
||||
into ``FlowDefinition.conversational`` via the Python DSL builder.
|
||||
The experimental conversational chat surface lives here as a mixin so that
|
||||
``crewai.flow.runtime`` stays focused on the execution engine. ``Flow``
|
||||
inherits from ``_ConversationalMixin``; the methods only register on
|
||||
subclasses that opt in via ``conversational = True`` (enforced by the
|
||||
``_conversational_only`` marker + ``FlowMeta`` gating in
|
||||
``crewai.flow.runtime``).
|
||||
|
||||
Import surface:
|
||||
- :class:`_ConversationalMixin` — internal; the public ``Flow`` class
|
||||
composes it in. Users don't import it directly.
|
||||
- :class:`_ConversationalMixin` — internal; ``Flow`` mixes it in. Users
|
||||
don't import it directly.
|
||||
- The data types this mixin uses live in
|
||||
:mod:`crewai.experimental.conversational`.
|
||||
"""
|
||||
@@ -22,7 +20,7 @@ from collections.abc import Callable, Mapping, Sequence
|
||||
from enum import Enum
|
||||
import json
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Any, ClassVar, Literal, TypeVar, cast
|
||||
from typing import TYPE_CHECKING, Any, ClassVar, Literal, cast
|
||||
|
||||
from pydantic import BaseModel, Field, create_model
|
||||
|
||||
@@ -51,56 +49,21 @@ from crewai.utilities.types import LLMMessage
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from crewai.flow.runtime import Flow
|
||||
from crewai.llms.base_llm import BaseLLM
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _iter_condition_labels(condition: Any) -> set[str]:
|
||||
if isinstance(condition, str):
|
||||
return {condition}
|
||||
if isinstance(condition, dict):
|
||||
labels: set[str] = set()
|
||||
for value in condition.values():
|
||||
if isinstance(value, list):
|
||||
for item in value:
|
||||
labels.update(_iter_condition_labels(item))
|
||||
else:
|
||||
labels.update(_iter_condition_labels(value))
|
||||
return labels
|
||||
return set()
|
||||
|
||||
|
||||
class _ConversationalMixin:
|
||||
"""Experimental conversational graph for ``Flow``.
|
||||
"""Built-in conversational graph for ``Flow`` (gated on ``conversational``).
|
||||
|
||||
This mixin owns chat behavior and runtime hooks. Non-chat flows see these
|
||||
methods as inert attributes unless they opt in with ``conversational = True``.
|
||||
Mixed into ``Flow`` so its execution engine (``runtime.py``) stays focused
|
||||
on running graphs. The methods here only register on subclasses that set
|
||||
``conversational = True``; non-chat flows see them as inert attributes.
|
||||
"""
|
||||
|
||||
# === EXPERIMENTAL: conversational mode ===
|
||||
# When ``conversational = True`` on a Flow subclass, this mixin's built-in
|
||||
# graph registers and ``handle_turn`` / ``chat`` become chat entry points.
|
||||
conversational: ClassVar[bool] = False
|
||||
conversational_config: ClassVar[ConversationConfig | None] = None
|
||||
builtin_routes: ClassVar[tuple[str, ...]] = ("converse", "end")
|
||||
internal_routes: ClassVar[tuple[str, ...]] = (
|
||||
"answer_from_history",
|
||||
"conversation_start",
|
||||
)
|
||||
builtin_route_descriptions: ClassVar[dict[str, str]] = {
|
||||
"converse": (
|
||||
"Ordinary chat, follow-ups, summaries, clarifications, and "
|
||||
"questions answerable from prior conversation history."
|
||||
),
|
||||
"end": ("User signals the conversation is finished (goodbye, exit, done)."),
|
||||
"answer_from_history": (
|
||||
"Answer directly from prior conversation history without invoking "
|
||||
"tools, agents, or custom routes."
|
||||
),
|
||||
}
|
||||
|
||||
# The metaclass + state attributes referenced below live on ``Flow`` —
|
||||
# this mixin is never instantiated standalone. These type-only
|
||||
# declarations exist so static analyzers don't flag attribute access.
|
||||
@@ -108,15 +71,22 @@ class _ConversationalMixin:
|
||||
# (otherwise mypy flags "Cannot override instance variable with class
|
||||
# variable" when Flow declares them as ``ClassVar``).
|
||||
if TYPE_CHECKING:
|
||||
conversational: ClassVar[bool]
|
||||
conversational_config: ClassVar[ConversationConfig | None]
|
||||
builtin_routes: ClassVar[tuple[str, ...]]
|
||||
internal_routes: ClassVar[tuple[str, ...]]
|
||||
builtin_route_descriptions: ClassVar[dict[str, str]]
|
||||
# Registry ClassVars populated by ``FlowMeta`` at class creation.
|
||||
_listeners: ClassVar[dict[Any, Any]]
|
||||
|
||||
# Instance attrs from ``Flow``.
|
||||
state: Any
|
||||
name: str | None
|
||||
_completed_methods: set[Any]
|
||||
_method_outputs: list[Any]
|
||||
_pending_events: dict[Any, Any]
|
||||
_pending_and_listeners: dict[Any, Any]
|
||||
_method_call_counts: dict[Any, int]
|
||||
_is_execution_resuming: bool
|
||||
_conversation_messages: list[LLMMessage]
|
||||
_pending_user_message: str | dict[str, Any] | None
|
||||
_pending_intents: Sequence[str] | None
|
||||
_pending_intent_llm: str | BaseLLM | None
|
||||
@@ -127,8 +97,8 @@ class _ConversationalMixin:
|
||||
def _collapse_to_outcome(
|
||||
self,
|
||||
feedback: str,
|
||||
outcomes: Sequence[str],
|
||||
llm: str | BaseLLM,
|
||||
outcomes: tuple[str, ...],
|
||||
llm: str | BaseLLM | Any,
|
||||
) -> str:
|
||||
pass
|
||||
|
||||
@@ -268,8 +238,8 @@ class _ConversationalMixin:
|
||||
state = cast(ConversationState, self.state)
|
||||
sid = session_id or state.id
|
||||
|
||||
# Stash the pending turn so the kickoff extension hook picks it up
|
||||
# after persist restore.
|
||||
# Stash the pending turn so ``_apply_pending_conversational_turn``
|
||||
# picks it up AFTER persist restore.
|
||||
self._pending_user_message = message
|
||||
self._pending_intents = list(intents) if intents else None
|
||||
self._pending_intent_llm = intent_llm
|
||||
@@ -316,7 +286,7 @@ class _ConversationalMixin:
|
||||
callers can customize prompts or exercise the loop without patching
|
||||
builtins.
|
||||
"""
|
||||
if not self._is_conversational_enabled():
|
||||
if not getattr(type(self), "conversational", False):
|
||||
raise ValueError("Flow.chat() is only available on conversational flows")
|
||||
|
||||
exit_set = {command.lower() for command in exit_commands}
|
||||
@@ -521,14 +491,14 @@ class _ConversationalMixin:
|
||||
**extra: Any,
|
||||
) -> None:
|
||||
"""Append a message to conversation history (legacy ChatState path)."""
|
||||
_append_conversation_message(cast(Any, self), role, content, **extra)
|
||||
_append_conversation_message(cast("Flow[Any]", self), role, content, **extra)
|
||||
|
||||
@property
|
||||
def conversation_messages(self) -> list[LLMMessage]:
|
||||
"""Message history from state, coerced to LLM-shaped dicts."""
|
||||
return [
|
||||
message_to_llm_dict(message)
|
||||
for message in get_conversation_messages(cast(Any, self))
|
||||
for message in get_conversation_messages(cast("Flow[Any]", self))
|
||||
]
|
||||
|
||||
def receive_user_message(
|
||||
@@ -544,7 +514,7 @@ class _ConversationalMixin:
|
||||
``state.messages`` and preserve ``last_intent`` across turns.
|
||||
Non-conversational flows fall through to the legacy helper.
|
||||
"""
|
||||
if self._is_conversational_enabled():
|
||||
if self.conversational:
|
||||
state = cast(ConversationState, self.state)
|
||||
state.messages.append(ConversationMessage(role="user", content=text))
|
||||
self._emit_conversation_message_added(
|
||||
@@ -565,7 +535,9 @@ class _ConversationalMixin:
|
||||
return intent
|
||||
return text
|
||||
|
||||
return _receive_user_message(cast(Any, self), text, outcomes=outcomes, llm=llm)
|
||||
return _receive_user_message(
|
||||
cast("Flow[Any]", self), text, outcomes=outcomes, llm=llm
|
||||
)
|
||||
|
||||
def classify_intent(
|
||||
self,
|
||||
@@ -589,104 +561,27 @@ class _ConversationalMixin:
|
||||
def _conversation_config(self) -> ConversationConfig | None:
|
||||
return getattr(type(self), "conversational_config", None)
|
||||
|
||||
@property
|
||||
def _conversation_definition(self) -> Any | None:
|
||||
return self._conversation_flow_definition().conversational
|
||||
|
||||
def _conversation_flow_definition(self) -> Any:
|
||||
flow_definition = getattr(type(self), "flow_definition", None)
|
||||
if not callable(flow_definition):
|
||||
raise AttributeError(
|
||||
f"{type(self).__name__} does not expose flow_definition()"
|
||||
)
|
||||
return flow_definition()
|
||||
|
||||
@classmethod
|
||||
def _conversational_definition(cls) -> Any | None:
|
||||
flow_definition = getattr(cls, "flow_definition", None)
|
||||
if not callable(flow_definition):
|
||||
return None
|
||||
return flow_definition().conversational
|
||||
|
||||
@classmethod
|
||||
def _is_conversational(cls) -> bool:
|
||||
definition = cls._conversational_definition()
|
||||
return bool(definition and definition.enabled)
|
||||
|
||||
def _is_conversational_enabled(self) -> bool:
|
||||
definition = self._conversation_definition
|
||||
return bool(definition and definition.enabled)
|
||||
|
||||
def _initialize_runtime_extension_attrs(self) -> None:
|
||||
if not isinstance(getattr(self, "_conversation_messages", None), list):
|
||||
object.__setattr__(self, "_conversation_messages", [])
|
||||
if not hasattr(self, "_pending_user_message"):
|
||||
object.__setattr__(self, "_pending_user_message", None)
|
||||
if not hasattr(self, "_pending_intents"):
|
||||
object.__setattr__(self, "_pending_intents", None)
|
||||
if not hasattr(self, "_pending_intent_llm"):
|
||||
object.__setattr__(self, "_pending_intent_llm", None)
|
||||
|
||||
def _create_default_extension_state(self) -> ConversationState | None:
|
||||
initial_state_t = getattr(self, "_initial_state_t", None)
|
||||
if type(self)._is_conversational() and (
|
||||
not hasattr(self, "_initial_state_t")
|
||||
or isinstance(initial_state_t, TypeVar)
|
||||
):
|
||||
return ConversationState()
|
||||
return None
|
||||
|
||||
def _should_apply_pending_kickoff_context(self) -> bool:
|
||||
return (
|
||||
type(self)._is_conversational() and self._pending_user_message is not None
|
||||
)
|
||||
|
||||
def _apply_pending_kickoff_context(self) -> None:
|
||||
self._apply_pending_conversational_turn()
|
||||
|
||||
def _order_start_methods_for_kickoff(
|
||||
self,
|
||||
start_methods: list[Any],
|
||||
) -> tuple[list[Any], bool]:
|
||||
if not type(self)._is_conversational():
|
||||
return start_methods, False
|
||||
|
||||
conversation_start = "conversation_start"
|
||||
if conversation_start not in {str(method) for method in start_methods}:
|
||||
return start_methods, False
|
||||
|
||||
ordered_starts = [
|
||||
method for method in start_methods if str(method) != conversation_start
|
||||
]
|
||||
ordered_starts.append(
|
||||
next(
|
||||
method for method in start_methods if str(method) == conversation_start
|
||||
)
|
||||
)
|
||||
return ordered_starts, True
|
||||
|
||||
def _should_defer_trace_finalization(self) -> bool:
|
||||
"""Whether per-turn ``FlowFinished`` + ``finalize_batch`` should be skipped.
|
||||
|
||||
True when either:
|
||||
- ``flow.defer_trace_finalization`` is set on the instance, OR
|
||||
- the static conversational definition enables deferred finalization.
|
||||
- the class-level ``ConversationConfig.defer_trace_finalization``
|
||||
on a conversational subclass is True.
|
||||
|
||||
Either source enables the deferred-session pattern. The caller
|
||||
eventually invokes ``finalize_session_traces()`` to close the batch.
|
||||
"""
|
||||
if getattr(self, "defer_trace_finalization", False):
|
||||
return True
|
||||
definition = self._conversation_definition
|
||||
return bool(
|
||||
definition and definition.enabled and definition.defer_trace_finalization
|
||||
)
|
||||
config = self._conversation_config
|
||||
return bool(config and config.defer_trace_finalization)
|
||||
|
||||
def _reset_turn_execution_state(self) -> None:
|
||||
"""Clear per-execution tracking so the next turn re-runs the graph."""
|
||||
self._completed_methods.clear()
|
||||
self._method_outputs.clear()
|
||||
self._pending_events.clear()
|
||||
self._pending_and_listeners.clear()
|
||||
self._method_call_counts.clear()
|
||||
self._clear_or_listeners()
|
||||
self._is_execution_resuming = False
|
||||
@@ -838,12 +733,11 @@ class _ConversationalMixin:
|
||||
router_config: RouterConfig | None,
|
||||
) -> dict[str, str]:
|
||||
label_to_method: dict[str, str] = {}
|
||||
flow_definition = self._conversation_flow_definition()
|
||||
for listener_name, method_definition in flow_definition.methods.items():
|
||||
if method_definition.listen is None or method_definition.router:
|
||||
continue
|
||||
for trigger_label in _iter_condition_labels(method_definition.listen):
|
||||
label_to_method.setdefault(trigger_label, listener_name)
|
||||
for listener_name, condition in self._listeners.items():
|
||||
if isinstance(condition, tuple):
|
||||
_, trigger_labels = condition
|
||||
for trigger_label in trigger_labels:
|
||||
label_to_method.setdefault(str(trigger_label), str(listener_name))
|
||||
|
||||
routes = self._effective_routes(router_config)
|
||||
overrides = (
|
||||
@@ -894,31 +788,21 @@ class _ConversationalMixin:
|
||||
|
||||
def _valid_route_labels(self) -> set[str]:
|
||||
labels: set[str] = set()
|
||||
flow_definition = self._conversation_flow_definition()
|
||||
for method_definition in flow_definition.methods.values():
|
||||
if method_definition.listen is None or method_definition.router:
|
||||
continue
|
||||
labels.update(_iter_condition_labels(method_definition.listen))
|
||||
for condition in self._listeners.values():
|
||||
if isinstance(condition, tuple):
|
||||
_, methods = condition
|
||||
labels.update(str(method) for method in methods)
|
||||
return labels
|
||||
|
||||
def _effective_routes(self, router_config: RouterConfig | None = None) -> set[str]:
|
||||
custom_routes = set(router_config.routes or ()) if router_config else set()
|
||||
definition = self._conversation_definition
|
||||
builtin_routes = (
|
||||
tuple(definition.builtin_routes)
|
||||
if definition is not None
|
||||
else self.builtin_routes
|
||||
)
|
||||
internal_routes = (
|
||||
tuple(definition.internal_routes)
|
||||
if definition is not None
|
||||
else self.internal_routes
|
||||
)
|
||||
if not custom_routes:
|
||||
custom_routes = (
|
||||
self._valid_route_labels() - set(builtin_routes) - set(internal_routes)
|
||||
self._valid_route_labels()
|
||||
- set(self.builtin_routes)
|
||||
- set(self.internal_routes)
|
||||
)
|
||||
return custom_routes | set(builtin_routes)
|
||||
return custom_routes | set(self.builtin_routes)
|
||||
|
||||
def _default_conversation_llm(self) -> Any | None:
|
||||
config = self._conversation_config
|
||||
|
||||
@@ -1,50 +0,0 @@
|
||||
"""Static conversational Flow definition models.
|
||||
|
||||
This module is part of the serializable Flow Definition contract. It should
|
||||
only contain static data shapes. Experimental conversational runtime behavior
|
||||
continues to live in ``crewai.experimental.conversational_mixin``.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any, Literal
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class FlowConversationalRouterDefinition(BaseModel):
|
||||
"""Static conversational router configuration."""
|
||||
|
||||
prompt: str | None = None
|
||||
response_format: Any = None
|
||||
llm: Any = None
|
||||
routes: list[str] | None = None
|
||||
route_descriptions: dict[str, str] | None = None
|
||||
default_intent: str | None = "converse"
|
||||
fallback_intent: str | None = "converse"
|
||||
intent_field: str = "intent"
|
||||
|
||||
|
||||
class FlowConversationalDefinition(BaseModel):
|
||||
"""Static conversational Flow configuration."""
|
||||
|
||||
enabled: bool = False
|
||||
system_prompt: str | None = None
|
||||
llm: Any = None
|
||||
router: FlowConversationalRouterDefinition | None = None
|
||||
answer_from_history_prompt: str | None = None
|
||||
default_intents: list[str] | None = None
|
||||
intent_llm: Any = None
|
||||
answer_from_history_llm: Any = None
|
||||
visible_agent_outputs: list[str] | Literal["all"] | None = None
|
||||
defer_trace_finalization: bool = True
|
||||
builtin_routes: list[str] = Field(default_factory=lambda: ["converse", "end"])
|
||||
internal_routes: list[str] = Field(
|
||||
default_factory=lambda: ["answer_from_history", "conversation_start"]
|
||||
)
|
||||
|
||||
|
||||
__all__ = [
|
||||
"FlowConversationalDefinition",
|
||||
"FlowConversationalRouterDefinition",
|
||||
]
|
||||
@@ -9,8 +9,6 @@ from typing_extensions import TypeIs
|
||||
|
||||
from crewai.flow.flow_definition import (
|
||||
FlowConfigDefinition,
|
||||
FlowConversationalDefinition,
|
||||
FlowConversationalRouterDefinition,
|
||||
FlowDefinition,
|
||||
FlowDefinitionDiagnostic,
|
||||
FlowHumanFeedbackDefinition,
|
||||
@@ -29,13 +27,6 @@ R = TypeVar("R")
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_FLOW_METHOD_DEFINITION_ATTR = "__flow_method_definition__"
|
||||
_FLOW_METHOD_METADATA_ATTRS = [
|
||||
"__conversational_only__",
|
||||
"__flow_method_definition__",
|
||||
"__flow_persistence_config__",
|
||||
"__human_feedback_config__",
|
||||
"_human_feedback_llm",
|
||||
]
|
||||
|
||||
|
||||
def is_flow_method(obj: Any) -> TypeIs[FlowMethod[Any, Any]]:
|
||||
@@ -51,39 +42,6 @@ def _should_include_flow_method(flow_class: type, method: Any) -> bool:
|
||||
return True
|
||||
|
||||
|
||||
def _is_conversational_flow(flow_class: type) -> bool:
|
||||
return bool(getattr(flow_class, "conversational", False))
|
||||
|
||||
|
||||
def _get_inherited_conversational_method(
|
||||
flow_class: type,
|
||||
attr_name: str,
|
||||
) -> Any | None:
|
||||
if not _is_conversational_flow(flow_class):
|
||||
return None
|
||||
|
||||
for base in flow_class.__mro__[1:]:
|
||||
inherited = base.__dict__.get(attr_name)
|
||||
if inherited is None:
|
||||
continue
|
||||
if getattr(inherited, "__conversational_only__", False) and is_flow_method(
|
||||
inherited
|
||||
):
|
||||
return inherited
|
||||
return None
|
||||
|
||||
|
||||
def _stamp_inherited_conversational_metadata(
|
||||
method: Any,
|
||||
inherited: Any,
|
||||
) -> Any:
|
||||
for attr in _FLOW_METHOD_METADATA_ATTRS:
|
||||
if hasattr(inherited, attr):
|
||||
setattr(method, attr, getattr(inherited, attr))
|
||||
method.__is_flow_method__ = True
|
||||
return method
|
||||
|
||||
|
||||
def _set_flow_method_definition(
|
||||
wrapper: FlowMethod[P, R],
|
||||
definition: FlowMethodDefinition,
|
||||
@@ -177,8 +135,6 @@ def _build_state_definition(
|
||||
from pydantic import BaseModel as PydanticBaseModel
|
||||
|
||||
state_value = getattr(flow_class, "_initial_state_t", None)
|
||||
if isinstance(state_value, TypeVar):
|
||||
state_value = None
|
||||
initial_state = getattr(flow_class, "initial_state", None)
|
||||
if initial_state is not None:
|
||||
state_value = initial_state
|
||||
@@ -274,98 +230,6 @@ def _build_persistence_definition(
|
||||
)
|
||||
|
||||
|
||||
def _build_conversational_router_definition(
|
||||
router_config: Any,
|
||||
diagnostics: list[FlowDefinitionDiagnostic],
|
||||
path: str,
|
||||
) -> FlowConversationalRouterDefinition | None:
|
||||
if router_config is None:
|
||||
return None
|
||||
|
||||
routes = getattr(router_config, "routes", None)
|
||||
return FlowConversationalRouterDefinition(
|
||||
prompt=getattr(router_config, "prompt", None),
|
||||
response_format=_serialize_static_value(
|
||||
getattr(router_config, "response_format", None),
|
||||
diagnostics,
|
||||
f"{path}.response_format",
|
||||
),
|
||||
llm=_serialize_static_value(
|
||||
getattr(router_config, "llm", None), diagnostics, f"{path}.llm"
|
||||
),
|
||||
routes=[str(route) for route in routes] if routes is not None else None,
|
||||
route_descriptions=getattr(router_config, "route_descriptions", None),
|
||||
default_intent=getattr(router_config, "default_intent", "converse"),
|
||||
fallback_intent=getattr(router_config, "fallback_intent", "converse"),
|
||||
intent_field=str(getattr(router_config, "intent_field", "intent")),
|
||||
)
|
||||
|
||||
|
||||
def _build_conversational_definition(
|
||||
flow_class: type,
|
||||
diagnostics: list[FlowDefinitionDiagnostic],
|
||||
) -> FlowConversationalDefinition | None:
|
||||
if not _is_conversational_flow(flow_class):
|
||||
return None
|
||||
|
||||
config = getattr(flow_class, "conversational_config", None)
|
||||
builtin_routes = getattr(flow_class, "builtin_routes", ("converse", "end"))
|
||||
internal_routes = getattr(
|
||||
flow_class,
|
||||
"internal_routes",
|
||||
("answer_from_history", "conversation_start"),
|
||||
)
|
||||
if config is None:
|
||||
return FlowConversationalDefinition(
|
||||
enabled=True,
|
||||
builtin_routes=[str(route) for route in builtin_routes],
|
||||
internal_routes=[str(route) for route in internal_routes],
|
||||
)
|
||||
|
||||
default_intents = getattr(config, "default_intents", None)
|
||||
visible_agent_outputs = getattr(config, "visible_agent_outputs", None)
|
||||
return FlowConversationalDefinition(
|
||||
enabled=True,
|
||||
system_prompt=getattr(config, "system_prompt", None),
|
||||
llm=_serialize_static_value(
|
||||
getattr(config, "llm", None), diagnostics, "conversational.llm"
|
||||
),
|
||||
router=_build_conversational_router_definition(
|
||||
getattr(config, "router", None),
|
||||
diagnostics,
|
||||
"conversational.router",
|
||||
),
|
||||
answer_from_history_prompt=getattr(config, "answer_from_history_prompt", None),
|
||||
default_intents=(
|
||||
[str(intent) for intent in default_intents]
|
||||
if default_intents is not None
|
||||
else None
|
||||
),
|
||||
intent_llm=_serialize_static_value(
|
||||
getattr(config, "intent_llm", None),
|
||||
diagnostics,
|
||||
"conversational.intent_llm",
|
||||
),
|
||||
answer_from_history_llm=_serialize_static_value(
|
||||
getattr(config, "answer_from_history_llm", None),
|
||||
diagnostics,
|
||||
"conversational.answer_from_history_llm",
|
||||
),
|
||||
visible_agent_outputs=(
|
||||
"all"
|
||||
if visible_agent_outputs == "all"
|
||||
else [str(output) for output in visible_agent_outputs]
|
||||
if visible_agent_outputs is not None
|
||||
else None
|
||||
),
|
||||
defer_trace_finalization=bool(
|
||||
getattr(config, "defer_trace_finalization", True)
|
||||
),
|
||||
builtin_routes=[str(route) for route in builtin_routes],
|
||||
internal_routes=[str(route) for route in internal_routes],
|
||||
)
|
||||
|
||||
|
||||
def _build_method_definition(
|
||||
method: Any,
|
||||
diagnostics: list[FlowDefinitionDiagnostic],
|
||||
@@ -377,11 +241,6 @@ def _build_method_definition(
|
||||
else:
|
||||
method_definition = fragment.model_copy(deep=True)
|
||||
|
||||
# Skip <locals>/<lambda> qualnames: they can never be re-imported, so a
|
||||
# missing handler is more honest than a dead reference.
|
||||
if "<" not in method.__qualname__:
|
||||
method_definition.handler = f"{method.__module__}:{method.__qualname__}"
|
||||
|
||||
human_feedback = _build_human_feedback_definition(
|
||||
method, diagnostics, f"{path}.human_feedback"
|
||||
)
|
||||
@@ -411,29 +270,6 @@ def _iter_flow_methods(flow_class: type) -> dict[str, Any]:
|
||||
flow_class, attr_value
|
||||
):
|
||||
methods[attr_name] = attr_value
|
||||
continue
|
||||
|
||||
inherited = _get_inherited_conversational_method(flow_class, attr_name)
|
||||
if inherited is not None and callable(attr_value):
|
||||
methods[attr_name] = _stamp_inherited_conversational_metadata(
|
||||
attr_value, inherited
|
||||
)
|
||||
|
||||
if _is_conversational_flow(flow_class):
|
||||
for base in reversed(flow_class.__mro__[1:]):
|
||||
for attr_name, raw_value in base.__dict__.items():
|
||||
if attr_name.startswith("_") or attr_name in methods:
|
||||
continue
|
||||
if not getattr(raw_value, "__conversational_only__", False):
|
||||
continue
|
||||
try:
|
||||
attr_value = getattr(flow_class, attr_name)
|
||||
except AttributeError:
|
||||
continue
|
||||
if is_flow_method(attr_value) and _should_include_flow_method(
|
||||
flow_class, attr_value
|
||||
):
|
||||
methods[attr_name] = attr_value
|
||||
|
||||
# A wrapped method whose name collides with a base Flow model field
|
||||
# (e.g. ``checkpoint``) is absorbed by Pydantic as a field; the underlying
|
||||
@@ -478,7 +314,6 @@ def _build_flow_definition_from_class(
|
||||
state=_build_state_definition(flow_class, diagnostics),
|
||||
config=_build_config_definition(flow_class, diagnostics),
|
||||
persist=_build_persistence_definition(flow_class, diagnostics, "persist"),
|
||||
conversational=_build_conversational_definition(flow_class, diagnostics),
|
||||
methods=methods,
|
||||
diagnostics=diagnostics,
|
||||
)
|
||||
|
||||
@@ -6,22 +6,15 @@ The implementation now lives in three modules, split by concern:
|
||||
``@router``, ``or_`` / ``and_``) and Python Flow class projection
|
||||
- ``crewai.flow.flow_definition`` -- the serializable Flow Definition contract
|
||||
- ``crewai.flow.runtime`` -- the Flow execution engine and state
|
||||
- ``crewai.experimental.conversational_mixin`` -- experimental conversational
|
||||
runtime extension composed onto the public ``Flow`` class
|
||||
|
||||
Prefer importing from those modules in new code; this module preserves the
|
||||
historical ``crewai.flow.flow`` import path.
|
||||
"""
|
||||
|
||||
from typing import Any, TypeVar
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from crewai.experimental.conversational_mixin import _ConversationalMixin
|
||||
from crewai.flow.dsl import and_, listen, or_, router, start
|
||||
from crewai.flow.runtime import (
|
||||
_INITIAL_STATE_CLASS_MARKER,
|
||||
Flow as RuntimeFlow,
|
||||
Flow,
|
||||
FlowMeta,
|
||||
FlowState,
|
||||
LockedDictProxy,
|
||||
@@ -30,13 +23,6 @@ from crewai.flow.runtime import (
|
||||
)
|
||||
|
||||
|
||||
T = TypeVar("T", bound=dict[str, Any] | BaseModel)
|
||||
|
||||
|
||||
class Flow(_ConversationalMixin, RuntimeFlow[T]):
|
||||
"""Public Flow class with experimental conversational extension behavior."""
|
||||
|
||||
|
||||
__all__ = [
|
||||
"_INITIAL_STATE_CLASS_MARKER",
|
||||
"Flow",
|
||||
|
||||
@@ -16,11 +16,6 @@ from typing import Any, Literal as TypingLiteral
|
||||
from pydantic import BaseModel, ConfigDict, Field
|
||||
import yaml
|
||||
|
||||
from crewai.flow.conversational_definition import (
|
||||
FlowConversationalDefinition,
|
||||
FlowConversationalRouterDefinition,
|
||||
)
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -28,8 +23,6 @@ FlowDefinitionCondition = str | dict[str, Any]
|
||||
|
||||
__all__ = [
|
||||
"FlowConfigDefinition",
|
||||
"FlowConversationalDefinition",
|
||||
"FlowConversationalRouterDefinition",
|
||||
"FlowDefinition",
|
||||
"FlowDefinitionCondition",
|
||||
"FlowDefinitionDiagnostic",
|
||||
@@ -52,9 +45,8 @@ class FlowDefinitionDiagnostic(BaseModel):
|
||||
class FlowStateDefinition(BaseModel):
|
||||
"""Static description of a Flow state contract."""
|
||||
|
||||
type: TypingLiteral["dict", "pydantic", "json_schema", "unknown"] = "dict"
|
||||
type: TypingLiteral["dict", "pydantic", "unknown"] = "dict"
|
||||
ref: str | None = None
|
||||
json_schema: dict[str, Any] | None = None
|
||||
default: Any = None
|
||||
|
||||
|
||||
@@ -94,7 +86,6 @@ class FlowHumanFeedbackDefinition(BaseModel):
|
||||
class FlowMethodDefinition(BaseModel):
|
||||
"""Static definition of one Flow method and its execution roles."""
|
||||
|
||||
handler: str | None = None
|
||||
start: bool | FlowDefinitionCondition | None = None
|
||||
listen: FlowDefinitionCondition | None = None
|
||||
router: bool = False
|
||||
@@ -124,7 +115,6 @@ class FlowDefinition(BaseModel):
|
||||
state: FlowStateDefinition | None = None
|
||||
config: FlowConfigDefinition = Field(default_factory=FlowConfigDefinition)
|
||||
persist: FlowPersistenceDefinition | None = None
|
||||
conversational: FlowConversationalDefinition | None = None
|
||||
methods: dict[str, FlowMethodDefinition] = Field(default_factory=dict)
|
||||
diagnostics: list[FlowDefinitionDiagnostic] = Field(default_factory=list)
|
||||
|
||||
|
||||
@@ -22,7 +22,6 @@ from concurrent.futures import Future, ThreadPoolExecutor
|
||||
import contextvars
|
||||
import copy
|
||||
import enum
|
||||
import importlib
|
||||
import inspect
|
||||
import logging
|
||||
import threading
|
||||
@@ -85,13 +84,17 @@ from crewai.events.types.flow_events import (
|
||||
MethodExecutionPausedEvent,
|
||||
MethodExecutionStartedEvent,
|
||||
)
|
||||
from crewai.experimental.conversational import (
|
||||
ConversationConfig,
|
||||
ConversationState,
|
||||
)
|
||||
from crewai.experimental.conversational_mixin import _ConversationalMixin
|
||||
from crewai.flow.dsl._utils import build_flow_definition
|
||||
from crewai.flow.flow_context import current_flow_id, current_flow_request_id
|
||||
from crewai.flow.flow_definition import (
|
||||
FlowDefinition,
|
||||
FlowDefinitionCondition,
|
||||
FlowMethodDefinition,
|
||||
FlowStateDefinition,
|
||||
)
|
||||
from crewai.flow.flow_wrappers import (
|
||||
FlowMethod,
|
||||
@@ -136,6 +139,7 @@ from crewai.utilities.streaming import (
|
||||
signal_end,
|
||||
signal_error,
|
||||
)
|
||||
from crewai.utilities.types import LLMMessage
|
||||
|
||||
|
||||
# Runtime alias so Pydantic can resolve the ``execution_context`` field's
|
||||
@@ -150,108 +154,14 @@ ExecutionContext = Any # type: ignore[assignment,misc]
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _condition_branches(
|
||||
condition: dict[str, Any],
|
||||
) -> tuple[Literal["and", "or"], list[FlowDefinitionCondition]]:
|
||||
if "and" in condition:
|
||||
return "and", condition["and"]
|
||||
return "or", condition["or"]
|
||||
|
||||
|
||||
def _condition_satisfied(condition: FlowDefinitionCondition, events: set[str]) -> bool:
|
||||
if isinstance(condition, str):
|
||||
return condition in events
|
||||
operator, branches = _condition_branches(condition)
|
||||
combine = all if operator == "and" else any
|
||||
return combine(_condition_satisfied(branch, events) for branch in branches)
|
||||
|
||||
|
||||
def _resolve_handler(ref: str) -> Callable[..., Any]:
|
||||
module_name, separator, qualname = ref.partition(":")
|
||||
if not separator or not module_name or not qualname:
|
||||
raise ValueError(
|
||||
f"invalid handler reference {ref!r}; expected 'module:qualname'"
|
||||
)
|
||||
module = importlib.import_module(module_name)
|
||||
target: Any = module
|
||||
for part in qualname.split("."):
|
||||
target = getattr(target, part)
|
||||
if not callable(target):
|
||||
raise TypeError(
|
||||
f"handler reference {ref!r} resolved to a non-callable "
|
||||
f"{type(target).__name__}"
|
||||
)
|
||||
return cast(Callable[..., Any], target)
|
||||
|
||||
|
||||
def _build_definition_state_model(
|
||||
state_definition: FlowStateDefinition,
|
||||
) -> BaseModel | None:
|
||||
kwargs = (
|
||||
dict(state_definition.default)
|
||||
if isinstance(state_definition.default, dict)
|
||||
else {}
|
||||
)
|
||||
|
||||
model_class: type[BaseModel] | None = None
|
||||
if state_definition.ref:
|
||||
try:
|
||||
resolved = _resolve_handler(state_definition.ref)
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"Could not import state ref %r", state_definition.ref, exc_info=True
|
||||
)
|
||||
else:
|
||||
if isinstance(resolved, type) and issubclass(resolved, BaseModel):
|
||||
model_class = resolved
|
||||
else:
|
||||
logger.warning(
|
||||
"State ref %r is not a pydantic model", state_definition.ref
|
||||
)
|
||||
|
||||
if model_class is None and state_definition.json_schema:
|
||||
from crewai.utilities.pydantic_schema_utils import create_model_from_schema
|
||||
|
||||
try:
|
||||
model_class = create_model_from_schema(state_definition.json_schema)
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"Could not build a state model from the declared json_schema",
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
if model_class is None:
|
||||
return None
|
||||
|
||||
if not issubclass(model_class, FlowState):
|
||||
|
||||
class StateWithId(FlowState, model_class): # type: ignore[misc, valid-type]
|
||||
pass
|
||||
|
||||
model_class = StateWithId
|
||||
return model_class(**kwargs)
|
||||
|
||||
|
||||
def _iter_condition_events(condition: FlowDefinitionCondition) -> Iterator[str]:
|
||||
if isinstance(condition, str):
|
||||
yield condition
|
||||
return
|
||||
|
||||
_, branches = _condition_branches(condition)
|
||||
for branch in branches:
|
||||
yield from _iter_condition_events(branch)
|
||||
|
||||
|
||||
def _or_alternative_events(condition: FlowDefinitionCondition) -> Iterator[str]:
|
||||
if isinstance(condition, str):
|
||||
yield condition
|
||||
return
|
||||
|
||||
operator, branches = _condition_branches(condition)
|
||||
if operator != "or":
|
||||
return
|
||||
for branch in branches:
|
||||
yield from _or_alternative_events(branch)
|
||||
sub_conditions = condition["and"] if "and" in condition else condition["or"]
|
||||
for sub_condition in sub_conditions:
|
||||
yield from _iter_condition_events(sub_condition)
|
||||
|
||||
|
||||
def _is_multi_event_or(
|
||||
@@ -260,8 +170,7 @@ def _is_multi_event_or(
|
||||
if isinstance(condition, str):
|
||||
return False
|
||||
|
||||
operator, branches = _condition_branches(condition)
|
||||
return operator == "or" and len(branches) > 1
|
||||
return "or" in condition and len(condition["or"]) > 1
|
||||
|
||||
|
||||
def _resolve_persistence(value: Any) -> Any:
|
||||
@@ -707,7 +616,7 @@ class FlowMeta(ModelMetaclass):
|
||||
return super().__new__(mcs, name, bases, namespace)
|
||||
|
||||
|
||||
class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
"""Base class for all flows.
|
||||
|
||||
type parameter T must be either dict[str, Any] or a subclass of BaseModel."""
|
||||
@@ -721,33 +630,41 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
|
||||
_flow_definition: ClassVar[FlowDefinition | None] = None
|
||||
|
||||
# === EXPERIMENTAL: conversational mode ===
|
||||
# When ``conversational = True`` on a subclass, the built-in conversational
|
||||
# graph (``conversation_start`` -> ``route_conversation`` -> ``converse_turn``
|
||||
# / ``end_conversation`` / ``answer_from_history_turn``) registers and
|
||||
# ``handle_turn`` / ``chat`` become the chat entry points. When ``False``
|
||||
# (default), the methods exist as inert attributes and never register or
|
||||
# fire — non-chat flows pay no runtime cost.
|
||||
#
|
||||
# ⚠ EXPERIMENTAL FEATURE. The whole conversational surface
|
||||
# (``conversational`` ClassVar, ``handle_turn``, ``chat``,
|
||||
# ``ConversationConfig``, ``RouterConfig``, ``ConversationState``, the
|
||||
# built-in graph + helpers) lives under ``crewai.experimental`` and may
|
||||
# change shape before graduating. Pin your CrewAI version if you depend on
|
||||
# specific behavior, and watch the changelog for breaking updates.
|
||||
conversational: ClassVar[bool] = False
|
||||
conversational_config: ClassVar[ConversationConfig | None] = None
|
||||
builtin_routes: ClassVar[tuple[str, ...]] = ("converse", "end")
|
||||
internal_routes: ClassVar[tuple[str, ...]] = (
|
||||
"answer_from_history",
|
||||
"conversation_start",
|
||||
)
|
||||
builtin_route_descriptions: ClassVar[dict[str, str]] = {
|
||||
"converse": (
|
||||
"Ordinary chat, follow-ups, summaries, clarifications, and "
|
||||
"questions answerable from prior conversation history."
|
||||
),
|
||||
"end": ("User signals the conversation is finished (goodbye, exit, done)."),
|
||||
"answer_from_history": (
|
||||
"Answer directly from prior conversation history without invoking "
|
||||
"tools, agents, or custom routes."
|
||||
),
|
||||
}
|
||||
|
||||
entity_type: Literal["flow"] = "flow"
|
||||
|
||||
def _initialize_runtime_extension_attrs(self) -> None:
|
||||
"""Initialize optional runtime-extension attributes."""
|
||||
|
||||
def _create_default_extension_state(self) -> Any | None:
|
||||
"""Return a default state supplied by an optional runtime extension."""
|
||||
return None
|
||||
|
||||
def _should_apply_pending_kickoff_context(self) -> bool:
|
||||
"""Whether an optional runtime extension has pending kickoff context."""
|
||||
return False
|
||||
|
||||
def _apply_pending_kickoff_context(self) -> None:
|
||||
"""Apply optional runtime-extension kickoff context."""
|
||||
|
||||
def _order_start_methods_for_kickoff(
|
||||
self,
|
||||
start_methods: list[FlowMethodName],
|
||||
) -> tuple[list[FlowMethodName], bool]:
|
||||
"""Allow an optional runtime extension to order kickoff start methods."""
|
||||
return start_methods, False
|
||||
|
||||
def _should_defer_trace_finalization(self) -> bool:
|
||||
"""Whether this kickoff should defer final flow trace finalization."""
|
||||
return bool(getattr(self, "defer_trace_finalization", False))
|
||||
|
||||
@classmethod
|
||||
def flow_definition(cls) -> FlowDefinition:
|
||||
"""Return the static Flow Definition built from this Flow class."""
|
||||
@@ -758,24 +675,21 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
return flow_definition
|
||||
|
||||
@classmethod
|
||||
def from_definition(cls, definition: FlowDefinition) -> Flow[Any]:
|
||||
"""Build a runnable Flow directly from a definition; no subclass required."""
|
||||
return cls.model_validate({}, context={"flow_definition": definition})
|
||||
|
||||
def _start_method_names(self) -> list[FlowMethodName]:
|
||||
def _start_method_names(cls) -> list[FlowMethodName]:
|
||||
return [
|
||||
FlowMethodName(method_name)
|
||||
for method_name, method_definition in self._definition.methods.items()
|
||||
for method_name, method_definition in cls.flow_definition().methods.items()
|
||||
if method_definition.is_start
|
||||
]
|
||||
|
||||
@classmethod
|
||||
def _listener_methods(
|
||||
self,
|
||||
cls,
|
||||
) -> Iterator[tuple[FlowMethodName, FlowMethodDefinition, FlowDefinitionCondition]]:
|
||||
# (name, definition, condition) for every non-start method that listens.
|
||||
# Routers are included (they listen too); callers wanting only plain
|
||||
# listeners filter on definition.router.
|
||||
for method_name, method_definition in self._definition.methods.items():
|
||||
for method_name, method_definition in cls.flow_definition().methods.items():
|
||||
if method_definition.listen is not None and not method_definition.is_start:
|
||||
yield (
|
||||
FlowMethodName(method_name),
|
||||
@@ -783,22 +697,25 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
method_definition.listen,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def _start_condition(
|
||||
self, method_name: FlowMethodName
|
||||
cls, method_name: FlowMethodName
|
||||
) -> FlowDefinitionCondition | None:
|
||||
method_definition = self._definition.methods[str(method_name)]
|
||||
method_definition = cls.flow_definition().methods[str(method_name)]
|
||||
start = method_definition.start
|
||||
if isinstance(start, (str, dict)):
|
||||
return start
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def _listen_condition(
|
||||
self, method_name: FlowMethodName
|
||||
cls, method_name: FlowMethodName
|
||||
) -> FlowDefinitionCondition | None:
|
||||
return self._definition.methods[str(method_name)].listen
|
||||
return cls.flow_definition().methods[str(method_name)].listen
|
||||
|
||||
def _is_router(self, method_name: FlowMethodName) -> bool:
|
||||
return self._definition.methods[str(method_name)].router
|
||||
@classmethod
|
||||
def _is_router(cls, method_name: FlowMethodName) -> bool:
|
||||
return cls.flow_definition().methods[str(method_name)].router
|
||||
|
||||
initial_state: Annotated[ # type: ignore[type-arg]
|
||||
type[BaseModel] | type[dict] | dict[str, Any] | BaseModel | None,
|
||||
@@ -941,13 +858,13 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
restore_event_scope(())
|
||||
reset_last_event_id()
|
||||
|
||||
_methods: dict[FlowMethodName, Callable[..., Any]] = PrivateAttr(
|
||||
_methods: dict[FlowMethodName, FlowMethod[Any, Any]] = PrivateAttr(
|
||||
default_factory=dict
|
||||
)
|
||||
_method_execution_counts: dict[FlowMethodName, int] = PrivateAttr(
|
||||
default_factory=dict
|
||||
)
|
||||
_pending_events: dict[PendingListenerKey, set[str]] = PrivateAttr(
|
||||
_pending_and_listeners: dict[PendingListenerKey, set[int]] = PrivateAttr(
|
||||
default_factory=dict
|
||||
)
|
||||
_fired_or_listeners: set[FlowMethodName] = PrivateAttr(default_factory=set)
|
||||
@@ -955,7 +872,6 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
PrivateAttr(default=None)
|
||||
)
|
||||
_method_outputs: list[Any] = PrivateAttr(default_factory=list)
|
||||
_definition: FlowDefinition = PrivateAttr()
|
||||
_state_lock: threading.Lock = PrivateAttr(default_factory=threading.Lock)
|
||||
_or_listeners_lock: threading.Lock = PrivateAttr(default_factory=threading.Lock)
|
||||
_completed_methods: set[FlowMethodName] = PrivateAttr(default_factory=set)
|
||||
@@ -966,6 +882,10 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
_human_feedback_method_outputs: dict[str, Any] = PrivateAttr(default_factory=dict)
|
||||
_input_history: list[InputHistoryEntry] = PrivateAttr(default_factory=list)
|
||||
_state: Any = PrivateAttr(default=None)
|
||||
_conversation_messages: list[LLMMessage] = PrivateAttr(default_factory=list)
|
||||
_pending_user_message: str | dict[str, Any] | None = PrivateAttr(default=None)
|
||||
_pending_intents: Sequence[str] | None = PrivateAttr(default=None)
|
||||
_pending_intent_llm: str | "BaseLLM" | None = PrivateAttr(default=None)
|
||||
_deferred_flow_started_event_id: str | None = PrivateAttr(default=None)
|
||||
|
||||
def __class_getitem__(cls: type[Flow[T]], item: type[T]) -> type[Flow[T]]: # type: ignore[override]
|
||||
@@ -984,26 +904,13 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
object.__setattr__(self, name, value)
|
||||
|
||||
def model_post_init(self, __context: Any) -> None:
|
||||
definition = (
|
||||
__context.get("flow_definition") if isinstance(__context, dict) else None
|
||||
)
|
||||
self._flow_post_init(definition)
|
||||
self._flow_post_init()
|
||||
|
||||
def _flow_post_init(self, definition: FlowDefinition | None = None) -> None:
|
||||
def _flow_post_init(self) -> None:
|
||||
"""Heavy initialization: state creation, events, memory, method registration."""
|
||||
if getattr(self, "_flow_post_init_done", False):
|
||||
return
|
||||
object.__setattr__(self, "_flow_post_init_done", True)
|
||||
self._initialize_runtime_extension_attrs()
|
||||
|
||||
self._definition = definition or type(self).flow_definition()
|
||||
if self.name and self.name != self._definition.name:
|
||||
self._definition = self._definition.model_copy(update={"name": self.name})
|
||||
methods = (
|
||||
self._handler_bound_methods()
|
||||
if definition is not None
|
||||
else self._class_bound_methods()
|
||||
)
|
||||
|
||||
if self._state is None:
|
||||
self._state = self._create_initial_state()
|
||||
@@ -1019,7 +926,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
self,
|
||||
FlowCreatedEvent(
|
||||
type="flow_created",
|
||||
flow_name=self._definition.name,
|
||||
flow_name=self.name or self.__class__.__name__,
|
||||
),
|
||||
)
|
||||
|
||||
@@ -1029,44 +936,17 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
if self.memory is None and not getattr(self, "_skip_auto_memory", False):
|
||||
from crewai.memory.utils import sanitize_scope_name
|
||||
|
||||
flow_name = sanitize_scope_name(self._definition.name)
|
||||
flow_name = sanitize_scope_name(self.name or self.__class__.__name__)
|
||||
self.memory = Memory(root_scope=f"/flow/{flow_name}")
|
||||
|
||||
self._methods.update(methods)
|
||||
|
||||
def _handler_bound_methods(self) -> dict[FlowMethodName, Callable[..., Any]]:
|
||||
methods: dict[FlowMethodName, Callable[..., Any]] = {}
|
||||
unresolved: list[str] = []
|
||||
for method_name, method_definition in self._definition.methods.items():
|
||||
if method_definition.handler is None:
|
||||
unresolved.append(f"{method_name}: no handler")
|
||||
continue
|
||||
try:
|
||||
handler = _resolve_handler(method_definition.handler)
|
||||
except Exception as e:
|
||||
unresolved.append(f"{method_name}: {e}")
|
||||
continue
|
||||
if getattr(handler, "__self__", None) is None:
|
||||
handler = handler.__get__(self, type(self))
|
||||
methods[FlowMethodName(method_name)] = handler
|
||||
if unresolved:
|
||||
raise ValueError(
|
||||
f"Cannot build flow {self._definition.name!r} from its definition; "
|
||||
"methods with missing or unresolvable handlers: "
|
||||
+ "; ".join(unresolved)
|
||||
)
|
||||
return methods
|
||||
|
||||
def _class_bound_methods(self) -> dict[FlowMethodName, Callable[..., Any]]:
|
||||
methods: dict[FlowMethodName, Callable[..., Any]] = {}
|
||||
for method_name in self._definition.methods:
|
||||
# Build the runtime method lookup from the static FlowDefinition.
|
||||
for method_name in type(self).flow_definition().methods:
|
||||
method = getattr(self, method_name, None)
|
||||
if method is None:
|
||||
continue
|
||||
if not hasattr(method, "__self__"):
|
||||
method = method.__get__(self, type(self))
|
||||
methods[FlowMethodName(method_name)] = method
|
||||
return methods
|
||||
method = method.__get__(self, self.__class__)
|
||||
self._methods[FlowMethodName(method_name)] = method
|
||||
|
||||
def recall(self, query: str, **kwargs: Any) -> Any:
|
||||
"""Recall relevant memories. Delegates to this flow's memory.
|
||||
@@ -1144,11 +1024,14 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
def _start_condition_triggered_by(
|
||||
self, method_name: FlowMethodName, trigger: FlowMethodName
|
||||
) -> bool:
|
||||
condition = self._start_condition(method_name)
|
||||
condition = type(self)._start_condition(method_name)
|
||||
if condition is None:
|
||||
return False
|
||||
return self._condition_met(
|
||||
condition, trigger, PendingListenerKey(f"start:{method_name}")
|
||||
return self._evaluate_condition(
|
||||
condition,
|
||||
trigger,
|
||||
method_name,
|
||||
pending_key_prefix=f"start:{method_name}",
|
||||
)
|
||||
|
||||
def _rearm_or_listeners_for_trigger(
|
||||
@@ -1172,7 +1055,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
trigger_str = str(trigger)
|
||||
to_discard: list[FlowMethodName] = []
|
||||
for listener_name in candidates:
|
||||
condition = self._listen_condition(listener_name)
|
||||
condition = type(self)._listen_condition(listener_name)
|
||||
if condition is None:
|
||||
continue
|
||||
if trigger_str in _iter_condition_events(condition):
|
||||
@@ -1188,13 +1071,12 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
# Only events that EXCLUSIVELY feed one OR listener race; an event that
|
||||
# also feeds another listener (e.g. an AND) is left alone when a sibling
|
||||
# wins. e.g. @listen(or_(a, b)) on handler -> {frozenset({a, b}): handler}.
|
||||
# Events nested under an and_() branch (e.g. or_(and_(a, b), c)) are not
|
||||
# alternatives and never race -- cancelling one would make the AND
|
||||
# unsatisfiable.
|
||||
racing_groups: dict[frozenset[FlowMethodName], FlowMethodName] = {}
|
||||
listener_conditions: dict[FlowMethodName, FlowDefinitionCondition] = {
|
||||
listener_name: condition
|
||||
for listener_name, method_definition, condition in self._listener_methods()
|
||||
for listener_name, method_definition, condition in type(
|
||||
self
|
||||
)._listener_methods()
|
||||
if not method_definition.router
|
||||
}
|
||||
|
||||
@@ -1211,14 +1093,14 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
for listener_name, condition in listener_conditions.items():
|
||||
if not isinstance(condition, dict):
|
||||
continue
|
||||
alternatives = set(_or_alternative_events(condition))
|
||||
if len(alternatives) <= 1:
|
||||
events = events_by_listener[listener_name]
|
||||
if "or" not in condition or len(events) <= 1:
|
||||
continue
|
||||
|
||||
exclusive_events = {
|
||||
event
|
||||
for event in alternatives
|
||||
if listeners_by_event[event] == {listener_name}
|
||||
for event in events
|
||||
if listeners_by_event.get(event, set()) == {listener_name}
|
||||
}
|
||||
if len(exclusive_events) > 1:
|
||||
# Racing only applies to method-completion events: each member is
|
||||
@@ -1467,7 +1349,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
self,
|
||||
FlowStartedEvent(
|
||||
type="flow_started",
|
||||
flow_name=self._definition.name,
|
||||
flow_name=self.name or self.__class__.__name__,
|
||||
inputs=None,
|
||||
),
|
||||
)
|
||||
@@ -1543,7 +1425,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
self,
|
||||
MethodExecutionFinishedEvent(
|
||||
type="method_execution_finished",
|
||||
flow_name=self._definition.name,
|
||||
flow_name=self.name or self.__class__.__name__,
|
||||
method_name=context.method_name,
|
||||
result=collapsed_outcome if emit else result,
|
||||
state=self._state,
|
||||
@@ -1597,7 +1479,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
self,
|
||||
FlowPausedEvent(
|
||||
type="flow_paused",
|
||||
flow_name=self._definition.name,
|
||||
flow_name=self.name or self.__class__.__name__,
|
||||
flow_id=e.context.flow_id,
|
||||
method_name=e.context.method_name,
|
||||
state=self._copy_and_serialize_state(),
|
||||
@@ -1625,7 +1507,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
self,
|
||||
FlowFinishedEvent(
|
||||
type="flow_finished",
|
||||
flow_name=self._definition.name,
|
||||
flow_name=self.name or self.__class__.__name__,
|
||||
result=final_result,
|
||||
state=self._copy_and_serialize_state(),
|
||||
),
|
||||
@@ -1658,15 +1540,20 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
"""
|
||||
init_state = self.initial_state
|
||||
|
||||
if init_state is None:
|
||||
extension_state = self._create_default_extension_state()
|
||||
if extension_state is not None:
|
||||
return cast(T, extension_state)
|
||||
# Conversational subclasses default to ``ConversationState`` if the
|
||||
# user didn't supply an explicit type parameter (``Flow[...]``) or an
|
||||
# ``initial_state``. This makes ``class MyChat(Flow): conversational
|
||||
# = True`` work without forcing every user to import and parameterize
|
||||
# ``ConversationState`` themselves.
|
||||
if (
|
||||
init_state is None
|
||||
and getattr(type(self), "conversational", False)
|
||||
and not hasattr(self, "_initial_state_t")
|
||||
):
|
||||
return cast(T, ConversationState())
|
||||
|
||||
if init_state is None and hasattr(self, "_initial_state_t"):
|
||||
state_type = self._initial_state_t
|
||||
if isinstance(state_type, TypeVar):
|
||||
state_type = None
|
||||
if isinstance(state_type, type):
|
||||
if issubclass(state_type, FlowState):
|
||||
instance = state_type()
|
||||
@@ -1686,7 +1573,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
return cast(T, {"id": str(uuid4())})
|
||||
|
||||
if init_state is None:
|
||||
return cast(T, self._create_definition_state())
|
||||
return cast(T, {"id": str(uuid4())})
|
||||
|
||||
if isinstance(init_state, type):
|
||||
state_class = init_state
|
||||
@@ -1728,34 +1615,6 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
f"Initial state must be dict or BaseModel, got {type(self.initial_state)}"
|
||||
)
|
||||
|
||||
def _create_definition_state(self) -> dict[str, Any] | BaseModel:
|
||||
state_definition = self._definition.state
|
||||
if state_definition is None:
|
||||
return {"id": str(uuid4())}
|
||||
if state_definition.type in ("pydantic", "json_schema"):
|
||||
state = _build_definition_state_model(state_definition)
|
||||
if state is not None:
|
||||
return state
|
||||
logger.error(
|
||||
"Flow %r declares %s state but neither ref nor json_schema "
|
||||
"produced a model; falling back to dict state",
|
||||
self._definition.name,
|
||||
state_definition.type,
|
||||
)
|
||||
elif state_definition.type == "unknown":
|
||||
logger.warning(
|
||||
"Flow %r declares state of unknown type; falling back to dict state",
|
||||
self._definition.name,
|
||||
)
|
||||
dict_state: dict[str, Any] = (
|
||||
dict(state_definition.default)
|
||||
if isinstance(state_definition.default, dict)
|
||||
else {}
|
||||
)
|
||||
if "id" not in dict_state:
|
||||
dict_state["id"] = str(uuid4())
|
||||
return dict_state
|
||||
|
||||
def _copy_state(self) -> T:
|
||||
"""Create a copy of the current state.
|
||||
|
||||
@@ -2169,7 +2028,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
# Clear completed methods and outputs for a fresh start
|
||||
self._completed_methods.clear()
|
||||
self._method_outputs.clear()
|
||||
self._pending_events.clear()
|
||||
self._pending_and_listeners.clear()
|
||||
self._clear_or_listeners()
|
||||
self._method_call_counts.clear()
|
||||
else:
|
||||
@@ -2264,11 +2123,12 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
|
||||
if should_emit_flow_started:
|
||||
# In normal flows, each kickoff owns its own flow lifecycle.
|
||||
# Deferred sessions reuse the first flow scope until an
|
||||
# explicit finalization call closes the batch.
|
||||
# Deferred conversational sessions are different: the first
|
||||
# turn opens the flow scope and later turns reuse it until
|
||||
# ``finalize_session_traces()`` emits the single finish event.
|
||||
started_event = FlowStartedEvent(
|
||||
type="flow_started",
|
||||
flow_name=self._definition.name,
|
||||
flow_name=self.name or self.__class__.__name__,
|
||||
inputs=inputs,
|
||||
)
|
||||
future = crewai_event_bus.emit(self, started_event)
|
||||
@@ -2295,8 +2155,16 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
# with implicit "crew" execution_type.
|
||||
get_env_context()
|
||||
|
||||
if self._should_apply_pending_kickoff_context():
|
||||
self._apply_pending_kickoff_context()
|
||||
# Conversational hook: apply the pending user message AFTER state
|
||||
# restore and AFTER flow scope initialization, so transcript events
|
||||
# are parented under the current conversation trace.
|
||||
# ``handle_turn`` stashes the message on ``self._pending_user_message``
|
||||
# before calling ``kickoff``; this drains it.
|
||||
if (
|
||||
getattr(type(self), "conversational", False)
|
||||
and self._pending_user_message is not None
|
||||
):
|
||||
self._apply_pending_conversational_turn()
|
||||
|
||||
if inputs is not None and "id" not in inputs:
|
||||
self._initialize_state(inputs)
|
||||
@@ -2308,29 +2176,22 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
# Determine which start methods to execute at kickoff
|
||||
# Conditional start methods are only triggered by their conditions
|
||||
# UNLESS there are no unconditional starts (then all starts run as entry points)
|
||||
start_methods = self._start_method_names()
|
||||
start_methods = type(self)._start_method_names()
|
||||
unconditional_starts = [
|
||||
start_method
|
||||
for start_method in start_methods
|
||||
if self._start_condition(start_method) is None
|
||||
if type(self)._start_condition(start_method) is None
|
||||
]
|
||||
# If there are unconditional starts, only run those at kickoff
|
||||
# If there are NO unconditional starts, run all starts (including conditional ones)
|
||||
starts_to_execute = (
|
||||
unconditional_starts if unconditional_starts else start_methods
|
||||
)
|
||||
starts_to_execute, run_starts_sequentially = (
|
||||
self._order_start_methods_for_kickoff(starts_to_execute)
|
||||
)
|
||||
if run_starts_sequentially:
|
||||
for start_method in starts_to_execute:
|
||||
await self._execute_start_method(start_method)
|
||||
else:
|
||||
tasks = [
|
||||
self._execute_start_method(start_method)
|
||||
for start_method in starts_to_execute
|
||||
]
|
||||
await asyncio.gather(*tasks)
|
||||
tasks = [
|
||||
self._execute_start_method(start_method)
|
||||
for start_method in starts_to_execute
|
||||
]
|
||||
await asyncio.gather(*tasks)
|
||||
except Exception as e:
|
||||
# Check if flow was paused for human feedback
|
||||
from crewai.flow.async_feedback.types import HumanFeedbackPending
|
||||
@@ -2360,7 +2221,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
self,
|
||||
FlowPausedEvent(
|
||||
type="flow_paused",
|
||||
flow_name=self._definition.name,
|
||||
flow_name=self.name or self.__class__.__name__,
|
||||
flow_id=e.context.flow_id,
|
||||
method_name=e.context.method_name,
|
||||
state=self._copy_and_serialize_state(),
|
||||
@@ -2402,15 +2263,16 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
|
||||
# When ``defer_trace_finalization`` is set, skip both per-turn
|
||||
# ``FlowFinishedEvent`` AND trace-batch finalization. The caller
|
||||
# invokes the matching finalization hook once at session end. The
|
||||
# flag is read from either the instance attribute or an extension
|
||||
# definition.
|
||||
# invokes ``finalize_session_traces()`` once at session end to
|
||||
# close out the whole conversation as one trace. The flag is
|
||||
# read from EITHER the instance attribute (set by user code) OR
|
||||
# the class-level ``ConversationConfig.defer_trace_finalization``.
|
||||
if not self._should_defer_trace_finalization():
|
||||
future = crewai_event_bus.emit(
|
||||
self,
|
||||
FlowFinishedEvent(
|
||||
type="flow_finished",
|
||||
flow_name=self._definition.name,
|
||||
flow_name=self.name or self.__class__.__name__,
|
||||
result=final_output,
|
||||
state=self._copy_and_serialize_state(),
|
||||
),
|
||||
@@ -2484,7 +2346,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
MethodExecutionFinishedEvent,
|
||||
MethodExecutionFailedEvent,
|
||||
)
|
||||
flow_name = self._definition.name
|
||||
flow_name = self.name or self.__class__.__name__
|
||||
nodes = sorted(
|
||||
(
|
||||
n
|
||||
@@ -2543,7 +2405,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
)
|
||||
|
||||
# If start method is a router, use its result as an additional trigger
|
||||
if self._is_router(start_method_name) and result is not None:
|
||||
if type(self)._is_router(start_method_name) and result is not None:
|
||||
# Execute listeners for the start method name first
|
||||
await self._execute_listeners(start_method_name, result, finished_event_id)
|
||||
# Then execute listeners for the router result (e.g., "approved")
|
||||
@@ -2563,16 +2425,15 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
def _inject_trigger_payload_for_start_method(
|
||||
self, original_method: Callable[..., Any]
|
||||
) -> Callable[..., Any]:
|
||||
accepts_trigger_payload = (
|
||||
"crewai_trigger_payload" in inspect.signature(original_method).parameters
|
||||
)
|
||||
|
||||
def prepare_kwargs(
|
||||
*args: Any, **kwargs: Any
|
||||
) -> tuple[tuple[Any, ...], dict[str, Any]]:
|
||||
inputs = cast(dict[str, Any], baggage.get_baggage("flow_inputs") or {})
|
||||
trigger_payload = inputs.get("crewai_trigger_payload")
|
||||
|
||||
sig = inspect.signature(original_method)
|
||||
accepts_trigger_payload = "crewai_trigger_payload" in sig.parameters
|
||||
|
||||
if trigger_payload is not None and accepts_trigger_payload:
|
||||
kwargs["crewai_trigger_payload"] = trigger_payload
|
||||
elif trigger_payload is not None:
|
||||
@@ -2622,7 +2483,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
MethodExecutionStartedEvent(
|
||||
type="method_execution_started",
|
||||
method_name=method_name,
|
||||
flow_name=self._definition.name,
|
||||
flow_name=self.name or self.__class__.__name__,
|
||||
params=dumped_params,
|
||||
state=self._copy_and_serialize_state(),
|
||||
),
|
||||
@@ -2674,7 +2535,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
finished_event = MethodExecutionFinishedEvent(
|
||||
type="method_execution_finished",
|
||||
method_name=method_name,
|
||||
flow_name=self._definition.name,
|
||||
flow_name=self.name or self.__class__.__name__,
|
||||
state=self._copy_and_serialize_state(),
|
||||
result=result,
|
||||
)
|
||||
@@ -2703,7 +2564,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
MethodExecutionPausedEvent(
|
||||
type="method_execution_paused",
|
||||
method_name=method_name,
|
||||
flow_name=self._definition.name,
|
||||
flow_name=self.name or self.__class__.__name__,
|
||||
state=self._copy_and_serialize_state(),
|
||||
flow_id=e.context.flow_id,
|
||||
message=e.context.message,
|
||||
@@ -2719,7 +2580,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
MethodExecutionFailedEvent(
|
||||
type="method_execution_failed",
|
||||
method_name=method_name,
|
||||
flow_name=self._definition.name,
|
||||
flow_name=self.name or self.__class__.__name__,
|
||||
error=e,
|
||||
),
|
||||
)
|
||||
@@ -2851,7 +2712,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
await asyncio.gather(*tasks)
|
||||
|
||||
if current_trigger in router_results:
|
||||
for method_name in self._start_method_names():
|
||||
for method_name in type(self)._start_method_names():
|
||||
if self._start_condition_triggered_by(
|
||||
method_name, current_trigger
|
||||
):
|
||||
@@ -2864,25 +2725,72 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
else:
|
||||
await self._execute_start_method(method_name)
|
||||
|
||||
def _condition_met(
|
||||
def _evaluate_condition(
|
||||
self,
|
||||
condition: FlowDefinitionCondition,
|
||||
trigger_method: FlowMethodName,
|
||||
subscription_key: PendingListenerKey,
|
||||
listener_name: FlowMethodName,
|
||||
pending_key_prefix: str | None = None,
|
||||
) -> bool:
|
||||
seen = self._pending_events.setdefault(subscription_key, set())
|
||||
seen.add(str(trigger_method))
|
||||
if not _condition_satisfied(condition, seen):
|
||||
return False
|
||||
del self._pending_events[subscription_key]
|
||||
return True
|
||||
if isinstance(condition, str):
|
||||
return condition == str(trigger_method)
|
||||
|
||||
def _sub_prefix(index: int) -> str | None:
|
||||
if pending_key_prefix is None:
|
||||
return None
|
||||
return f"{pending_key_prefix}:{index}"
|
||||
|
||||
if "or" in condition:
|
||||
# Evaluate every sub-condition (no short-circuit): a nested and_()
|
||||
# branch needs the chance to clear its pending state in
|
||||
# _pending_and_listeners even when an earlier branch already matched.
|
||||
any_matched = False
|
||||
for index, sub_condition in enumerate(condition["or"]):
|
||||
if self._evaluate_condition(
|
||||
sub_condition,
|
||||
trigger_method,
|
||||
listener_name,
|
||||
pending_key_prefix=_sub_prefix(index),
|
||||
):
|
||||
any_matched = True
|
||||
return any_matched
|
||||
|
||||
sub_conditions = condition["and"]
|
||||
pending_key = PendingListenerKey(
|
||||
pending_key_prefix
|
||||
if pending_key_prefix is not None
|
||||
else f"{listener_name}:{id(condition)}"
|
||||
)
|
||||
|
||||
if pending_key not in self._pending_and_listeners:
|
||||
self._pending_and_listeners[pending_key] = set(range(len(sub_conditions)))
|
||||
|
||||
pending_conditions = self._pending_and_listeners[pending_key]
|
||||
for index, sub_condition in enumerate(sub_conditions):
|
||||
if index not in pending_conditions:
|
||||
continue
|
||||
if self._evaluate_condition(
|
||||
sub_condition,
|
||||
trigger_method,
|
||||
listener_name,
|
||||
pending_key_prefix=_sub_prefix(index),
|
||||
):
|
||||
pending_conditions.discard(index)
|
||||
|
||||
if not pending_conditions:
|
||||
self._pending_and_listeners.pop(pending_key, None)
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def _find_triggered_methods(
|
||||
self, trigger_method: FlowMethodName, router_only: bool
|
||||
) -> list[FlowMethodName]:
|
||||
triggered: list[FlowMethodName] = []
|
||||
|
||||
for listener_name, method_definition, condition in self._listener_methods():
|
||||
for listener_name, method_definition, condition in type(
|
||||
self
|
||||
)._listener_methods():
|
||||
is_router = method_definition.router
|
||||
if router_only != is_router:
|
||||
continue
|
||||
@@ -2891,8 +2799,10 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
if should_check_fired and listener_name in self._fired_or_listeners:
|
||||
continue
|
||||
|
||||
if self._condition_met(
|
||||
condition, trigger_method, PendingListenerKey(str(listener_name))
|
||||
if self._evaluate_condition(
|
||||
condition,
|
||||
trigger_method,
|
||||
listener_name,
|
||||
):
|
||||
triggered.append(listener_name)
|
||||
if should_check_fired:
|
||||
@@ -2948,10 +2858,10 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
|
||||
# For routers, also check if any conditional starts they triggered are completed
|
||||
# If so, continue their chains
|
||||
if self._is_router(listener_name):
|
||||
for start_method_name in self._start_method_names():
|
||||
if type(self)._is_router(listener_name):
|
||||
for start_method_name in type(self)._start_method_names():
|
||||
if (
|
||||
self._start_condition(start_method_name) is not None
|
||||
type(self)._start_condition(start_method_name) is not None
|
||||
and start_method_name in self._completed_methods
|
||||
):
|
||||
# This conditional start was executed, continue its chain
|
||||
@@ -2970,7 +2880,8 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
method = self._methods[listener_name]
|
||||
|
||||
sig = inspect.signature(method)
|
||||
method_params = [p for p in sig.parameters.values() if p.name != "self"]
|
||||
params = list(sig.parameters.values())
|
||||
method_params = [p for p in params if p.name != "self"]
|
||||
|
||||
if triggering_event_id:
|
||||
with triggered_by_scope(triggering_event_id):
|
||||
@@ -3026,7 +2937,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
return self.input_provider
|
||||
if flow_config.input_provider is not None:
|
||||
return flow_config.input_provider
|
||||
return cast(InputProvider, ConsoleProvider())
|
||||
return ConsoleProvider()
|
||||
|
||||
def _checkpoint_state_for_ask(self) -> None:
|
||||
"""Auto-checkpoint flow state before waiting for user input.
|
||||
@@ -3126,7 +3037,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
self,
|
||||
FlowInputRequestedEvent(
|
||||
type="flow_input_requested",
|
||||
flow_name=self._definition.name,
|
||||
flow_name=self.name or self.__class__.__name__,
|
||||
method_name=method_name,
|
||||
message=message,
|
||||
metadata=metadata,
|
||||
@@ -3145,7 +3056,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
executor = ThreadPoolExecutor(max_workers=1)
|
||||
ctx = contextvars.copy_context()
|
||||
future = executor.submit(
|
||||
ctx.run, provider.request_input, message, cast(Any, self), metadata
|
||||
ctx.run, provider.request_input, message, self, metadata
|
||||
)
|
||||
try:
|
||||
raw = future.result(timeout=timeout)
|
||||
@@ -3158,9 +3069,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
# cancel_futures=True cleans up any queued-but-not-started tasks.
|
||||
executor.shutdown(wait=False, cancel_futures=True)
|
||||
else:
|
||||
raw = provider.request_input(
|
||||
message, cast(Any, self), metadata=metadata
|
||||
)
|
||||
raw = provider.request_input(message, self, metadata=metadata)
|
||||
except KeyboardInterrupt:
|
||||
raise
|
||||
except Exception:
|
||||
@@ -3193,7 +3102,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
self,
|
||||
FlowInputReceivedEvent(
|
||||
type="flow_input_received",
|
||||
flow_name=self._definition.name,
|
||||
flow_name=self.name or self.__class__.__name__,
|
||||
method_name=method_name,
|
||||
message=message,
|
||||
response=response,
|
||||
@@ -3231,7 +3140,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
self,
|
||||
HumanFeedbackRequestedEvent(
|
||||
type="human_feedback_requested",
|
||||
flow_name=self._definition.name,
|
||||
flow_name=self.name or self.__class__.__name__,
|
||||
method_name="", # Will be set by decorator if needed
|
||||
output=output,
|
||||
message=message,
|
||||
@@ -3260,7 +3169,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
self,
|
||||
HumanFeedbackReceivedEvent(
|
||||
type="human_feedback_received",
|
||||
flow_name=self._definition.name,
|
||||
flow_name=self.name or self.__class__.__name__,
|
||||
method_name="", # Will be set by decorator if needed
|
||||
feedback=feedback,
|
||||
outcome=None, # Will be determined after collapsing
|
||||
@@ -3435,10 +3344,10 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
self,
|
||||
FlowPlotEvent(
|
||||
type="flow_plot",
|
||||
flow_name=self._definition.name,
|
||||
flow_name=self.name or self.__class__.__name__,
|
||||
),
|
||||
)
|
||||
structure = build_flow_structure(cast(Any, self))
|
||||
structure = build_flow_structure(self)
|
||||
return render_interactive(structure, filename=filename, show=show)
|
||||
|
||||
@staticmethod
|
||||
|
||||
@@ -16,7 +16,7 @@ R = TypeVar("R", covariant=True)
|
||||
FlowMethodName = NewType("FlowMethodName", str)
|
||||
PendingListenerKey = NewType(
|
||||
"PendingListenerKey",
|
||||
Annotated[str, "listener method name, or 'start:<method>' for conditional starts"],
|
||||
Annotated[str, "nested flow conditions use 'listener_name:object_id'"],
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -259,9 +259,8 @@ class RecallFlow(Flow[RecallState]):
|
||||
candidates = []
|
||||
if not candidates:
|
||||
candidates = [scope_prefix]
|
||||
selected_scopes = candidates[:20]
|
||||
self.state.candidate_scopes = selected_scopes
|
||||
return selected_scopes
|
||||
self.state.candidate_scopes = candidates[:20]
|
||||
return self.state.candidate_scopes
|
||||
|
||||
@listen(filter_and_chunk)
|
||||
def search_chunks(self) -> list[Any]:
|
||||
@@ -369,10 +368,9 @@ class RecallFlow(Flow[RecallState]):
|
||||
)
|
||||
)
|
||||
matches.sort(key=lambda m: m.score, reverse=True)
|
||||
final_results = matches[: self.state.limit]
|
||||
self.state.final_results = final_results
|
||||
self.state.final_results = matches[: self.state.limit]
|
||||
|
||||
if self.state.evidence_gaps and self.state.final_results:
|
||||
self.state.final_results[0].evidence_gaps = list(self.state.evidence_gaps)
|
||||
|
||||
return final_results
|
||||
return self.state.final_results
|
||||
|
||||
@@ -999,11 +999,7 @@ def _json_schema_to_pydantic_field(
|
||||
if examples:
|
||||
schema_extra["examples"] = examples
|
||||
|
||||
default = (
|
||||
json_schema["default"]
|
||||
if "default" in json_schema
|
||||
else (... if is_required else None)
|
||||
)
|
||||
default = ... if is_required else None
|
||||
|
||||
if isinstance(type_, type) and issubclass(type_, (int, float)):
|
||||
if "minimum" in json_schema:
|
||||
|
||||
@@ -32,7 +32,7 @@ def _build_executor(**kwargs: Any) -> AgentExecutor:
|
||||
executor._method_outputs = []
|
||||
executor._completed_methods = set()
|
||||
executor._fired_or_listeners = set()
|
||||
executor._pending_events = {}
|
||||
executor._pending_and_listeners = {}
|
||||
executor._method_execution_counts = {}
|
||||
executor._method_call_counts = {}
|
||||
executor._event_futures = []
|
||||
|
||||
@@ -1157,25 +1157,6 @@ def test_flow_name():
|
||||
assert flow.name == "MyFlow"
|
||||
|
||||
|
||||
def test_flow_custom_name_overrides_class_name_in_events():
|
||||
class InternalFlowClass(Flow):
|
||||
name = "PublicName"
|
||||
|
||||
@start()
|
||||
def begin(self):
|
||||
return "done"
|
||||
|
||||
received = []
|
||||
|
||||
@crewai_event_bus.on(FlowStartedEvent)
|
||||
def handle(source, event):
|
||||
received.append(event)
|
||||
|
||||
InternalFlowClass().kickoff()
|
||||
|
||||
assert received[0].flow_name == "PublicName"
|
||||
|
||||
|
||||
def test_nested_and_or_conditions():
|
||||
"""Test nested conditions like or_(and_(A, B), and_(C, D)).
|
||||
|
||||
@@ -1561,63 +1542,40 @@ def test_deeply_nested_conditions():
|
||||
|
||||
|
||||
def test_or_branch_does_not_leave_stale_and_state():
|
||||
fired = []
|
||||
"""or_() over nested and_() branches must not leave stale pending AND state.
|
||||
|
||||
Regression: evaluating an or_() condition stopped at the first branch that was
|
||||
satisfied, so a later and_() branch that the *same* trigger would have completed
|
||||
never cleared its pending state. On the next cycle that trigger alone then
|
||||
spuriously re-satisfied the whole condition. Both branches share the final
|
||||
event ``x`` here, so the shared trigger that completes branch ``(a AND x)`` also
|
||||
completes branch ``(c AND x)`` and both must be cleared together.
|
||||
"""
|
||||
|
||||
class StaleStateFlow(Flow):
|
||||
@start()
|
||||
def begin(self):
|
||||
pass
|
||||
|
||||
@listen(begin)
|
||||
def a(self):
|
||||
pass
|
||||
|
||||
@listen(begin)
|
||||
def c(self):
|
||||
pass
|
||||
|
||||
@listen(and_(a, c))
|
||||
def x(self):
|
||||
pass
|
||||
|
||||
@listen(or_(and_("a", "x"), and_("c", "y")))
|
||||
@listen(or_(and_("a", "x"), and_("c", "x")))
|
||||
def joined(self):
|
||||
fired.append("joined")
|
||||
pass
|
||||
|
||||
@router(joined)
|
||||
def emit_y(self):
|
||||
return "y"
|
||||
flow = StaleStateFlow()
|
||||
condition = type(flow)._listen_condition("joined")
|
||||
|
||||
StaleStateFlow().kickoff()
|
||||
def fires(trigger):
|
||||
return flow._evaluate_condition(condition, trigger, "joined")
|
||||
|
||||
assert fired == ["joined"]
|
||||
# First cycle: "a" then "c" arrive, then the shared "x" completes (a AND x).
|
||||
assert fires("a") is False
|
||||
assert fires("c") is False
|
||||
assert fires("x") is True
|
||||
|
||||
|
||||
def test_and_branch_inside_or_does_not_race():
|
||||
execution_order = []
|
||||
|
||||
class DiamondWithFallbackFlow(Flow):
|
||||
@start()
|
||||
def go(self):
|
||||
execution_order.append("go")
|
||||
|
||||
@listen(go)
|
||||
def a(self):
|
||||
execution_order.append("a")
|
||||
|
||||
@listen(go)
|
||||
def b(self):
|
||||
execution_order.append("b")
|
||||
|
||||
@listen(or_(and_(a, b), "fallback"))
|
||||
def done(self):
|
||||
execution_order.append("done")
|
||||
|
||||
DiamondWithFallbackFlow().kickoff()
|
||||
|
||||
assert "done" in execution_order
|
||||
assert execution_order.index("done") > execution_order.index("a")
|
||||
assert execution_order.index("done") > execution_order.index("b")
|
||||
# Next cycle: "x" alone must NOT re-satisfy the condition. The "c" from the
|
||||
# previous cycle was consumed when "joined" fired, so neither branch is half
|
||||
# complete and "x" by itself is insufficient.
|
||||
assert fires("x") is False
|
||||
|
||||
|
||||
def test_mixed_sync_async_execution_order():
|
||||
|
||||
@@ -169,6 +169,9 @@ class TestConversationalFlow:
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.skip(
|
||||
reason="Experimental conversational registry behavior is out of scope for the definition-first start migration."
|
||||
)
|
||||
def test_handle_turn_routes_to_listener_and_records_public_result(self) -> None:
|
||||
@ConversationConfig(default_intents=["research"], intent_llm="gpt-4o-mini")
|
||||
class ResearchFlow(ConversationalFlow):
|
||||
@@ -592,6 +595,9 @@ class TestConversationalFlow:
|
||||
assert result == "legacy-searched"
|
||||
assert flow.state.last_intent == "search"
|
||||
|
||||
@pytest.mark.skip(
|
||||
reason="Experimental conversational sequential-start behavior is out of scope for the definition-first start migration."
|
||||
)
|
||||
def test_user_start_methods_run_sequentially_before_router_in_conversational_mode(
|
||||
self,
|
||||
) -> None:
|
||||
@@ -643,6 +649,9 @@ class TestConversationalFlow:
|
||||
assert "attach_bus" in order # still fires every turn
|
||||
assert "route_turn" in order
|
||||
|
||||
@pytest.mark.skip(
|
||||
reason="Experimental inherited conversational start registration is out of scope for the definition-first start migration."
|
||||
)
|
||||
def test_subclass_can_override_conversation_start_without_redecorating(
|
||||
self,
|
||||
) -> None:
|
||||
@@ -1333,12 +1342,6 @@ class TestFlowTracingWhenSuppressed:
|
||||
|
||||
|
||||
class TestDeferTraceFinalization:
|
||||
def test_bare_conversational_flow_defers_by_default(self) -> None:
|
||||
class BareChat(ConversationalFlow):
|
||||
pass
|
||||
|
||||
assert BareChat()._should_defer_trace_finalization() is True
|
||||
|
||||
def test_conversation_config_drives_defer_flag(self) -> None:
|
||||
"""``ConversationConfig(defer_trace_finalization=...)`` controls whether
|
||||
a conversational subclass defers per-turn trace finalization."""
|
||||
|
||||
@@ -13,7 +13,6 @@ from pydantic import BaseModel
|
||||
import crewai.flow.dsl as flow_dsl
|
||||
import crewai.flow.flow_definition as flow_definition
|
||||
import crewai.flow.visualization.builder as visualization_builder
|
||||
from crewai.experimental import ConversationConfig, RouterConfig
|
||||
from crewai.flow import Flow, and_, human_feedback, listen, or_, persist, router, start
|
||||
|
||||
|
||||
@@ -37,8 +36,6 @@ def test_flow_public_exports_are_explicit():
|
||||
}
|
||||
assert set(flow_definition.__all__) == {
|
||||
"FlowConfigDefinition",
|
||||
"FlowConversationalDefinition",
|
||||
"FlowConversationalRouterDefinition",
|
||||
"FlowDefinition",
|
||||
"FlowDefinitionCondition",
|
||||
"FlowDefinitionDiagnostic",
|
||||
@@ -172,7 +169,6 @@ def test_flow_definition_maps_dsl_to_static_contract():
|
||||
assert definition.state.ref and "ContractState" in definition.state.ref
|
||||
assert definition.config.stream is True
|
||||
assert definition.config.max_method_calls == 7
|
||||
assert definition.conversational is None
|
||||
|
||||
assert definition.methods["begin"].start is True
|
||||
assert definition.methods["process"].listen == "begin"
|
||||
@@ -205,74 +201,27 @@ def test_flow_definition_excludes_conversational_builtins_for_regular_flows():
|
||||
|
||||
methods = RegularFlow.flow_definition().methods
|
||||
|
||||
assert RegularFlow.flow_definition().conversational is None
|
||||
assert set(methods) == {"begin"}
|
||||
assert "conversation_start" not in methods
|
||||
assert "route_conversation" not in methods
|
||||
assert "converse_turn" not in methods
|
||||
|
||||
|
||||
@pytest.mark.skip(
|
||||
reason="Experimental conversational inherited built-ins are out of scope for the definition-first start migration."
|
||||
)
|
||||
def test_flow_definition_includes_conversational_builtins_when_enabled():
|
||||
class ChatFlow(Flow):
|
||||
conversational = True
|
||||
|
||||
definition = ChatFlow.flow_definition()
|
||||
methods = definition.methods
|
||||
methods = ChatFlow.flow_definition().methods
|
||||
|
||||
assert definition.conversational is not None
|
||||
assert definition.conversational.enabled is True
|
||||
assert definition.conversational.defer_trace_finalization is True
|
||||
assert definition.conversational.builtin_routes == ["converse", "end"]
|
||||
assert "conversation_start" in methods
|
||||
assert "route_conversation" in methods
|
||||
assert "converse_turn" in methods
|
||||
assert methods["conversation_start"].start is True
|
||||
|
||||
|
||||
def test_flow_definition_serializes_conversational_config():
|
||||
@ConversationConfig(
|
||||
system_prompt="Be concise.",
|
||||
llm="gpt-4o-mini",
|
||||
router=RouterConfig(
|
||||
prompt="Pick a route.",
|
||||
routes=["research"],
|
||||
default_intent="converse",
|
||||
fallback_intent="end",
|
||||
),
|
||||
default_intents=["research"],
|
||||
visible_agent_outputs=["researcher"],
|
||||
defer_trace_finalization=False,
|
||||
)
|
||||
class ChatFlow(Flow):
|
||||
conversational = True
|
||||
|
||||
conversational = ChatFlow.flow_definition().conversational
|
||||
|
||||
assert conversational is not None
|
||||
assert conversational.system_prompt == "Be concise."
|
||||
assert conversational.llm == "gpt-4o-mini"
|
||||
assert conversational.default_intents == ["research"]
|
||||
assert conversational.visible_agent_outputs == ["researcher"]
|
||||
assert conversational.defer_trace_finalization is False
|
||||
assert conversational.router is not None
|
||||
assert conversational.router.prompt == "Pick a route."
|
||||
assert conversational.router.routes == ["research"]
|
||||
assert conversational.router.fallback_intent == "end"
|
||||
|
||||
|
||||
def test_flow_definition_preserves_undecorated_conversational_override():
|
||||
class ChatFlow(Flow):
|
||||
conversational = True
|
||||
|
||||
def conversation_start(self) -> str | None:
|
||||
return "custom"
|
||||
|
||||
methods = ChatFlow.flow_definition().methods
|
||||
|
||||
assert methods["conversation_start"].start is True
|
||||
assert "route_conversation" in methods
|
||||
|
||||
|
||||
def test_flow_definition_serializes_human_feedback_metadata():
|
||||
marker = object()
|
||||
|
||||
|
||||
@@ -1,508 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from crewai.events.event_bus import crewai_event_bus
|
||||
from crewai.events.types.flow_events import (
|
||||
MethodExecutionFinishedEvent,
|
||||
MethodExecutionStartedEvent,
|
||||
)
|
||||
from crewai.flow import Flow, and_, listen, or_, router, start
|
||||
from crewai.flow.flow import FlowState
|
||||
from crewai.flow.flow_definition import FlowDefinition
|
||||
|
||||
|
||||
class ChainFlow(Flow):
|
||||
@start()
|
||||
def begin(self):
|
||||
self.state["begin_ran"] = True
|
||||
return "hello"
|
||||
|
||||
@listen(begin)
|
||||
def shout(self, result):
|
||||
return result.upper()
|
||||
|
||||
@listen(shout)
|
||||
def confirm(self):
|
||||
self.state["confirmed"] = True
|
||||
return f"confirmed:{self.state['confirmed']}"
|
||||
|
||||
|
||||
CHAIN_YAML = f"""
|
||||
schema: crewai.flow/v1
|
||||
name: ChainFlow
|
||||
methods:
|
||||
begin:
|
||||
handler: {__name__}:ChainFlow.begin
|
||||
start: true
|
||||
shout:
|
||||
handler: {__name__}:ChainFlow.shout
|
||||
listen: begin
|
||||
confirm:
|
||||
handler: {__name__}:ChainFlow.confirm
|
||||
listen: shout
|
||||
"""
|
||||
|
||||
|
||||
class MergeFlow(Flow):
|
||||
@start()
|
||||
def begin(self):
|
||||
return "go"
|
||||
|
||||
@listen(begin)
|
||||
def left(self):
|
||||
return "left"
|
||||
|
||||
@listen(begin)
|
||||
def right(self):
|
||||
return "right"
|
||||
|
||||
@listen(or_(left, right))
|
||||
def either(self):
|
||||
self.state["either_ran"] = True
|
||||
return "either"
|
||||
|
||||
@listen(and_(left, right, either))
|
||||
def join(self):
|
||||
self.state["joined"] = True
|
||||
return "joined"
|
||||
|
||||
|
||||
MERGE_YAML = f"""
|
||||
schema: crewai.flow/v1
|
||||
name: MergeFlow
|
||||
methods:
|
||||
begin:
|
||||
handler: {__name__}:MergeFlow.begin
|
||||
start: true
|
||||
left:
|
||||
handler: {__name__}:MergeFlow.left
|
||||
listen: begin
|
||||
right:
|
||||
handler: {__name__}:MergeFlow.right
|
||||
listen: begin
|
||||
either:
|
||||
handler: {__name__}:MergeFlow.either
|
||||
listen:
|
||||
or: [left, right]
|
||||
join:
|
||||
handler: {__name__}:MergeFlow.join
|
||||
listen:
|
||||
and: [left, right, either]
|
||||
"""
|
||||
|
||||
|
||||
class RouteFlow(Flow):
|
||||
@start()
|
||||
def begin(self):
|
||||
return "go"
|
||||
|
||||
@router(begin)
|
||||
def decide(self):
|
||||
return "left" if self.state.get("direction") == "left" else "right"
|
||||
|
||||
@listen("left")
|
||||
def take_left(self):
|
||||
return "took-left"
|
||||
|
||||
@listen("right")
|
||||
def take_right(self):
|
||||
return "took-right"
|
||||
|
||||
|
||||
ROUTE_YAML = f"""
|
||||
schema: crewai.flow/v1
|
||||
name: RouteFlow
|
||||
methods:
|
||||
begin:
|
||||
handler: {__name__}:RouteFlow.begin
|
||||
start: true
|
||||
decide:
|
||||
handler: {__name__}:RouteFlow.decide
|
||||
listen: begin
|
||||
router: true
|
||||
take_left:
|
||||
handler: {__name__}:RouteFlow.take_left
|
||||
listen: left
|
||||
take_right:
|
||||
handler: {__name__}:RouteFlow.take_right
|
||||
listen: right
|
||||
"""
|
||||
|
||||
|
||||
class LoopFlow(Flow):
|
||||
@start("retry")
|
||||
def step(self):
|
||||
self.state["count"] = self.state.get("count", 0) + 1
|
||||
return self.state["count"]
|
||||
|
||||
@router(step)
|
||||
def decide(self):
|
||||
if self.state["count"] < 3:
|
||||
return "retry"
|
||||
return "done"
|
||||
|
||||
@listen("done")
|
||||
def finish(self):
|
||||
return f"finished:{self.state['count']}"
|
||||
|
||||
|
||||
LOOP_YAML = f"""
|
||||
schema: crewai.flow/v1
|
||||
name: LoopFlow
|
||||
methods:
|
||||
step:
|
||||
handler: {__name__}:LoopFlow.step
|
||||
start: retry
|
||||
decide:
|
||||
handler: {__name__}:LoopFlow.decide
|
||||
listen: step
|
||||
router: true
|
||||
finish:
|
||||
handler: {__name__}:LoopFlow.finish
|
||||
listen: done
|
||||
"""
|
||||
|
||||
|
||||
class CounterState(FlowState):
|
||||
count: int = 0
|
||||
label: str = "none"
|
||||
|
||||
|
||||
class PydanticStateFlow(Flow[CounterState]):
|
||||
@start()
|
||||
def begin(self):
|
||||
self.state.count += 1
|
||||
return self.state.count
|
||||
|
||||
@listen(begin)
|
||||
def finish(self):
|
||||
self.state.label = f"count={self.state.count}"
|
||||
return self.state.label
|
||||
|
||||
|
||||
PYDANTIC_STATE_YAML = f"""
|
||||
schema: crewai.flow/v1
|
||||
name: PydanticStateFlow
|
||||
state:
|
||||
type: pydantic
|
||||
ref: {__name__}:CounterState
|
||||
methods:
|
||||
begin:
|
||||
handler: {__name__}:PydanticStateFlow.begin
|
||||
start: true
|
||||
finish:
|
||||
handler: {__name__}:PydanticStateFlow.finish
|
||||
listen: begin
|
||||
"""
|
||||
|
||||
PYDANTIC_STATE_OVERLAY_YAML = f"""
|
||||
schema: crewai.flow/v1
|
||||
name: PydanticStateFlow
|
||||
state:
|
||||
type: pydantic
|
||||
ref: {__name__}:CounterState
|
||||
default:
|
||||
count: 5
|
||||
methods:
|
||||
begin:
|
||||
handler: {__name__}:PydanticStateFlow.begin
|
||||
start: true
|
||||
finish:
|
||||
handler: {__name__}:PydanticStateFlow.finish
|
||||
listen: begin
|
||||
"""
|
||||
|
||||
JSON_SCHEMA_STATE_YAML = f"""
|
||||
schema: crewai.flow/v1
|
||||
name: JsonSchemaStateFlow
|
||||
state:
|
||||
type: json_schema
|
||||
json_schema:
|
||||
title: CounterState
|
||||
type: object
|
||||
properties:
|
||||
count:
|
||||
type: integer
|
||||
default: 0
|
||||
label:
|
||||
type: string
|
||||
default: none
|
||||
methods:
|
||||
begin:
|
||||
handler: {__name__}:PydanticStateFlow.begin
|
||||
start: true
|
||||
finish:
|
||||
handler: {__name__}:PydanticStateFlow.finish
|
||||
listen: begin
|
||||
"""
|
||||
|
||||
PYDANTIC_REF_WITH_SCHEMA_FALLBACK_YAML = f"""
|
||||
schema: crewai.flow/v1
|
||||
name: SchemaFallbackFlow
|
||||
state:
|
||||
type: pydantic
|
||||
ref: definitely_not_a_module_xyz:MissingState
|
||||
json_schema:
|
||||
title: CounterState
|
||||
type: object
|
||||
properties:
|
||||
count:
|
||||
type: integer
|
||||
default: 0
|
||||
label:
|
||||
type: string
|
||||
default: none
|
||||
methods:
|
||||
begin:
|
||||
handler: {__name__}:PydanticStateFlow.begin
|
||||
start: true
|
||||
finish:
|
||||
handler: {__name__}:PydanticStateFlow.finish
|
||||
listen: begin
|
||||
"""
|
||||
|
||||
UNRESOLVABLE_STATE_YAML = f"""
|
||||
schema: crewai.flow/v1
|
||||
name: UnresolvableStateFlow
|
||||
state:
|
||||
type: pydantic
|
||||
ref: definitely_not_a_module_xyz:MissingState
|
||||
methods:
|
||||
begin:
|
||||
handler: {__name__}:ChainFlow.begin
|
||||
start: true
|
||||
"""
|
||||
|
||||
DICT_STATE_YAML = f"""
|
||||
schema: crewai.flow/v1
|
||||
name: DictStateFlow
|
||||
state:
|
||||
type: dict
|
||||
default:
|
||||
count: 5
|
||||
methods:
|
||||
begin:
|
||||
handler: {__name__}:ChainFlow.begin
|
||||
start: true
|
||||
"""
|
||||
|
||||
UNKNOWN_STATE_YAML = f"""
|
||||
schema: crewai.flow/v1
|
||||
name: UnknownStateFlow
|
||||
state:
|
||||
type: unknown
|
||||
ref: somewhere:Something
|
||||
methods:
|
||||
begin:
|
||||
handler: {__name__}:ChainFlow.begin
|
||||
start: true
|
||||
"""
|
||||
|
||||
|
||||
def _run_with_events(flow, inputs=None):
|
||||
events = []
|
||||
with crewai_event_bus.scoped_handlers():
|
||||
|
||||
@crewai_event_bus.on(MethodExecutionStartedEvent)
|
||||
def on_started(source, event):
|
||||
events.append(event)
|
||||
|
||||
@crewai_event_bus.on(MethodExecutionFinishedEvent)
|
||||
def on_finished(source, event):
|
||||
events.append(event)
|
||||
|
||||
result = flow.kickoff(inputs=inputs)
|
||||
events.sort(key=lambda e: e.timestamp)
|
||||
return result, [
|
||||
(type(e).__name__, str(e.method_name), e.flow_name) for e in events
|
||||
]
|
||||
|
||||
|
||||
def _state_without_id(flow):
|
||||
snapshot = dict(flow.state.model_dump())
|
||||
snapshot.pop("id", None)
|
||||
return snapshot
|
||||
|
||||
|
||||
def assert_parity(flow_cls, yaml_str, inputs=None, ordered=True):
|
||||
class_flow = flow_cls()
|
||||
class_result, class_events = _run_with_events(class_flow, inputs)
|
||||
|
||||
definition = FlowDefinition.from_yaml(yaml_str)
|
||||
definition_flow = Flow.from_definition(definition)
|
||||
definition_result, definition_events = _run_with_events(definition_flow, inputs)
|
||||
|
||||
assert definition_result == class_result
|
||||
assert _state_without_id(definition_flow) == _state_without_id(class_flow)
|
||||
if ordered:
|
||||
assert definition_flow.method_outputs == class_flow.method_outputs
|
||||
assert definition_events == class_events
|
||||
else:
|
||||
assert sorted(map(repr, definition_flow.method_outputs)) == sorted(
|
||||
map(repr, class_flow.method_outputs)
|
||||
)
|
||||
assert sorted(definition_events) == sorted(class_events)
|
||||
return definition_flow, definition_result
|
||||
|
||||
|
||||
def test_simple_chain_parity():
|
||||
flow, result = assert_parity(ChainFlow, CHAIN_YAML)
|
||||
assert result == "confirmed:True"
|
||||
assert flow.method_outputs == ["hello", "HELLO", "confirmed:True"]
|
||||
|
||||
|
||||
def test_and_or_merge_parity():
|
||||
flow, _ = assert_parity(MergeFlow, MERGE_YAML, ordered=False)
|
||||
assert flow.state["joined"] is True
|
||||
assert flow.state["either_ran"] is True
|
||||
|
||||
|
||||
def test_router_label_parity_for_each_branch():
|
||||
left_flow, _ = assert_parity(RouteFlow, ROUTE_YAML, inputs={"direction": "left"})
|
||||
assert "took-left" in left_flow.method_outputs
|
||||
assert "took-right" not in left_flow.method_outputs
|
||||
|
||||
right_flow, _ = assert_parity(RouteFlow, ROUTE_YAML, inputs={"direction": "right"})
|
||||
assert "took-right" in right_flow.method_outputs
|
||||
|
||||
|
||||
def test_cyclic_flow_parity():
|
||||
flow, result = assert_parity(LoopFlow, LOOP_YAML)
|
||||
assert result == "finished:3"
|
||||
assert flow.state["count"] == 3
|
||||
|
||||
|
||||
def test_definition_flow_events_use_definition_name():
|
||||
definition = FlowDefinition.from_yaml(CHAIN_YAML)
|
||||
flow = Flow.from_definition(definition)
|
||||
_, events = _run_with_events(flow)
|
||||
assert events
|
||||
assert all(flow_name == "ChainFlow" for _, _, flow_name in events)
|
||||
|
||||
|
||||
def test_from_definition_missing_handler_raises():
|
||||
definition = FlowDefinition.from_dict(
|
||||
{
|
||||
"schema": "crewai.flow/v1",
|
||||
"name": "NoHandlers",
|
||||
"methods": {"begin": {"start": True}},
|
||||
}
|
||||
)
|
||||
|
||||
with pytest.raises(ValueError, match="begin: no handler"):
|
||||
Flow.from_definition(definition)
|
||||
|
||||
|
||||
def test_from_definition_unresolvable_handler_raises():
|
||||
definition = FlowDefinition.from_dict(
|
||||
{
|
||||
"schema": "crewai.flow/v1",
|
||||
"name": "BadHandlers",
|
||||
"methods": {
|
||||
"begin": {
|
||||
"start": True,
|
||||
"handler": "definitely_not_a_module_xyz:nope",
|
||||
}
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
with pytest.raises(ValueError, match="missing or unresolvable handlers.*begin"):
|
||||
Flow.from_definition(definition)
|
||||
|
||||
|
||||
def test_from_definition_malformed_handler_raises():
|
||||
definition = FlowDefinition.from_dict(
|
||||
{
|
||||
"schema": "crewai.flow/v1",
|
||||
"name": "MalformedHandlers",
|
||||
"methods": {"begin": {"start": True, "handler": "no-colon-here"}},
|
||||
}
|
||||
)
|
||||
|
||||
with pytest.raises(ValueError, match="expected 'module:qualname'"):
|
||||
Flow.from_definition(definition)
|
||||
|
||||
|
||||
def test_flow_definition_stamps_handler_refs():
|
||||
definition = ChainFlow.flow_definition()
|
||||
|
||||
assert definition.methods["begin"].handler == f"{__name__}:ChainFlow.begin"
|
||||
assert definition.methods["shout"].handler == f"{__name__}:ChainFlow.shout"
|
||||
|
||||
|
||||
def test_pydantic_state_from_ref_parity():
|
||||
flow, result = assert_parity(PydanticStateFlow, PYDANTIC_STATE_YAML)
|
||||
assert result == "count=1"
|
||||
assert flow.state.count == 1
|
||||
assert flow.state.label == "count=1"
|
||||
assert flow.state.id
|
||||
|
||||
|
||||
def test_pydantic_state_default_overlay():
|
||||
flow = Flow.from_definition(FlowDefinition.from_yaml(PYDANTIC_STATE_OVERLAY_YAML))
|
||||
result = flow.kickoff()
|
||||
assert result == "count=6"
|
||||
assert flow.state.count == 6
|
||||
|
||||
|
||||
def test_json_schema_state():
|
||||
flow = Flow.from_definition(FlowDefinition.from_yaml(JSON_SCHEMA_STATE_YAML))
|
||||
result = flow.kickoff()
|
||||
assert result == "count=1"
|
||||
assert flow.state.count == 1
|
||||
assert flow.state.label == "count=1"
|
||||
assert flow.state.id
|
||||
|
||||
|
||||
def test_json_schema_state_validates_inputs():
|
||||
flow = Flow.from_definition(FlowDefinition.from_yaml(JSON_SCHEMA_STATE_YAML))
|
||||
with pytest.raises(ValueError, match="Invalid inputs"):
|
||||
flow.kickoff(inputs={"count": "not-a-number"})
|
||||
|
||||
|
||||
def test_pydantic_state_falls_back_to_json_schema_when_ref_unimportable():
|
||||
flow = Flow.from_definition(
|
||||
FlowDefinition.from_yaml(PYDANTIC_REF_WITH_SCHEMA_FALLBACK_YAML)
|
||||
)
|
||||
result = flow.kickoff()
|
||||
assert result == "count=1"
|
||||
assert flow.state.count == 1
|
||||
|
||||
|
||||
def test_pydantic_state_without_ref_or_schema_falls_back_to_dict(caplog):
|
||||
with caplog.at_level("ERROR"):
|
||||
flow = Flow.from_definition(FlowDefinition.from_yaml(UNRESOLVABLE_STATE_YAML))
|
||||
assert "falling back to dict state" in caplog.text
|
||||
|
||||
result = flow.kickoff()
|
||||
assert result == "hello"
|
||||
assert flow.state["begin_ran"] is True
|
||||
assert flow.state["id"]
|
||||
|
||||
|
||||
def test_dict_state_is_a_copy_of_default_plus_id():
|
||||
definition = FlowDefinition.from_yaml(DICT_STATE_YAML)
|
||||
|
||||
flow = Flow.from_definition(definition)
|
||||
assert flow.state["count"] == 5
|
||||
assert flow.state["id"]
|
||||
flow.kickoff()
|
||||
assert flow.state["begin_ran"] is True
|
||||
|
||||
second = Flow.from_definition(definition)
|
||||
assert second.state["count"] == 5
|
||||
assert "begin_ran" not in second.state.model_dump()
|
||||
assert second.state["id"] != flow.state["id"]
|
||||
assert definition.state.default == {"count": 5}
|
||||
|
||||
|
||||
def test_unknown_state_type_falls_back_to_dict(caplog):
|
||||
with caplog.at_level("WARNING"):
|
||||
flow = Flow.from_definition(FlowDefinition.from_yaml(UNKNOWN_STATE_YAML))
|
||||
assert "falling back to dict state" in caplog.text
|
||||
|
||||
result = flow.kickoff()
|
||||
assert result == "hello"
|
||||
assert flow.state["begin_ran"] is True
|
||||
Reference in New Issue
Block a user