Add conversational Flow chat helper

This commit is contained in:
lorenzejay
2026-06-03 16:43:04 -07:00
parent 051fa0c1cb
commit bd026965e6
4 changed files with 165 additions and 21 deletions

View File

@@ -7,7 +7,7 @@ mode: "wide"
## Overview
Conversational apps treat each user line as a **new flow run** with the **same session id**. CrewAI adds helpers for message history, optional intent classification, deferred tracing, and UI bridges — without a separate `chat()` API on `Flow`.
Conversational apps treat each user line as a **new flow run** with the **same session id**. CrewAI adds helpers for message history, optional intent classification, deferred tracing, UI bridges, and a local `flow.chat()` REPL for conversational flows.
| Concept | Implementation |
|---------|----------------|
@@ -16,13 +16,15 @@ Conversational apps treat each user line as a **new flow run** with the **same s
| Turn complete | `FlowFinished` for **this run** only; chat continues on the next `kickoff` |
| Full-session trace | `ConversationalConfig(defer_trace_finalization=True)` + `finalize_session_traces()` |
## One entry point: `kickoff`
## Turn APIs
Use **`flow.kickoff(user_message=..., session_id=...)`** for every user message (REST, WebSocket, CLI). Do not add a custom `chat()` wrapper on `Flow`.
Use **`flow.kickoff(user_message=..., session_id=...)`** or **`flow.handle_turn(...)`** for every user message from REST, WebSocket, tests, and custom UIs. Use **`flow.chat()`** when you want a local terminal chat loop for a conversational `Flow`.
| API | Use for |
|-----|---------|
| `kickoff(user_message=..., session_id=...)` | Each user message |
| `handle_turn(message, session_id=...)` | Ergonomic one-turn wrapper for conversational `Flow` |
| `chat()` | Local terminal REPL for conversational `Flow` |
| `kickoff_async(...)` | Same parameters; native async entry |
| `ask()` | Blocking prompt **inside** one step (wizard, clarification) |
| `@human_feedback` | Approve/reject **a step output** — not the next chat line |
@@ -293,6 +295,15 @@ finally:
flow.finalize_session_traces()
```
For a local terminal chat, use `chat()`:
```python
def kickoff() -> None:
SupportFlow().chat()
```
`chat()` wraps `handle_turn()` in a REPL, exits on `exit` / `quit`, skips blank lines by default, and calls `finalize_session_traces()` when the session ends.
### `ConversationConfig`
Class decorator that attaches per-class chat defaults.
@@ -376,6 +387,36 @@ You can override any of these by defining a same-named handler in your subclass.
You can also call `flow.kickoff(user_message=..., session_id=...)` directly — the same reset/run logic fires. `handle_turn` is the ergonomic wrapper.
### `chat()` for local REPLs
`flow.chat()` is the batteries-included terminal wrapper around `handle_turn()`:
```python
flow = SupportFlow()
flow.chat()
```
It handles the common local loop:
1. Prompts for a user message.
2. Stops on `exit` / `quit`, `EOFError`, or `KeyboardInterrupt`.
3. Calls `handle_turn(message, session_id=...)`.
4. Prints the assistant result.
5. Finalizes deferred session traces in a `finally` block.
Customize the terminal behavior with injectable I/O:
```python
flow.chat(
session_id="demo-session",
prompt="You: ",
assistant_prefix="Assistant: ",
exit_commands=("exit", "quit", "bye"),
)
```
For web apps, background workers, tests, and custom transports, keep using `handle_turn()` directly.
### Custom router behavior
To run side effects (event bus setup, telemetry) on every routing decision, override `route_turn`:
@@ -410,17 +451,12 @@ With `defer_trace_finalization=True` (default in `ConversationalConfig`):
- **Nested work** (`Agent.kickoff()`, crews, Exa tools) appends to the **parent** batch; inner `AgentExecutor` flows do not close the session batch early.
```python
try:
while True:
line = input("You: ").strip()
if not line:
break
flow.kickoff(user_message=line, session_id=session_id)
finally:
flow.finalize_session_traces()
flow.chat(session_id=session_id)
```
`ChatSession.close()` calls `finalize_session_traces()` when deferral is enabled.
`flow.chat()` calls `finalize_session_traces()` for you. When you own the loop
with `handle_turn()` or `kickoff(...)`, call `finalize_session_traces()` when
the session ends.
`suppress_flow_events=True` only hides Rich console panels; trace and method events still emit for observability.

View File

@@ -16,7 +16,7 @@ Import surface:
from __future__ import annotations
from collections.abc import Mapping, Sequence
from collections.abc import Callable, Mapping, Sequence
from enum import Enum
import json
import logging
@@ -243,6 +243,59 @@ class _ConversationalMixin:
self.append_assistant_message(self._stringify_result(result))
return result
def chat(
self,
*,
session_id: str | None = None,
prompt: str = "\nYou: ",
assistant_prefix: str = "\nAssistant: ",
exit_commands: Sequence[str] = ("exit", "quit"),
input_fn: Callable[[str], str] = input,
output_fn: Callable[[str], None] = print,
skip_empty: bool = True,
defer_trace_finalization: bool = True,
**handle_turn_kwargs: Any,
) -> None:
"""Run an interactive terminal chat loop for a conversational Flow.
``chat()`` is a convenience wrapper around ``handle_turn()`` for local
REPLs. For web apps, tests, and custom transports, call
``handle_turn()`` directly. The input/output callables are injectable so
callers can customize prompts or exercise the loop without patching
builtins.
"""
if not getattr(type(self), "conversational", False):
raise ValueError("Flow.chat() is only available on conversational flows")
exit_set = {command.lower() for command in exit_commands}
previous_defer = getattr(self, "defer_trace_finalization", False)
if defer_trace_finalization:
self.defer_trace_finalization = True
try:
while True:
try:
message = input_fn(prompt).strip()
except (EOFError, KeyboardInterrupt):
output_fn("")
break
if message.lower() in exit_set:
break
if skip_empty and not message:
continue
result = self.handle_turn(
message,
session_id=session_id,
**handle_turn_kwargs,
)
output_fn(f"{assistant_prefix}{result}")
finally:
self.finalize_session_traces()
if defer_trace_finalization:
self.defer_trace_finalization = previous_defer
def build_router_context(self) -> dict[str, Any]:
"""Build context used by the routing policy for the current turn."""
state = cast(ConversationState, self.state)

View File

@@ -704,16 +704,16 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
# When ``conversational = True`` on a subclass, the built-in conversational
# graph (``conversation_start`` -> ``route_conversation`` -> ``converse_turn``
# / ``end_conversation`` / ``answer_from_history_turn``) registers and
# ``handle_turn`` becomes the chat entry point. When ``False`` (default),
# the methods exist as inert attributes and never register or fire —
# non-chat flows pay no runtime cost.
# ``handle_turn`` / ``chat`` become the chat entry points. When ``False``
# (default), the methods exist as inert attributes and never register or
# fire — non-chat flows pay no runtime cost.
#
# ⚠ EXPERIMENTAL FEATURE. The whole conversational surface
# (``conversational`` ClassVar, ``handle_turn``, ``ConversationConfig``,
# ``RouterConfig``, ``ConversationState``, the built-in graph + helpers)
# lives under ``crewai.experimental`` and may change shape before
# graduating. Pin your CrewAI version if you depend on specific
# behavior, and watch the changelog for breaking updates.
# (``conversational`` ClassVar, ``handle_turn``, ``chat``,
# ``ConversationConfig``, ``RouterConfig``, ``ConversationState``, the
# built-in graph + helpers) lives under ``crewai.experimental`` and may
# change shape before graduating. Pin your CrewAI version if you depend on
# specific behavior, and watch the changelog for breaking updates.
conversational: ClassVar[bool] = False
conversational_config: ClassVar[ConversationConfig | None] = None
builtin_routes: ClassVar[tuple[str, ...]] = ("converse", "end")

View File

@@ -858,6 +858,61 @@ class TestConversationalFlow:
flow.handle_turn("anything")
assert flow.state.messages[-1].content == "worked"
def test_chat_runs_repl_over_handle_turn_and_finalizes(self) -> None:
@ConversationConfig(defer_trace_finalization=False)
class MyChat(ConversationalFlow):
turns: int = 0
def route_turn(self, context: dict[str, Any]) -> str | None:
return "work"
@listen("work")
def do_work(self) -> str:
self.turns += 1
reply = f"worked: {self.state.current_user_message}"
self.append_assistant_message(reply)
return reply
flow = MyChat()
inputs = iter(["first", "", "second", "quit"])
prompts: list[str] = []
outputs: list[str] = []
def input_fn(prompt: str) -> str:
prompts.append(prompt)
return next(inputs)
with patch.object(flow, "finalize_session_traces") as mock_finalize:
flow.chat(
session_id="session-1",
input_fn=input_fn,
output_fn=outputs.append,
)
assert flow.turns == 2
assert prompts == ["\nYou: ", "\nYou: ", "\nYou: ", "\nYou: "]
assert outputs == [
"\nAssistant: worked: first",
"\nAssistant: worked: second",
]
mock_finalize.assert_called_once_with()
assert flow.defer_trace_finalization is False
def test_chat_rejects_non_conversational_flows(self) -> None:
class PlainFlow(Flow):
@start()
def begin(self) -> str:
return "done"
flow = PlainFlow()
try:
flow.chat(input_fn=lambda _: "quit")
except ValueError as exc:
assert "conversational flows" in str(exc)
else:
raise AssertionError("Flow.chat() should reject regular flows")
def test_defer_trace_finalization_skips_per_turn_finalize(self) -> None:
"""``defer_trace_finalization = True`` suppresses per-turn ``finalize_batch``.