Update conversational flow docs to use handle_turn (#6053)

This commit is contained in:
Lorenze Jay
2026-06-05 11:04:28 -07:00
committed by GitHub
parent cab3319af9
commit 3723f0db76
4 changed files with 220 additions and 247 deletions

View File

@@ -11,95 +11,83 @@ mode: "wide"
| المفهوم | التنفيذ |
|---------|---------|
| معرّف الجلسة | `kickoff(session_id=...)` → `inputs["id"]` → `state.id` |
| سطر المستخدم | `kickoff(user_message=...)` يُضاف إلى `state.messages` قبل تشغيل الرسم |
| اكتمال الجولة | `FlowFinished` لهذا **التشغيل** فقط؛ تستمر المحادثة في `kickoff` التالي |
| تتبع الجلسة | `ConversationalConfig(defer_trace_finalization=True)` + `finalize_session_traces()` |
| معرّف الجلسة | `handle_turn(..., session_id=...)` → `kickoff(inputs={"id": ...})` → `state.id` |
| سطر المستخدم | `handle_turn(message)` يضيف الرسالة إلى `state.messages` قبل تشغيل الرسم |
| اكتمال الجولة | `FlowFinished` لهذا **التشغيل** فقط؛ تستمر المحادثة في `handle_turn` التالي |
| تتبع الجلسة | `ConversationConfig(defer_trace_finalization=True)` + `finalize_session_traces()` |
## واجهات الجولات
استخدم **`flow.kickoff(user_message=..., session_id=...)`** أو **`flow.handle_turn(...)`** لكل رسالة مستخدم من REST أو WebSocket أو الاختبارات أو الواجهات المخصصة. استخدم **`flow.chat()`** عندما تريد حلقة دردشة محلية في الطرفية لـ `Flow` محادثي.
استخدم **`flow.handle_turn(message, session_id=...)`** لكل رسالة مستخدم من REST أو WebSocket أو الاختبارات أو الواجهات المخصصة. استخدم **`flow.chat()`** عندما تريد حلقة دردشة محلية في الطرفية لـ `Flow` محادثي.
لا يقبل `Flow.kickoff()` الوسيطين `user_message=` أو `session_id=`. في التدفقات المحادثية، يخزن `handle_turn()` الرسالة المعلقة ويستدعي داخلياً `kickoff(inputs={"id": session_id})`.
| API | الاستخدام |
|-----|-----------|
| `kickoff(user_message=..., session_id=...)` | كل رسالة مستخدم |
| `handle_turn(message, session_id=...)` | غلاف مريح لجولة واحدة في `Flow` محادثي |
| `chat()` | REPL محلي في الطرفية لـ `Flow` محادثي |
| `kickoff_async(...)` | نفس المعاملات؛ دخول async أصلي |
| `kickoff(inputs={...})` | تشغيل متقدم للـ flow بدون معالجة جولة محادثية |
| `ask()` | مطالبة حاجزة **داخل** خطوة واحدة |
| `@human_feedback` | الموافقة/الرفض على **مخرجات خطوة** — وليس السطر التالي |
| `ChatSession.handle_turn(...)` | طبقة نقل فوق `kickoff` |
| `ChatSession.handle_turn(...)` | طبقة نقل فوق `handle_turn` |
## بداية سريعة
```python
from uuid import uuid4
from crewai.flow import (
ChatState,
ConversationalConfig,
Flow,
listen,
or_,
persist,
router,
start,
from crewai import Flow
from crewai.flow import listen
from crewai.experimental.conversational import (
ConversationConfig,
ConversationState,
)
from crewai.flow.persistence import SQLiteFlowPersistence
class SupportFlow(Flow[ChatState]):
conversational_config = ConversationalConfig(
default_intents=["order", "help", "goodbye"],
intent_llm="gpt-4o-mini",
defer_trace_finalization=True,
)
@ConversationConfig(defer_trace_finalization=True)
class SupportFlow(Flow[ConversationState]):
conversational = True
@start()
def bootstrap(self):
if not self.state.session_ready:
self.state.session_ready = True
return "ready"
@router(bootstrap)
def route(self):
return self.state.last_intent or "help"
def route_turn(self, context):
message = (self.state.current_user_message or "").lower()
if "طلب" in message or "order" in message:
return "order"
if "وداع" in message or "goodbye" in message:
return "goodbye"
return "help"
@listen("order")
def handle_order(self):
reply = "طلبك في الطريق."
self.append_message("assistant", reply)
self.append_assistant_message(reply)
return reply
@listen("help")
def handle_help(self):
reply = "كيف يمكنني المساعدة؟"
self.append_message("assistant", reply)
self.append_assistant_message(reply)
return reply
@listen("goodbye")
def handle_goodbye(self):
reply = "وداعاً!"
self.append_message("assistant", reply)
self.append_assistant_message(reply)
return reply
@persist(SQLiteFlowPersistence("support.db"))
@listen(or_(handle_order, handle_help, handle_goodbye))
def finalize(self):
return self.state.model_dump()
session_id = str(uuid4())
flow = SupportFlow()
flow.kickoff(user_message="أين طلبي؟", session_id=session_id)
flow.kickoff(user_message="وماذا عن الإرجاع؟", session_id=session_id)
flow.finalize_session_traces()
try:
flow.handle_turn("أين طلبي؟", session_id=session_id)
flow.handle_turn("وماذا عن الإرجاع؟", session_id=session_id)
finally:
flow.finalize_session_traces()
```
## دورة حياة الجولة
كل `kickoff` مع `user_message` يشغّل:
كل `handle_turn` يشغّل:
1. **`_configure_conversational_kickoff`** — دمج `session_id` / `user_message` في `inputs` وتطبيق `ConversationalConfig`.
2. **استعادة الحالة** — عند وجود `inputs["id"]` و`@persist`.
@@ -108,7 +96,7 @@ flow.finalize_session_traces()
5. **تنفيذ الرسم** — `@start` → `@router` → معالجات `@listen`.
6. **نهاية التشغيل** — يُتخطى `flow_finished` والتتبع لكل جولة عند التأجيل؛ `Agent.kickoff()` / crews لا تغلق دفعة الأب.
استدعِ **`append_message("assistant", reply)`** في المعالجات. سطر المستخدم محفوظ عند kickoff — لا تُضفه مرة أخرى.
استدعِ **`append_assistant_message(reply)`** في المعالجات. سطر المستخدم محفوظ عبر `handle_turn` — لا تُضفه مرة أخرى.
## `ConversationalConfig` (افتراضيات على مستوى الصنف)
@@ -382,7 +370,7 @@ Routes:
4. يخزّن الموجّه قراره في `state.last_intent` (يكون مرئياً لسياق التوجيه في الجولة التالية).
5. إذا أعاد معالجك سلسلة نصية ولم يستدعِ `append_assistant_message`، فإن `handle_turn` يُلحقها نيابةً عنك.
يمكنك أيضاً استدعاء `flow.kickoff(user_message=..., session_id=...)` مباشرةً — نفس منطق الإعادة والتشغيل يعمل. `handle_turn` هو الغلاف المريح.
استدعِ `handle_turn()` لرسائل الدردشة. استدعاء `kickoff(inputs={"id": ...})` مباشرةً يشغل الرسم بدون غلاف الجولة المحادثية.
### `chat()` للـ REPL المحلي

View File

@@ -1,132 +1,121 @@
---
title: Conversational Flows
description: Build multi-turn chat apps with kickoff per turn, message history, intent routing, tracing, and WebSocket bridges.
description: Build multi-turn chat apps with handle_turn per turn, message history, intent routing, tracing, and WebSocket bridges.
icon: comments
mode: "wide"
---
## Overview
Conversational apps treat each user line as a **new flow run** with the **same session id**. CrewAI adds helpers for message history, optional intent classification, deferred tracing, UI bridges, and a local `flow.chat()` REPL for conversational flows.
Conversational apps treat each user line as a **new flow run** with the **same session id**. CrewAI adds helpers for message history, optional intent routing, deferred tracing, UI bridges, and a local `flow.chat()` REPL for conversational flows.
| Concept | Implementation |
|---------|----------------|
| Session id | `kickoff(session_id=...)` → `inputs["id"]` → `state.id` |
| User line | `kickoff(user_message=...)` appends to `state.messages` before the graph runs |
| Turn complete | `FlowFinished` for **this run** only; chat continues on the next `kickoff` |
| Full-session trace | `ConversationalConfig(defer_trace_finalization=True)` + `finalize_session_traces()` |
| Session id | `handle_turn(..., session_id=...)` → `kickoff(inputs={"id": ...})` → `state.id` |
| User line | `handle_turn(message)` appends to `state.messages` before the graph runs |
| Turn complete | `FlowFinished` for **this run** only; chat continues on the next `handle_turn` |
| Full-session trace | `ConversationConfig(defer_trace_finalization=True)` + `finalize_session_traces()` |
## Turn APIs
Use **`flow.kickoff(user_message=..., session_id=...)`** or **`flow.handle_turn(...)`** for every user message from REST, WebSocket, tests, and custom UIs. Use **`flow.chat()`** when you want a local terminal chat loop for a conversational `Flow`.
Use **`flow.handle_turn(message, session_id=...)`** for every user message from REST, WebSocket, tests, and custom UIs. Use **`flow.chat()`** when you want a local terminal chat loop for a conversational `Flow`.
`Flow.kickoff()` does **not** accept `user_message=` or `session_id=` keyword arguments. For conversational flows, `handle_turn()` stores the pending message and calls `kickoff(inputs={"id": session_id})` internally after resetting per-turn execution state.
| API | Use for |
|-----|---------|
| `kickoff(user_message=..., session_id=...)` | Each user message |
| `handle_turn(message, session_id=...)` | Ergonomic one-turn wrapper for conversational `Flow` |
| `chat()` | Local terminal REPL for conversational `Flow` |
| `kickoff_async(...)` | Same parameters; native async entry |
| `kickoff(inputs={...})` | Advanced flow execution without conversational turn handling |
| `ask()` | Blocking prompt **inside** one step (wizard, clarification) |
| `@human_feedback` | Approve/reject **a step output** — not the next chat line |
| `ChatSession.handle_turn(...)` | Transport layer over `kickoff` (SSE / WebSocket) |
| `ChatSession.handle_turn(...)` | Transport layer over `handle_turn` (SSE / WebSocket) |
## Quick start
```python
from uuid import uuid4
from crewai.flow import (
ChatState,
ConversationalConfig,
Flow,
listen,
or_,
persist,
router,
start,
from crewai import Flow
from crewai.flow import listen
from crewai.experimental.conversational import (
ConversationConfig,
ConversationState,
)
from crewai.flow.persistence import SQLiteFlowPersistence
class SupportFlow(Flow[ChatState]):
conversational_config = ConversationalConfig(
default_intents=["order", "help", "goodbye"],
intent_llm="gpt-4o-mini",
defer_trace_finalization=True,
)
@ConversationConfig(defer_trace_finalization=True)
class SupportFlow(Flow[ConversationState]):
conversational = True
@start()
def bootstrap(self):
if not self.state.session_ready:
self.state.session_ready = True
return "ready"
@router(bootstrap)
def route(self):
# last_intent set in prepare_conversational_turn when default_intents is set
return self.state.last_intent or "help"
def route_turn(self, context):
message = (self.state.current_user_message or "").lower()
if "order" in message:
return "order"
if "bye" in message or "goodbye" in message:
return "goodbye"
return "help"
@listen("order")
def handle_order(self):
reply = "Your order is on the way."
self.append_message("assistant", reply)
self.append_assistant_message(reply)
return reply
@listen("help")
def handle_help(self):
reply = "How can I help?"
self.append_message("assistant", reply)
self.append_assistant_message(reply)
return reply
@listen("goodbye")
def handle_goodbye(self):
reply = "Goodbye!"
self.append_message("assistant", reply)
self.append_assistant_message(reply)
return reply
@persist(SQLiteFlowPersistence("support.db"))
@listen(or_(handle_order, handle_help, handle_goodbye))
def finalize(self):
return self.state.model_dump()
session_id = str(uuid4())
flow = SupportFlow()
flow.kickoff(user_message="Where is my order?", session_id=session_id)
flow.kickoff(user_message="What about returns?", session_id=session_id)
flow.finalize_session_traces() # one trace link for the whole chat
try:
flow.handle_turn("Where is my order?", session_id=session_id)
flow.handle_turn("What about returns?", session_id=session_id)
finally:
flow.finalize_session_traces() # one trace link for the whole chat
```
## Turn lifecycle
Each `kickoff` with `user_message` runs this pipeline:
Each `handle_turn` runs this pipeline:
1. **`_configure_conversational_kickoff`** — merges `session_id` / `user_message` into `inputs`, applies `ConversationalConfig`, enables deferred tracing when configured.
1. **Turn setup** — stores the pending user message, resolves the session id, resets per-turn execution tracking, and calls `kickoff(inputs={"id": session_id})`.
2. **State restore** — if `inputs["id"]` exists and `@persist` is configured, loads the latest snapshot.
3. **`FlowStarted`** — emitted on the first deferred session turn only.
4. **`prepare_conversational_turn`** — appends the user message to `state.messages`, sets `last_user_message`, clears `last_intent`, optionally classifies when `intents` / `default_intents` + `intent_llm` are set.
5. **Graph execution** — `@start` → `@router` → `@listen` handlers.
4. **Pending turn hydration** — appends the user message to `state.messages`, sets `current_user_message` / `last_user_message`, and optionally classifies when `intents` / `default_intents` + `intent_llm` are set.
5. **Graph execution** — `conversation_start` → `route_conversation` → the selected `@listen` handler.
6. **End of run** — per-turn `flow_finished` and trace finalization are **skipped** when deferral is enabled; nested `Agent.kickoff()` / crews do not close the parent batch either.
Handlers should call **`append_message("assistant", reply)`** so the next turns `conversation_messages` includes assistant text. The user line is already stored at kickoff — do not append it again in handlers.
Handlers should call **`append_assistant_message(reply)`** so the next turns `conversation_messages` includes assistant text. The user line is already stored by `handle_turn` — do not append it again in handlers.
## `ConversationalConfig` (class-level defaults)
## `ConversationConfig` (class-level defaults)
Set on your `Flow` subclass as `conversational_config: ClassVar[ConversationalConfig | None]`.
Decorate your conversational `Flow` subclass with `ConversationConfig`.
| Field | Default | Purpose |
|-------|---------|---------|
| `default_intents` | `None` | Outcome labels for automatic pre-kickoff classification |
| `intent_llm` | `None` | Model for classification (required when intents are used) |
| `interactive_prompt` | `"You: "` | Prompt for `kickoff(interactive=True)` |
| `interactive_timeout` | `None` | Per-line timeout in interactive mode |
| `exit_commands` | `exit`, `quit` | Words that end interactive mode |
| `defer_trace_finalization` | `True` | Keep one trace batch open across turns |
| `system_prompt` | Framework default | System message used by the built-in `converse_turn`. |
| `llm` | `None` | Conversation LLM used by `converse_turn` and as router fallback. |
| `router` | `None` | `RouterConfig` for LLM-driven routing. |
| `intent_llm` | `None` | LLM for `intents=` / `default_intents` pre-classification. |
| `default_intents` | `None` | Outcome labels for pre-classification. |
| `defer_trace_finalization` | `True` | Keep one trace batch open across `handle_turn()` calls. |
Override per kickoff with `intents=` and `intent_llm=` keyword arguments.
Override pre-classification per turn with `handle_turn(..., intents=..., intent_llm=...)`.
## `ChatState` (recommended persisted shape)
## Lower-level `ChatState` helpers
`ChatState`, `ConversationalConfig`, and `crewai.flow.conversation` helpers are still importable for advanced orchestration, tests, or custom wrappers. They do not add `user_message=` or `session_id=` keyword arguments to `Flow.kickoff()`.
```python
from crewai.flow import ChatState
@@ -140,7 +129,7 @@ class MyChatState(ChatState):
| Field | Role |
|-------|------|
| `id` | Session UUID (same as `session_id` / `inputs["id"]`) |
| `id` | Session UUID (same as `inputs["id"]`) |
| `messages` | `list` of `{role, content}` for LLM history |
| `last_user_message` | Latest user line for this turn |
| `last_intent` | Route label after classification (if used) |
@@ -150,27 +139,26 @@ class MyChatState(ChatState):
## `Flow` conversational API
### `kickoff` / `kickoff_async` parameters
### `handle_turn` parameters
| Parameter | Purpose |
|-----------|---------|
| `user_message` | This turns text (or `{"role": "user", "content": "..."}`) |
| `message` | This turns text |
| `session_id` | Conversation UUID → `inputs["id"]` / `state.id` |
| `intents` | Outcome labels for pre-kickoff `classify_intent` |
| `intent_llm` | LLM for classification (required with `intents`) |
| `interactive` | CLI loop via `ask()` (local demos only) |
| `interactive_prompt` | Override prompt in interactive mode |
| `interactive_timeout` | Per-line `ask()` timeout |
| `exit_commands` | Words that end interactive mode |
| `inputs` | Additional state fields (merged with conversational keys) |
| `restore_from_state_id` | Fork hydration from another persisted flow |
| `**kickoff_kwargs` | Forwarded to `kickoff()` for options like `input_files`, `from_checkpoint`, and `restore_from_state_id` |
### `kickoff` parameters
`Flow.kickoff()` accepts `inputs`, `input_files`, `from_checkpoint`, and `restore_from_state_id`. Pass `inputs={"id": session_id}` when you need raw flow execution, but use `handle_turn()` when the call represents a chat message.
### Instance attributes
| Attribute | Purpose |
|-----------|---------|
| `conversational_config` | Class-level `ConversationalConfig` defaults |
| `defer_trace_finalization` | Instance flag; set automatically from config on kickoff |
| `conversational` | Set to `True` to enable the conversational graph and `handle_turn()` |
| `defer_trace_finalization` | Instance flag; set automatically from config on `handle_turn()` |
| `suppress_flow_events` | Hides console flow panels; **tracing still records** method/flow events |
| `stream` | Enable streaming; use with `ChatSession.handle_turn(..., stream=True)` |
@@ -178,7 +166,8 @@ class MyChatState(ChatState):
| Name | Description |
|------|-------------|
| `append_message(role, content, **extra)` | Append to `state.messages` (roles: `user`, `assistant`, `system`, `tool`) |
| `append_assistant_message(content)` | Append a user-visible assistant reply to `state.messages` |
| `append_message(role, content, **extra)` | Lower-level append to `state.messages` |
| `conversation_messages` | Read-only history for LLM calls |
| `classify_intent(text, outcomes, *, llm, context=None)` | Map text to one outcome (same collapse logic as `@human_feedback`) |
| `receive_user_message(text, *, outcomes=None, llm=None)` | Append user message; optionally set `last_intent` |
@@ -195,7 +184,7 @@ Importable for tests or custom orchestration:
| `normalize_kickoff_inputs(inputs, user_message=..., session_id=...)` | Merge conversational kwargs into `inputs` |
| `get_conversation_messages(flow)` | Read messages from state or internal buffer |
| `append_message(flow, role, content, **extra)` | Same as instance method |
| `prepare_conversational_turn(flow, user_message=..., intents=..., intent_llm=..., config=...)` | Turn hydration (usually called by kickoff) |
| `prepare_conversational_turn(flow, user_message=..., intents=..., intent_llm=..., config=...)` | Lower-level turn hydration for custom wrappers |
| `receive_user_message(flow, text, ...)` | Same as instance method |
| `set_state_field(flow, name, value)` | Set a field on dict or Pydantic state |
| `get_conversational_config(flow)` | Read class `conversational_config` |
@@ -203,21 +192,20 @@ Importable for tests or custom orchestration:
## Intent routing patterns
### A. Pre-classify via `ConversationalConfig` (simplest)
### A. Pre-classify via `ConversationConfig` (simplest)
Set `default_intents` and `intent_llm`. Each kickoff runs classification before your `@router`; read `self.state.last_intent` in `route()`.
Set `default_intents` and `intent_llm`. Each `handle_turn()` runs classification before routing; read `self.state.last_intent` in `route_turn()`.
### B. Classify inside `@router` (richer prompts)
### B. Classify inside `route_turn` (richer prompts)
Set `default_intents=None` so kickoff only appends the user message. In `route()`, call `classify_intent` with a custom prompt or descriptions:
Set `default_intents=None` so `handle_turn()` only appends the user message. In `route_turn()`, call `classify_intent` with a custom prompt or descriptions:
```python
@router(bootstrap)
def route(self):
def route_turn(self, context):
intent = self.classify_intent(
self._routing_prompt(self.state.last_user_message),
self._routing_prompt(self.state.current_user_message),
("GREETING", "ORDER", "RESEARCH", "GOODBYE"),
llm=self.conversational_config.intent_llm or "gpt-4o-mini",
llm="gpt-4o-mini",
)
self.state.last_intent = intent
return intent
@@ -227,7 +215,7 @@ Use **`@listen("RESEARCH")`** (or similar) for steps that run `Agent.kickoff()`
## When the flow finishes but the user keeps chatting
`FlowFinished` means **this graph run** completed. The conversation continues with another `kickoff` and the same `session_id`. `@persist` restores `messages`, flags, and context.
`FlowFinished` means **this graph run** completed. The conversation continues with another `handle_turn()` and the same `session_id`. `@persist` restores `messages`, flags, and context.
**Persist pattern:** prefer `@persist` on a **single terminal step** (for example `finalize`) rather than on the whole `Flow` class. Class-level persist saves after every method; `load_state` uses the latest row, which may be a mid-run snapshot (for example right after `bootstrap`) and miss handler updates from the same turn.
@@ -244,53 +232,53 @@ Do **not** use `@human_feedback` for follow-up chat lines unless a human must ap
changelog for breaking updates. Open issues / feedback welcome.
</Warning>
Opt into the conversational chat graph by setting `conversational = True` on a `Flow` subclass. The base `Flow` then ships a built-in `@start` / `@router` / `converse_turn` / `end_conversation` graph, manages `state.messages`, drives the router LLM, and keeps the trace batch open across turns. You write the **custom routes**; the framework owns the rest.
Opt into the conversational chat graph by setting `conversational = True` on a `Flow` subclass. The base `Flow` then ships a built-in `@start` / `@router` / `converse_turn` / `end_conversation` graph, manages `state.messages`, can drive a router LLM, and keeps the trace batch open across turns. You write the **custom routes**; the framework owns the rest.
Use this when you want a multi-turn chat with an LLM-driven router and per-route handlers without wiring the lifecycle yourself. Use `Flow[ChatState]` (the lower-level pattern above) when you need full control.
Use this when you want a multi-turn chat with a router and per-route handlers without wiring the lifecycle yourself. Use `Flow[ChatState]` (the lower-level pattern above) when you need full control.
### Quick example
```python
from crewai import LLM, Flow
from crewai import Flow
from crewai.flow import listen
from crewai.experimental.conversational import (
ConversationConfig,
ConversationState,
RouterConfig,
)
ROUTER_LLM = LLM(model="gpt-4o-mini")
@ConversationConfig(
system_prompt="A multi-agent assistant for ordinary chat and tool-backed tasks.",
llm=ROUTER_LLM,
router=RouterConfig(), # routes + descriptions auto-discovered from @listen handlers
)
@ConversationConfig(defer_trace_finalization=True)
class SupportFlow(Flow[ConversationState]):
conversational = True
def route_turn(self, context: dict) -> str | None:
message = (self.state.current_user_message or "").lower()
if "search" in message or "news" in message:
return "INTERNET_SEARCH"
if "docs" in message or "crewai" in message:
return "CREWAI_DOCS"
return "converse"
@listen("INTERNET_SEARCH")
def handle_internet_search(self) -> str:
"""Fresh web research, current news, real-time lookups."""
...
reply = "I would run the web research route here."
self.append_assistant_message(reply)
return reply
@listen("CREWAI_DOCS")
def handle_crewai_docs(self) -> str:
"""Look up the CrewAI documentation for framework/API questions."""
...
reply = "I would look up the CrewAI docs here."
self.append_assistant_message(reply)
return reply
flow = SupportFlow()
try:
flow.handle_turn("What can you do?") # routes to converse (built-in)
flow.handle_turn("What can you do?") # routes to converse
flow.handle_turn("Search the web for AI news.") # routes to INTERNET_SEARCH
flow.handle_turn("Summarize the first result.") # routes back to converse
flow.handle_turn("Check the CrewAI docs.") # routes to CREWAI_DOCS
finally:
flow.finalize_session_traces()
```
@@ -323,7 +311,21 @@ Class decorator that attaches per-class chat defaults.
### `RouterConfig` and the auto-built route catalog
```python
RouterConfig(
from typing import Literal
from pydantic import BaseModel
from crewai import LLM
from crewai.experimental.conversational import RouterConfig
class MyRoute(BaseModel):
intent: Literal["INTERNET_SEARCH", "CREWAI_DOCS", "converse"]
ROUTER_LLM = LLM(model="gpt-4o-mini")
router_config = RouterConfig(
prompt="Optional domain framing (policy, voice, persona).",
response_format=MyRoute, # optional; auto-generated otherwise
llm=ROUTER_LLM, # falls back to ConversationConfig.llm
@@ -347,6 +349,9 @@ The router prompt that gets sent to the LLM is built automatically. For each rou
So in practice, **adding a new route is `@listen("X")` + a one-line docstring**:
```python
from crewai.flow import listen
@listen("INTERNET_SEARCH")
def handle_internet_search(self) -> str:
"""Fresh web research, current news, real-time lookups."""
@@ -385,7 +390,7 @@ You can override any of these by defining a same-named handler in your subclass.
4. The router stores its decision in `state.last_intent` (visible to the next turn's router context).
5. If your handler returned a string and didn't already call `append_assistant_message`, `handle_turn` appends it for you.
You can also call `flow.kickoff(user_message=..., session_id=...)` directly the same reset/run logic fires. `handle_turn` is the ergonomic wrapper.
Call `handle_turn()` for chat messages. Calling `kickoff(inputs={"id": ...})` directly runs the flow graph without applying the conversational turn wrapper.
### `chat()` for local REPLs
@@ -422,6 +427,12 @@ For web apps, background workers, tests, and custom transports, keep using `hand
To run side effects (event bus setup, telemetry) on every routing decision, override `route_turn`:
```python
from typing import Any
from crewai import Flow
from crewai.experimental.conversational import ConversationState
class SupportFlow(Flow[ConversationState]):
conversational = True
@@ -443,7 +454,7 @@ Inside a `@listen(label)` handler, choose:
## Tracing across turns
With `defer_trace_finalization=True` (default in `ConversationalConfig`):
With `defer_trace_finalization=True` (default in `ConversationConfig`):
- **One trace batch** for the whole chat session.
- **`flow_started`** on the first turn only; **`flow_finished`** once in `finalize_session_traces()`.
@@ -455,7 +466,7 @@ flow.chat(session_id=session_id)
```
`flow.chat()` calls `finalize_session_traces()` for you. When you own the loop
with `handle_turn()` or `kickoff(...)`, call `finalize_session_traces()` when
with `handle_turn()`, call `finalize_session_traces()` when
the session ends.
`suppress_flow_events=True` only hides Rich console panels; trace and method events still emit for observability.

View File

@@ -11,96 +11,83 @@ mode: "wide"
| 개념 | 구현 |
|------|------|
| 세션 id | `kickoff(session_id=...)` → `inputs["id"]` → `state.id` |
| 사용자 입력 | `kickoff(user_message=...)`가 그래프 실행 전 `state.messages`에 추가 |
| 턴 완료 | `FlowFinished`는 **이번 실행**만 의미; 다음 `kickoff`로 대화 계속 |
| 세션 전체 트레이스 | `ConversationalConfig(defer_trace_finalization=True)` + `finalize_session_traces()` |
| 세션 id | `handle_turn(..., session_id=...)` → `kickoff(inputs={"id": ...})` → `state.id` |
| 사용자 입력 | `handle_turn(message)`가 그래프 실행 전 `state.messages`에 추가 |
| 턴 완료 | `FlowFinished`는 **이번 실행**만 의미; 다음 `handle_turn`로 대화 계속 |
| 세션 전체 트레이스 | `ConversationConfig(defer_trace_finalization=True)` + `finalize_session_traces()` |
## 턴 API
REST, WebSocket, 테스트, 커스텀 UI에서 오는 모든 사용자 메시지에는 **`flow.kickoff(user_message=..., session_id=...)`** 또는 **`flow.handle_turn(...)`**를 사용하세요. 대화형 `Flow`를 로컬 터미널 채팅 루프로 실행하고 싶을 때는 **`flow.chat()`**을 사용하세요.
REST, WebSocket, 테스트, 커스텀 UI에서 오는 모든 사용자 메시지에는 **`flow.handle_turn(message, session_id=...)`**를 사용하세요. 대화형 `Flow`를 로컬 터미널 채팅 루프로 실행하고 싶을 때는 **`flow.chat()`**을 사용하세요.
`Flow.kickoff()`는 `user_message=` 또는 `session_id=` 키워드 인자를 받지 않습니다. 대화형 flow에서는 `handle_turn()`이 보류 중인 메시지를 저장하고 내부적으로 `kickoff(inputs={"id": session_id})`를 호출합니다.
| API | 용도 |
|-----|------|
| `kickoff(user_message=..., session_id=...)` | 각 사용자 메시지 |
| `handle_turn(message, session_id=...)` | 대화형 `Flow`용 한 턴 편의 래퍼 |
| `chat()` | 대화형 `Flow`용 로컬 터미널 REPL |
| `kickoff_async(...)` | 동일 파라미터; 네이티브 async 진입 |
| `kickoff(inputs={...})` | 대화형 턴 처리 없이 flow를 직접 실행 |
| `ask()` | 한 스텝 **내부** 블로킹 프롬프트 (마법사, 확인) |
| `@human_feedback` | **스텝 출력** 승인/거부 — 다음 채팅 줄이 아님 |
| `ChatSession.handle_turn(...)` | `kickoff` 위의 전송 계층 (SSE / WebSocket) |
| `ChatSession.handle_turn(...)` | `handle_turn` 위의 전송 계층 (SSE / WebSocket) |
## 빠른 시작
```python
from uuid import uuid4
from crewai.flow import (
ChatState,
ConversationalConfig,
Flow,
listen,
or_,
persist,
router,
start,
from crewai import Flow
from crewai.flow import listen
from crewai.experimental.conversational import (
ConversationConfig,
ConversationState,
)
from crewai.flow.persistence import SQLiteFlowPersistence
class SupportFlow(Flow[ChatState]):
conversational_config = ConversationalConfig(
default_intents=["order", "help", "goodbye"],
intent_llm="gpt-4o-mini",
defer_trace_finalization=True,
)
@ConversationConfig(defer_trace_finalization=True)
class SupportFlow(Flow[ConversationState]):
conversational = True
@start()
def bootstrap(self):
if not self.state.session_ready:
self.state.session_ready = True
return "ready"
@router(bootstrap)
def route(self):
# default_intents 설정 시 prepare_conversational_turn에서 last_intent 설정
return self.state.last_intent or "help"
def route_turn(self, context):
message = self.state.current_user_message or ""
if "주문" in message or "order" in message.lower():
return "order"
if "안녕" in message or "goodbye" in message.lower():
return "goodbye"
return "help"
@listen("order")
def handle_order(self):
reply = "주문이 배송 중입니다."
self.append_message("assistant", reply)
self.append_assistant_message(reply)
return reply
@listen("help")
def handle_help(self):
reply = "무엇을 도와드릴까요?"
self.append_message("assistant", reply)
self.append_assistant_message(reply)
return reply
@listen("goodbye")
def handle_goodbye(self):
reply = "안녕히 가세요!"
self.append_message("assistant", reply)
self.append_assistant_message(reply)
return reply
@persist(SQLiteFlowPersistence("support.db"))
@listen(or_(handle_order, handle_help, handle_goodbye))
def finalize(self):
return self.state.model_dump()
session_id = str(uuid4())
flow = SupportFlow()
flow.kickoff(user_message="주문 어디까지 왔나요?", session_id=session_id)
flow.kickoff(user_message="반품은 어떻게 하나요?", session_id=session_id)
flow.finalize_session_traces() # 전체 대화에 대한 단일 trace 링크
try:
flow.handle_turn("주문 어디까지 왔나요?", session_id=session_id)
flow.handle_turn("반품은 어떻게 하나요?", session_id=session_id)
finally:
flow.finalize_session_traces() # 전체 대화에 대한 단일 trace 링크
```
## 턴 생명주기
`user_message`가 있는 각 `kickoff`는 다음 파이프라인을 실행합니다:
각 `handle_turn`은 다음 파이프라인을 실행합니다:
1. **`_configure_conversational_kickoff`** — `session_id` / `user_message`를 `inputs`에 병합, `ConversationalConfig` 적용, 설정 시 지연 트레이싱 활성화.
2. **상태 복원** — `inputs["id"]`가 있고 `@persist`가 설정되면 최신 스냅샷 로드.
@@ -109,7 +96,7 @@ flow.finalize_session_traces() # 전체 대화에 대한 단일 trace 링크
5. **그래프 실행** — `@start` → `@router` → `@listen` 핸들러.
6. **실행 종료** — 지연 활성화 시 턴별 `flow_finished` 및 trace 종료 **건너뜀**; 중첩 `Agent.kickoff()` / crew도 부모 batch를 닫지 않음.
핸들러는 **`append_message("assistant", reply)`**를 호출해 다음 턴의 `conversation_messages`에 어시스턴트 응답이 포함되게 하세요. 사용자 입력은 kickoff 시 이미 저장니다 — 핸들러에서 다시 추가하지 마세요.
핸들러는 **`append_assistant_message(reply)`**를 호출해 다음 턴의 `conversation_messages`에 어시스턴트 응답이 포함되게 하세요. 사용자 입력은 `handle_turn`이 이미 저장니다 — 핸들러에서 다시 추가하지 마세요.
## `ConversationalConfig` (클래스 수준 기본값)
@@ -384,7 +371,7 @@ Routes:
4. router는 결정을 `state.last_intent`에 저장합니다 (다음 턴의 router 컨텍스트에서 보입니다).
5. 핸들러가 문자열을 반환했지만 `append_assistant_message`를 직접 호출하지 않았다면, `handle_turn`이 대신 추가해 줍니다.
`flow.kickoff(user_message=..., session_id=...)`를 직접 호출해도 동일한 reset/run 로직이 동작합니다. `handle_turn`은 그 위에 얹은 편의 래퍼입니다.
채팅 메시지에는 `handle_turn()`을 호출하세요. `kickoff(inputs={"id": ...})`를 직접 호출하면 대화형 턴 래퍼 없이 flow 그래프가 실행됩니다.
### 로컬 REPL용 `chat()`

View File

@@ -11,96 +11,83 @@ Apps conversacionais tratam cada linha do usuário como uma **nova execução do
| Conceito | Implementação |
|---------|----------------|
| Id de sessão | `kickoff(session_id=...)` → `inputs["id"]` → `state.id` |
| Linha do usuário | `kickoff(user_message=...)` acrescenta em `state.messages` antes do grafo rodar |
| Fim do turno | `FlowFinished` só para **esta execução**; o chat segue no próximo `kickoff` |
| Trace da sessão | `ConversationalConfig(defer_trace_finalization=True)` + `finalize_session_traces()` |
| Id de sessão | `handle_turn(..., session_id=...)` → `kickoff(inputs={"id": ...})` → `state.id` |
| Linha do usuário | `handle_turn(message)` acrescenta em `state.messages` antes do grafo rodar |
| Fim do turno | `FlowFinished` só para **esta execução**; o chat segue no próximo `handle_turn` |
| Trace da sessão | `ConversationConfig(defer_trace_finalization=True)` + `finalize_session_traces()` |
## APIs de turno
Use **`flow.kickoff(user_message=..., session_id=...)`** ou **`flow.handle_turn(...)`** para cada mensagem de usuário em REST, WebSocket, testes e UIs customizadas. Use **`flow.chat()`** quando quiser um loop de chat local no terminal para um `Flow` conversacional.
Use **`flow.handle_turn(message, session_id=...)`** para cada mensagem de usuário em REST, WebSocket, testes e UIs customizadas. Use **`flow.chat()`** quando quiser um loop de chat local no terminal para um `Flow` conversacional.
`Flow.kickoff()` não aceita os argumentos nomeados `user_message=` ou `session_id=`. Para flows conversacionais, `handle_turn()` guarda a mensagem pendente e chama `kickoff(inputs={"id": session_id})` internamente.
| API | Uso |
|-----|-----|
| `kickoff(user_message=..., session_id=...)` | Cada mensagem do usuário |
| `handle_turn(message, session_id=...)` | Wrapper ergonômico de um turno para `Flow` conversacional |
| `chat()` | REPL local no terminal para `Flow` conversacional |
| `kickoff_async(...)` | Mesmos parâmetros; entrada async nativa |
| `kickoff(inputs={...})` | Execução avançada do flow sem tratamento de turno conversacional |
| `ask()` | Prompt bloqueante **dentro** de um passo (wizard, esclarecimento) |
| `@human_feedback` | Aprovar/rejeitar **saída de um passo** — não a próxima linha do chat |
| `ChatSession.handle_turn(...)` | Camada de transporte sobre `kickoff` (SSE / WebSocket) |
| `ChatSession.handle_turn(...)` | Camada de transporte sobre `handle_turn` (SSE / WebSocket) |
## Início rápido
```python
from uuid import uuid4
from crewai.flow import (
ChatState,
ConversationalConfig,
Flow,
listen,
or_,
persist,
router,
start,
from crewai import Flow
from crewai.flow import listen
from crewai.experimental.conversational import (
ConversationConfig,
ConversationState,
)
from crewai.flow.persistence import SQLiteFlowPersistence
class SupportFlow(Flow[ChatState]):
conversational_config = ConversationalConfig(
default_intents=["order", "help", "goodbye"],
intent_llm="gpt-4o-mini",
defer_trace_finalization=True,
)
@ConversationConfig(defer_trace_finalization=True)
class SupportFlow(Flow[ConversationState]):
conversational = True
@start()
def bootstrap(self):
if not self.state.session_ready:
self.state.session_ready = True
return "ready"
@router(bootstrap)
def route(self):
# last_intent definido em prepare_conversational_turn quando default_intents está setado
return self.state.last_intent or "help"
def route_turn(self, context):
message = (self.state.current_user_message or "").lower()
if "pedido" in message or "order" in message:
return "order"
if "tchau" in message or "goodbye" in message:
return "goodbye"
return "help"
@listen("order")
def handle_order(self):
reply = "Seu pedido está a caminho."
self.append_message("assistant", reply)
self.append_assistant_message(reply)
return reply
@listen("help")
def handle_help(self):
reply = "Como posso ajudar?"
self.append_message("assistant", reply)
self.append_assistant_message(reply)
return reply
@listen("goodbye")
def handle_goodbye(self):
reply = "Até logo!"
self.append_message("assistant", reply)
self.append_assistant_message(reply)
return reply
@persist(SQLiteFlowPersistence("support.db"))
@listen(or_(handle_order, handle_help, handle_goodbye))
def finalize(self):
return self.state.model_dump()
session_id = str(uuid4())
flow = SupportFlow()
flow.kickoff(user_message="Onde está meu pedido?", session_id=session_id)
flow.kickoff(user_message="E as devoluções?", session_id=session_id)
flow.finalize_session_traces() # um link de trace para o chat inteiro
try:
flow.handle_turn("Onde está meu pedido?", session_id=session_id)
flow.handle_turn("E as devoluções?", session_id=session_id)
finally:
flow.finalize_session_traces() # um link de trace para o chat inteiro
```
## Ciclo de vida do turno
Cada `kickoff` com `user_message` executa este pipeline:
Cada `handle_turn` executa este pipeline:
1. **`_configure_conversational_kickoff`** — mescla `session_id` / `user_message` em `inputs`, aplica `ConversationalConfig`, habilita tracing adiado quando configurado.
2. **Restauração de estado** — se `inputs["id"]` existe e `@persist` está configurado, carrega o snapshot mais recente.
@@ -109,7 +96,7 @@ Cada `kickoff` com `user_message` executa este pipeline:
5. **Execução do grafo** — `@start` → `@router` → handlers `@listen`.
6. **Fim da execução** — `flow_finished` por turno e finalização de trace são **ignorados** com adiamento; `Agent.kickoff()` / crews aninhados também não fecham o batch pai.
Os handlers devem chamar **`append_message("assistant", reply)`** para que o próximo turno inclua a resposta do assistente. A linha do usuário já é salva no kickoff — não acrescente de novo nos handlers.
Os handlers devem chamar **`append_assistant_message(reply)`** para que o próximo turno inclua a resposta do assistente. A linha do usuário já é salva por `handle_turn` — não acrescente de novo nos handlers.
## `ConversationalConfig` (padrões em nível de classe)
@@ -385,7 +372,7 @@ Você pode sobrescrever qualquer uma definindo um handler com o mesmo nome na su
4. O router grava sua decisão em `state.last_intent` (visível para o contexto de routing do próximo turno).
5. Se seu handler retornou uma string e ainda não chamou `append_assistant_message`, `handle_turn` anexa para você.
Você também pode chamar `flow.kickoff(user_message=..., session_id=...)` diretamente — a mesma lógica de reset/run é acionada. `handle_turn` é o wrapper ergonômico.
Chame `handle_turn()` para mensagens de chat. Chamar `kickoff(inputs={"id": ...})` diretamente executa o grafo sem aplicar o wrapper de turno conversacional.
### `chat()` para REPLs locais