diff --git a/lib/crewai/src/crewai/experimental/conversational_mixin.py b/lib/crewai/src/crewai/experimental/conversational_mixin.py index 862706a88..46e83134b 100644 --- a/lib/crewai/src/crewai/experimental/conversational_mixin.py +++ b/lib/crewai/src/crewai/experimental/conversational_mixin.py @@ -1,15 +1,17 @@ -"""Conversational graph + helpers as a mixin for ``Flow`` (experimental). +"""Conversational graph + helpers as an experimental Flow extension. -The experimental conversational chat surface lives here as a mixin so that -``crewai.flow.runtime`` stays focused on the execution engine. ``Flow`` -inherits from ``_ConversationalMixin``; the methods only register on -subclasses that opt in via ``conversational = True`` (enforced by the -``_conversational_only`` marker + ``FlowMeta`` gating in -``crewai.flow.runtime``). +The conversational chat surface remains experimental and may change before the +v2 graduation path. It lives here so ``crewai.flow.runtime`` can stay focused +on the execution engine. ``crewai.flow.flow`` composes this mixin onto the +public ``Flow`` class for backwards compatibility. + +The built-in conversational graph only registers for subclasses that opt in +with ``conversational = True``. Static conversational metadata is projected +into ``FlowDefinition.conversational`` via the Python DSL builder. Import surface: - - :class:`_ConversationalMixin` — internal; ``Flow`` mixes it in. Users - don't import it directly. + - :class:`_ConversationalMixin` — internal; the public ``Flow`` class + composes it in. Users don't import it directly. - The data types this mixin uses live in :mod:`crewai.experimental.conversational`. """ @@ -20,7 +22,7 @@ from collections.abc import Callable, Mapping, Sequence from enum import Enum import json import logging -from typing import TYPE_CHECKING, Any, ClassVar, Literal, cast +from typing import TYPE_CHECKING, Any, ClassVar, Literal, TypeVar, cast from pydantic import BaseModel, Field, create_model @@ -49,21 +51,56 @@ from crewai.utilities.types import LLMMessage if TYPE_CHECKING: - from crewai.flow.runtime import Flow from crewai.llms.base_llm import BaseLLM logger = logging.getLogger(__name__) -class _ConversationalMixin: - """Built-in conversational graph for ``Flow`` (gated on ``conversational``). +def _iter_condition_labels(condition: Any) -> set[str]: + if isinstance(condition, str): + return {condition} + if isinstance(condition, dict): + labels: set[str] = set() + for value in condition.values(): + if isinstance(value, list): + for item in value: + labels.update(_iter_condition_labels(item)) + else: + labels.update(_iter_condition_labels(value)) + return labels + return set() - Mixed into ``Flow`` so its execution engine (``runtime.py``) stays focused - on running graphs. The methods here only register on subclasses that set - ``conversational = True``; non-chat flows see them as inert attributes. + +class _ConversationalMixin: + """Experimental conversational graph for ``Flow``. + + This mixin owns chat behavior and runtime hooks. Non-chat flows see these + methods as inert attributes unless they opt in with ``conversational = True``. """ + # === EXPERIMENTAL: conversational mode === + # When ``conversational = True`` on a Flow subclass, this mixin's built-in + # graph registers and ``handle_turn`` / ``chat`` become chat entry points. + conversational: ClassVar[bool] = False + conversational_config: ClassVar[ConversationConfig | None] = None + builtin_routes: ClassVar[tuple[str, ...]] = ("converse", "end") + internal_routes: ClassVar[tuple[str, ...]] = ( + "answer_from_history", + "conversation_start", + ) + builtin_route_descriptions: ClassVar[dict[str, str]] = { + "converse": ( + "Ordinary chat, follow-ups, summaries, clarifications, and " + "questions answerable from prior conversation history." + ), + "end": ("User signals the conversation is finished (goodbye, exit, done)."), + "answer_from_history": ( + "Answer directly from prior conversation history without invoking " + "tools, agents, or custom routes." + ), + } + # The metaclass + state attributes referenced below live on ``Flow`` — # this mixin is never instantiated standalone. These type-only # declarations exist so static analyzers don't flag attribute access. @@ -71,14 +108,6 @@ class _ConversationalMixin: # (otherwise mypy flags "Cannot override instance variable with class # variable" when Flow declares them as ``ClassVar``). if TYPE_CHECKING: - conversational: ClassVar[bool] - conversational_config: ClassVar[ConversationConfig | None] - builtin_routes: ClassVar[tuple[str, ...]] - internal_routes: ClassVar[tuple[str, ...]] - builtin_route_descriptions: ClassVar[dict[str, str]] - # Registry ClassVars populated by ``FlowMeta`` at class creation. - _listeners: ClassVar[dict[Any, Any]] - # Instance attrs from ``Flow``. state: Any name: str | None @@ -87,6 +116,7 @@ class _ConversationalMixin: _pending_events: dict[Any, Any] _method_call_counts: dict[Any, int] _is_execution_resuming: bool + _conversation_messages: list[LLMMessage] _pending_user_message: str | dict[str, Any] | None _pending_intents: Sequence[str] | None _pending_intent_llm: str | BaseLLM | None @@ -97,8 +127,8 @@ class _ConversationalMixin: def _collapse_to_outcome( self, feedback: str, - outcomes: tuple[str, ...], - llm: str | BaseLLM | Any, + outcomes: Sequence[str], + llm: str | BaseLLM, ) -> str: pass @@ -238,8 +268,8 @@ class _ConversationalMixin: state = cast(ConversationState, self.state) sid = session_id or state.id - # Stash the pending turn so ``_apply_pending_conversational_turn`` - # picks it up AFTER persist restore. + # Stash the pending turn so the kickoff extension hook picks it up + # after persist restore. self._pending_user_message = message self._pending_intents = list(intents) if intents else None self._pending_intent_llm = intent_llm @@ -286,7 +316,7 @@ class _ConversationalMixin: callers can customize prompts or exercise the loop without patching builtins. """ - if not getattr(type(self), "conversational", False): + if not self._is_conversational_enabled(): raise ValueError("Flow.chat() is only available on conversational flows") exit_set = {command.lower() for command in exit_commands} @@ -491,14 +521,14 @@ class _ConversationalMixin: **extra: Any, ) -> None: """Append a message to conversation history (legacy ChatState path).""" - _append_conversation_message(cast("Flow[Any]", self), role, content, **extra) + _append_conversation_message(cast(Any, self), role, content, **extra) @property def conversation_messages(self) -> list[LLMMessage]: """Message history from state, coerced to LLM-shaped dicts.""" return [ message_to_llm_dict(message) - for message in get_conversation_messages(cast("Flow[Any]", self)) + for message in get_conversation_messages(cast(Any, self)) ] def receive_user_message( @@ -514,7 +544,7 @@ class _ConversationalMixin: ``state.messages`` and preserve ``last_intent`` across turns. Non-conversational flows fall through to the legacy helper. """ - if self.conversational: + if self._is_conversational_enabled(): state = cast(ConversationState, self.state) state.messages.append(ConversationMessage(role="user", content=text)) self._emit_conversation_message_added( @@ -535,9 +565,7 @@ class _ConversationalMixin: return intent return text - return _receive_user_message( - cast("Flow[Any]", self), text, outcomes=outcomes, llm=llm - ) + return _receive_user_message(cast(Any, self), text, outcomes=outcomes, llm=llm) def classify_intent( self, @@ -561,21 +589,98 @@ class _ConversationalMixin: def _conversation_config(self) -> ConversationConfig | None: return getattr(type(self), "conversational_config", None) + @property + def _conversation_definition(self) -> Any | None: + return self._conversation_flow_definition().conversational + + def _conversation_flow_definition(self) -> Any: + flow_definition = getattr(type(self), "flow_definition", None) + if not callable(flow_definition): + raise AttributeError( + f"{type(self).__name__} does not expose flow_definition()" + ) + return flow_definition() + + @classmethod + def _conversational_definition(cls) -> Any | None: + flow_definition = getattr(cls, "flow_definition", None) + if not callable(flow_definition): + return None + return flow_definition().conversational + + @classmethod + def _is_conversational(cls) -> bool: + definition = cls._conversational_definition() + return bool(definition and definition.enabled) + + def _is_conversational_enabled(self) -> bool: + definition = self._conversation_definition + return bool(definition and definition.enabled) + + def _initialize_runtime_extension_attrs(self) -> None: + if not isinstance(getattr(self, "_conversation_messages", None), list): + object.__setattr__(self, "_conversation_messages", []) + if not hasattr(self, "_pending_user_message"): + object.__setattr__(self, "_pending_user_message", None) + if not hasattr(self, "_pending_intents"): + object.__setattr__(self, "_pending_intents", None) + if not hasattr(self, "_pending_intent_llm"): + object.__setattr__(self, "_pending_intent_llm", None) + + def _create_default_extension_state(self) -> ConversationState | None: + initial_state_t = getattr(self, "_initial_state_t", None) + if type(self)._is_conversational() and ( + not hasattr(self, "_initial_state_t") + or isinstance(initial_state_t, TypeVar) + ): + return ConversationState() + return None + + def _should_apply_pending_kickoff_context(self) -> bool: + return ( + type(self)._is_conversational() and self._pending_user_message is not None + ) + + def _apply_pending_kickoff_context(self) -> None: + self._apply_pending_conversational_turn() + + def _order_start_methods_for_kickoff( + self, + start_methods: list[Any], + ) -> tuple[list[Any], bool]: + if not type(self)._is_conversational(): + return start_methods, False + + conversation_start = "conversation_start" + if conversation_start not in {str(method) for method in start_methods}: + return start_methods, False + + ordered_starts = [ + method for method in start_methods if str(method) != conversation_start + ] + ordered_starts.append( + next( + method for method in start_methods if str(method) == conversation_start + ) + ) + return ordered_starts, True + def _should_defer_trace_finalization(self) -> bool: """Whether per-turn ``FlowFinished`` + ``finalize_batch`` should be skipped. True when either: - ``flow.defer_trace_finalization`` is set on the instance, OR - - the class-level ``ConversationConfig.defer_trace_finalization`` - on a conversational subclass is True. + - the static conversational definition enables deferred finalization. Either source enables the deferred-session pattern. The caller eventually invokes ``finalize_session_traces()`` to close the batch. """ if getattr(self, "defer_trace_finalization", False): return True - config = self._conversation_config - return bool(config and config.defer_trace_finalization) + definition = self._conversation_definition + return bool( + definition and definition.enabled and definition.defer_trace_finalization + ) def _reset_turn_execution_state(self) -> None: """Clear per-execution tracking so the next turn re-runs the graph.""" @@ -733,11 +838,12 @@ class _ConversationalMixin: router_config: RouterConfig | None, ) -> dict[str, str]: label_to_method: dict[str, str] = {} - for listener_name, condition in self._listeners.items(): - if isinstance(condition, tuple): - _, trigger_labels = condition - for trigger_label in trigger_labels: - label_to_method.setdefault(str(trigger_label), str(listener_name)) + flow_definition = self._conversation_flow_definition() + for listener_name, method_definition in flow_definition.methods.items(): + if method_definition.listen is None or method_definition.router: + continue + for trigger_label in _iter_condition_labels(method_definition.listen): + label_to_method.setdefault(trigger_label, listener_name) routes = self._effective_routes(router_config) overrides = ( @@ -788,21 +894,31 @@ class _ConversationalMixin: def _valid_route_labels(self) -> set[str]: labels: set[str] = set() - for condition in self._listeners.values(): - if isinstance(condition, tuple): - _, methods = condition - labels.update(str(method) for method in methods) + flow_definition = self._conversation_flow_definition() + for method_definition in flow_definition.methods.values(): + if method_definition.listen is None or method_definition.router: + continue + labels.update(_iter_condition_labels(method_definition.listen)) return labels def _effective_routes(self, router_config: RouterConfig | None = None) -> set[str]: custom_routes = set(router_config.routes or ()) if router_config else set() + definition = self._conversation_definition + builtin_routes = ( + tuple(definition.builtin_routes) + if definition is not None + else self.builtin_routes + ) + internal_routes = ( + tuple(definition.internal_routes) + if definition is not None + else self.internal_routes + ) if not custom_routes: custom_routes = ( - self._valid_route_labels() - - set(self.builtin_routes) - - set(self.internal_routes) + self._valid_route_labels() - set(builtin_routes) - set(internal_routes) ) - return custom_routes | set(self.builtin_routes) + return custom_routes | set(builtin_routes) def _default_conversation_llm(self) -> Any | None: config = self._conversation_config diff --git a/lib/crewai/src/crewai/flow/conversational_definition.py b/lib/crewai/src/crewai/flow/conversational_definition.py new file mode 100644 index 000000000..8673bbb3a --- /dev/null +++ b/lib/crewai/src/crewai/flow/conversational_definition.py @@ -0,0 +1,50 @@ +"""Static conversational Flow definition models. + +This module is part of the serializable Flow Definition contract. It should +only contain static data shapes. Experimental conversational runtime behavior +continues to live in ``crewai.experimental.conversational_mixin``. +""" + +from __future__ import annotations + +from typing import Any, Literal + +from pydantic import BaseModel, Field + + +class FlowConversationalRouterDefinition(BaseModel): + """Static conversational router configuration.""" + + prompt: str | None = None + response_format: Any = None + llm: Any = None + routes: list[str] | None = None + route_descriptions: dict[str, str] | None = None + default_intent: str | None = "converse" + fallback_intent: str | None = "converse" + intent_field: str = "intent" + + +class FlowConversationalDefinition(BaseModel): + """Static conversational Flow configuration.""" + + enabled: bool = False + system_prompt: str | None = None + llm: Any = None + router: FlowConversationalRouterDefinition | None = None + answer_from_history_prompt: str | None = None + default_intents: list[str] | None = None + intent_llm: Any = None + answer_from_history_llm: Any = None + visible_agent_outputs: list[str] | Literal["all"] | None = None + defer_trace_finalization: bool = True + builtin_routes: list[str] = Field(default_factory=lambda: ["converse", "end"]) + internal_routes: list[str] = Field( + default_factory=lambda: ["answer_from_history", "conversation_start"] + ) + + +__all__ = [ + "FlowConversationalDefinition", + "FlowConversationalRouterDefinition", +] diff --git a/lib/crewai/src/crewai/flow/dsl/_utils.py b/lib/crewai/src/crewai/flow/dsl/_utils.py index 21b9b4477..c4b9a4c92 100644 --- a/lib/crewai/src/crewai/flow/dsl/_utils.py +++ b/lib/crewai/src/crewai/flow/dsl/_utils.py @@ -9,6 +9,8 @@ from typing_extensions import TypeIs from crewai.flow.flow_definition import ( FlowConfigDefinition, + FlowConversationalDefinition, + FlowConversationalRouterDefinition, FlowDefinition, FlowDefinitionDiagnostic, FlowHumanFeedbackDefinition, @@ -27,6 +29,13 @@ R = TypeVar("R") logger = logging.getLogger(__name__) _FLOW_METHOD_DEFINITION_ATTR = "__flow_method_definition__" +_FLOW_METHOD_METADATA_ATTRS = [ + "__conversational_only__", + "__flow_method_definition__", + "__flow_persistence_config__", + "__human_feedback_config__", + "_human_feedback_llm", +] def is_flow_method(obj: Any) -> TypeIs[FlowMethod[Any, Any]]: @@ -42,6 +51,39 @@ def _should_include_flow_method(flow_class: type, method: Any) -> bool: return True +def _is_conversational_flow(flow_class: type) -> bool: + return bool(getattr(flow_class, "conversational", False)) + + +def _get_inherited_conversational_method( + flow_class: type, + attr_name: str, +) -> Any | None: + if not _is_conversational_flow(flow_class): + return None + + for base in flow_class.__mro__[1:]: + inherited = base.__dict__.get(attr_name) + if inherited is None: + continue + if getattr(inherited, "__conversational_only__", False) and is_flow_method( + inherited + ): + return inherited + return None + + +def _stamp_inherited_conversational_metadata( + method: Any, + inherited: Any, +) -> Any: + for attr in _FLOW_METHOD_METADATA_ATTRS: + if hasattr(inherited, attr): + setattr(method, attr, getattr(inherited, attr)) + method.__is_flow_method__ = True + return method + + def _set_flow_method_definition( wrapper: FlowMethod[P, R], definition: FlowMethodDefinition, @@ -135,6 +177,8 @@ def _build_state_definition( from pydantic import BaseModel as PydanticBaseModel state_value = getattr(flow_class, "_initial_state_t", None) + if isinstance(state_value, TypeVar): + state_value = None initial_state = getattr(flow_class, "initial_state", None) if initial_state is not None: state_value = initial_state @@ -230,6 +274,98 @@ def _build_persistence_definition( ) +def _build_conversational_router_definition( + router_config: Any, + diagnostics: list[FlowDefinitionDiagnostic], + path: str, +) -> FlowConversationalRouterDefinition | None: + if router_config is None: + return None + + routes = getattr(router_config, "routes", None) + return FlowConversationalRouterDefinition( + prompt=getattr(router_config, "prompt", None), + response_format=_serialize_static_value( + getattr(router_config, "response_format", None), + diagnostics, + f"{path}.response_format", + ), + llm=_serialize_static_value( + getattr(router_config, "llm", None), diagnostics, f"{path}.llm" + ), + routes=[str(route) for route in routes] if routes is not None else None, + route_descriptions=getattr(router_config, "route_descriptions", None), + default_intent=getattr(router_config, "default_intent", "converse"), + fallback_intent=getattr(router_config, "fallback_intent", "converse"), + intent_field=str(getattr(router_config, "intent_field", "intent")), + ) + + +def _build_conversational_definition( + flow_class: type, + diagnostics: list[FlowDefinitionDiagnostic], +) -> FlowConversationalDefinition | None: + if not _is_conversational_flow(flow_class): + return None + + config = getattr(flow_class, "conversational_config", None) + builtin_routes = getattr(flow_class, "builtin_routes", ("converse", "end")) + internal_routes = getattr( + flow_class, + "internal_routes", + ("answer_from_history", "conversation_start"), + ) + if config is None: + return FlowConversationalDefinition( + enabled=True, + builtin_routes=[str(route) for route in builtin_routes], + internal_routes=[str(route) for route in internal_routes], + ) + + default_intents = getattr(config, "default_intents", None) + visible_agent_outputs = getattr(config, "visible_agent_outputs", None) + return FlowConversationalDefinition( + enabled=True, + system_prompt=getattr(config, "system_prompt", None), + llm=_serialize_static_value( + getattr(config, "llm", None), diagnostics, "conversational.llm" + ), + router=_build_conversational_router_definition( + getattr(config, "router", None), + diagnostics, + "conversational.router", + ), + answer_from_history_prompt=getattr(config, "answer_from_history_prompt", None), + default_intents=( + [str(intent) for intent in default_intents] + if default_intents is not None + else None + ), + intent_llm=_serialize_static_value( + getattr(config, "intent_llm", None), + diagnostics, + "conversational.intent_llm", + ), + answer_from_history_llm=_serialize_static_value( + getattr(config, "answer_from_history_llm", None), + diagnostics, + "conversational.answer_from_history_llm", + ), + visible_agent_outputs=( + "all" + if visible_agent_outputs == "all" + else [str(output) for output in visible_agent_outputs] + if visible_agent_outputs is not None + else None + ), + defer_trace_finalization=bool( + getattr(config, "defer_trace_finalization", True) + ), + builtin_routes=[str(route) for route in builtin_routes], + internal_routes=[str(route) for route in internal_routes], + ) + + def _build_method_definition( method: Any, diagnostics: list[FlowDefinitionDiagnostic], @@ -270,6 +406,29 @@ def _iter_flow_methods(flow_class: type) -> dict[str, Any]: flow_class, attr_value ): methods[attr_name] = attr_value + continue + + inherited = _get_inherited_conversational_method(flow_class, attr_name) + if inherited is not None and callable(attr_value): + methods[attr_name] = _stamp_inherited_conversational_metadata( + attr_value, inherited + ) + + if _is_conversational_flow(flow_class): + for base in reversed(flow_class.__mro__[1:]): + for attr_name, raw_value in base.__dict__.items(): + if attr_name.startswith("_") or attr_name in methods: + continue + if not getattr(raw_value, "__conversational_only__", False): + continue + try: + attr_value = getattr(flow_class, attr_name) + except AttributeError: + continue + if is_flow_method(attr_value) and _should_include_flow_method( + flow_class, attr_value + ): + methods[attr_name] = attr_value # A wrapped method whose name collides with a base Flow model field # (e.g. ``checkpoint``) is absorbed by Pydantic as a field; the underlying @@ -314,6 +473,7 @@ def _build_flow_definition_from_class( state=_build_state_definition(flow_class, diagnostics), config=_build_config_definition(flow_class, diagnostics), persist=_build_persistence_definition(flow_class, diagnostics, "persist"), + conversational=_build_conversational_definition(flow_class, diagnostics), methods=methods, diagnostics=diagnostics, ) diff --git a/lib/crewai/src/crewai/flow/flow.py b/lib/crewai/src/crewai/flow/flow.py index 353f0ba9c..19c161ffb 100644 --- a/lib/crewai/src/crewai/flow/flow.py +++ b/lib/crewai/src/crewai/flow/flow.py @@ -6,15 +6,22 @@ The implementation now lives in three modules, split by concern: ``@router``, ``or_`` / ``and_``) and Python Flow class projection - ``crewai.flow.flow_definition`` -- the serializable Flow Definition contract - ``crewai.flow.runtime`` -- the Flow execution engine and state +- ``crewai.experimental.conversational_mixin`` -- experimental conversational + runtime extension composed onto the public ``Flow`` class Prefer importing from those modules in new code; this module preserves the historical ``crewai.flow.flow`` import path. """ +from typing import Any, TypeVar + +from pydantic import BaseModel + +from crewai.experimental.conversational_mixin import _ConversationalMixin from crewai.flow.dsl import and_, listen, or_, router, start from crewai.flow.runtime import ( _INITIAL_STATE_CLASS_MARKER, - Flow, + Flow as RuntimeFlow, FlowMeta, FlowState, LockedDictProxy, @@ -23,6 +30,13 @@ from crewai.flow.runtime import ( ) +T = TypeVar("T", bound=dict[str, Any] | BaseModel) + + +class Flow(_ConversationalMixin, RuntimeFlow[T]): + """Public Flow class with experimental conversational extension behavior.""" + + __all__ = [ "_INITIAL_STATE_CLASS_MARKER", "Flow", diff --git a/lib/crewai/src/crewai/flow/flow_definition.py b/lib/crewai/src/crewai/flow/flow_definition.py index 1c05a51a9..0830f7a65 100644 --- a/lib/crewai/src/crewai/flow/flow_definition.py +++ b/lib/crewai/src/crewai/flow/flow_definition.py @@ -16,6 +16,11 @@ from typing import Any, Literal as TypingLiteral from pydantic import BaseModel, ConfigDict, Field import yaml +from crewai.flow.conversational_definition import ( + FlowConversationalDefinition, + FlowConversationalRouterDefinition, +) + logger = logging.getLogger(__name__) @@ -23,6 +28,8 @@ FlowDefinitionCondition = str | dict[str, Any] __all__ = [ "FlowConfigDefinition", + "FlowConversationalDefinition", + "FlowConversationalRouterDefinition", "FlowDefinition", "FlowDefinitionCondition", "FlowDefinitionDiagnostic", @@ -115,6 +122,7 @@ class FlowDefinition(BaseModel): state: FlowStateDefinition | None = None config: FlowConfigDefinition = Field(default_factory=FlowConfigDefinition) persist: FlowPersistenceDefinition | None = None + conversational: FlowConversationalDefinition | None = None methods: dict[str, FlowMethodDefinition] = Field(default_factory=dict) diagnostics: list[FlowDefinitionDiagnostic] = Field(default_factory=list) diff --git a/lib/crewai/src/crewai/flow/runtime.py b/lib/crewai/src/crewai/flow/runtime.py index 4be128c6f..874972a61 100644 --- a/lib/crewai/src/crewai/flow/runtime.py +++ b/lib/crewai/src/crewai/flow/runtime.py @@ -84,11 +84,6 @@ from crewai.events.types.flow_events import ( MethodExecutionPausedEvent, MethodExecutionStartedEvent, ) -from crewai.experimental.conversational import ( - ConversationConfig, - ConversationState, -) -from crewai.experimental.conversational_mixin import _ConversationalMixin from crewai.flow.dsl._utils import build_flow_definition from crewai.flow.flow_context import current_flow_id, current_flow_request_id from crewai.flow.flow_definition import ( @@ -139,7 +134,6 @@ from crewai.utilities.streaming import ( signal_end, signal_error, ) -from crewai.utilities.types import LLMMessage # Runtime alias so Pydantic can resolve the ``execution_context`` field's @@ -645,7 +639,7 @@ class FlowMeta(ModelMetaclass): return super().__new__(mcs, name, bases, namespace) -class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta): +class Flow(BaseModel, Generic[T], metaclass=FlowMeta): """Base class for all flows. type parameter T must be either dict[str, Any] or a subclass of BaseModel.""" @@ -659,41 +653,33 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta): _flow_definition: ClassVar[FlowDefinition | None] = None - # === EXPERIMENTAL: conversational mode === - # When ``conversational = True`` on a subclass, the built-in conversational - # graph (``conversation_start`` -> ``route_conversation`` -> ``converse_turn`` - # / ``end_conversation`` / ``answer_from_history_turn``) registers and - # ``handle_turn`` / ``chat`` become the chat entry points. When ``False`` - # (default), the methods exist as inert attributes and never register or - # fire — non-chat flows pay no runtime cost. - # - # ⚠ EXPERIMENTAL FEATURE. The whole conversational surface - # (``conversational`` ClassVar, ``handle_turn``, ``chat``, - # ``ConversationConfig``, ``RouterConfig``, ``ConversationState``, the - # built-in graph + helpers) lives under ``crewai.experimental`` and may - # change shape before graduating. Pin your CrewAI version if you depend on - # specific behavior, and watch the changelog for breaking updates. - conversational: ClassVar[bool] = False - conversational_config: ClassVar[ConversationConfig | None] = None - builtin_routes: ClassVar[tuple[str, ...]] = ("converse", "end") - internal_routes: ClassVar[tuple[str, ...]] = ( - "answer_from_history", - "conversation_start", - ) - builtin_route_descriptions: ClassVar[dict[str, str]] = { - "converse": ( - "Ordinary chat, follow-ups, summaries, clarifications, and " - "questions answerable from prior conversation history." - ), - "end": ("User signals the conversation is finished (goodbye, exit, done)."), - "answer_from_history": ( - "Answer directly from prior conversation history without invoking " - "tools, agents, or custom routes." - ), - } - entity_type: Literal["flow"] = "flow" + def _initialize_runtime_extension_attrs(self) -> None: + """Initialize optional runtime-extension attributes.""" + + def _create_default_extension_state(self) -> Any | None: + """Return a default state supplied by an optional runtime extension.""" + return None + + def _should_apply_pending_kickoff_context(self) -> bool: + """Whether an optional runtime extension has pending kickoff context.""" + return False + + def _apply_pending_kickoff_context(self) -> None: + """Apply optional runtime-extension kickoff context.""" + + def _order_start_methods_for_kickoff( + self, + start_methods: list[FlowMethodName], + ) -> tuple[list[FlowMethodName], bool]: + """Allow an optional runtime extension to order kickoff start methods.""" + return start_methods, False + + def _should_defer_trace_finalization(self) -> bool: + """Whether this kickoff should defer final flow trace finalization.""" + return bool(getattr(self, "defer_trace_finalization", False)) + @classmethod def flow_definition(cls) -> FlowDefinition: """Return the static Flow Definition built from this Flow class.""" @@ -911,10 +897,6 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta): _human_feedback_method_outputs: dict[str, Any] = PrivateAttr(default_factory=dict) _input_history: list[InputHistoryEntry] = PrivateAttr(default_factory=list) _state: Any = PrivateAttr(default=None) - _conversation_messages: list[LLMMessage] = PrivateAttr(default_factory=list) - _pending_user_message: str | dict[str, Any] | None = PrivateAttr(default=None) - _pending_intents: Sequence[str] | None = PrivateAttr(default=None) - _pending_intent_llm: str | "BaseLLM" | None = PrivateAttr(default=None) _deferred_flow_started_event_id: str | None = PrivateAttr(default=None) def __class_getitem__(cls: type[Flow[T]], item: type[T]) -> type[Flow[T]]: # type: ignore[override] @@ -940,6 +922,7 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta): if getattr(self, "_flow_post_init_done", False): return object.__setattr__(self, "_flow_post_init_done", True) + self._initialize_runtime_extension_attrs() if self._state is None: self._state = self._create_initial_state() @@ -1569,20 +1552,15 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta): """ init_state = self.initial_state - # Conversational subclasses default to ``ConversationState`` if the - # user didn't supply an explicit type parameter (``Flow[...]``) or an - # ``initial_state``. This makes ``class MyChat(Flow): conversational - # = True`` work without forcing every user to import and parameterize - # ``ConversationState`` themselves. - if ( - init_state is None - and getattr(type(self), "conversational", False) - and not hasattr(self, "_initial_state_t") - ): - return cast(T, ConversationState()) + if init_state is None: + extension_state = self._create_default_extension_state() + if extension_state is not None: + return cast(T, extension_state) if init_state is None and hasattr(self, "_initial_state_t"): state_type = self._initial_state_t + if isinstance(state_type, TypeVar): + state_type = None if isinstance(state_type, type): if issubclass(state_type, FlowState): instance = state_type() @@ -2152,9 +2130,8 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta): if should_emit_flow_started: # In normal flows, each kickoff owns its own flow lifecycle. - # Deferred conversational sessions are different: the first - # turn opens the flow scope and later turns reuse it until - # ``finalize_session_traces()`` emits the single finish event. + # Deferred sessions reuse the first flow scope until an + # explicit finalization call closes the batch. started_event = FlowStartedEvent( type="flow_started", flow_name=self.name or self.__class__.__name__, @@ -2184,16 +2161,8 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta): # with implicit "crew" execution_type. get_env_context() - # Conversational hook: apply the pending user message AFTER state - # restore and AFTER flow scope initialization, so transcript events - # are parented under the current conversation trace. - # ``handle_turn`` stashes the message on ``self._pending_user_message`` - # before calling ``kickoff``; this drains it. - if ( - getattr(type(self), "conversational", False) - and self._pending_user_message is not None - ): - self._apply_pending_conversational_turn() + if self._should_apply_pending_kickoff_context(): + self._apply_pending_kickoff_context() if inputs is not None and "id" not in inputs: self._initialize_state(inputs) @@ -2216,11 +2185,18 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta): starts_to_execute = ( unconditional_starts if unconditional_starts else start_methods ) - tasks = [ - self._execute_start_method(start_method) - for start_method in starts_to_execute - ] - await asyncio.gather(*tasks) + starts_to_execute, run_starts_sequentially = ( + self._order_start_methods_for_kickoff(starts_to_execute) + ) + if run_starts_sequentially: + for start_method in starts_to_execute: + await self._execute_start_method(start_method) + else: + tasks = [ + self._execute_start_method(start_method) + for start_method in starts_to_execute + ] + await asyncio.gather(*tasks) except Exception as e: # Check if flow was paused for human feedback from crewai.flow.async_feedback.types import HumanFeedbackPending @@ -2292,10 +2268,9 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta): # When ``defer_trace_finalization`` is set, skip both per-turn # ``FlowFinishedEvent`` AND trace-batch finalization. The caller - # invokes ``finalize_session_traces()`` once at session end to - # close out the whole conversation as one trace. The flag is - # read from EITHER the instance attribute (set by user code) OR - # the class-level ``ConversationConfig.defer_trace_finalization``. + # invokes the matching finalization hook once at session end. The + # flag is read from either the instance attribute or an extension + # definition. if not self._should_defer_trace_finalization(): future = crewai_event_bus.emit( self, @@ -2919,7 +2894,7 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta): return self.input_provider if flow_config.input_provider is not None: return flow_config.input_provider - return ConsoleProvider() + return cast(InputProvider, ConsoleProvider()) def _checkpoint_state_for_ask(self) -> None: """Auto-checkpoint flow state before waiting for user input. @@ -3038,7 +3013,7 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta): executor = ThreadPoolExecutor(max_workers=1) ctx = contextvars.copy_context() future = executor.submit( - ctx.run, provider.request_input, message, self, metadata + ctx.run, provider.request_input, message, cast(Any, self), metadata ) try: raw = future.result(timeout=timeout) @@ -3051,7 +3026,9 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta): # cancel_futures=True cleans up any queued-but-not-started tasks. executor.shutdown(wait=False, cancel_futures=True) else: - raw = provider.request_input(message, self, metadata=metadata) + raw = provider.request_input( + message, cast(Any, self), metadata=metadata + ) except KeyboardInterrupt: raise except Exception: @@ -3329,7 +3306,7 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta): flow_name=self.name or self.__class__.__name__, ), ) - structure = build_flow_structure(self) + structure = build_flow_structure(cast(Any, self)) return render_interactive(structure, filename=filename, show=show) @staticmethod diff --git a/lib/crewai/src/crewai/memory/recall_flow.py b/lib/crewai/src/crewai/memory/recall_flow.py index e09278983..562768c18 100644 --- a/lib/crewai/src/crewai/memory/recall_flow.py +++ b/lib/crewai/src/crewai/memory/recall_flow.py @@ -259,8 +259,9 @@ class RecallFlow(Flow[RecallState]): candidates = [] if not candidates: candidates = [scope_prefix] - self.state.candidate_scopes = candidates[:20] - return self.state.candidate_scopes + selected_scopes = candidates[:20] + self.state.candidate_scopes = selected_scopes + return selected_scopes @listen(filter_and_chunk) def search_chunks(self) -> list[Any]: @@ -368,9 +369,10 @@ class RecallFlow(Flow[RecallState]): ) ) matches.sort(key=lambda m: m.score, reverse=True) - self.state.final_results = matches[: self.state.limit] + final_results = matches[: self.state.limit] + self.state.final_results = final_results if self.state.evidence_gaps and self.state.final_results: self.state.final_results[0].evidence_gaps = list(self.state.evidence_gaps) - return self.state.final_results + return final_results diff --git a/lib/crewai/tests/test_flow_conversation.py b/lib/crewai/tests/test_flow_conversation.py index 03f95080f..122ad0009 100644 --- a/lib/crewai/tests/test_flow_conversation.py +++ b/lib/crewai/tests/test_flow_conversation.py @@ -169,9 +169,6 @@ class TestConversationalFlow: ) - @pytest.mark.skip( - reason="Experimental conversational registry behavior is out of scope for the definition-first start migration." - ) def test_handle_turn_routes_to_listener_and_records_public_result(self) -> None: @ConversationConfig(default_intents=["research"], intent_llm="gpt-4o-mini") class ResearchFlow(ConversationalFlow): @@ -595,9 +592,6 @@ class TestConversationalFlow: assert result == "legacy-searched" assert flow.state.last_intent == "search" - @pytest.mark.skip( - reason="Experimental conversational sequential-start behavior is out of scope for the definition-first start migration." - ) def test_user_start_methods_run_sequentially_before_router_in_conversational_mode( self, ) -> None: @@ -649,9 +643,6 @@ class TestConversationalFlow: assert "attach_bus" in order # still fires every turn assert "route_turn" in order - @pytest.mark.skip( - reason="Experimental inherited conversational start registration is out of scope for the definition-first start migration." - ) def test_subclass_can_override_conversation_start_without_redecorating( self, ) -> None: @@ -1342,6 +1333,12 @@ class TestFlowTracingWhenSuppressed: class TestDeferTraceFinalization: + def test_bare_conversational_flow_defers_by_default(self) -> None: + class BareChat(ConversationalFlow): + pass + + assert BareChat()._should_defer_trace_finalization() is True + def test_conversation_config_drives_defer_flag(self) -> None: """``ConversationConfig(defer_trace_finalization=...)`` controls whether a conversational subclass defers per-turn trace finalization.""" diff --git a/lib/crewai/tests/test_flow_definition.py b/lib/crewai/tests/test_flow_definition.py index bda29e7eb..da7908798 100644 --- a/lib/crewai/tests/test_flow_definition.py +++ b/lib/crewai/tests/test_flow_definition.py @@ -13,6 +13,7 @@ from pydantic import BaseModel import crewai.flow.dsl as flow_dsl import crewai.flow.flow_definition as flow_definition import crewai.flow.visualization.builder as visualization_builder +from crewai.experimental import ConversationConfig, RouterConfig from crewai.flow import Flow, and_, human_feedback, listen, or_, persist, router, start @@ -36,6 +37,8 @@ def test_flow_public_exports_are_explicit(): } assert set(flow_definition.__all__) == { "FlowConfigDefinition", + "FlowConversationalDefinition", + "FlowConversationalRouterDefinition", "FlowDefinition", "FlowDefinitionCondition", "FlowDefinitionDiagnostic", @@ -169,6 +172,7 @@ def test_flow_definition_maps_dsl_to_static_contract(): assert definition.state.ref and "ContractState" in definition.state.ref assert definition.config.stream is True assert definition.config.max_method_calls == 7 + assert definition.conversational is None assert definition.methods["begin"].start is True assert definition.methods["process"].listen == "begin" @@ -201,27 +205,74 @@ def test_flow_definition_excludes_conversational_builtins_for_regular_flows(): methods = RegularFlow.flow_definition().methods + assert RegularFlow.flow_definition().conversational is None assert set(methods) == {"begin"} assert "conversation_start" not in methods assert "route_conversation" not in methods assert "converse_turn" not in methods -@pytest.mark.skip( - reason="Experimental conversational inherited built-ins are out of scope for the definition-first start migration." -) def test_flow_definition_includes_conversational_builtins_when_enabled(): class ChatFlow(Flow): conversational = True - methods = ChatFlow.flow_definition().methods + definition = ChatFlow.flow_definition() + methods = definition.methods + assert definition.conversational is not None + assert definition.conversational.enabled is True + assert definition.conversational.defer_trace_finalization is True + assert definition.conversational.builtin_routes == ["converse", "end"] assert "conversation_start" in methods assert "route_conversation" in methods assert "converse_turn" in methods assert methods["conversation_start"].start is True +def test_flow_definition_serializes_conversational_config(): + @ConversationConfig( + system_prompt="Be concise.", + llm="gpt-4o-mini", + router=RouterConfig( + prompt="Pick a route.", + routes=["research"], + default_intent="converse", + fallback_intent="end", + ), + default_intents=["research"], + visible_agent_outputs=["researcher"], + defer_trace_finalization=False, + ) + class ChatFlow(Flow): + conversational = True + + conversational = ChatFlow.flow_definition().conversational + + assert conversational is not None + assert conversational.system_prompt == "Be concise." + assert conversational.llm == "gpt-4o-mini" + assert conversational.default_intents == ["research"] + assert conversational.visible_agent_outputs == ["researcher"] + assert conversational.defer_trace_finalization is False + assert conversational.router is not None + assert conversational.router.prompt == "Pick a route." + assert conversational.router.routes == ["research"] + assert conversational.router.fallback_intent == "end" + + +def test_flow_definition_preserves_undecorated_conversational_override(): + class ChatFlow(Flow): + conversational = True + + def conversation_start(self) -> str | None: + return "custom" + + methods = ChatFlow.flow_definition().methods + + assert methods["conversation_start"].start is True + assert "route_conversation" in methods + + def test_flow_definition_serializes_human_feedback_metadata(): marker = object()