diff --git a/lib/crewai/src/crewai/flow/dsl/_human_feedback.py b/lib/crewai/src/crewai/flow/dsl/_human_feedback.py index a1917a7b5..e1317b23d 100644 --- a/lib/crewai/src/crewai/flow/dsl/_human_feedback.py +++ b/lib/crewai/src/crewai/flow/dsl/_human_feedback.py @@ -3,11 +3,10 @@ from __future__ import annotations from collections.abc import Callable, Sequence from typing import TYPE_CHECKING, Any, TypeVar -from crewai.flow.flow_definition import FlowMethodDefinition from crewai.flow.human_feedback import ( HumanFeedbackConfig, HumanFeedbackResult, - _build_human_feedback_runtime_decorator, + _validate_human_feedback_options, ) @@ -21,32 +20,6 @@ F = TypeVar("F", bound=Callable[..., Any]) __all__ = ["HumanFeedbackResult", "human_feedback"] -def _stamp_human_feedback_metadata( - wrapper: Any, - func: Callable[..., Any], - config: HumanFeedbackConfig, -) -> None: - for attr in [ - "__is_flow_method__", - "__flow_persistence_config__", - "__flow_method_definition__", - ]: - if hasattr(func, attr): - setattr(wrapper, attr, getattr(func, attr)) - - wrapper.__human_feedback_config__ = config - wrapper.__is_flow_method__ = True - - if config.emit: - fragment = getattr(wrapper, "__flow_method_definition__", None) - if isinstance(fragment, FlowMethodDefinition): - wrapper.__flow_method_definition__ = fragment.model_copy( - update={"router": True, "emit": list(config.emit)} - ) - - wrapper._human_feedback_llm = config.llm - - def human_feedback( message: str, emit: Sequence[str] | None = None, @@ -58,21 +31,18 @@ def human_feedback( learn_source: str = "hitl", learn_strict: bool = False, ) -> Callable[[F], F]: - """Decorator for Flow methods that require human feedback.""" - runtime_decorator = _build_human_feedback_runtime_decorator( - message=message, - emit=emit, - llm=llm, - default_outcome=default_outcome, - metadata=metadata, - provider=provider, - learn=learn, - learn_source=learn_source, - learn_strict=learn_strict, + """Decorator for Flow methods that require human feedback. + + The decorator is a pure metadata stamper: it records the feedback + configuration on the method, and the Flow engine collects and routes + feedback after the method completes, driven by the flow's definition. + """ + _validate_human_feedback_options( + emit=emit, llm=llm, default_outcome=default_outcome ) config = HumanFeedbackConfig( message=message, - emit=emit, + emit=list(emit) if emit is not None else None, llm=llm, default_outcome=default_outcome, metadata=metadata, @@ -83,8 +53,7 @@ def human_feedback( ) def decorator(func: F) -> F: - wrapper = runtime_decorator(func) - _stamp_human_feedback_metadata(wrapper, func, config) - return wrapper + func.__human_feedback_config__ = config # type: ignore[attr-defined] + return func return decorator diff --git a/lib/crewai/src/crewai/flow/dsl/_utils.py b/lib/crewai/src/crewai/flow/dsl/_utils.py index ee8202272..cdd7bdbca 100644 --- a/lib/crewai/src/crewai/flow/dsl/_utils.py +++ b/lib/crewai/src/crewai/flow/dsl/_utils.py @@ -18,6 +18,7 @@ from crewai.flow.flow_definition import ( FlowMethodDefinition, FlowPersistenceDefinition, FlowStateDefinition, + _object_ref, ) from crewai.flow.flow_wrappers import ( FlowMethod, @@ -35,15 +36,12 @@ _FLOW_METHOD_METADATA_ATTRS = [ "__flow_method_definition__", "__flow_persistence_config__", "__human_feedback_config__", - "_human_feedback_llm", ] def is_flow_method(obj: Any) -> TypeIs[FlowMethod[Any, Any]]: """Check if the object carries Flow method wrapper metadata.""" - return hasattr(obj, "__is_flow_method__") or hasattr( - obj, _FLOW_METHOD_DEFINITION_ATTR - ) + return hasattr(obj, _FLOW_METHOD_DEFINITION_ATTR) def _should_include_flow_method(flow_class: type, method: Any) -> bool: @@ -81,7 +79,6 @@ def _stamp_inherited_conversational_metadata( for attr in _FLOW_METHOD_METADATA_ATTRS: if hasattr(inherited, attr): setattr(method, attr, getattr(inherited, attr)) - method.__is_flow_method__ = True return method @@ -105,13 +102,6 @@ def _get_flow_method_definition(method: Any) -> FlowMethodDefinition | None: return None -def _object_ref(value: Any) -> str: - target = value if isinstance(value, type) else type(value) - module = getattr(target, "__module__", "") - qualname = getattr(target, "__qualname__", getattr(target, "__name__", "")) - return f"{module}:{qualname}" if module and qualname else repr(value) - - def _is_json_serializable(value: Any) -> bool: try: json.dumps(value) @@ -227,7 +217,10 @@ def _build_config_definition( for field_name, default in field_defaults.items(): value = getattr(flow_class, field_name, default) if field_name == "input_provider": - values[field_name] = None if value is None else _object_ref(value) + # A string value is already a ref; only live objects degrade. + values[field_name] = ( + value if value is None or isinstance(value, str) else _object_ref(value) + ) else: values[field_name] = _serialize_static_value( value, diagnostics, f"config.{field_name}" @@ -247,38 +240,31 @@ def _build_human_feedback_definition( return FlowHumanFeedbackDefinition( message=str(config.message), emit=[str(value) for value in emit] if emit is not None else None, - llm=_serialize_static_value( - getattr(config, "llm", None), diagnostics, f"{path}.llm" - ), + # llm and provider stay live: the engine consumes them in-process and + # the contract degrades them to serializable forms at JSON dump time. + llm=getattr(config, "llm", None), default_outcome=getattr(config, "default_outcome", None), metadata=_serialize_static_value( getattr(config, "metadata", None), diagnostics, f"{path}.metadata" ), - provider=_serialize_static_value( - getattr(config, "provider", None), diagnostics, f"{path}.provider" - ), + provider=getattr(config, "provider", None), learn=bool(getattr(config, "learn", False)), learn_source=str(getattr(config, "learn_source", "hitl")), learn_strict=bool(getattr(config, "learn_strict", False)), ) -def _build_persistence_definition( - value: Any, - diagnostics: list[FlowDefinitionDiagnostic], - path: str, -) -> FlowPersistenceDefinition | None: +def _build_persistence_definition(value: Any) -> FlowPersistenceDefinition | None: config = getattr(value, "__flow_persistence_config__", None) if config is None: return None - persistence = getattr(config, "persistence", None) - verbose = bool(getattr(config, "verbose", False)) return FlowPersistenceDefinition( enabled=True, - verbose=verbose, - persistence=_serialize_static_value( - persistence, diagnostics, f"{path}.persistence" - ), + verbose=bool(getattr(config, "verbose", False)), + # The backend stays live: the engine persists through the exact + # instance the user configured; the contract degrades it to a + # serialized config at JSON dump time. + persistence=getattr(config, "persistence", None), ) @@ -396,9 +382,7 @@ def _build_method_definition( method_definition.router = True method_definition.emit = None - method_definition.persist = _build_persistence_definition( - method, diagnostics, f"{path}.persist" - ) + method_definition.persist = _build_persistence_definition(method) return method_definition @@ -482,7 +466,7 @@ def _build_flow_definition_from_class( description=description, state=_build_state_definition(flow_class, diagnostics), config=_build_config_definition(flow_class, diagnostics), - persist=_build_persistence_definition(flow_class, diagnostics, "persist"), + persist=_build_persistence_definition(flow_class), conversational=_build_conversational_definition(flow_class, diagnostics), methods=methods, diagnostics=diagnostics, diff --git a/lib/crewai/src/crewai/flow/flow_definition.py b/lib/crewai/src/crewai/flow/flow_definition.py index 5de3ae2e6..e55bb2bf7 100644 --- a/lib/crewai/src/crewai/flow/flow_definition.py +++ b/lib/crewai/src/crewai/flow/flow_definition.py @@ -13,7 +13,7 @@ import json import logging from typing import Any, Literal as TypingLiteral -from pydantic import BaseModel, ConfigDict, Field +from pydantic import BaseModel, ConfigDict, Field, field_serializer, model_validator import yaml from crewai.flow.conversational_definition import ( @@ -41,6 +41,14 @@ __all__ = [ ] +def _object_ref(value: Any) -> str: + """Format a class or instance as the canonical ``module:qualname`` ref.""" + target = value if isinstance(value, type) else type(value) + module = getattr(target, "__module__", "") + qualname = getattr(target, "__qualname__", getattr(target, "__name__", "")) + return f"{module}:{qualname}" if module and qualname else repr(value) + + class FlowDefinitionDiagnostic(BaseModel): """A non-fatal Flow Definition build or validation diagnostic.""" @@ -56,7 +64,7 @@ class FlowStateDefinition(BaseModel): type: TypingLiteral["dict", "pydantic", "json_schema", "unknown"] = "dict" ref: str | None = None json_schema: dict[str, Any] | None = None - default: Any = None + default: dict[str, Any] | None = None class FlowConfigDefinition(BaseModel): @@ -73,15 +81,41 @@ class FlowConfigDefinition(BaseModel): class FlowPersistenceDefinition(BaseModel): - """Static persistence configuration.""" + """Static persistence configuration. + + ``persistence`` may hold a live backend when the definition is built from + a decorated class — the engine then persists through the exact instance + the user configured; the JSON/YAML projection degrades it to its + serialized config. + """ enabled: bool = False verbose: bool = False - persistence: dict[str, Any] | None = None + persistence: Any = None + + @field_serializer("persistence", when_used="json") + def _serialize_persistence(self, value: Any) -> Any: + if value is None or isinstance(value, dict): + return value + if isinstance(value, BaseModel): + try: + return value.model_dump(mode="json") + except Exception: + logger.warning( + "Persistence backend %s is not fully serializable; " + "preserved import reference only.", + _object_ref(value), + ) + return {"ref": _object_ref(value)} class FlowHumanFeedbackDefinition(BaseModel): - """Static human feedback configuration.""" + """Static human feedback configuration. + + ``llm`` and ``provider`` may hold live Python objects when the definition + is built from a decorated class; the JSON/YAML projection degrades them to + a serialized config (``llm``) or a ``module:qualname`` ref (``provider``). + """ message: str emit: list[str] | None = None @@ -93,6 +127,20 @@ class FlowHumanFeedbackDefinition(BaseModel): learn_source: str = "hitl" learn_strict: bool = False + @field_serializer("llm", when_used="json") + def _serialize_llm(self, value: Any) -> dict[str, Any] | str | None: + if value is None or isinstance(value, (str, dict)): + return value + from crewai.flow.human_feedback import _serialize_llm_for_context + + return _serialize_llm_for_context(value) + + @field_serializer("provider", when_used="json") + def _serialize_provider(self, value: Any) -> str | None: + if value is None or isinstance(value, str): + return value + return _object_ref(value) + class FlowActionDefinition(BaseModel): """What a Flow method node executes, independent of when it fires.""" @@ -112,6 +160,16 @@ class FlowMethodDefinition(BaseModel): human_feedback: FlowHumanFeedbackDefinition | None = None persist: FlowPersistenceDefinition | None = None + @model_validator(mode="after") + def _canonicalize_human_feedback_routing(self) -> FlowMethodDefinition: + # Canonical shape: a method whose human_feedback declares emit + # outcomes routes like a router, regardless of how the definition + # was authored. + if self.human_feedback is not None and self.human_feedback.emit: + self.router = True + self.emit = None + return self + @property def is_start(self) -> bool: """Whether this method is a start method. diff --git a/lib/crewai/src/crewai/flow/flow_wrappers.py b/lib/crewai/src/crewai/flow/flow_wrappers.py index d02d3bc4d..1304c8e99 100644 --- a/lib/crewai/src/crewai/flow/flow_wrappers.py +++ b/lib/crewai/src/crewai/flow/flow_wrappers.py @@ -83,7 +83,6 @@ class FlowMethod(Generic[P, R]): "__conversational_only__", # gates registration on Flow.conversational "__flow_persistence_config__", "__flow_method_definition__", - "_human_feedback_llm", # Live LLM object for HITL resume ]: if hasattr(meth, attr): setattr(self, attr, getattr(meth, attr)) diff --git a/lib/crewai/src/crewai/flow/human_feedback.py b/lib/crewai/src/crewai/flow/human_feedback.py index 7f2442def..c3a4a203f 100644 --- a/lib/crewai/src/crewai/flow/human_feedback.py +++ b/lib/crewai/src/crewai/flow/human_feedback.py @@ -1,8 +1,11 @@ -"""Human feedback decorator for Flow methods. +"""Human feedback support for Flow methods. -This module provides the @human_feedback decorator that enables human-in-the-loop -workflows within CrewAI Flows. It allows collecting human feedback on method outputs -and optionally routing to different listeners based on the feedback. +This module backs the @human_feedback decorator that enables human-in-the-loop +workflows within CrewAI Flows. The decorator is a pure metadata stamper: it +records a :class:`HumanFeedbackConfig` on the method, the Flow definition +builder lifts it into ``FlowHumanFeedbackDefinition``, and the Flow engine +collects feedback after each decorated method completes, driven by the flow's +definition. Supports both synchronous (blocking) and asynchronous (non-blocking) feedback collection through the provider parameter. @@ -55,22 +58,18 @@ Example (asynchronous with custom provider): from __future__ import annotations -import asyncio from collections.abc import Callable, Sequence from dataclasses import dataclass, field from datetime import datetime -from functools import wraps import logging from typing import TYPE_CHECKING, Any, TypeVar from pydantic import BaseModel, Field -from crewai.flow.flow_wrappers import FlowMethod - if TYPE_CHECKING: from crewai.flow.async_feedback.types import HumanFeedbackProvider - from crewai.flow.flow import Flow + from crewai.flow.runtime import Flow from crewai.llms.base_llm import BaseLLM @@ -160,8 +159,8 @@ class HumanFeedbackResult: class HumanFeedbackConfig: """Configuration for the @human_feedback decorator. - Stores the parameters passed to the decorator for later use during - method execution and for introspection by visualization tools. + Stores the parameters passed to the decorator for later use by the + Flow definition builder and for introspection by visualization tools. Attributes: message: The message shown to the human when requesting feedback. @@ -183,19 +182,6 @@ class HumanFeedbackConfig: learn_strict: bool = False -class HumanFeedbackMethod(FlowMethod[Any, Any]): - """Wrapper for methods decorated with @human_feedback. - - This wrapper extends FlowMethod to add human feedback specific attributes - used by the FlowDefinition builder and runtime feedback handling. - - Attributes: - __human_feedback_config__: The HumanFeedbackConfig for this method. - """ - - __human_feedback_config__: HumanFeedbackConfig | None = None - - class PreReviewResult(BaseModel): """Structured output from the HITL pre-review LLM call.""" @@ -217,17 +203,11 @@ class DistilledLessons(BaseModel): ) -def _build_human_feedback_runtime_decorator( - message: str, - emit: Sequence[str] | None = None, - llm: str | BaseLLM | None = "gpt-4o-mini", - default_outcome: str | None = None, - metadata: dict[str, Any] | None = None, - provider: HumanFeedbackProvider | None = None, - learn: bool = False, - learn_source: str = "hitl", - learn_strict: bool = False, -) -> Callable[[F], F]: +def _validate_human_feedback_options( + emit: Sequence[str] | None, + llm: Any, + default_outcome: str | None, +) -> None: if emit is not None: if not llm: raise ValueError( @@ -244,295 +224,139 @@ def _build_human_feedback_runtime_decorator( elif default_outcome is not None: raise ValueError("default_outcome requires emit to be specified.") - def decorator(func: F) -> F: - def _get_hitl_prompt(key: str) -> str: - from crewai.utilities.i18n import I18N_DEFAULT - return I18N_DEFAULT.slice(key) +def _get_hitl_prompt(key: str) -> str: + from crewai.utilities.i18n import I18N_DEFAULT - def _resolve_llm_instance() -> Any: - if llm is None: - from crewai.llm import LLM + return I18N_DEFAULT.slice(key) - return LLM(model="gpt-4o-mini") - if isinstance(llm, str): - from crewai.llm import LLM - return LLM(model=llm) - return llm # already a BaseLLM instance +def _resolve_llm_instance(llm: Any) -> Any: + from crewai.llm import LLM - def _pre_review_with_lessons( - flow_instance: Flow[Any], method_output: Any - ) -> Any: - try: - mem = flow_instance.memory - if mem is None: - return method_output - query = f"human feedback lessons for {func.__name__}: {method_output!s}" - matches = mem.recall(query, source=learn_source) - if not matches: - return method_output + if llm is None: + return LLM(model="gpt-4o-mini") + if isinstance(llm, str): + return LLM(model=llm) + if isinstance(llm, dict): + deserialized = _deserialize_llm_from_context(llm) + return deserialized if deserialized is not None else LLM(model="gpt-4o-mini") + return llm # already a BaseLLM instance - lessons = "\n".join(f"- {m.record.content}" for m in matches) - llm_inst = _resolve_llm_instance() - prompt = _get_hitl_prompt("hitl_pre_review_user").format( - output=str(method_output), - lessons=lessons, - ) - messages = [ - { - "role": "system", - "content": _get_hitl_prompt("hitl_pre_review_system"), - }, - {"role": "user", "content": prompt}, - ] - if getattr(llm_inst, "supports_function_calling", lambda: False)(): - response = llm_inst.call(messages, response_model=PreReviewResult) - if isinstance(response, PreReviewResult): - return response.improved_output - return PreReviewResult.model_validate(response).improved_output - reviewed = llm_inst.call(messages) - return reviewed if isinstance(reviewed, str) else str(reviewed) - except Exception: - if learn_strict: - logger.warning( - "HITL pre-review failed for %s; re-raising (learn_strict=True)", - func.__name__, - exc_info=True, - ) - raise - logger.warning( - "HITL pre-review failed for %s; falling back to raw output", - func.__name__, - exc_info=True, - ) - return method_output - def _distill_and_store_lessons( - flow_instance: Flow[Any], method_output: Any, raw_feedback: str - ) -> None: - try: - mem = flow_instance.memory - if mem is None: - return - llm_inst = _resolve_llm_instance() - prompt = _get_hitl_prompt("hitl_distill_user").format( - method_name=func.__name__, - output=str(method_output), - feedback=raw_feedback, - ) - messages = [ - { - "role": "system", - "content": _get_hitl_prompt("hitl_distill_system"), - }, - {"role": "user", "content": prompt}, - ] +def _pre_review_with_lessons( + flow_instance: Flow[Any], + method_name: str, + method_output: Any, + *, + llm: Any, + learn_source: str, + learn_strict: bool, +) -> Any: + try: + mem = flow_instance.memory + if mem is None: + return method_output + query = f"human feedback lessons for {method_name}: {method_output!s}" + matches = mem.recall(query, source=learn_source) + if not matches: + return method_output - lessons: list[str] = [] - if getattr(llm_inst, "supports_function_calling", lambda: False)(): - response = llm_inst.call(messages, response_model=DistilledLessons) - if isinstance(response, DistilledLessons): - lessons = response.lessons - else: - lessons = DistilledLessons.model_validate(response).lessons - else: - response = llm_inst.call(messages) - if isinstance(response, str): - lessons = [ - line.strip("- ").strip() - for line in response.strip().split("\n") - if line.strip() and line.strip() != "NONE" - ] - - if lessons: - mem.remember_many(lessons, source=learn_source) # type: ignore[union-attr] - except Exception: - if learn_strict: - logger.warning( - "HITL lesson distillation failed for %s; re-raising (learn_strict=True)", - func.__name__, - exc_info=True, - ) - raise - logger.warning( - "HITL lesson distillation failed for %s; no lessons stored", - func.__name__, - exc_info=True, - ) - - def _build_feedback_context( - flow_instance: Flow[Any], method_output: Any - ) -> tuple[Any, Any]: - from crewai.flow.async_feedback.types import PendingFeedbackContext - - context = PendingFeedbackContext( - flow_id=flow_instance.flow_id or "unknown", - flow_class=f"{flow_instance.__class__.__module__}.{flow_instance.__class__.__name__}", - method_name=func.__name__, - method_output=method_output, - message=message, - emit=list(emit) if emit else None, - default_outcome=default_outcome, - metadata=metadata or {}, - llm=llm if isinstance(llm, str) else _serialize_llm_for_context(llm), + lessons = "\n".join(f"- {m.record.content}" for m in matches) + llm_inst = _resolve_llm_instance(llm) + prompt = _get_hitl_prompt("hitl_pre_review_user").format( + output=str(method_output), + lessons=lessons, + ) + messages = [ + { + "role": "system", + "content": _get_hitl_prompt("hitl_pre_review_system"), + }, + {"role": "user", "content": prompt}, + ] + if getattr(llm_inst, "supports_function_calling", lambda: False)(): + response = llm_inst.call(messages, response_model=PreReviewResult) + if isinstance(response, PreReviewResult): + return response.improved_output + return PreReviewResult.model_validate(response).improved_output + reviewed = llm_inst.call(messages) + return reviewed if isinstance(reviewed, str) else str(reviewed) + except Exception: + if learn_strict: + logger.warning( + "HITL pre-review failed for %s; re-raising (learn_strict=True)", + method_name, + exc_info=True, ) + raise + logger.warning( + "HITL pre-review failed for %s; falling back to raw output", + method_name, + exc_info=True, + ) + return method_output - effective_provider = provider - if effective_provider is None: - from crewai.flow.flow_config import flow_config - effective_provider = flow_config.hitl_provider +def _distill_and_store_lessons( + flow_instance: Flow[Any], + method_name: str, + method_output: Any, + raw_feedback: str, + *, + llm: Any, + learn_source: str, + learn_strict: bool, +) -> None: + try: + mem = flow_instance.memory + if mem is None: + return + llm_inst = _resolve_llm_instance(llm) + prompt = _get_hitl_prompt("hitl_distill_user").format( + method_name=method_name, + output=str(method_output), + feedback=raw_feedback, + ) + messages = [ + { + "role": "system", + "content": _get_hitl_prompt("hitl_distill_system"), + }, + {"role": "user", "content": prompt}, + ] - return context, effective_provider - - def _request_feedback(flow_instance: Flow[Any], method_output: Any) -> str: - context, effective_provider = _build_feedback_context( - flow_instance, method_output - ) - - if effective_provider is not None: - feedback_result = effective_provider.request_feedback( - context, flow_instance - ) - if asyncio.iscoroutine(feedback_result): - raise TypeError( - f"Provider {type(effective_provider).__name__}.request_feedback() " - "returned a coroutine in a sync flow method. Use an async flow " - "method or a synchronous provider." - ) - return str(feedback_result) - return flow_instance._request_human_feedback( - message=message, - output=method_output, - metadata=metadata, - emit=emit, - ) - - async def _request_feedback_async( - flow_instance: Flow[Any], method_output: Any - ) -> str: - context, effective_provider = _build_feedback_context( - flow_instance, method_output - ) - - if effective_provider is not None: - feedback_result = effective_provider.request_feedback( - context, flow_instance - ) - if asyncio.iscoroutine(feedback_result): - return str(await feedback_result) - return str(feedback_result) - return flow_instance._request_human_feedback( - message=message, - output=method_output, - metadata=metadata, - emit=emit, - ) - - def _process_feedback( - flow_instance: Flow[Any], - method_output: Any, - raw_feedback: str, - ) -> HumanFeedbackResult | str: - collapsed_outcome: str | None = None - - if not raw_feedback.strip(): - if default_outcome: - collapsed_outcome = default_outcome - elif emit: - collapsed_outcome = emit[0] - elif emit: - if llm is not None: - collapsed_outcome = flow_instance._collapse_to_outcome( - feedback=raw_feedback, - outcomes=emit, - llm=llm, - ) - else: - collapsed_outcome = emit[0] - - result = HumanFeedbackResult( - output=method_output, - feedback=raw_feedback, - outcome=collapsed_outcome, - timestamp=datetime.now(), - method_name=func.__name__, - metadata=metadata or {}, - ) - - flow_instance.human_feedback_history.append(result) - flow_instance.last_human_feedback = result - - if emit: - if collapsed_outcome is None: - collapsed_outcome = default_outcome or emit[0] - result.outcome = collapsed_outcome - return collapsed_outcome - return result - - if asyncio.iscoroutinefunction(func): - - @wraps(func) - async def async_wrapper(self: Flow[Any], *args: Any, **kwargs: Any) -> Any: - method_output = await func(self, *args, **kwargs) - - if learn and getattr(self, "memory", None) is not None: - method_output = _pre_review_with_lessons(self, method_output) - - raw_feedback = await _request_feedback_async(self, method_output) - result = _process_feedback(self, method_output, raw_feedback) - - if ( - learn - and getattr(self, "memory", None) is not None - and raw_feedback.strip() - ): - _distill_and_store_lessons(self, method_output, raw_feedback) - - # Stash the real method output for final flow result when emit is set: - # result is the collapsed outcome string for routing, but we preserve the - # actual method output as the flow's final result. Uses per-method dict for - # concurrency safety and to handle None returns. - if emit: - self._human_feedback_method_outputs[func.__name__] = method_output - - return result - - wrapper: Any = async_wrapper + lessons: list[str] = [] + if getattr(llm_inst, "supports_function_calling", lambda: False)(): + response = llm_inst.call(messages, response_model=DistilledLessons) + if isinstance(response, DistilledLessons): + lessons = response.lessons + else: + lessons = DistilledLessons.model_validate(response).lessons else: + response = llm_inst.call(messages) + if isinstance(response, str): + lessons = [ + line.strip("- ").strip() + for line in response.strip().split("\n") + if line.strip() and line.strip() != "NONE" + ] - @wraps(func) - def sync_wrapper(self: Flow[Any], *args: Any, **kwargs: Any) -> Any: - method_output = func(self, *args, **kwargs) - - if learn and getattr(self, "memory", None) is not None: - method_output = _pre_review_with_lessons(self, method_output) - - raw_feedback = _request_feedback(self, method_output) - result = _process_feedback(self, method_output, raw_feedback) - - if ( - learn - and getattr(self, "memory", None) is not None - and raw_feedback.strip() - ): - _distill_and_store_lessons(self, method_output, raw_feedback) - - # Stash the real method output for final flow result when emit is set: - # result is the collapsed outcome string for routing, but we preserve the - # actual method output as the flow's final result. Uses per-method dict for - # concurrency safety and to handle None returns. - if emit: - self._human_feedback_method_outputs[func.__name__] = method_output - - return result - - wrapper = sync_wrapper - - return wrapper # type: ignore[no-any-return] - - return decorator + if lessons: + mem.remember_many(lessons, source=learn_source) # type: ignore[union-attr] + except Exception: + if learn_strict: + logger.warning( + "HITL lesson distillation failed for %s; re-raising (learn_strict=True)", + method_name, + exc_info=True, + ) + raise + logger.warning( + "HITL lesson distillation failed for %s; no lessons stored", + method_name, + exc_info=True, + ) def human_feedback( diff --git a/lib/crewai/src/crewai/flow/persistence/decorators.py b/lib/crewai/src/crewai/flow/persistence/decorators.py index 65da2cee1..48b917760 100644 --- a/lib/crewai/src/crewai/flow/persistence/decorators.py +++ b/lib/crewai/src/crewai/flow/persistence/decorators.py @@ -25,7 +25,6 @@ Example: from __future__ import annotations from collections.abc import Callable -import functools import logging from types import SimpleNamespace from typing import TYPE_CHECKING, Any, Final, TypeVar @@ -186,20 +185,6 @@ def persist( persistence if persistence is not None else default_flow_persistence() ) - if isinstance(target, type): - _stamp_persistence_metadata(target, actual_persistence, verbose) - original_init = target.__init__ # type: ignore[misc] - - @functools.wraps(original_init) - def new_init(self: Any, *args: Any, **kwargs: Any) -> None: - if "persistence" not in kwargs: - kwargs["persistence"] = actual_persistence - original_init(self, *args, **kwargs) - - target.__init__ = new_init # type: ignore[misc] - return target - - target.__is_flow_method__ = True # type: ignore[attr-defined] _stamp_persistence_metadata(target, actual_persistence, verbose) return target diff --git a/lib/crewai/src/crewai/flow/runtime/__init__.py b/lib/crewai/src/crewai/flow/runtime/__init__.py index 09e6983b4..ab31068a4 100644 --- a/lib/crewai/src/crewai/flow/runtime/__init__.py +++ b/lib/crewai/src/crewai/flow/runtime/__init__.py @@ -21,8 +21,8 @@ from collections.abc import ( from concurrent.futures import Future, ThreadPoolExecutor import contextvars import copy +from datetime import datetime import enum -import importlib import inspect import logging import threading @@ -85,6 +85,11 @@ from crewai.events.types.flow_events import ( MethodExecutionPausedEvent, MethodExecutionStartedEvent, ) +from crewai.flow.async_feedback.types import ( + HumanFeedbackPending, + HumanFeedbackProvider, + PendingFeedbackContext, +) from crewai.flow.dsl._utils import build_flow_definition from crewai.flow.flow_context import ( current_flow_defer_trace_finalization, @@ -95,6 +100,7 @@ from crewai.flow.flow_context import ( from crewai.flow.flow_definition import ( FlowDefinition, FlowDefinitionCondition, + FlowHumanFeedbackDefinition, FlowMethodDefinition, FlowPersistenceDefinition, FlowStateDefinition, @@ -105,10 +111,20 @@ from crewai.flow.flow_wrappers import ( RouterMethod, StartMethod, ) -from crewai.flow.human_feedback import HumanFeedbackResult +from crewai.flow.human_feedback import ( + HumanFeedbackResult, + _deserialize_llm_from_context, + _distill_and_store_lessons, + _pre_review_with_lessons, + _serialize_llm_for_context, +) from crewai.flow.input_provider import InputProvider from crewai.flow.persistence.base import FlowPersistence -from crewai.flow.runtime._action_resolvers import resolve_action +from crewai.flow.runtime._resolvers import ( + resolve_action, + resolve_instance_ref, + resolve_ref, +) from crewai.flow.types import ( FlowExecutionData, FlowMethodName, @@ -128,7 +144,6 @@ if TYPE_CHECKING: from crewai_files import FileInput from crewai.context import ExecutionContext - from crewai.flow.async_feedback.types import PendingFeedbackContext from crewai.llms.base_llm import BaseLLM from crewai.flow.visualization import build_flow_structure, render_interactive @@ -176,19 +191,12 @@ def _condition_satisfied(condition: FlowDefinitionCondition, events: set[str]) - def _build_definition_state_model( state_definition: FlowStateDefinition, ) -> BaseModel | None: - kwargs = ( - dict(state_definition.default) - if isinstance(state_definition.default, dict) - else {} - ) + kwargs = dict(state_definition.default or {}) model_class: type[BaseModel] | None = None if state_definition.ref: try: - module_name, _, qualname = state_definition.ref.partition(":") - resolved: Any = importlib.import_module(module_name) - for part in qualname.split("."): - resolved = getattr(resolved, part) + resolved: Any = resolve_ref(state_definition.ref, field="state") except Exception: logger.warning( "Could not import state ref %r", state_definition.ref, exc_info=True @@ -262,8 +270,7 @@ def _resolve_persistence(value: Any) -> Any: if isinstance(value, dict): from crewai.flow.persistence.base import _persistence_registry - type_name = value.get("persistence_type", "SQLiteFlowPersistence") - cls = _persistence_registry.get(type_name) + cls = _persistence_registry.get(value.get("persistence_type", "")) if cls is not None: return cls.model_validate(value) return value @@ -284,7 +291,7 @@ def _validate_input_provider(value: Any) -> Any: if value is None or isinstance(value, InputProvider): return value if isinstance(value, str) and ":" in value: - resolved = _resolve_input_provider_ref(value) + resolved = resolve_instance_ref(value, field="input_provider") else: from crewai.types.callback import _dotted_path_to_instance @@ -297,15 +304,6 @@ def _validate_input_provider(value: Any) -> Any: ) -def _resolve_input_provider_ref(ref: str) -> Any: - from crewai.flow.runtime._action_resolvers import import_ref - - target = import_ref(ref) - if inspect.isclass(target): - return target() - return target - - def _serialize_input_provider(value: Any) -> str | None: if value is None: return None @@ -762,10 +760,10 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): return flow_definition @classmethod - def from_definition(cls, definition: FlowDefinition) -> Flow[Any]: + def from_definition(cls, definition: FlowDefinition, **kwargs: Any) -> Flow[Any]: """Build a runnable Flow directly from a definition; no subclass required.""" return cls.model_validate( - definition.config.model_dump(), + {**definition.config.model_dump(), **kwargs}, context={"flow_definition": definition}, ) @@ -858,12 +856,21 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): ] = Field(default=None) @classmethod - def from_checkpoint(cls, config: CheckpointConfig) -> Flow: # type: ignore[type-arg] + def from_checkpoint( + cls, + config: CheckpointConfig, + *, + definition: FlowDefinition | None = None, + ) -> Flow: # type: ignore[type-arg] """Restore a Flow from a checkpoint. Args: config: Checkpoint configuration with ``restore_from`` set to the path of the checkpoint to load. + definition: The FlowDefinition to restore a definition-built flow + (one created via ``Flow.from_definition``) from; its actions + are re-resolved since checkpoints carry no callables. + Subclasses carry their own definition and don't need this. Returns: A Flow instance ready to resume. @@ -872,7 +879,10 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): from crewai.events.event_bus import crewai_event_bus from crewai.state.runtime import RuntimeState - state = RuntimeState.from_checkpoint(config, context={"from_checkpoint": True}) + context: dict[str, Any] = {"from_checkpoint": True} + if definition is not None: + context["flow_definition"] = definition + state = RuntimeState.from_checkpoint(config, context=context) crewai_event_bus.set_runtime_state(state) for entity in state.root: if not isinstance(entity, Flow): @@ -882,7 +892,9 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): if isinstance(entity, cls): entity._restore_from_checkpoint() return entity - instance = cls() + instance = ( + cls.from_definition(definition) if definition is not None else cls() + ) instance.checkpoint_completed_methods = entity.checkpoint_completed_methods instance.checkpoint_method_outputs = entity.checkpoint_method_outputs instance.checkpoint_method_counts = entity.checkpoint_method_counts @@ -896,17 +908,21 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): cls, config: CheckpointConfig, branch: str | None = None, + *, + definition: FlowDefinition | None = None, ) -> Flow: # type: ignore[type-arg] """Fork a Flow from a checkpoint, creating a new execution branch. Args: config: Checkpoint configuration with ``restore_from`` set. branch: Branch label for the fork. Auto-generated if not provided. + definition: The FlowDefinition to restore a definition-built flow + from, as in :meth:`from_checkpoint`. Returns: A Flow instance on the new branch. Call kickoff() to run. """ - flow = cls.from_checkpoint(config) + flow = cls.from_checkpoint(config, definition=definition) state = crewai_event_bus.runtime_state if state is None: raise RuntimeError( @@ -976,7 +992,8 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): _input_history: list[InputHistoryEntry] = PrivateAttr(default_factory=list) _state: Any = PrivateAttr(default=None) _deferred_flow_started_event_id: str | None = PrivateAttr(default=None) - _persist_backends: dict[int, FlowPersistence] = PrivateAttr(default_factory=dict) + _persist_backends: dict[str, FlowPersistence] = PrivateAttr(default_factory=dict) + _persistence_from_definition: bool = PrivateAttr(default=False) def __class_getitem__(cls: type[Flow[T]], item: type[T]) -> type[Flow[T]]: # type: ignore[override] class _FlowGeneric(cls): # type: ignore[valid-type,misc] @@ -1022,6 +1039,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): and flow_persist.enabled ): self.persistence = self._resolve_persist_backend(flow_persist) + self._persistence_from_definition = True if self._state is None: self._state = self._create_initial_state() @@ -1075,13 +1093,20 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): def _class_bound_methods(self) -> dict[FlowMethodName, Callable[..., Any]]: methods: dict[FlowMethodName, Callable[..., Any]] = {} + missing: list[str] = [] for method_name in self._definition.methods: method = getattr(self, method_name, None) if method is None: + missing.append(method_name) continue if not hasattr(method, "__self__"): method = method.__get__(self, type(self)) methods[FlowMethodName(method_name)] = method + if missing: + raise ValueError( + f"Flow {self._definition.name!r} definition declares methods its " + "class does not provide: " + ", ".join(missing) + ) return methods def recall(self, query: str, **kwargs: Any) -> Any: @@ -1327,6 +1352,8 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): cls, flow_id: str, persistence: FlowPersistence | None = None, + *, + definition: FlowDefinition | None = None, **kwargs: Any, ) -> Flow[Any]: """Create a Flow instance from a pending feedback state. @@ -1341,6 +1368,9 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): If not provided, uses ``default_flow_persistence()`` (the registered factory when present, else the built-in SQLite fallback). + definition: The FlowDefinition to restore a definition-built flow + (one created via ``Flow.from_definition``) from. Subclasses + carry their own definition and don't need this. **kwargs: Additional keyword arguments passed to the Flow constructor Returns: @@ -1372,7 +1402,11 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): state_data, pending_context = loaded - instance = cls(persistence=persistence, **kwargs) + instance = ( + cls.from_definition(definition, persistence=persistence, **kwargs) + if definition is not None + else cls(persistence=persistence, **kwargs) + ) instance._initialize_state(state_data) instance._pending_feedback_context = pending_context instance._is_execution_resuming = True @@ -1465,10 +1499,6 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): Raises: ValueError: If no pending feedback context exists """ - from datetime import datetime - - from crewai.flow.human_feedback import HumanFeedbackResult - if self._pending_feedback_context is None: raise ValueError( "No pending feedback context. Use from_pending() to restore a paused flow." @@ -1497,59 +1527,26 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): context = self._pending_feedback_context emit = context.emit - default_outcome = context.default_outcome - # Try to get the live LLM from the re-imported decorator first. - # This preserves the fully-configured object (credentials, safety_settings, etc.) - # for same-process resume. For cross-process resume, fall back to the - # serialized context.llm which is now a dict with full config (or a legacy string). - from crewai.flow.human_feedback import _deserialize_llm_from_context - - llm = None - method = self._methods.get(FlowMethodName(context.method_name)) - if method is not None: - live_llm = getattr(method, "_human_feedback_llm", None) - if live_llm is not None: - from crewai.llms.base_llm import BaseLLM as BaseLLMClass - - if isinstance(live_llm, BaseLLMClass): - llm = live_llm - - if llm is None: - llm = _deserialize_llm_from_context(context.llm) - - collapsed_outcome: str | None = None - - if not feedback.strip(): - if default_outcome: - collapsed_outcome = default_outcome - elif emit: - collapsed_outcome = emit[0] - elif emit: - if llm is not None: - collapsed_outcome = self._collapse_to_outcome( - feedback=feedback, - outcomes=emit, - llm=llm, - ) - else: - collapsed_outcome = emit[0] - - result = HumanFeedbackResult( - output=context.method_output, - feedback=feedback, - outcome=collapsed_outcome, - timestamp=datetime.now(), + # The serialized context carries the full LLM config (a dict, or a + # legacy model string) — the single source for cross- and same-process + # resume. + result = await self._finalize_human_feedback( method_name=context.method_name, + method_output=context.method_output, + raw_feedback=feedback, + emit=emit, + default_outcome=context.default_outcome, + llm=context.llm, metadata=context.metadata, ) - - self.human_feedback_history.append(result) - self.last_human_feedback = result + collapsed_outcome = result.outcome self._completed_methods.add(FlowMethodName(context.method_name)) - self._persist_method_completion(FlowMethodName(context.method_name)) + await asyncio.to_thread( + self._persist_method_completion, FlowMethodName(context.method_name) + ) self._pending_feedback_context = None @@ -1572,10 +1569,6 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): # This allows methods to re-execute in loops (e.g., implement_changes → suggest_changes → implement_changes) self._is_execution_resuming = False - if emit and collapsed_outcome is None: - collapsed_outcome = default_outcome or emit[0] - result.outcome = collapsed_outcome - try: if emit and collapsed_outcome: self._method_outputs.append(collapsed_outcome) @@ -1590,8 +1583,6 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): ) except Exception as e: # Check if flow was paused again for human feedback (loop case) - from crewai.flow.async_feedback.types import HumanFeedbackPending - if isinstance(e, HumanFeedbackPending): self._pending_feedback_context = e.context @@ -1773,11 +1764,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): "Flow %r declares state of unknown type; falling back to dict state", self._definition.name, ) - dict_state: dict[str, Any] = ( - dict(state_definition.default) - if isinstance(state_definition.default, dict) - else {} - ) + dict_state: dict[str, Any] = dict(state_definition.default or {}) if "id" not in dict_state: dict_state["id"] = str(uuid4()) return dict_state @@ -2057,8 +2044,6 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): result_holder.append(result) except Exception as e: # HumanFeedbackPending is expected control flow, not an error - from crewai.flow.async_feedback.types import HumanFeedbackPending - if isinstance(e, HumanFeedbackPending): result_holder.append(e) else: @@ -2159,8 +2144,6 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): result_holder.append(result) except Exception as e: # HumanFeedbackPending is expected control flow, not an error - from crewai.flow.async_feedback.types import HumanFeedbackPending - if isinstance(e, HumanFeedbackPending): result_holder.append(e) else: @@ -2382,8 +2365,6 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): await asyncio.gather(*tasks) except Exception as e: # Check if flow was paused for human feedback - from crewai.flow.async_feedback.types import HumanFeedbackPending - if isinstance(e, HumanFeedbackPending): # Auto-save pending feedback (create default persistence if needed) if self.persistence is None: @@ -2712,6 +2693,12 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): if asyncio.iscoroutine(result): result = await result + method_definition = self._definition.methods[str(method_name)] + if method_definition.human_feedback is not None: + result = await self._run_human_feedback_step( + method_name, method_definition.human_feedback, result + ) + self._method_outputs.append(result) # For @human_feedback methods with emit, the result is the collapsed outcome @@ -2730,7 +2717,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): self._completed_methods.add(method_name) - self._persist_method_completion(method_name) + await asyncio.to_thread(self._persist_method_completion, method_name) finished_event_id: str | None = None if not self.suppress_flow_events: @@ -2749,8 +2736,6 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): return result, finished_event_id except Exception as e: # Check if this is a HumanFeedbackPending exception (paused, not failed) - from crewai.flow.async_feedback.types import HumanFeedbackPending - if isinstance(e, HumanFeedbackPending): e.context.method_name = method_name @@ -2791,10 +2776,10 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): raise e def _persist_method_completion(self, method_name: FlowMethodName) -> None: - method_definition = self._definition.methods.get(method_name) + method_definition = self._definition.methods[str(method_name)] persist_definition = ( method_definition.persist - if method_definition is not None and method_definition.persist is not None + if method_definition.persist is not None else self._definition.persist ) if persist_definition is None or not persist_definition.enabled: @@ -2802,20 +2787,27 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): from crewai.flow.persistence.decorators import PersistenceDecorator - backend = self.persistence or self._persist_backend_for(persist_definition) + # Backend precedence: user-supplied instance persistence, then the + # method's own declared backend, then the flow-level backend (folded + # into self.persistence from the definition), then the default. + method_declares_backend = ( + method_definition.persist is not None + and method_definition.persist.persistence is not None + ) + if method_declares_backend and self._persistence_from_definition: + backend = self._persist_backends.get(str(method_name)) + if backend is None: + backend = self._resolve_persist_backend(persist_definition) + self._persist_backends[str(method_name)] = backend + else: + backend = self.persistence or self._persist_backends.get(str(method_name)) + if backend is None: + backend = self._resolve_persist_backend(persist_definition) + self._persist_backends[str(method_name)] = backend PersistenceDecorator.persist_state( self, method_name, backend, verbose=persist_definition.verbose ) - def _persist_backend_for( - self, persist_definition: FlowPersistenceDefinition - ) -> FlowPersistence: - cached = self._persist_backends.get(id(persist_definition)) - if cached is None: - cached = self._resolve_persist_backend(persist_definition) - self._persist_backends[id(persist_definition)] = cached - return cached - def _resolve_persist_backend( self, persist_definition: FlowPersistenceDefinition ) -> FlowPersistence: @@ -3105,8 +3097,6 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): except Exception as e: # Don't log HumanFeedbackPending as an error - it's expected control flow - from crewai.flow.async_feedback.types import HumanFeedbackPending - if not isinstance(e, HumanFeedbackPending): if not getattr(e, "_flow_listener_logged", False): logger.error(f"Error executing listener {listener_name}: {e}") @@ -3216,7 +3206,6 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): ThreadPoolExecutor, TimeoutError as FuturesTimeoutError, ) - from datetime import datetime from crewai.events.types.flow_events import ( FlowInputReceivedEvent, @@ -3309,6 +3298,157 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): return response + async def _run_human_feedback_step( + self, + method_name: FlowMethodName, + feedback_definition: FlowHumanFeedbackDefinition, + method_output: Any, + ) -> Any: + llm = feedback_definition.llm + llm_instance = ( + _deserialize_llm_from_context(llm) if isinstance(llm, (str, dict)) else llm + ) + emit = feedback_definition.emit + default_outcome = feedback_definition.default_outcome + metadata = feedback_definition.metadata + learn = feedback_definition.learn and self.memory is not None + + if learn: + method_output = await asyncio.to_thread( + _pre_review_with_lessons, + self, + method_name, + method_output, + llm=llm_instance, + learn_source=feedback_definition.learn_source, + learn_strict=feedback_definition.learn_strict, + ) + + provider = self._resolve_feedback_provider(feedback_definition) + if provider is not None: + context = PendingFeedbackContext( + flow_id=self.flow_id or "unknown", + flow_class=f"{type(self).__module__}.{type(self).__name__}", + method_name=method_name, + method_output=method_output, + message=feedback_definition.message, + emit=list(emit) if emit else None, + default_outcome=default_outcome, + metadata=metadata or {}, + llm=llm + if llm is None or isinstance(llm, (str, dict)) + else _serialize_llm_for_context(llm), + ) + feedback_value = await asyncio.to_thread( + provider.request_feedback, context, self + ) + if asyncio.iscoroutine(feedback_value): + feedback_value = await feedback_value + raw_feedback = str(feedback_value) + else: + raw_feedback = await asyncio.to_thread( + self._request_human_feedback, + message=feedback_definition.message, + output=method_output, + metadata=metadata, + emit=emit, + ) + + result = await self._finalize_human_feedback( + method_name=method_name, + method_output=method_output, + raw_feedback=raw_feedback, + emit=emit, + default_outcome=default_outcome, + llm=llm_instance, + metadata=metadata or {}, + ) + + if learn and raw_feedback.strip(): + await asyncio.to_thread( + _distill_and_store_lessons, + self, + method_name, + method_output, + raw_feedback, + llm=llm_instance, + learn_source=feedback_definition.learn_source, + learn_strict=feedback_definition.learn_strict, + ) + + if emit: + # Stash the real method output: the collapsed outcome routes + # listeners, but the flow's final result stays the method's + # actual return value. + self._human_feedback_method_outputs[method_name] = method_output + return result.outcome + return result + + async def _finalize_human_feedback( + self, + *, + method_name: str, + method_output: Any, + raw_feedback: str, + emit: list[str] | None, + default_outcome: str | None, + llm: Any, + metadata: dict[str, Any], + ) -> HumanFeedbackResult: + collapsed_outcome: str | None = None + if not raw_feedback.strip(): + if default_outcome: + collapsed_outcome = default_outcome + elif emit: + collapsed_outcome = emit[0] + elif emit: + collapse_llm = ( + _deserialize_llm_from_context(llm) + if isinstance(llm, (str, dict)) + else llm + ) + if collapse_llm is not None: + collapsed_outcome = await asyncio.to_thread( + self._collapse_to_outcome, + feedback=raw_feedback, + outcomes=emit, + llm=collapse_llm, + ) + else: + collapsed_outcome = emit[0] + if emit and collapsed_outcome is None: + collapsed_outcome = default_outcome or emit[0] + + result = HumanFeedbackResult( + output=method_output, + feedback=raw_feedback, + outcome=collapsed_outcome, + method_name=method_name, + metadata=metadata, + ) + self.human_feedback_history.append(result) + self.last_human_feedback = result + return result + + def _resolve_feedback_provider( + self, feedback_definition: FlowHumanFeedbackDefinition + ) -> Any: + + provider = feedback_definition.provider + if isinstance(provider, str): + provider = resolve_instance_ref(provider, field="human_feedback.provider") + if provider is None: + from crewai.flow.flow_config import flow_config + + provider = flow_config.hitl_provider + if provider is not None and not isinstance(provider, HumanFeedbackProvider): + raise ValueError( + f"human_feedback.provider {feedback_definition.provider!r} for flow " + f"{self._definition.name!r} does not implement the " + "HumanFeedbackProvider protocol (missing request_feedback)." + ) + return provider + def _request_human_feedback( self, message: str, diff --git a/lib/crewai/src/crewai/flow/runtime/_action_resolvers.py b/lib/crewai/src/crewai/flow/runtime/_action_resolvers.py deleted file mode 100644 index d71dfacaa..000000000 --- a/lib/crewai/src/crewai/flow/runtime/_action_resolvers.py +++ /dev/null @@ -1,48 +0,0 @@ -from __future__ import annotations - -from collections.abc import Callable -import importlib -from operator import attrgetter -from typing import TYPE_CHECKING, Any, cast - -from crewai.flow.flow_definition import FlowActionDefinition - - -if TYPE_CHECKING: - from crewai.flow.runtime import Flow - - -class InvalidActionRefError(ValueError): - def __init__(self, ref: str) -> None: - super().__init__(f"invalid callable {ref!r}; expected 'module:qualname'") - - -def import_ref(ref: str) -> Any: - """Import the object a `module:qualname` reference points to.""" - module_name, _, qualname = ref.partition(":") - if "<" in ref or not module_name or not qualname: - raise InvalidActionRefError(ref) - try: - return attrgetter(qualname)(importlib.import_module(module_name)) - except (ImportError, AttributeError) as e: - raise InvalidActionRefError(ref) from e - - -def _resolve_code_action( - flow: Flow[Any], action: FlowActionDefinition -) -> Callable[..., Any]: - ref = action.ref - target = import_ref(ref) - if not callable(target): - raise InvalidActionRefError(ref) - handler = cast(Callable[..., Any], target) - if getattr(handler, "__self__", None) is None: - handler = handler.__get__(flow, type(flow)) - return handler - - -def resolve_action(flow: Flow[Any], action: FlowActionDefinition) -> Callable[..., Any]: - """Turn one `do:` action into the callable the flow runs for that node.""" - if action.call == "code": - return _resolve_code_action(flow, action) - raise ValueError(f"unknown call type {action.call!r}") diff --git a/lib/crewai/src/crewai/flow/runtime/_resolvers.py b/lib/crewai/src/crewai/flow/runtime/_resolvers.py new file mode 100644 index 000000000..063009d4f --- /dev/null +++ b/lib/crewai/src/crewai/flow/runtime/_resolvers.py @@ -0,0 +1,70 @@ +"""Resolution of FlowDefinition refs (``module:qualname``) into live objects. + +Every ref-shaped value in a definition — ``do`` actions, ``state.ref``, +``config.input_provider``, ``human_feedback.provider`` — resolves through +:func:`resolve_ref`. Failures are loud and name the field and the ref. +""" + +from __future__ import annotations + +from collections.abc import Callable +import importlib +import inspect +from operator import attrgetter +from typing import TYPE_CHECKING, Any, cast + +from crewai.flow.flow_definition import FlowActionDefinition + + +if TYPE_CHECKING: + from crewai.flow.runtime import Flow + + +class InvalidRefError(ValueError): + """A definition ref that cannot be resolved to a live object.""" + + +def resolve_ref(ref: str, *, field: str) -> Any: + """Import the object a definition's `module:qualname` ref points to.""" + module_name, _, qualname = ref.partition(":") + if "<" in ref or not module_name or not qualname: + raise InvalidRefError( + f"invalid {field} ref {ref!r}; expected 'module:qualname'" + ) + try: + return attrgetter(qualname)(importlib.import_module(module_name)) + except (ImportError, AttributeError) as e: + raise InvalidRefError(f"unresolvable {field} ref {ref!r}") from e + + +def resolve_instance_ref(ref: str, *, field: str) -> Any: + """Resolve a ref, auto-instantiating a no-arg class into an instance.""" + target = resolve_ref(ref, field=field) + if not inspect.isclass(target): + return target + try: + return target() + except Exception as e: + raise InvalidRefError( + f"cannot instantiate {field} ref {ref!r} without arguments: {e}" + ) from e + + +def _resolve_code_action( + flow: Flow[Any], action: FlowActionDefinition +) -> Callable[..., Any]: + ref = action.ref + target = resolve_ref(ref, field="do") + if not callable(target): + raise InvalidRefError(f"invalid do ref {ref!r}; object is not callable") + handler = cast(Callable[..., Any], target) + if getattr(handler, "__self__", None) is None: + handler = handler.__get__(flow, type(flow)) + return handler + + +def resolve_action(flow: Flow[Any], action: FlowActionDefinition) -> Callable[..., Any]: + """Turn one `do:` action into the callable the flow runs for that node.""" + if action.call == "code": + return _resolve_code_action(flow, action) + raise ValueError(f"unknown call type {action.call!r}") diff --git a/lib/crewai/tests/test_async_human_feedback.py b/lib/crewai/tests/test_async_human_feedback.py index 19b682405..fad77988a 100644 --- a/lib/crewai/tests/test_async_human_feedback.py +++ b/lib/crewai/tests/test_async_human_feedback.py @@ -1168,132 +1168,13 @@ class TestAsyncHumanFeedbackEdgeCases: -class TestLiveLLMPreservationOnResume: - """Tests for preserving the full LLM config across HITL resume.""" - - def test_human_feedback_llm_attribute_set_on_wrapper_with_basellm(self) -> None: - """Test that _human_feedback_llm is set on the wrapper when llm is a BaseLLM instance.""" - from crewai.llms.base_llm import BaseLLM - - mock_llm = MagicMock(spec=BaseLLM) - mock_llm.model = "gemini/gemini-3-flash" - - class TestFlow(Flow): - @start() - @human_feedback( - message="Review:", - emit=["approved", "rejected"], - llm=mock_llm, - ) - def review(self): - return "content" - - flow = TestFlow() - method = flow._methods.get("review") - assert method is not None - assert hasattr(method, "_human_feedback_llm") - assert method._human_feedback_llm is mock_llm - - def test_human_feedback_llm_attribute_set_on_wrapper_with_string(self) -> None: - """Test that _human_feedback_llm is set on the wrapper even when llm is a string.""" - - class TestFlow(Flow): - @start() - @human_feedback( - message="Review:", - emit=["approved", "rejected"], - llm="gpt-4o-mini", - ) - def review(self): - return "content" - - flow = TestFlow() - method = flow._methods.get("review") - assert method is not None - assert hasattr(method, "_human_feedback_llm") - assert method._human_feedback_llm == "gpt-4o-mini" +class TestResumeLLMFromSerializedContext: + """Resume rebuilds the collapse LLM from the serialized context alone.""" @patch("crewai.flow.runtime.crewai_event_bus.emit") - def test_resume_async_uses_live_basellm_over_serialized_string( + def test_resume_builds_llm_from_serialized_context( self, mock_emit: MagicMock ) -> None: - """Test that resume_async uses the live BaseLLM from decorator instead of serialized string. - - This is the main bug fix: when a flow resumes, it should use the fully-configured - LLM from the re-imported decorator (with credentials, project, etc.) instead of - creating a new LLM from just the model string. - """ - with tempfile.TemporaryDirectory() as tmpdir: - db_path = os.path.join(tmpdir, "test_flows.db") - persistence = SQLiteFlowPersistence(db_path) - - from crewai.llms.base_llm import BaseLLM - - # Create a mock BaseLLM with full config (simulating Gemini with service account) - live_llm = MagicMock(spec=BaseLLM) - live_llm.model = "gemini/gemini-3-flash" - - class TestFlow(Flow): - result_path: str = "" - - @start() - @human_feedback( - message="Approve?", - emit=["approved", "rejected"], - llm=live_llm, - ) - def review(self): - return "content" - - @listen("approved") - def handle_approved(self): - self.result_path = "approved" - return "Approved!" - - context = PendingFeedbackContext( - flow_id="live-llm-test", - flow_class="TestFlow", - method_name="review", - method_output="content", - message="Approve?", - emit=["approved", "rejected"], - llm="gemini/gemini-3-flash", # Serialized string, NOT the live object - ) - persistence.save_pending_feedback( - flow_uuid="live-llm-test", - context=context, - state_data={"id": "live-llm-test"}, - ) - - flow = TestFlow.from_pending("live-llm-test", persistence) - - captured_llm = [] - - def capture_llm(feedback, outcomes, llm): - captured_llm.append(llm) - return "approved" - - with patch.object(flow, "_collapse_to_outcome", side_effect=capture_llm): - flow.resume("looks good!") - - # NOT the serialized string. The live_llm was captured at class definition - # time and stored on the method wrapper as _human_feedback_llm. - assert len(captured_llm) == 1 - # (which is stored on the method's _human_feedback_llm attribute) - method = flow._methods.get("review") - assert method is not None - assert captured_llm[0] is method._human_feedback_llm - # And verify it's a BaseLLM instance, not a string - assert isinstance(captured_llm[0], BaseLLM) - - @patch("crewai.flow.runtime.crewai_event_bus.emit") - def test_resume_async_falls_back_to_serialized_string_when_no_human_feedback_llm( - self, mock_emit: MagicMock - ) -> None: - """Test that resume_async falls back to context.llm when _human_feedback_llm is not available. - - This ensures backward compatibility with flows that were paused before this fix. - """ with tempfile.TemporaryDirectory() as tmpdir: db_path = os.path.join(tmpdir, "test_flows.db") persistence = SQLiteFlowPersistence(db_path) @@ -1325,11 +1206,6 @@ class TestLiveLLMPreservationOnResume: flow = TestFlow.from_pending("fallback-test", persistence) - # Remove _human_feedback_llm to simulate old decorator without this attribute - method = flow._methods.get("review") - if hasattr(method, "_human_feedback_llm"): - delattr(method, "_human_feedback_llm") - captured_llm = [] def capture_llm(feedback, outcomes, llm): @@ -1343,85 +1219,3 @@ class TestLiveLLMPreservationOnResume: from crewai.llms.base_llm import BaseLLM as BaseLLMClass assert isinstance(captured_llm[0], BaseLLMClass) assert captured_llm[0].model == "gpt-4o-mini" - - @patch("crewai.flow.runtime.crewai_event_bus.emit") - def test_resume_async_uses_string_from_context_when_human_feedback_llm_is_string( - self, mock_emit: MagicMock - ) -> None: - """Test that when _human_feedback_llm is a string (not BaseLLM), we still use context.llm. - - String LLM values offer no benefit over the serialized context.llm, - so we don't prefer them. - """ - with tempfile.TemporaryDirectory() as tmpdir: - db_path = os.path.join(tmpdir, "test_flows.db") - persistence = SQLiteFlowPersistence(db_path) - - class TestFlow(Flow): - @start() - @human_feedback( - message="Approve?", - emit=["approved", "rejected"], - llm="gpt-4o-mini", - ) - def review(self): - return "content" - - context = PendingFeedbackContext( - flow_id="string-llm-test", - flow_class="TestFlow", - method_name="review", - method_output="content", - message="Approve?", - emit=["approved", "rejected"], - llm="gpt-4o-mini", - ) - persistence.save_pending_feedback( - flow_uuid="string-llm-test", - context=context, - state_data={"id": "string-llm-test"}, - ) - - flow = TestFlow.from_pending("string-llm-test", persistence) - - method = flow._methods.get("review") - assert method._human_feedback_llm == "gpt-4o-mini" - - captured_llm = [] - - def capture_llm(feedback, outcomes, llm): - captured_llm.append(llm) - return "approved" - - with patch.object(flow, "_collapse_to_outcome", side_effect=capture_llm): - flow.resume("looks good!") - - # _human_feedback_llm is a string, so resume deserializes context.llm into an LLM instance - assert len(captured_llm) == 1 - from crewai.llms.base_llm import BaseLLM as BaseLLMClass - assert isinstance(captured_llm[0], BaseLLMClass) - assert captured_llm[0].model == "gpt-4o-mini" - - def test_human_feedback_llm_set_for_async_wrapper(self) -> None: - """Test that _human_feedback_llm is set on async wrapper functions.""" - import asyncio - from crewai.llms.base_llm import BaseLLM - - mock_llm = MagicMock(spec=BaseLLM) - mock_llm.model = "gemini/gemini-3-flash" - - class TestFlow(Flow): - @start() - @human_feedback( - message="Review:", - emit=["approved", "rejected"], - llm=mock_llm, - ) - async def async_review(self): - return "content" - - flow = TestFlow() - method = flow._methods.get("async_review") - assert method is not None - assert hasattr(method, "_human_feedback_llm") - assert method._human_feedback_llm is mock_llm diff --git a/lib/crewai/tests/test_flow_from_definition.py b/lib/crewai/tests/test_flow_from_definition.py index 0df09caa6..9f381977b 100644 --- a/lib/crewai/tests/test_flow_from_definition.py +++ b/lib/crewai/tests/test_flow_from_definition.py @@ -1,7 +1,9 @@ from __future__ import annotations from collections import defaultdict +from pathlib import Path from typing import Any, ClassVar +from unittest.mock import patch import pytest from pydantic import ValidationError @@ -15,8 +17,9 @@ from crewai.events.types.flow_events import ( MethodExecutionStartedEvent, ) from crewai.flow import Flow, and_, human_feedback, listen, or_, router, start -from crewai.flow.async_feedback import PendingFeedbackContext +from crewai.flow.async_feedback import HumanFeedbackPending, PendingFeedbackContext from crewai.flow.flow import FlowState +from crewai.flow.flow_config import flow_config from crewai.flow.flow_definition import FlowConfigDefinition, FlowDefinition from crewai.flow.persistence import persist from crewai.flow.persistence.base import FlowPersistence @@ -1012,3 +1015,366 @@ def test_resume_synthetic_completion_persists(): assert result == "done" assert _saved_methods("resume-synthetic") == ["generate"] + + +class ReviewFlow(Flow): + @start() + @human_feedback( + message="Review the draft:", + emit=["approved", "rejected"], + llm="gpt-4o-mini", + default_outcome="rejected", + ) + def draft(self): + return "draft-content" + + @listen("approved") + def publish(self): + return f"published:{self.last_human_feedback.feedback}" + + @listen("rejected") + def discard(self): + return "discarded" + + +REVIEW_YAML = f""" +schema: crewai.flow/v1 +name: ReviewFlow +methods: + draft: + do: + ref: {__name__}:ReviewFlow.draft + start: true + human_feedback: + message: "Review the draft:" + emit: [approved, rejected] + llm: gpt-4o-mini + default_outcome: rejected + publish: + do: + ref: {__name__}:ReviewFlow.publish + listen: approved + discard: + do: + ref: {__name__}:ReviewFlow.discard + listen: rejected +""" + + +def _pending_generate(flow): + return "content" + + +def _pending_process(flow, result): + return f"resumed:{result.feedback}" + + +class PausingProvider: + def request_feedback(self, context, flow): + raise HumanFeedbackPending(context=context) + + +PENDING_REVIEW_YAML = f""" +schema: crewai.flow/v1 +name: PendingReviewFlow +persist: + enabled: true + persistence: + persistence_type: DefinitionStoreBackend + store: hitl-pending +methods: + generate: + do: + ref: {__name__}:_pending_generate + start: true + human_feedback: + message: "Review:" + provider: {__name__}:PausingProvider + process: + do: + ref: {__name__}:_pending_process + listen: generate +""" + + +def test_human_feedback_from_yaml_default_outcome_routes(): + flow = Flow.from_definition(FlowDefinition.from_yaml(REVIEW_YAML)) + + with patch.object(flow, "_request_human_feedback", return_value="") as request: + result = flow.kickoff() + + assert result == "discarded" + assert request.call_count == 1 + assert flow.last_human_feedback.outcome == "rejected" + assert flow.last_human_feedback.output == "draft-content" + + +def test_human_feedback_from_yaml_collapses_and_routes(): + flow = Flow.from_definition(FlowDefinition.from_yaml(REVIEW_YAML)) + + with ( + patch.object(flow, "_request_human_feedback", return_value="ship it"), + patch.object(flow, "_collapse_to_outcome", return_value="approved"), + ): + result = flow.kickoff() + + assert result == "published:ship it" + assert [r.outcome for r in flow.human_feedback_history] == ["approved"] + + +def test_round_trip_human_feedback_equivalence(): + class_flow = ReviewFlow() + with patch.object(class_flow, "_request_human_feedback", return_value=""): + class_result = class_flow.kickoff() + + definition = FlowDefinition.from_yaml(ReviewFlow.flow_definition().to_yaml()) + twin = Flow.from_definition(definition) + with patch.object(twin, "_request_human_feedback", return_value=""): + twin_result = twin.kickoff() + + assert twin_result == class_result == "discarded" + assert ( + twin.last_human_feedback.outcome + == class_flow.last_human_feedback.outcome + == "rejected" + ) + + +def test_human_feedback_pending_and_resume_from_yaml(): + definition = FlowDefinition.from_yaml(PENDING_REVIEW_YAML) + + flow = Flow.from_definition(definition) + pending = flow.kickoff() + + assert isinstance(pending, HumanFeedbackPending) + flow_id = pending.context.flow_id + assert flow_id in DefinitionStoreBackend.pending + + resumed = Flow.from_pending( + flow_id, + DefinitionStoreBackend(store="hitl-pending"), + definition=definition, + ) + result = resumed.resume("looks good") + + assert result == "resumed:looks good" + assert resumed.last_human_feedback.feedback == "looks good" + assert flow_id not in DefinitionStoreBackend.pending + + +def test_flow_config_provider_fallback_from_yaml(): + yaml_str = f""" +schema: crewai.flow/v1 +name: ConfigProviderFlow +methods: + generate: + do: + ref: {__name__}:_pending_generate + start: true + human_feedback: + message: "Review:" + process: + do: + ref: {__name__}:_pending_process + listen: generate +""" + + class RecordingProvider: + def __init__(self): + self.requests = [] + + def request_feedback(self, context, flow): + self.requests.append(context.method_name) + return "from-config" + + provider = RecordingProvider() + flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + + previous = flow_config.hitl_provider + flow_config.hitl_provider = provider + try: + result = flow.kickoff() + finally: + flow_config.hitl_provider = previous + + assert result == "resumed:from-config" + assert provider.requests == ["generate"] + + +# --- PR 7: one resolution story, inert decorator attrs, restore paths --- + + +def test_runtime_package_reads_no_decorator_attrs(): + import crewai.flow.runtime as flow_runtime + + runtime_dir = Path(flow_runtime.__file__).parent + forbidden = ( + "__human_feedback_config__", + "__flow_persistence_config__", + "__flow_method_definition__", + "_human_feedback_llm", + ) + offenders = [ + f"{path.name}: {attr}" + for path in sorted(runtime_dir.rglob("*.py")) + for attr in forbidden + if attr in path.read_text(encoding="utf-8") + ] + assert offenders == [] + + +def test_stamped_decorator_attrs_are_inert_at_runtime(): + class StampFreeFlow(Flow): + @start() + @persist(DefinitionStoreBackend(store="stamp-free")) + def first(self): + return "one" + + @listen(first) + def second(self, result): + return f"{result}-two" + + StampFreeFlow.flow_definition() + stamped = ( + "__flow_method_definition__", + "__flow_persistence_config__", + "__human_feedback_config__", + ) + for name in ("first", "second"): + wrapper = StampFreeFlow.__dict__[name] + for attr in stamped: + if attr in wrapper.__dict__: + delattr(wrapper, attr) + + result = StampFreeFlow().kickoff() + + assert result == "one-two" + assert _saved_methods("stamp-free") == ["first"] + + +def test_class_level_persist_without_instance_kwarg_saves_and_restores(): + before = len(DefinitionStoreBackend.saves["class-decorator"]) + flow = ClassPersistedFlow() + flow.kickoff() + + assert _saved_methods("class-decorator")[before:] == ["first", "second"] + assert flow.state["count"] == 2 + + resumed = ClassPersistedFlow() + resumed.kickoff(inputs={"id": flow.state["id"]}) + assert resumed.state["count"] == 4 + + +def test_input_provider_bad_ref_names_field_and_ref(): + with pytest.raises(ValidationError, match="unresolvable input_provider ref"): + Flow(input_provider="missing_module_xyz:Provider") + + +class _NeedsArgsProvider: + def __init__(self, channel): + self.channel = channel + + def request_feedback(self, context, flow): + return "ok" + + +def test_provider_ref_requiring_ctor_args_fails_loudly(): + yaml_str = f""" +schema: crewai.flow/v1 +name: BadProviderFlow +methods: + generate: + do: + ref: {__name__}:_pending_generate + start: true + human_feedback: + message: "Review:" + provider: {__name__}:_NeedsArgsProvider +""" + flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + + with pytest.raises( + ValueError, match="cannot instantiate human_feedback.provider ref" + ): + flow.kickoff() + + +def test_unresolvable_provider_ref_names_field_and_ref(): + yaml_str = f""" +schema: crewai.flow/v1 +name: BadProviderFlow +methods: + generate: + do: + ref: {__name__}:_pending_generate + start: true + human_feedback: + message: "Review:" + provider: missing_module_xyz:Provider +""" + flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + + with pytest.raises( + ValueError, match="unresolvable human_feedback.provider ref" + ): + flow.kickoff() + + +def _checkpoint_chain_flow(tmp_path): + from crewai.state.provider.json_provider import JsonProvider + from crewai.state.runtime import RuntimeState + + definition = FlowDefinition.from_yaml(CHAIN_YAML) + flow = Flow.from_definition(definition) + result = flow.kickoff() + assert result == "confirmed:True" + + state = RuntimeState(root=[flow]) + state._provider = JsonProvider() + location = state.checkpoint(str(tmp_path)) + return definition, flow, CheckpointConfig(restore_from=location) + + +def test_from_checkpoint_with_definition_restores_yaml_flow(tmp_path): + definition, flow, config = _checkpoint_chain_flow(tmp_path) + + restored = Flow.from_checkpoint(config, definition=definition) + + assert restored.state["confirmed"] is True + assert restored.state["id"] == flow.state["id"] + assert restored.kickoff() == "confirmed:True" + + +def test_fork_with_definition_branches_yaml_flow(tmp_path): + definition, flow, config = _checkpoint_chain_flow(tmp_path) + + forked = Flow.fork(config, branch="alt", definition=definition) + + assert forked.state["id"] != flow.state["id"] + assert forked.kickoff() == "confirmed:True" + + +def test_non_dict_state_default_rejected_by_contract(): + yaml_str = """ +schema: crewai.flow/v1 +name: BadStateFlow +state: + type: dict + default: 42 +methods: {} +""" + with pytest.raises(ValidationError, match="default"): + FlowDefinition.from_yaml(yaml_str) + + +def test_definition_method_missing_from_class_fails_loudly(): + class VanishingFlow(Flow): + @start() + def begin(self): + return "one" + + VanishingFlow.flow_definition() + del VanishingFlow.begin + + with pytest.raises(ValueError, match="does not provide: begin"): + VanishingFlow() diff --git a/lib/crewai/tests/test_human_feedback_decorator.py b/lib/crewai/tests/test_human_feedback_decorator.py index 97af330b0..acf7d7e34 100644 --- a/lib/crewai/tests/test_human_feedback_decorator.py +++ b/lib/crewai/tests/test_human_feedback_decorator.py @@ -92,8 +92,8 @@ class TestHumanFeedbackValidation: assert hasattr(test_method, "__human_feedback_config__") assert not hasattr(test_method, "__is_router__") - def test_persist_preserves_human_feedback_llm_attribute(self): - """Test @persist preserves the live LLM stashed by @human_feedback.""" + def test_persist_preserves_human_feedback_config(self): + """Test @persist preserves the config stamped by @human_feedback.""" llm = object() @persist() @@ -105,8 +105,8 @@ class TestHumanFeedbackValidation: def test_method(self): return "output" - assert hasattr(test_method, "_human_feedback_llm") - assert test_method._human_feedback_llm is llm + assert hasattr(test_method, "__human_feedback_config__") + assert test_method.__human_feedback_config__.llm is llm class TestHumanFeedbackConfig: @@ -481,7 +481,7 @@ class TestHumanFeedbackLearn: with patch.object( flow, "_request_human_feedback", return_value="looks good" ): - flow.produce() + flow.kickoff() # memory.recall and memory.remember_many should NOT be called flow.memory.recall.assert_not_called() @@ -516,7 +516,7 @@ class TestHumanFeedbackLearn: ) MockLLM.return_value = mock_llm - flow.produce() + flow.kickoff() # remember_many should be called with the distilled lesson flow.memory.remember_many.assert_called_once() @@ -570,7 +570,7 @@ class TestHumanFeedbackLearn: ] MockLLM.return_value = mock_llm - flow.produce() + flow.kickoff() assert captured_output["shown_to_human"] == "draft with citations added" # recall was called to find past lessons @@ -592,7 +592,7 @@ class TestHumanFeedbackLearn: with patch.object( flow, "_request_human_feedback", return_value="" ): - flow.produce() + flow.kickoff() flow.memory.remember_many.assert_not_called() @@ -645,7 +645,7 @@ class TestHumanFeedbackLearn: mock_llm.call.side_effect = RuntimeError("simulated pre-review failure") MockLLM.return_value = mock_llm - flow.produce() + flow.kickoff() assert captured["shown_to_human"] == "raw draft" assert any( @@ -690,7 +690,7 @@ class TestHumanFeedbackLearn: MockLLM.return_value = mock_llm with pytest.raises(RuntimeError, match="simulated pre-review failure"): - flow.produce() + flow.kickoff() def test_distillation_failure_logs_and_does_not_block_flow(self, caplog): """Distillation LLM failure logs a warning but does not break the flow.""" @@ -717,7 +717,7 @@ class TestHumanFeedbackLearn: mock_llm.call.side_effect = RuntimeError("simulated distill failure") MockLLM.return_value = mock_llm - flow.produce() # must not raise + flow.kickoff() # must not raise flow.memory.remember_many.assert_not_called() assert any( diff --git a/lib/crewai/tests/test_human_feedback_integration.py b/lib/crewai/tests/test_human_feedback_integration.py index 8036fdb90..5c07243e3 100644 --- a/lib/crewai/tests/test_human_feedback_integration.py +++ b/lib/crewai/tests/test_human_feedback_integration.py @@ -778,77 +778,11 @@ class TestEdgeCases: class TestLLMConfigPreservation: """Tests that LLM config is preserved through @human_feedback serialization. - PR #4970 introduced _human_feedback_llm stashing so the live LLM object survives - decorator wrapping for same-process resume. The serialization path - (_serialize_llm_for_context / _deserialize_llm_from_context) preserves - config for cross-process resume. + The flow definition keeps the live LLM object for same-process execution. + The serialization path (_serialize_llm_for_context / + _deserialize_llm_from_context) preserves config for cross-process resume. """ - def test_human_feedback_llm_stashed_on_wrapper_with_llm_instance(self): - """Test that passing an LLM instance stashes it on the wrapper as _human_feedback_llm.""" - from crewai.llm import LLM - - llm_instance = LLM(model="gpt-4o-mini", temperature=0.42) - - class ConfigFlow(Flow): - @start() - @human_feedback( - message="Review:", - emit=["approved", "rejected"], - llm=llm_instance, - ) - def review(self): - return "content" - - method = ConfigFlow.review - assert hasattr(method, "_human_feedback_llm"), "_human_feedback_llm not found on wrapper" - assert method._human_feedback_llm is llm_instance, "_human_feedback_llm is not the same object" - - def test_human_feedback_llm_preserved_on_listen_method(self): - """Test that _human_feedback_llm is preserved when @human_feedback is on a @listen method.""" - from crewai.llm import LLM - - llm_instance = LLM(model="gpt-4o-mini", temperature=0.7) - - class ListenConfigFlow(Flow): - @start() - def generate(self): - return "draft" - - @listen("generate") - @human_feedback( - message="Review:", - emit=["approved", "rejected"], - llm=llm_instance, - ) - def review(self): - return "content" - - method = ListenConfigFlow.review - assert hasattr(method, "_human_feedback_llm") - assert method._human_feedback_llm is llm_instance - - def test_human_feedback_llm_accessible_on_instance(self): - """Test that _human_feedback_llm survives Flow instantiation (bound method access).""" - from crewai.llm import LLM - - llm_instance = LLM(model="gpt-4o-mini", temperature=0.42) - - class InstanceFlow(Flow): - @start() - @human_feedback( - message="Review:", - emit=["approved", "rejected"], - llm=llm_instance, - ) - def review(self): - return "content" - - flow = InstanceFlow() - instance_method = flow.review - assert hasattr(instance_method, "_human_feedback_llm") - assert instance_method._human_feedback_llm is llm_instance - def test_serialize_llm_preserves_config_fields(self): """Test that _serialize_llm_for_context captures temperature, base_url, etc.""" from crewai.flow.human_feedback import _serialize_llm_for_context