From bd026965e676d40b7aa6a5d0a3c4865b80eee78d Mon Sep 17 00:00:00 2001 From: lorenzejay Date: Wed, 3 Jun 2026 16:43:04 -0700 Subject: [PATCH] Add conversational Flow chat helper --- docs/en/guides/flows/conversational-flows.mdx | 60 +++++++++++++++---- .../experimental/conversational_mixin.py | 55 ++++++++++++++++- lib/crewai/src/crewai/flow/runtime.py | 16 ++--- lib/crewai/tests/test_flow_conversation.py | 55 +++++++++++++++++ 4 files changed, 165 insertions(+), 21 deletions(-) diff --git a/docs/en/guides/flows/conversational-flows.mdx b/docs/en/guides/flows/conversational-flows.mdx index 832574095..00084cae7 100644 --- a/docs/en/guides/flows/conversational-flows.mdx +++ b/docs/en/guides/flows/conversational-flows.mdx @@ -7,7 +7,7 @@ mode: "wide" ## Overview -Conversational apps treat each user line as a **new flow run** with the **same session id**. CrewAI adds helpers for message history, optional intent classification, deferred tracing, and UI bridges — without a separate `chat()` API on `Flow`. +Conversational apps treat each user line as a **new flow run** with the **same session id**. CrewAI adds helpers for message history, optional intent classification, deferred tracing, UI bridges, and a local `flow.chat()` REPL for conversational flows. | Concept | Implementation | |---------|----------------| @@ -16,13 +16,15 @@ Conversational apps treat each user line as a **new flow run** with the **same s | Turn complete | `FlowFinished` for **this run** only; chat continues on the next `kickoff` | | Full-session trace | `ConversationalConfig(defer_trace_finalization=True)` + `finalize_session_traces()` | -## One entry point: `kickoff` +## Turn APIs -Use **`flow.kickoff(user_message=..., session_id=...)`** for every user message (REST, WebSocket, CLI). Do not add a custom `chat()` wrapper on `Flow`. +Use **`flow.kickoff(user_message=..., session_id=...)`** or **`flow.handle_turn(...)`** for every user message from REST, WebSocket, tests, and custom UIs. Use **`flow.chat()`** when you want a local terminal chat loop for a conversational `Flow`. | API | Use for | |-----|---------| | `kickoff(user_message=..., session_id=...)` | Each user message | +| `handle_turn(message, session_id=...)` | Ergonomic one-turn wrapper for conversational `Flow` | +| `chat()` | Local terminal REPL for conversational `Flow` | | `kickoff_async(...)` | Same parameters; native async entry | | `ask()` | Blocking prompt **inside** one step (wizard, clarification) | | `@human_feedback` | Approve/reject **a step output** — not the next chat line | @@ -293,6 +295,15 @@ finally: flow.finalize_session_traces() ``` +For a local terminal chat, use `chat()`: + +```python +def kickoff() -> None: + SupportFlow().chat() +``` + +`chat()` wraps `handle_turn()` in a REPL, exits on `exit` / `quit`, skips blank lines by default, and calls `finalize_session_traces()` when the session ends. + ### `ConversationConfig` Class decorator that attaches per-class chat defaults. @@ -376,6 +387,36 @@ You can override any of these by defining a same-named handler in your subclass. You can also call `flow.kickoff(user_message=..., session_id=...)` directly — the same reset/run logic fires. `handle_turn` is the ergonomic wrapper. +### `chat()` for local REPLs + +`flow.chat()` is the batteries-included terminal wrapper around `handle_turn()`: + +```python +flow = SupportFlow() +flow.chat() +``` + +It handles the common local loop: + +1. Prompts for a user message. +2. Stops on `exit` / `quit`, `EOFError`, or `KeyboardInterrupt`. +3. Calls `handle_turn(message, session_id=...)`. +4. Prints the assistant result. +5. Finalizes deferred session traces in a `finally` block. + +Customize the terminal behavior with injectable I/O: + +```python +flow.chat( + session_id="demo-session", + prompt="You: ", + assistant_prefix="Assistant: ", + exit_commands=("exit", "quit", "bye"), +) +``` + +For web apps, background workers, tests, and custom transports, keep using `handle_turn()` directly. + ### Custom router behavior To run side effects (event bus setup, telemetry) on every routing decision, override `route_turn`: @@ -410,17 +451,12 @@ With `defer_trace_finalization=True` (default in `ConversationalConfig`): - **Nested work** (`Agent.kickoff()`, crews, Exa tools) appends to the **parent** batch; inner `AgentExecutor` flows do not close the session batch early. ```python -try: - while True: - line = input("You: ").strip() - if not line: - break - flow.kickoff(user_message=line, session_id=session_id) -finally: - flow.finalize_session_traces() +flow.chat(session_id=session_id) ``` -`ChatSession.close()` calls `finalize_session_traces()` when deferral is enabled. +`flow.chat()` calls `finalize_session_traces()` for you. When you own the loop +with `handle_turn()` or `kickoff(...)`, call `finalize_session_traces()` when +the session ends. `suppress_flow_events=True` only hides Rich console panels; trace and method events still emit for observability. diff --git a/lib/crewai/src/crewai/experimental/conversational_mixin.py b/lib/crewai/src/crewai/experimental/conversational_mixin.py index a66c5bc68..ab38201f3 100644 --- a/lib/crewai/src/crewai/experimental/conversational_mixin.py +++ b/lib/crewai/src/crewai/experimental/conversational_mixin.py @@ -16,7 +16,7 @@ Import surface: from __future__ import annotations -from collections.abc import Mapping, Sequence +from collections.abc import Callable, Mapping, Sequence from enum import Enum import json import logging @@ -243,6 +243,59 @@ class _ConversationalMixin: self.append_assistant_message(self._stringify_result(result)) return result + def chat( + self, + *, + session_id: str | None = None, + prompt: str = "\nYou: ", + assistant_prefix: str = "\nAssistant: ", + exit_commands: Sequence[str] = ("exit", "quit"), + input_fn: Callable[[str], str] = input, + output_fn: Callable[[str], None] = print, + skip_empty: bool = True, + defer_trace_finalization: bool = True, + **handle_turn_kwargs: Any, + ) -> None: + """Run an interactive terminal chat loop for a conversational Flow. + + ``chat()`` is a convenience wrapper around ``handle_turn()`` for local + REPLs. For web apps, tests, and custom transports, call + ``handle_turn()`` directly. The input/output callables are injectable so + callers can customize prompts or exercise the loop without patching + builtins. + """ + if not getattr(type(self), "conversational", False): + raise ValueError("Flow.chat() is only available on conversational flows") + + exit_set = {command.lower() for command in exit_commands} + previous_defer = getattr(self, "defer_trace_finalization", False) + if defer_trace_finalization: + self.defer_trace_finalization = True + + try: + while True: + try: + message = input_fn(prompt).strip() + except (EOFError, KeyboardInterrupt): + output_fn("") + break + + if message.lower() in exit_set: + break + if skip_empty and not message: + continue + + result = self.handle_turn( + message, + session_id=session_id, + **handle_turn_kwargs, + ) + output_fn(f"{assistant_prefix}{result}") + finally: + self.finalize_session_traces() + if defer_trace_finalization: + self.defer_trace_finalization = previous_defer + def build_router_context(self) -> dict[str, Any]: """Build context used by the routing policy for the current turn.""" state = cast(ConversationState, self.state) diff --git a/lib/crewai/src/crewai/flow/runtime.py b/lib/crewai/src/crewai/flow/runtime.py index 33bfbacea..1693864da 100644 --- a/lib/crewai/src/crewai/flow/runtime.py +++ b/lib/crewai/src/crewai/flow/runtime.py @@ -704,16 +704,16 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta): # When ``conversational = True`` on a subclass, the built-in conversational # graph (``conversation_start`` -> ``route_conversation`` -> ``converse_turn`` # / ``end_conversation`` / ``answer_from_history_turn``) registers and - # ``handle_turn`` becomes the chat entry point. When ``False`` (default), - # the methods exist as inert attributes and never register or fire — - # non-chat flows pay no runtime cost. + # ``handle_turn`` / ``chat`` become the chat entry points. When ``False`` + # (default), the methods exist as inert attributes and never register or + # fire — non-chat flows pay no runtime cost. # # ⚠ EXPERIMENTAL FEATURE. The whole conversational surface - # (``conversational`` ClassVar, ``handle_turn``, ``ConversationConfig``, - # ``RouterConfig``, ``ConversationState``, the built-in graph + helpers) - # lives under ``crewai.experimental`` and may change shape before - # graduating. Pin your CrewAI version if you depend on specific - # behavior, and watch the changelog for breaking updates. + # (``conversational`` ClassVar, ``handle_turn``, ``chat``, + # ``ConversationConfig``, ``RouterConfig``, ``ConversationState``, the + # built-in graph + helpers) lives under ``crewai.experimental`` and may + # change shape before graduating. Pin your CrewAI version if you depend on + # specific behavior, and watch the changelog for breaking updates. conversational: ClassVar[bool] = False conversational_config: ClassVar[ConversationConfig | None] = None builtin_routes: ClassVar[tuple[str, ...]] = ("converse", "end") diff --git a/lib/crewai/tests/test_flow_conversation.py b/lib/crewai/tests/test_flow_conversation.py index 77567fe5d..0a651a5c8 100644 --- a/lib/crewai/tests/test_flow_conversation.py +++ b/lib/crewai/tests/test_flow_conversation.py @@ -858,6 +858,61 @@ class TestConversationalFlow: flow.handle_turn("anything") assert flow.state.messages[-1].content == "worked" + def test_chat_runs_repl_over_handle_turn_and_finalizes(self) -> None: + @ConversationConfig(defer_trace_finalization=False) + class MyChat(ConversationalFlow): + turns: int = 0 + + def route_turn(self, context: dict[str, Any]) -> str | None: + return "work" + + @listen("work") + def do_work(self) -> str: + self.turns += 1 + reply = f"worked: {self.state.current_user_message}" + self.append_assistant_message(reply) + return reply + + flow = MyChat() + inputs = iter(["first", "", "second", "quit"]) + prompts: list[str] = [] + outputs: list[str] = [] + + def input_fn(prompt: str) -> str: + prompts.append(prompt) + return next(inputs) + + with patch.object(flow, "finalize_session_traces") as mock_finalize: + flow.chat( + session_id="session-1", + input_fn=input_fn, + output_fn=outputs.append, + ) + + assert flow.turns == 2 + assert prompts == ["\nYou: ", "\nYou: ", "\nYou: ", "\nYou: "] + assert outputs == [ + "\nAssistant: worked: first", + "\nAssistant: worked: second", + ] + mock_finalize.assert_called_once_with() + assert flow.defer_trace_finalization is False + + def test_chat_rejects_non_conversational_flows(self) -> None: + class PlainFlow(Flow): + @start() + def begin(self) -> str: + return "done" + + flow = PlainFlow() + + try: + flow.chat(input_fn=lambda _: "quit") + except ValueError as exc: + assert "conversational flows" in str(exc) + else: + raise AssertionError("Flow.chat() should reject regular flows") + def test_defer_trace_finalization_skips_per_turn_finalize(self) -> None: """``defer_trace_finalization = True`` suppresses per-turn ``finalize_batch``.