Drive human feedback from the flow definition

@human_feedback previously wrapped methods with the full HITL runtime (feedback
request, outcome collapse, learn loop), so flows built from a YAML definition —
which carry no decorated callables — could not pause for or route on human
feedback.
This commit is contained in:
Vinicius Brasil
2026-06-11 22:26:28 -07:00
parent 5a04033d4c
commit b0a71bb861
13 changed files with 939 additions and 860 deletions

View File

@@ -3,11 +3,10 @@ from __future__ import annotations
from collections.abc import Callable, Sequence
from typing import TYPE_CHECKING, Any, TypeVar
from crewai.flow.flow_definition import FlowMethodDefinition
from crewai.flow.human_feedback import (
HumanFeedbackConfig,
HumanFeedbackResult,
_build_human_feedback_runtime_decorator,
_validate_human_feedback_options,
)
@@ -21,32 +20,6 @@ F = TypeVar("F", bound=Callable[..., Any])
__all__ = ["HumanFeedbackResult", "human_feedback"]
def _stamp_human_feedback_metadata(
wrapper: Any,
func: Callable[..., Any],
config: HumanFeedbackConfig,
) -> None:
for attr in [
"__is_flow_method__",
"__flow_persistence_config__",
"__flow_method_definition__",
]:
if hasattr(func, attr):
setattr(wrapper, attr, getattr(func, attr))
wrapper.__human_feedback_config__ = config
wrapper.__is_flow_method__ = True
if config.emit:
fragment = getattr(wrapper, "__flow_method_definition__", None)
if isinstance(fragment, FlowMethodDefinition):
wrapper.__flow_method_definition__ = fragment.model_copy(
update={"router": True, "emit": list(config.emit)}
)
wrapper._human_feedback_llm = config.llm
def human_feedback(
message: str,
emit: Sequence[str] | None = None,
@@ -58,21 +31,18 @@ def human_feedback(
learn_source: str = "hitl",
learn_strict: bool = False,
) -> Callable[[F], F]:
"""Decorator for Flow methods that require human feedback."""
runtime_decorator = _build_human_feedback_runtime_decorator(
message=message,
emit=emit,
llm=llm,
default_outcome=default_outcome,
metadata=metadata,
provider=provider,
learn=learn,
learn_source=learn_source,
learn_strict=learn_strict,
"""Decorator for Flow methods that require human feedback.
The decorator is a pure metadata stamper: it records the feedback
configuration on the method, and the Flow engine collects and routes
feedback after the method completes, driven by the flow's definition.
"""
_validate_human_feedback_options(
emit=emit, llm=llm, default_outcome=default_outcome
)
config = HumanFeedbackConfig(
message=message,
emit=emit,
emit=list(emit) if emit is not None else None,
llm=llm,
default_outcome=default_outcome,
metadata=metadata,
@@ -83,8 +53,7 @@ def human_feedback(
)
def decorator(func: F) -> F:
wrapper = runtime_decorator(func)
_stamp_human_feedback_metadata(wrapper, func, config)
return wrapper
func.__human_feedback_config__ = config # type: ignore[attr-defined]
return func
return decorator

View File

@@ -18,6 +18,7 @@ from crewai.flow.flow_definition import (
FlowMethodDefinition,
FlowPersistenceDefinition,
FlowStateDefinition,
_object_ref,
)
from crewai.flow.flow_wrappers import (
FlowMethod,
@@ -35,15 +36,12 @@ _FLOW_METHOD_METADATA_ATTRS = [
"__flow_method_definition__",
"__flow_persistence_config__",
"__human_feedback_config__",
"_human_feedback_llm",
]
def is_flow_method(obj: Any) -> TypeIs[FlowMethod[Any, Any]]:
"""Check if the object carries Flow method wrapper metadata."""
return hasattr(obj, "__is_flow_method__") or hasattr(
obj, _FLOW_METHOD_DEFINITION_ATTR
)
return hasattr(obj, _FLOW_METHOD_DEFINITION_ATTR)
def _should_include_flow_method(flow_class: type, method: Any) -> bool:
@@ -81,7 +79,6 @@ def _stamp_inherited_conversational_metadata(
for attr in _FLOW_METHOD_METADATA_ATTRS:
if hasattr(inherited, attr):
setattr(method, attr, getattr(inherited, attr))
method.__is_flow_method__ = True
return method
@@ -105,13 +102,6 @@ def _get_flow_method_definition(method: Any) -> FlowMethodDefinition | None:
return None
def _object_ref(value: Any) -> str:
target = value if isinstance(value, type) else type(value)
module = getattr(target, "__module__", "")
qualname = getattr(target, "__qualname__", getattr(target, "__name__", ""))
return f"{module}:{qualname}" if module and qualname else repr(value)
def _is_json_serializable(value: Any) -> bool:
try:
json.dumps(value)
@@ -227,7 +217,10 @@ def _build_config_definition(
for field_name, default in field_defaults.items():
value = getattr(flow_class, field_name, default)
if field_name == "input_provider":
values[field_name] = None if value is None else _object_ref(value)
# A string value is already a ref; only live objects degrade.
values[field_name] = (
value if value is None or isinstance(value, str) else _object_ref(value)
)
else:
values[field_name] = _serialize_static_value(
value, diagnostics, f"config.{field_name}"
@@ -247,38 +240,31 @@ def _build_human_feedback_definition(
return FlowHumanFeedbackDefinition(
message=str(config.message),
emit=[str(value) for value in emit] if emit is not None else None,
llm=_serialize_static_value(
getattr(config, "llm", None), diagnostics, f"{path}.llm"
),
# llm and provider stay live: the engine consumes them in-process and
# the contract degrades them to serializable forms at JSON dump time.
llm=getattr(config, "llm", None),
default_outcome=getattr(config, "default_outcome", None),
metadata=_serialize_static_value(
getattr(config, "metadata", None), diagnostics, f"{path}.metadata"
),
provider=_serialize_static_value(
getattr(config, "provider", None), diagnostics, f"{path}.provider"
),
provider=getattr(config, "provider", None),
learn=bool(getattr(config, "learn", False)),
learn_source=str(getattr(config, "learn_source", "hitl")),
learn_strict=bool(getattr(config, "learn_strict", False)),
)
def _build_persistence_definition(
value: Any,
diagnostics: list[FlowDefinitionDiagnostic],
path: str,
) -> FlowPersistenceDefinition | None:
def _build_persistence_definition(value: Any) -> FlowPersistenceDefinition | None:
config = getattr(value, "__flow_persistence_config__", None)
if config is None:
return None
persistence = getattr(config, "persistence", None)
verbose = bool(getattr(config, "verbose", False))
return FlowPersistenceDefinition(
enabled=True,
verbose=verbose,
persistence=_serialize_static_value(
persistence, diagnostics, f"{path}.persistence"
),
verbose=bool(getattr(config, "verbose", False)),
# The backend stays live: the engine persists through the exact
# instance the user configured; the contract degrades it to a
# serialized config at JSON dump time.
persistence=getattr(config, "persistence", None),
)
@@ -396,9 +382,7 @@ def _build_method_definition(
method_definition.router = True
method_definition.emit = None
method_definition.persist = _build_persistence_definition(
method, diagnostics, f"{path}.persist"
)
method_definition.persist = _build_persistence_definition(method)
return method_definition
@@ -482,7 +466,7 @@ def _build_flow_definition_from_class(
description=description,
state=_build_state_definition(flow_class, diagnostics),
config=_build_config_definition(flow_class, diagnostics),
persist=_build_persistence_definition(flow_class, diagnostics, "persist"),
persist=_build_persistence_definition(flow_class),
conversational=_build_conversational_definition(flow_class, diagnostics),
methods=methods,
diagnostics=diagnostics,

View File

@@ -13,7 +13,7 @@ import json
import logging
from typing import Any, Literal as TypingLiteral
from pydantic import BaseModel, ConfigDict, Field
from pydantic import BaseModel, ConfigDict, Field, field_serializer, model_validator
import yaml
from crewai.flow.conversational_definition import (
@@ -41,6 +41,14 @@ __all__ = [
]
def _object_ref(value: Any) -> str:
"""Format a class or instance as the canonical ``module:qualname`` ref."""
target = value if isinstance(value, type) else type(value)
module = getattr(target, "__module__", "")
qualname = getattr(target, "__qualname__", getattr(target, "__name__", ""))
return f"{module}:{qualname}" if module and qualname else repr(value)
class FlowDefinitionDiagnostic(BaseModel):
"""A non-fatal Flow Definition build or validation diagnostic."""
@@ -56,7 +64,7 @@ class FlowStateDefinition(BaseModel):
type: TypingLiteral["dict", "pydantic", "json_schema", "unknown"] = "dict"
ref: str | None = None
json_schema: dict[str, Any] | None = None
default: Any = None
default: dict[str, Any] | None = None
class FlowConfigDefinition(BaseModel):
@@ -73,15 +81,41 @@ class FlowConfigDefinition(BaseModel):
class FlowPersistenceDefinition(BaseModel):
"""Static persistence configuration."""
"""Static persistence configuration.
``persistence`` may hold a live backend when the definition is built from
a decorated class — the engine then persists through the exact instance
the user configured; the JSON/YAML projection degrades it to its
serialized config.
"""
enabled: bool = False
verbose: bool = False
persistence: dict[str, Any] | None = None
persistence: Any = None
@field_serializer("persistence", when_used="json")
def _serialize_persistence(self, value: Any) -> Any:
if value is None or isinstance(value, dict):
return value
if isinstance(value, BaseModel):
try:
return value.model_dump(mode="json")
except Exception:
logger.warning(
"Persistence backend %s is not fully serializable; "
"preserved import reference only.",
_object_ref(value),
)
return {"ref": _object_ref(value)}
class FlowHumanFeedbackDefinition(BaseModel):
"""Static human feedback configuration."""
"""Static human feedback configuration.
``llm`` and ``provider`` may hold live Python objects when the definition
is built from a decorated class; the JSON/YAML projection degrades them to
a serialized config (``llm``) or a ``module:qualname`` ref (``provider``).
"""
message: str
emit: list[str] | None = None
@@ -93,6 +127,20 @@ class FlowHumanFeedbackDefinition(BaseModel):
learn_source: str = "hitl"
learn_strict: bool = False
@field_serializer("llm", when_used="json")
def _serialize_llm(self, value: Any) -> dict[str, Any] | str | None:
if value is None or isinstance(value, (str, dict)):
return value
from crewai.flow.human_feedback import _serialize_llm_for_context
return _serialize_llm_for_context(value)
@field_serializer("provider", when_used="json")
def _serialize_provider(self, value: Any) -> str | None:
if value is None or isinstance(value, str):
return value
return _object_ref(value)
class FlowActionDefinition(BaseModel):
"""What a Flow method node executes, independent of when it fires."""
@@ -112,6 +160,16 @@ class FlowMethodDefinition(BaseModel):
human_feedback: FlowHumanFeedbackDefinition | None = None
persist: FlowPersistenceDefinition | None = None
@model_validator(mode="after")
def _canonicalize_human_feedback_routing(self) -> FlowMethodDefinition:
# Canonical shape: a method whose human_feedback declares emit
# outcomes routes like a router, regardless of how the definition
# was authored.
if self.human_feedback is not None and self.human_feedback.emit:
self.router = True
self.emit = None
return self
@property
def is_start(self) -> bool:
"""Whether this method is a start method.

View File

@@ -83,7 +83,6 @@ class FlowMethod(Generic[P, R]):
"__conversational_only__", # gates registration on Flow.conversational
"__flow_persistence_config__",
"__flow_method_definition__",
"_human_feedback_llm", # Live LLM object for HITL resume
]:
if hasattr(meth, attr):
setattr(self, attr, getattr(meth, attr))

View File

@@ -1,8 +1,11 @@
"""Human feedback decorator for Flow methods.
"""Human feedback support for Flow methods.
This module provides the @human_feedback decorator that enables human-in-the-loop
workflows within CrewAI Flows. It allows collecting human feedback on method outputs
and optionally routing to different listeners based on the feedback.
This module backs the @human_feedback decorator that enables human-in-the-loop
workflows within CrewAI Flows. The decorator is a pure metadata stamper: it
records a :class:`HumanFeedbackConfig` on the method, the Flow definition
builder lifts it into ``FlowHumanFeedbackDefinition``, and the Flow engine
collects feedback after each decorated method completes, driven by the flow's
definition.
Supports both synchronous (blocking) and asynchronous (non-blocking) feedback
collection through the provider parameter.
@@ -55,22 +58,18 @@ Example (asynchronous with custom provider):
from __future__ import annotations
import asyncio
from collections.abc import Callable, Sequence
from dataclasses import dataclass, field
from datetime import datetime
from functools import wraps
import logging
from typing import TYPE_CHECKING, Any, TypeVar
from pydantic import BaseModel, Field
from crewai.flow.flow_wrappers import FlowMethod
if TYPE_CHECKING:
from crewai.flow.async_feedback.types import HumanFeedbackProvider
from crewai.flow.flow import Flow
from crewai.flow.runtime import Flow
from crewai.llms.base_llm import BaseLLM
@@ -160,8 +159,8 @@ class HumanFeedbackResult:
class HumanFeedbackConfig:
"""Configuration for the @human_feedback decorator.
Stores the parameters passed to the decorator for later use during
method execution and for introspection by visualization tools.
Stores the parameters passed to the decorator for later use by the
Flow definition builder and for introspection by visualization tools.
Attributes:
message: The message shown to the human when requesting feedback.
@@ -183,19 +182,6 @@ class HumanFeedbackConfig:
learn_strict: bool = False
class HumanFeedbackMethod(FlowMethod[Any, Any]):
"""Wrapper for methods decorated with @human_feedback.
This wrapper extends FlowMethod to add human feedback specific attributes
used by the FlowDefinition builder and runtime feedback handling.
Attributes:
__human_feedback_config__: The HumanFeedbackConfig for this method.
"""
__human_feedback_config__: HumanFeedbackConfig | None = None
class PreReviewResult(BaseModel):
"""Structured output from the HITL pre-review LLM call."""
@@ -217,17 +203,11 @@ class DistilledLessons(BaseModel):
)
def _build_human_feedback_runtime_decorator(
message: str,
emit: Sequence[str] | None = None,
llm: str | BaseLLM | None = "gpt-4o-mini",
default_outcome: str | None = None,
metadata: dict[str, Any] | None = None,
provider: HumanFeedbackProvider | None = None,
learn: bool = False,
learn_source: str = "hitl",
learn_strict: bool = False,
) -> Callable[[F], F]:
def _validate_human_feedback_options(
emit: Sequence[str] | None,
llm: Any,
default_outcome: str | None,
) -> None:
if emit is not None:
if not llm:
raise ValueError(
@@ -244,295 +224,139 @@ def _build_human_feedback_runtime_decorator(
elif default_outcome is not None:
raise ValueError("default_outcome requires emit to be specified.")
def decorator(func: F) -> F:
def _get_hitl_prompt(key: str) -> str:
from crewai.utilities.i18n import I18N_DEFAULT
return I18N_DEFAULT.slice(key)
def _get_hitl_prompt(key: str) -> str:
from crewai.utilities.i18n import I18N_DEFAULT
def _resolve_llm_instance() -> Any:
if llm is None:
from crewai.llm import LLM
return I18N_DEFAULT.slice(key)
return LLM(model="gpt-4o-mini")
if isinstance(llm, str):
from crewai.llm import LLM
return LLM(model=llm)
return llm # already a BaseLLM instance
def _resolve_llm_instance(llm: Any) -> Any:
from crewai.llm import LLM
def _pre_review_with_lessons(
flow_instance: Flow[Any], method_output: Any
) -> Any:
try:
mem = flow_instance.memory
if mem is None:
return method_output
query = f"human feedback lessons for {func.__name__}: {method_output!s}"
matches = mem.recall(query, source=learn_source)
if not matches:
return method_output
if llm is None:
return LLM(model="gpt-4o-mini")
if isinstance(llm, str):
return LLM(model=llm)
if isinstance(llm, dict):
deserialized = _deserialize_llm_from_context(llm)
return deserialized if deserialized is not None else LLM(model="gpt-4o-mini")
return llm # already a BaseLLM instance
lessons = "\n".join(f"- {m.record.content}" for m in matches)
llm_inst = _resolve_llm_instance()
prompt = _get_hitl_prompt("hitl_pre_review_user").format(
output=str(method_output),
lessons=lessons,
)
messages = [
{
"role": "system",
"content": _get_hitl_prompt("hitl_pre_review_system"),
},
{"role": "user", "content": prompt},
]
if getattr(llm_inst, "supports_function_calling", lambda: False)():
response = llm_inst.call(messages, response_model=PreReviewResult)
if isinstance(response, PreReviewResult):
return response.improved_output
return PreReviewResult.model_validate(response).improved_output
reviewed = llm_inst.call(messages)
return reviewed if isinstance(reviewed, str) else str(reviewed)
except Exception:
if learn_strict:
logger.warning(
"HITL pre-review failed for %s; re-raising (learn_strict=True)",
func.__name__,
exc_info=True,
)
raise
logger.warning(
"HITL pre-review failed for %s; falling back to raw output",
func.__name__,
exc_info=True,
)
return method_output
def _distill_and_store_lessons(
flow_instance: Flow[Any], method_output: Any, raw_feedback: str
) -> None:
try:
mem = flow_instance.memory
if mem is None:
return
llm_inst = _resolve_llm_instance()
prompt = _get_hitl_prompt("hitl_distill_user").format(
method_name=func.__name__,
output=str(method_output),
feedback=raw_feedback,
)
messages = [
{
"role": "system",
"content": _get_hitl_prompt("hitl_distill_system"),
},
{"role": "user", "content": prompt},
]
def _pre_review_with_lessons(
flow_instance: Flow[Any],
method_name: str,
method_output: Any,
*,
llm: Any,
learn_source: str,
learn_strict: bool,
) -> Any:
try:
mem = flow_instance.memory
if mem is None:
return method_output
query = f"human feedback lessons for {method_name}: {method_output!s}"
matches = mem.recall(query, source=learn_source)
if not matches:
return method_output
lessons: list[str] = []
if getattr(llm_inst, "supports_function_calling", lambda: False)():
response = llm_inst.call(messages, response_model=DistilledLessons)
if isinstance(response, DistilledLessons):
lessons = response.lessons
else:
lessons = DistilledLessons.model_validate(response).lessons
else:
response = llm_inst.call(messages)
if isinstance(response, str):
lessons = [
line.strip("- ").strip()
for line in response.strip().split("\n")
if line.strip() and line.strip() != "NONE"
]
if lessons:
mem.remember_many(lessons, source=learn_source) # type: ignore[union-attr]
except Exception:
if learn_strict:
logger.warning(
"HITL lesson distillation failed for %s; re-raising (learn_strict=True)",
func.__name__,
exc_info=True,
)
raise
logger.warning(
"HITL lesson distillation failed for %s; no lessons stored",
func.__name__,
exc_info=True,
)
def _build_feedback_context(
flow_instance: Flow[Any], method_output: Any
) -> tuple[Any, Any]:
from crewai.flow.async_feedback.types import PendingFeedbackContext
context = PendingFeedbackContext(
flow_id=flow_instance.flow_id or "unknown",
flow_class=f"{flow_instance.__class__.__module__}.{flow_instance.__class__.__name__}",
method_name=func.__name__,
method_output=method_output,
message=message,
emit=list(emit) if emit else None,
default_outcome=default_outcome,
metadata=metadata or {},
llm=llm if isinstance(llm, str) else _serialize_llm_for_context(llm),
lessons = "\n".join(f"- {m.record.content}" for m in matches)
llm_inst = _resolve_llm_instance(llm)
prompt = _get_hitl_prompt("hitl_pre_review_user").format(
output=str(method_output),
lessons=lessons,
)
messages = [
{
"role": "system",
"content": _get_hitl_prompt("hitl_pre_review_system"),
},
{"role": "user", "content": prompt},
]
if getattr(llm_inst, "supports_function_calling", lambda: False)():
response = llm_inst.call(messages, response_model=PreReviewResult)
if isinstance(response, PreReviewResult):
return response.improved_output
return PreReviewResult.model_validate(response).improved_output
reviewed = llm_inst.call(messages)
return reviewed if isinstance(reviewed, str) else str(reviewed)
except Exception:
if learn_strict:
logger.warning(
"HITL pre-review failed for %s; re-raising (learn_strict=True)",
method_name,
exc_info=True,
)
raise
logger.warning(
"HITL pre-review failed for %s; falling back to raw output",
method_name,
exc_info=True,
)
return method_output
effective_provider = provider
if effective_provider is None:
from crewai.flow.flow_config import flow_config
effective_provider = flow_config.hitl_provider
def _distill_and_store_lessons(
flow_instance: Flow[Any],
method_name: str,
method_output: Any,
raw_feedback: str,
*,
llm: Any,
learn_source: str,
learn_strict: bool,
) -> None:
try:
mem = flow_instance.memory
if mem is None:
return
llm_inst = _resolve_llm_instance(llm)
prompt = _get_hitl_prompt("hitl_distill_user").format(
method_name=method_name,
output=str(method_output),
feedback=raw_feedback,
)
messages = [
{
"role": "system",
"content": _get_hitl_prompt("hitl_distill_system"),
},
{"role": "user", "content": prompt},
]
return context, effective_provider
def _request_feedback(flow_instance: Flow[Any], method_output: Any) -> str:
context, effective_provider = _build_feedback_context(
flow_instance, method_output
)
if effective_provider is not None:
feedback_result = effective_provider.request_feedback(
context, flow_instance
)
if asyncio.iscoroutine(feedback_result):
raise TypeError(
f"Provider {type(effective_provider).__name__}.request_feedback() "
"returned a coroutine in a sync flow method. Use an async flow "
"method or a synchronous provider."
)
return str(feedback_result)
return flow_instance._request_human_feedback(
message=message,
output=method_output,
metadata=metadata,
emit=emit,
)
async def _request_feedback_async(
flow_instance: Flow[Any], method_output: Any
) -> str:
context, effective_provider = _build_feedback_context(
flow_instance, method_output
)
if effective_provider is not None:
feedback_result = effective_provider.request_feedback(
context, flow_instance
)
if asyncio.iscoroutine(feedback_result):
return str(await feedback_result)
return str(feedback_result)
return flow_instance._request_human_feedback(
message=message,
output=method_output,
metadata=metadata,
emit=emit,
)
def _process_feedback(
flow_instance: Flow[Any],
method_output: Any,
raw_feedback: str,
) -> HumanFeedbackResult | str:
collapsed_outcome: str | None = None
if not raw_feedback.strip():
if default_outcome:
collapsed_outcome = default_outcome
elif emit:
collapsed_outcome = emit[0]
elif emit:
if llm is not None:
collapsed_outcome = flow_instance._collapse_to_outcome(
feedback=raw_feedback,
outcomes=emit,
llm=llm,
)
else:
collapsed_outcome = emit[0]
result = HumanFeedbackResult(
output=method_output,
feedback=raw_feedback,
outcome=collapsed_outcome,
timestamp=datetime.now(),
method_name=func.__name__,
metadata=metadata or {},
)
flow_instance.human_feedback_history.append(result)
flow_instance.last_human_feedback = result
if emit:
if collapsed_outcome is None:
collapsed_outcome = default_outcome or emit[0]
result.outcome = collapsed_outcome
return collapsed_outcome
return result
if asyncio.iscoroutinefunction(func):
@wraps(func)
async def async_wrapper(self: Flow[Any], *args: Any, **kwargs: Any) -> Any:
method_output = await func(self, *args, **kwargs)
if learn and getattr(self, "memory", None) is not None:
method_output = _pre_review_with_lessons(self, method_output)
raw_feedback = await _request_feedback_async(self, method_output)
result = _process_feedback(self, method_output, raw_feedback)
if (
learn
and getattr(self, "memory", None) is not None
and raw_feedback.strip()
):
_distill_and_store_lessons(self, method_output, raw_feedback)
# Stash the real method output for final flow result when emit is set:
# result is the collapsed outcome string for routing, but we preserve the
# actual method output as the flow's final result. Uses per-method dict for
# concurrency safety and to handle None returns.
if emit:
self._human_feedback_method_outputs[func.__name__] = method_output
return result
wrapper: Any = async_wrapper
lessons: list[str] = []
if getattr(llm_inst, "supports_function_calling", lambda: False)():
response = llm_inst.call(messages, response_model=DistilledLessons)
if isinstance(response, DistilledLessons):
lessons = response.lessons
else:
lessons = DistilledLessons.model_validate(response).lessons
else:
response = llm_inst.call(messages)
if isinstance(response, str):
lessons = [
line.strip("- ").strip()
for line in response.strip().split("\n")
if line.strip() and line.strip() != "NONE"
]
@wraps(func)
def sync_wrapper(self: Flow[Any], *args: Any, **kwargs: Any) -> Any:
method_output = func(self, *args, **kwargs)
if learn and getattr(self, "memory", None) is not None:
method_output = _pre_review_with_lessons(self, method_output)
raw_feedback = _request_feedback(self, method_output)
result = _process_feedback(self, method_output, raw_feedback)
if (
learn
and getattr(self, "memory", None) is not None
and raw_feedback.strip()
):
_distill_and_store_lessons(self, method_output, raw_feedback)
# Stash the real method output for final flow result when emit is set:
# result is the collapsed outcome string for routing, but we preserve the
# actual method output as the flow's final result. Uses per-method dict for
# concurrency safety and to handle None returns.
if emit:
self._human_feedback_method_outputs[func.__name__] = method_output
return result
wrapper = sync_wrapper
return wrapper # type: ignore[no-any-return]
return decorator
if lessons:
mem.remember_many(lessons, source=learn_source) # type: ignore[union-attr]
except Exception:
if learn_strict:
logger.warning(
"HITL lesson distillation failed for %s; re-raising (learn_strict=True)",
method_name,
exc_info=True,
)
raise
logger.warning(
"HITL lesson distillation failed for %s; no lessons stored",
method_name,
exc_info=True,
)
def human_feedback(

View File

@@ -25,7 +25,6 @@ Example:
from __future__ import annotations
from collections.abc import Callable
import functools
import logging
from types import SimpleNamespace
from typing import TYPE_CHECKING, Any, Final, TypeVar
@@ -186,20 +185,6 @@ def persist(
persistence if persistence is not None else default_flow_persistence()
)
if isinstance(target, type):
_stamp_persistence_metadata(target, actual_persistence, verbose)
original_init = target.__init__ # type: ignore[misc]
@functools.wraps(original_init)
def new_init(self: Any, *args: Any, **kwargs: Any) -> None:
if "persistence" not in kwargs:
kwargs["persistence"] = actual_persistence
original_init(self, *args, **kwargs)
target.__init__ = new_init # type: ignore[misc]
return target
target.__is_flow_method__ = True # type: ignore[attr-defined]
_stamp_persistence_metadata(target, actual_persistence, verbose)
return target

View File

@@ -21,8 +21,8 @@ from collections.abc import (
from concurrent.futures import Future, ThreadPoolExecutor
import contextvars
import copy
from datetime import datetime
import enum
import importlib
import inspect
import logging
import threading
@@ -86,6 +86,11 @@ from crewai.events.types.flow_events import (
MethodExecutionStartedEvent,
)
from crewai.events.types.llm_events import LLMCallCompletedEvent
from crewai.flow.async_feedback.types import (
HumanFeedbackPending,
HumanFeedbackProvider,
PendingFeedbackContext,
)
from crewai.flow.dsl._utils import build_flow_definition
from crewai.flow.flow_context import (
current_flow_defer_trace_finalization,
@@ -96,6 +101,7 @@ from crewai.flow.flow_context import (
from crewai.flow.flow_definition import (
FlowDefinition,
FlowDefinitionCondition,
FlowHumanFeedbackDefinition,
FlowMethodDefinition,
FlowPersistenceDefinition,
FlowStateDefinition,
@@ -106,10 +112,20 @@ from crewai.flow.flow_wrappers import (
RouterMethod,
StartMethod,
)
from crewai.flow.human_feedback import HumanFeedbackResult
from crewai.flow.human_feedback import (
HumanFeedbackResult,
_deserialize_llm_from_context,
_distill_and_store_lessons,
_pre_review_with_lessons,
_serialize_llm_for_context,
)
from crewai.flow.input_provider import InputProvider
from crewai.flow.persistence.base import FlowPersistence
from crewai.flow.runtime._action_resolvers import resolve_action
from crewai.flow.runtime._resolvers import (
resolve_action,
resolve_instance_ref,
resolve_ref,
)
from crewai.flow.types import (
FlowExecutionData,
FlowMethodName,
@@ -129,7 +145,6 @@ if TYPE_CHECKING:
from crewai_files import FileInput
from crewai.context import ExecutionContext
from crewai.flow.async_feedback.types import PendingFeedbackContext
from crewai.llms.base_llm import BaseLLM
from crewai.flow.visualization import build_flow_structure, render_interactive
@@ -178,19 +193,12 @@ def _condition_satisfied(condition: FlowDefinitionCondition, events: set[str]) -
def _build_definition_state_model(
state_definition: FlowStateDefinition,
) -> BaseModel | None:
kwargs = (
dict(state_definition.default)
if isinstance(state_definition.default, dict)
else {}
)
kwargs = dict(state_definition.default or {})
model_class: type[BaseModel] | None = None
if state_definition.ref:
try:
module_name, _, qualname = state_definition.ref.partition(":")
resolved: Any = importlib.import_module(module_name)
for part in qualname.split("."):
resolved = getattr(resolved, part)
resolved: Any = resolve_ref(state_definition.ref, field="state")
except Exception:
logger.warning(
"Could not import state ref %r", state_definition.ref, exc_info=True
@@ -274,8 +282,7 @@ def _resolve_persistence(value: Any) -> Any:
if isinstance(value, dict):
from crewai.flow.persistence.base import _persistence_registry
type_name = value.get("persistence_type", "SQLiteFlowPersistence")
cls = _persistence_registry.get(type_name)
cls = _persistence_registry.get(value.get("persistence_type", ""))
if cls is not None:
return cls.model_validate(value)
return value
@@ -296,7 +303,7 @@ def _validate_input_provider(value: Any) -> Any:
if value is None or isinstance(value, InputProvider):
return value
if isinstance(value, str) and ":" in value:
resolved = _resolve_input_provider_ref(value)
resolved = resolve_instance_ref(value, field="input_provider")
else:
from crewai.types.callback import _dotted_path_to_instance
@@ -309,15 +316,6 @@ def _validate_input_provider(value: Any) -> Any:
)
def _resolve_input_provider_ref(ref: str) -> Any:
from crewai.flow.runtime._action_resolvers import import_ref
target = import_ref(ref)
if inspect.isclass(target):
return target()
return target
def _serialize_input_provider(value: Any) -> str | None:
if value is None:
return None
@@ -774,10 +772,10 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
return flow_definition
@classmethod
def from_definition(cls, definition: FlowDefinition) -> Flow[Any]:
def from_definition(cls, definition: FlowDefinition, **kwargs: Any) -> Flow[Any]:
"""Build a runnable Flow directly from a definition; no subclass required."""
return cls.model_validate(
definition.config.model_dump(),
{**definition.config.model_dump(), **kwargs},
context={"flow_definition": definition},
)
@@ -870,12 +868,21 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
] = Field(default=None)
@classmethod
def from_checkpoint(cls, config: CheckpointConfig) -> Flow: # type: ignore[type-arg]
def from_checkpoint(
cls,
config: CheckpointConfig,
*,
definition: FlowDefinition | None = None,
) -> Flow: # type: ignore[type-arg]
"""Restore a Flow from a checkpoint.
Args:
config: Checkpoint configuration with ``restore_from`` set to
the path of the checkpoint to load.
definition: The FlowDefinition to restore a definition-built flow
(one created via ``Flow.from_definition``) from; its actions
are re-resolved since checkpoints carry no callables.
Subclasses carry their own definition and don't need this.
Returns:
A Flow instance ready to resume.
@@ -884,7 +891,10 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
from crewai.events.event_bus import crewai_event_bus
from crewai.state.runtime import RuntimeState
state = RuntimeState.from_checkpoint(config, context={"from_checkpoint": True})
context: dict[str, Any] = {"from_checkpoint": True}
if definition is not None:
context["flow_definition"] = definition
state = RuntimeState.from_checkpoint(config, context=context)
crewai_event_bus.set_runtime_state(state)
for entity in state.root:
if not isinstance(entity, Flow):
@@ -894,7 +904,9 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
if isinstance(entity, cls):
entity._restore_from_checkpoint()
return entity
instance = cls()
instance = (
cls.from_definition(definition) if definition is not None else cls()
)
instance.checkpoint_completed_methods = entity.checkpoint_completed_methods
instance.checkpoint_method_outputs = entity.checkpoint_method_outputs
instance.checkpoint_method_counts = entity.checkpoint_method_counts
@@ -908,17 +920,21 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
cls,
config: CheckpointConfig,
branch: str | None = None,
*,
definition: FlowDefinition | None = None,
) -> Flow: # type: ignore[type-arg]
"""Fork a Flow from a checkpoint, creating a new execution branch.
Args:
config: Checkpoint configuration with ``restore_from`` set.
branch: Branch label for the fork. Auto-generated if not provided.
definition: The FlowDefinition to restore a definition-built flow
from, as in :meth:`from_checkpoint`.
Returns:
A Flow instance on the new branch. Call kickoff() to run.
"""
flow = cls.from_checkpoint(config)
flow = cls.from_checkpoint(config, definition=definition)
state = crewai_event_bus.runtime_state
if state is None:
raise RuntimeError(
@@ -992,7 +1008,8 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
_usage_metrics_lock: threading.Lock = PrivateAttr(default_factory=threading.Lock)
_flow_match_id: str | None = PrivateAttr(default=None)
_usage_aggregation_handler: Callable[..., Any] | None = PrivateAttr(default=None)
_persist_backends: dict[int, FlowPersistence] = PrivateAttr(default_factory=dict)
_persist_backends: dict[str, FlowPersistence] = PrivateAttr(default_factory=dict)
_persistence_from_definition: bool = PrivateAttr(default=False)
def __class_getitem__(cls: type[Flow[T]], item: type[T]) -> type[Flow[T]]: # type: ignore[override]
class _FlowGeneric(cls): # type: ignore[valid-type,misc]
@@ -1038,6 +1055,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
and flow_persist.enabled
):
self.persistence = self._resolve_persist_backend(flow_persist)
self._persistence_from_definition = True
if self._state is None:
self._state = self._create_initial_state()
@@ -1091,13 +1109,20 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
def _class_bound_methods(self) -> dict[FlowMethodName, Callable[..., Any]]:
methods: dict[FlowMethodName, Callable[..., Any]] = {}
missing: list[str] = []
for method_name in self._definition.methods:
method = getattr(self, method_name, None)
if method is None:
missing.append(method_name)
continue
if not hasattr(method, "__self__"):
method = method.__get__(self, type(self))
methods[FlowMethodName(method_name)] = method
if missing:
raise ValueError(
f"Flow {self._definition.name!r} definition declares methods its "
"class does not provide: " + ", ".join(missing)
)
return methods
def _attach_usage_aggregation_listener(self) -> None:
@@ -1408,6 +1433,8 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
cls,
flow_id: str,
persistence: FlowPersistence | None = None,
*,
definition: FlowDefinition | None = None,
**kwargs: Any,
) -> Flow[Any]:
"""Create a Flow instance from a pending feedback state.
@@ -1422,6 +1449,9 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
If not provided, uses ``default_flow_persistence()`` (the
registered factory when present, else the built-in SQLite
fallback).
definition: The FlowDefinition to restore a definition-built flow
(one created via ``Flow.from_definition``) from. Subclasses
carry their own definition and don't need this.
**kwargs: Additional keyword arguments passed to the Flow constructor
Returns:
@@ -1453,7 +1483,11 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
state_data, pending_context = loaded
instance = cls(persistence=persistence, **kwargs)
instance = (
cls.from_definition(definition, persistence=persistence, **kwargs)
if definition is not None
else cls(persistence=persistence, **kwargs)
)
instance._initialize_state(state_data)
instance._pending_feedback_context = pending_context
instance._is_execution_resuming = True
@@ -1605,59 +1639,26 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
"No pending feedback context. Use from_pending() to restore a paused flow."
)
emit = context.emit
default_outcome = context.default_outcome
# Try to get the live LLM from the re-imported decorator first.
# This preserves the fully-configured object (credentials, safety_settings, etc.)
# for same-process resume. For cross-process resume, fall back to the
# serialized context.llm which is now a dict with full config (or a legacy string).
from crewai.flow.human_feedback import _deserialize_llm_from_context
llm = None
method = self._methods.get(FlowMethodName(context.method_name))
if method is not None:
live_llm = getattr(method, "_human_feedback_llm", None)
if live_llm is not None:
from crewai.llms.base_llm import BaseLLM as BaseLLMClass
if isinstance(live_llm, BaseLLMClass):
llm = live_llm
if llm is None:
llm = _deserialize_llm_from_context(context.llm)
collapsed_outcome: str | None = None
if not feedback.strip():
if default_outcome:
collapsed_outcome = default_outcome
elif emit:
collapsed_outcome = emit[0]
elif emit:
if llm is not None:
collapsed_outcome = self._collapse_to_outcome(
feedback=feedback,
outcomes=emit,
llm=llm,
)
else:
collapsed_outcome = emit[0]
result = HumanFeedbackResult(
output=context.method_output,
feedback=feedback,
outcome=collapsed_outcome,
timestamp=datetime.now(),
# The serialized context carries the full LLM config (a dict, or a
# legacy model string) — the single source for cross- and same-process
# resume.
result = await self._finalize_human_feedback(
method_name=context.method_name,
method_output=context.method_output,
raw_feedback=feedback,
emit=emit,
default_outcome=context.default_outcome,
llm=context.llm,
metadata=context.metadata,
)
self.human_feedback_history.append(result)
self.last_human_feedback = result
collapsed_outcome = result.outcome
self._completed_methods.add(FlowMethodName(context.method_name))
self._persist_method_completion(FlowMethodName(context.method_name))
await asyncio.to_thread(
self._persist_method_completion, FlowMethodName(context.method_name)
)
self._pending_feedback_context = None
@@ -1680,10 +1681,6 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
# This allows methods to re-execute in loops (e.g., implement_changes → suggest_changes → implement_changes)
self._is_execution_resuming = False
if emit and collapsed_outcome is None:
collapsed_outcome = default_outcome or emit[0]
result.outcome = collapsed_outcome
try:
if emit and collapsed_outcome:
self._method_outputs.append(collapsed_outcome)
@@ -1698,8 +1695,6 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
)
except Exception as e:
# Check if flow was paused again for human feedback (loop case)
from crewai.flow.async_feedback.types import HumanFeedbackPending
if isinstance(e, HumanFeedbackPending):
self._pending_feedback_context = e.context
@@ -1881,11 +1876,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
"Flow %r declares state of unknown type; falling back to dict state",
self._definition.name,
)
dict_state: dict[str, Any] = (
dict(state_definition.default)
if isinstance(state_definition.default, dict)
else {}
)
dict_state: dict[str, Any] = dict(state_definition.default or {})
if "id" not in dict_state:
dict_state["id"] = str(uuid4())
return dict_state
@@ -2165,8 +2156,6 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
result_holder.append(result)
except Exception as e:
# HumanFeedbackPending is expected control flow, not an error
from crewai.flow.async_feedback.types import HumanFeedbackPending
if isinstance(e, HumanFeedbackPending):
result_holder.append(e)
else:
@@ -2267,8 +2256,6 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
result_holder.append(result)
except Exception as e:
# HumanFeedbackPending is expected control flow, not an error
from crewai.flow.async_feedback.types import HumanFeedbackPending
if isinstance(e, HumanFeedbackPending):
result_holder.append(e)
else:
@@ -2500,8 +2487,6 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
await asyncio.gather(*tasks)
except Exception as e:
# Check if flow was paused for human feedback
from crewai.flow.async_feedback.types import HumanFeedbackPending
if isinstance(e, HumanFeedbackPending):
# Auto-save pending feedback (create default persistence if needed)
if self.persistence is None:
@@ -2838,6 +2823,12 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
if asyncio.iscoroutine(result):
result = await result
method_definition = self._definition.methods[str(method_name)]
if method_definition.human_feedback is not None:
result = await self._run_human_feedback_step(
method_name, method_definition.human_feedback, result
)
self._method_outputs.append(result)
# For @human_feedback methods with emit, the result is the collapsed outcome
@@ -2856,7 +2847,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
self._completed_methods.add(method_name)
self._persist_method_completion(method_name)
await asyncio.to_thread(self._persist_method_completion, method_name)
finished_event_id: str | None = None
if not self.suppress_flow_events:
@@ -2875,8 +2866,6 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
return result, finished_event_id
except Exception as e:
# Check if this is a HumanFeedbackPending exception (paused, not failed)
from crewai.flow.async_feedback.types import HumanFeedbackPending
if isinstance(e, HumanFeedbackPending):
e.context.method_name = method_name
@@ -2917,10 +2906,10 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
raise e
def _persist_method_completion(self, method_name: FlowMethodName) -> None:
method_definition = self._definition.methods.get(method_name)
method_definition = self._definition.methods[str(method_name)]
persist_definition = (
method_definition.persist
if method_definition is not None and method_definition.persist is not None
if method_definition.persist is not None
else self._definition.persist
)
if persist_definition is None or not persist_definition.enabled:
@@ -2928,20 +2917,27 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
from crewai.flow.persistence.decorators import PersistenceDecorator
backend = self.persistence or self._persist_backend_for(persist_definition)
# Backend precedence: user-supplied instance persistence, then the
# method's own declared backend, then the flow-level backend (folded
# into self.persistence from the definition), then the default.
method_declares_backend = (
method_definition.persist is not None
and method_definition.persist.persistence is not None
)
if method_declares_backend and self._persistence_from_definition:
backend = self._persist_backends.get(str(method_name))
if backend is None:
backend = self._resolve_persist_backend(persist_definition)
self._persist_backends[str(method_name)] = backend
else:
backend = self.persistence or self._persist_backends.get(str(method_name))
if backend is None:
backend = self._resolve_persist_backend(persist_definition)
self._persist_backends[str(method_name)] = backend
PersistenceDecorator.persist_state(
self, method_name, backend, verbose=persist_definition.verbose
)
def _persist_backend_for(
self, persist_definition: FlowPersistenceDefinition
) -> FlowPersistence:
cached = self._persist_backends.get(id(persist_definition))
if cached is None:
cached = self._resolve_persist_backend(persist_definition)
self._persist_backends[id(persist_definition)] = cached
return cached
def _resolve_persist_backend(
self, persist_definition: FlowPersistenceDefinition
) -> FlowPersistence:
@@ -3231,8 +3227,6 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
except Exception as e:
# Don't log HumanFeedbackPending as an error - it's expected control flow
from crewai.flow.async_feedback.types import HumanFeedbackPending
if not isinstance(e, HumanFeedbackPending):
if not getattr(e, "_flow_listener_logged", False):
logger.error(f"Error executing listener {listener_name}: {e}")
@@ -3342,7 +3336,6 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
ThreadPoolExecutor,
TimeoutError as FuturesTimeoutError,
)
from datetime import datetime
from crewai.events.types.flow_events import (
FlowInputReceivedEvent,
@@ -3435,6 +3428,157 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
return response
async def _run_human_feedback_step(
self,
method_name: FlowMethodName,
feedback_definition: FlowHumanFeedbackDefinition,
method_output: Any,
) -> Any:
llm = feedback_definition.llm
llm_instance = (
_deserialize_llm_from_context(llm) if isinstance(llm, (str, dict)) else llm
)
emit = feedback_definition.emit
default_outcome = feedback_definition.default_outcome
metadata = feedback_definition.metadata
learn = feedback_definition.learn and self.memory is not None
if learn:
method_output = await asyncio.to_thread(
_pre_review_with_lessons,
self,
method_name,
method_output,
llm=llm_instance,
learn_source=feedback_definition.learn_source,
learn_strict=feedback_definition.learn_strict,
)
provider = self._resolve_feedback_provider(feedback_definition)
if provider is not None:
context = PendingFeedbackContext(
flow_id=self.flow_id or "unknown",
flow_class=f"{type(self).__module__}.{type(self).__name__}",
method_name=method_name,
method_output=method_output,
message=feedback_definition.message,
emit=list(emit) if emit else None,
default_outcome=default_outcome,
metadata=metadata or {},
llm=llm
if llm is None or isinstance(llm, (str, dict))
else _serialize_llm_for_context(llm),
)
feedback_value = await asyncio.to_thread(
provider.request_feedback, context, self
)
if asyncio.iscoroutine(feedback_value):
feedback_value = await feedback_value
raw_feedback = str(feedback_value)
else:
raw_feedback = await asyncio.to_thread(
self._request_human_feedback,
message=feedback_definition.message,
output=method_output,
metadata=metadata,
emit=emit,
)
result = await self._finalize_human_feedback(
method_name=method_name,
method_output=method_output,
raw_feedback=raw_feedback,
emit=emit,
default_outcome=default_outcome,
llm=llm_instance,
metadata=metadata or {},
)
if learn and raw_feedback.strip():
await asyncio.to_thread(
_distill_and_store_lessons,
self,
method_name,
method_output,
raw_feedback,
llm=llm_instance,
learn_source=feedback_definition.learn_source,
learn_strict=feedback_definition.learn_strict,
)
if emit:
# Stash the real method output: the collapsed outcome routes
# listeners, but the flow's final result stays the method's
# actual return value.
self._human_feedback_method_outputs[method_name] = method_output
return result.outcome
return result
async def _finalize_human_feedback(
self,
*,
method_name: str,
method_output: Any,
raw_feedback: str,
emit: list[str] | None,
default_outcome: str | None,
llm: Any,
metadata: dict[str, Any],
) -> HumanFeedbackResult:
collapsed_outcome: str | None = None
if not raw_feedback.strip():
if default_outcome:
collapsed_outcome = default_outcome
elif emit:
collapsed_outcome = emit[0]
elif emit:
collapse_llm = (
_deserialize_llm_from_context(llm)
if isinstance(llm, (str, dict))
else llm
)
if collapse_llm is not None:
collapsed_outcome = await asyncio.to_thread(
self._collapse_to_outcome,
feedback=raw_feedback,
outcomes=emit,
llm=collapse_llm,
)
else:
collapsed_outcome = emit[0]
if emit and collapsed_outcome is None:
collapsed_outcome = default_outcome or emit[0]
result = HumanFeedbackResult(
output=method_output,
feedback=raw_feedback,
outcome=collapsed_outcome,
method_name=method_name,
metadata=metadata,
)
self.human_feedback_history.append(result)
self.last_human_feedback = result
return result
def _resolve_feedback_provider(
self, feedback_definition: FlowHumanFeedbackDefinition
) -> Any:
provider = feedback_definition.provider
if isinstance(provider, str):
provider = resolve_instance_ref(provider, field="human_feedback.provider")
if provider is None:
from crewai.flow.flow_config import flow_config
provider = flow_config.hitl_provider
if provider is not None and not isinstance(provider, HumanFeedbackProvider):
raise ValueError(
f"human_feedback.provider {feedback_definition.provider!r} for flow "
f"{self._definition.name!r} does not implement the "
"HumanFeedbackProvider protocol (missing request_feedback)."
)
return provider
def _request_human_feedback(
self,
message: str,

View File

@@ -1,48 +0,0 @@
from __future__ import annotations
from collections.abc import Callable
import importlib
from operator import attrgetter
from typing import TYPE_CHECKING, Any, cast
from crewai.flow.flow_definition import FlowActionDefinition
if TYPE_CHECKING:
from crewai.flow.runtime import Flow
class InvalidActionRefError(ValueError):
def __init__(self, ref: str) -> None:
super().__init__(f"invalid callable {ref!r}; expected 'module:qualname'")
def import_ref(ref: str) -> Any:
"""Import the object a `module:qualname` reference points to."""
module_name, _, qualname = ref.partition(":")
if "<" in ref or not module_name or not qualname:
raise InvalidActionRefError(ref)
try:
return attrgetter(qualname)(importlib.import_module(module_name))
except (ImportError, AttributeError) as e:
raise InvalidActionRefError(ref) from e
def _resolve_code_action(
flow: Flow[Any], action: FlowActionDefinition
) -> Callable[..., Any]:
ref = action.ref
target = import_ref(ref)
if not callable(target):
raise InvalidActionRefError(ref)
handler = cast(Callable[..., Any], target)
if getattr(handler, "__self__", None) is None:
handler = handler.__get__(flow, type(flow))
return handler
def resolve_action(flow: Flow[Any], action: FlowActionDefinition) -> Callable[..., Any]:
"""Turn one `do:` action into the callable the flow runs for that node."""
if action.call == "code":
return _resolve_code_action(flow, action)
raise ValueError(f"unknown call type {action.call!r}")

View File

@@ -0,0 +1,70 @@
"""Resolution of FlowDefinition refs (``module:qualname``) into live objects.
Every ref-shaped value in a definition — ``do`` actions, ``state.ref``,
``config.input_provider``, ``human_feedback.provider`` — resolves through
:func:`resolve_ref`. Failures are loud and name the field and the ref.
"""
from __future__ import annotations
from collections.abc import Callable
import importlib
import inspect
from operator import attrgetter
from typing import TYPE_CHECKING, Any, cast
from crewai.flow.flow_definition import FlowActionDefinition
if TYPE_CHECKING:
from crewai.flow.runtime import Flow
class InvalidRefError(ValueError):
"""A definition ref that cannot be resolved to a live object."""
def resolve_ref(ref: str, *, field: str) -> Any:
"""Import the object a definition's `module:qualname` ref points to."""
module_name, _, qualname = ref.partition(":")
if "<" in ref or not module_name or not qualname:
raise InvalidRefError(
f"invalid {field} ref {ref!r}; expected 'module:qualname'"
)
try:
return attrgetter(qualname)(importlib.import_module(module_name))
except (ImportError, AttributeError) as e:
raise InvalidRefError(f"unresolvable {field} ref {ref!r}") from e
def resolve_instance_ref(ref: str, *, field: str) -> Any:
"""Resolve a ref, auto-instantiating a no-arg class into an instance."""
target = resolve_ref(ref, field=field)
if not inspect.isclass(target):
return target
try:
return target()
except Exception as e:
raise InvalidRefError(
f"cannot instantiate {field} ref {ref!r} without arguments: {e}"
) from e
def _resolve_code_action(
flow: Flow[Any], action: FlowActionDefinition
) -> Callable[..., Any]:
ref = action.ref
target = resolve_ref(ref, field="do")
if not callable(target):
raise InvalidRefError(f"invalid do ref {ref!r}; object is not callable")
handler = cast(Callable[..., Any], target)
if getattr(handler, "__self__", None) is None:
handler = handler.__get__(flow, type(flow))
return handler
def resolve_action(flow: Flow[Any], action: FlowActionDefinition) -> Callable[..., Any]:
"""Turn one `do:` action into the callable the flow runs for that node."""
if action.call == "code":
return _resolve_code_action(flow, action)
raise ValueError(f"unknown call type {action.call!r}")

View File

@@ -1168,132 +1168,13 @@ class TestAsyncHumanFeedbackEdgeCases:
class TestLiveLLMPreservationOnResume:
"""Tests for preserving the full LLM config across HITL resume."""
def test_human_feedback_llm_attribute_set_on_wrapper_with_basellm(self) -> None:
"""Test that _human_feedback_llm is set on the wrapper when llm is a BaseLLM instance."""
from crewai.llms.base_llm import BaseLLM
mock_llm = MagicMock(spec=BaseLLM)
mock_llm.model = "gemini/gemini-3-flash"
class TestFlow(Flow):
@start()
@human_feedback(
message="Review:",
emit=["approved", "rejected"],
llm=mock_llm,
)
def review(self):
return "content"
flow = TestFlow()
method = flow._methods.get("review")
assert method is not None
assert hasattr(method, "_human_feedback_llm")
assert method._human_feedback_llm is mock_llm
def test_human_feedback_llm_attribute_set_on_wrapper_with_string(self) -> None:
"""Test that _human_feedback_llm is set on the wrapper even when llm is a string."""
class TestFlow(Flow):
@start()
@human_feedback(
message="Review:",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
)
def review(self):
return "content"
flow = TestFlow()
method = flow._methods.get("review")
assert method is not None
assert hasattr(method, "_human_feedback_llm")
assert method._human_feedback_llm == "gpt-4o-mini"
class TestResumeLLMFromSerializedContext:
"""Resume rebuilds the collapse LLM from the serialized context alone."""
@patch("crewai.flow.runtime.crewai_event_bus.emit")
def test_resume_async_uses_live_basellm_over_serialized_string(
def test_resume_builds_llm_from_serialized_context(
self, mock_emit: MagicMock
) -> None:
"""Test that resume_async uses the live BaseLLM from decorator instead of serialized string.
This is the main bug fix: when a flow resumes, it should use the fully-configured
LLM from the re-imported decorator (with credentials, project, etc.) instead of
creating a new LLM from just the model string.
"""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = os.path.join(tmpdir, "test_flows.db")
persistence = SQLiteFlowPersistence(db_path)
from crewai.llms.base_llm import BaseLLM
# Create a mock BaseLLM with full config (simulating Gemini with service account)
live_llm = MagicMock(spec=BaseLLM)
live_llm.model = "gemini/gemini-3-flash"
class TestFlow(Flow):
result_path: str = ""
@start()
@human_feedback(
message="Approve?",
emit=["approved", "rejected"],
llm=live_llm,
)
def review(self):
return "content"
@listen("approved")
def handle_approved(self):
self.result_path = "approved"
return "Approved!"
context = PendingFeedbackContext(
flow_id="live-llm-test",
flow_class="TestFlow",
method_name="review",
method_output="content",
message="Approve?",
emit=["approved", "rejected"],
llm="gemini/gemini-3-flash", # Serialized string, NOT the live object
)
persistence.save_pending_feedback(
flow_uuid="live-llm-test",
context=context,
state_data={"id": "live-llm-test"},
)
flow = TestFlow.from_pending("live-llm-test", persistence)
captured_llm = []
def capture_llm(feedback, outcomes, llm):
captured_llm.append(llm)
return "approved"
with patch.object(flow, "_collapse_to_outcome", side_effect=capture_llm):
flow.resume("looks good!")
# NOT the serialized string. The live_llm was captured at class definition
# time and stored on the method wrapper as _human_feedback_llm.
assert len(captured_llm) == 1
# (which is stored on the method's _human_feedback_llm attribute)
method = flow._methods.get("review")
assert method is not None
assert captured_llm[0] is method._human_feedback_llm
# And verify it's a BaseLLM instance, not a string
assert isinstance(captured_llm[0], BaseLLM)
@patch("crewai.flow.runtime.crewai_event_bus.emit")
def test_resume_async_falls_back_to_serialized_string_when_no_human_feedback_llm(
self, mock_emit: MagicMock
) -> None:
"""Test that resume_async falls back to context.llm when _human_feedback_llm is not available.
This ensures backward compatibility with flows that were paused before this fix.
"""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = os.path.join(tmpdir, "test_flows.db")
persistence = SQLiteFlowPersistence(db_path)
@@ -1325,11 +1206,6 @@ class TestLiveLLMPreservationOnResume:
flow = TestFlow.from_pending("fallback-test", persistence)
# Remove _human_feedback_llm to simulate old decorator without this attribute
method = flow._methods.get("review")
if hasattr(method, "_human_feedback_llm"):
delattr(method, "_human_feedback_llm")
captured_llm = []
def capture_llm(feedback, outcomes, llm):
@@ -1343,85 +1219,3 @@ class TestLiveLLMPreservationOnResume:
from crewai.llms.base_llm import BaseLLM as BaseLLMClass
assert isinstance(captured_llm[0], BaseLLMClass)
assert captured_llm[0].model == "gpt-4o-mini"
@patch("crewai.flow.runtime.crewai_event_bus.emit")
def test_resume_async_uses_string_from_context_when_human_feedback_llm_is_string(
self, mock_emit: MagicMock
) -> None:
"""Test that when _human_feedback_llm is a string (not BaseLLM), we still use context.llm.
String LLM values offer no benefit over the serialized context.llm,
so we don't prefer them.
"""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = os.path.join(tmpdir, "test_flows.db")
persistence = SQLiteFlowPersistence(db_path)
class TestFlow(Flow):
@start()
@human_feedback(
message="Approve?",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
)
def review(self):
return "content"
context = PendingFeedbackContext(
flow_id="string-llm-test",
flow_class="TestFlow",
method_name="review",
method_output="content",
message="Approve?",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
)
persistence.save_pending_feedback(
flow_uuid="string-llm-test",
context=context,
state_data={"id": "string-llm-test"},
)
flow = TestFlow.from_pending("string-llm-test", persistence)
method = flow._methods.get("review")
assert method._human_feedback_llm == "gpt-4o-mini"
captured_llm = []
def capture_llm(feedback, outcomes, llm):
captured_llm.append(llm)
return "approved"
with patch.object(flow, "_collapse_to_outcome", side_effect=capture_llm):
flow.resume("looks good!")
# _human_feedback_llm is a string, so resume deserializes context.llm into an LLM instance
assert len(captured_llm) == 1
from crewai.llms.base_llm import BaseLLM as BaseLLMClass
assert isinstance(captured_llm[0], BaseLLMClass)
assert captured_llm[0].model == "gpt-4o-mini"
def test_human_feedback_llm_set_for_async_wrapper(self) -> None:
"""Test that _human_feedback_llm is set on async wrapper functions."""
import asyncio
from crewai.llms.base_llm import BaseLLM
mock_llm = MagicMock(spec=BaseLLM)
mock_llm.model = "gemini/gemini-3-flash"
class TestFlow(Flow):
@start()
@human_feedback(
message="Review:",
emit=["approved", "rejected"],
llm=mock_llm,
)
async def async_review(self):
return "content"
flow = TestFlow()
method = flow._methods.get("async_review")
assert method is not None
assert hasattr(method, "_human_feedback_llm")
assert method._human_feedback_llm is mock_llm

View File

@@ -1,7 +1,9 @@
from __future__ import annotations
from collections import defaultdict
from pathlib import Path
from typing import Any, ClassVar
from unittest.mock import patch
import pytest
from pydantic import ValidationError
@@ -15,8 +17,9 @@ from crewai.events.types.flow_events import (
MethodExecutionStartedEvent,
)
from crewai.flow import Flow, and_, human_feedback, listen, or_, router, start
from crewai.flow.async_feedback import PendingFeedbackContext
from crewai.flow.async_feedback import HumanFeedbackPending, PendingFeedbackContext
from crewai.flow.flow import FlowState
from crewai.flow.flow_config import flow_config
from crewai.flow.flow_definition import FlowConfigDefinition, FlowDefinition
from crewai.flow.persistence import persist
from crewai.flow.persistence.base import FlowPersistence
@@ -1012,3 +1015,366 @@ def test_resume_synthetic_completion_persists():
assert result == "done"
assert _saved_methods("resume-synthetic") == ["generate"]
class ReviewFlow(Flow):
@start()
@human_feedback(
message="Review the draft:",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
default_outcome="rejected",
)
def draft(self):
return "draft-content"
@listen("approved")
def publish(self):
return f"published:{self.last_human_feedback.feedback}"
@listen("rejected")
def discard(self):
return "discarded"
REVIEW_YAML = f"""
schema: crewai.flow/v1
name: ReviewFlow
methods:
draft:
do:
ref: {__name__}:ReviewFlow.draft
start: true
human_feedback:
message: "Review the draft:"
emit: [approved, rejected]
llm: gpt-4o-mini
default_outcome: rejected
publish:
do:
ref: {__name__}:ReviewFlow.publish
listen: approved
discard:
do:
ref: {__name__}:ReviewFlow.discard
listen: rejected
"""
def _pending_generate(flow):
return "content"
def _pending_process(flow, result):
return f"resumed:{result.feedback}"
class PausingProvider:
def request_feedback(self, context, flow):
raise HumanFeedbackPending(context=context)
PENDING_REVIEW_YAML = f"""
schema: crewai.flow/v1
name: PendingReviewFlow
persist:
enabled: true
persistence:
persistence_type: DefinitionStoreBackend
store: hitl-pending
methods:
generate:
do:
ref: {__name__}:_pending_generate
start: true
human_feedback:
message: "Review:"
provider: {__name__}:PausingProvider
process:
do:
ref: {__name__}:_pending_process
listen: generate
"""
def test_human_feedback_from_yaml_default_outcome_routes():
flow = Flow.from_definition(FlowDefinition.from_yaml(REVIEW_YAML))
with patch.object(flow, "_request_human_feedback", return_value="") as request:
result = flow.kickoff()
assert result == "discarded"
assert request.call_count == 1
assert flow.last_human_feedback.outcome == "rejected"
assert flow.last_human_feedback.output == "draft-content"
def test_human_feedback_from_yaml_collapses_and_routes():
flow = Flow.from_definition(FlowDefinition.from_yaml(REVIEW_YAML))
with (
patch.object(flow, "_request_human_feedback", return_value="ship it"),
patch.object(flow, "_collapse_to_outcome", return_value="approved"),
):
result = flow.kickoff()
assert result == "published:ship it"
assert [r.outcome for r in flow.human_feedback_history] == ["approved"]
def test_round_trip_human_feedback_equivalence():
class_flow = ReviewFlow()
with patch.object(class_flow, "_request_human_feedback", return_value=""):
class_result = class_flow.kickoff()
definition = FlowDefinition.from_yaml(ReviewFlow.flow_definition().to_yaml())
twin = Flow.from_definition(definition)
with patch.object(twin, "_request_human_feedback", return_value=""):
twin_result = twin.kickoff()
assert twin_result == class_result == "discarded"
assert (
twin.last_human_feedback.outcome
== class_flow.last_human_feedback.outcome
== "rejected"
)
def test_human_feedback_pending_and_resume_from_yaml():
definition = FlowDefinition.from_yaml(PENDING_REVIEW_YAML)
flow = Flow.from_definition(definition)
pending = flow.kickoff()
assert isinstance(pending, HumanFeedbackPending)
flow_id = pending.context.flow_id
assert flow_id in DefinitionStoreBackend.pending
resumed = Flow.from_pending(
flow_id,
DefinitionStoreBackend(store="hitl-pending"),
definition=definition,
)
result = resumed.resume("looks good")
assert result == "resumed:looks good"
assert resumed.last_human_feedback.feedback == "looks good"
assert flow_id not in DefinitionStoreBackend.pending
def test_flow_config_provider_fallback_from_yaml():
yaml_str = f"""
schema: crewai.flow/v1
name: ConfigProviderFlow
methods:
generate:
do:
ref: {__name__}:_pending_generate
start: true
human_feedback:
message: "Review:"
process:
do:
ref: {__name__}:_pending_process
listen: generate
"""
class RecordingProvider:
def __init__(self):
self.requests = []
def request_feedback(self, context, flow):
self.requests.append(context.method_name)
return "from-config"
provider = RecordingProvider()
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
previous = flow_config.hitl_provider
flow_config.hitl_provider = provider
try:
result = flow.kickoff()
finally:
flow_config.hitl_provider = previous
assert result == "resumed:from-config"
assert provider.requests == ["generate"]
# --- PR 7: one resolution story, inert decorator attrs, restore paths ---
def test_runtime_package_reads_no_decorator_attrs():
import crewai.flow.runtime as flow_runtime
runtime_dir = Path(flow_runtime.__file__).parent
forbidden = (
"__human_feedback_config__",
"__flow_persistence_config__",
"__flow_method_definition__",
"_human_feedback_llm",
)
offenders = [
f"{path.name}: {attr}"
for path in sorted(runtime_dir.rglob("*.py"))
for attr in forbidden
if attr in path.read_text(encoding="utf-8")
]
assert offenders == []
def test_stamped_decorator_attrs_are_inert_at_runtime():
class StampFreeFlow(Flow):
@start()
@persist(DefinitionStoreBackend(store="stamp-free"))
def first(self):
return "one"
@listen(first)
def second(self, result):
return f"{result}-two"
StampFreeFlow.flow_definition()
stamped = (
"__flow_method_definition__",
"__flow_persistence_config__",
"__human_feedback_config__",
)
for name in ("first", "second"):
wrapper = StampFreeFlow.__dict__[name]
for attr in stamped:
if attr in wrapper.__dict__:
delattr(wrapper, attr)
result = StampFreeFlow().kickoff()
assert result == "one-two"
assert _saved_methods("stamp-free") == ["first"]
def test_class_level_persist_without_instance_kwarg_saves_and_restores():
before = len(DefinitionStoreBackend.saves["class-decorator"])
flow = ClassPersistedFlow()
flow.kickoff()
assert _saved_methods("class-decorator")[before:] == ["first", "second"]
assert flow.state["count"] == 2
resumed = ClassPersistedFlow()
resumed.kickoff(inputs={"id": flow.state["id"]})
assert resumed.state["count"] == 4
def test_input_provider_bad_ref_names_field_and_ref():
with pytest.raises(ValidationError, match="unresolvable input_provider ref"):
Flow(input_provider="missing_module_xyz:Provider")
class _NeedsArgsProvider:
def __init__(self, channel):
self.channel = channel
def request_feedback(self, context, flow):
return "ok"
def test_provider_ref_requiring_ctor_args_fails_loudly():
yaml_str = f"""
schema: crewai.flow/v1
name: BadProviderFlow
methods:
generate:
do:
ref: {__name__}:_pending_generate
start: true
human_feedback:
message: "Review:"
provider: {__name__}:_NeedsArgsProvider
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
with pytest.raises(
ValueError, match="cannot instantiate human_feedback.provider ref"
):
flow.kickoff()
def test_unresolvable_provider_ref_names_field_and_ref():
yaml_str = f"""
schema: crewai.flow/v1
name: BadProviderFlow
methods:
generate:
do:
ref: {__name__}:_pending_generate
start: true
human_feedback:
message: "Review:"
provider: missing_module_xyz:Provider
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
with pytest.raises(
ValueError, match="unresolvable human_feedback.provider ref"
):
flow.kickoff()
def _checkpoint_chain_flow(tmp_path):
from crewai.state.provider.json_provider import JsonProvider
from crewai.state.runtime import RuntimeState
definition = FlowDefinition.from_yaml(CHAIN_YAML)
flow = Flow.from_definition(definition)
result = flow.kickoff()
assert result == "confirmed:True"
state = RuntimeState(root=[flow])
state._provider = JsonProvider()
location = state.checkpoint(str(tmp_path))
return definition, flow, CheckpointConfig(restore_from=location)
def test_from_checkpoint_with_definition_restores_yaml_flow(tmp_path):
definition, flow, config = _checkpoint_chain_flow(tmp_path)
restored = Flow.from_checkpoint(config, definition=definition)
assert restored.state["confirmed"] is True
assert restored.state["id"] == flow.state["id"]
assert restored.kickoff() == "confirmed:True"
def test_fork_with_definition_branches_yaml_flow(tmp_path):
definition, flow, config = _checkpoint_chain_flow(tmp_path)
forked = Flow.fork(config, branch="alt", definition=definition)
assert forked.state["id"] != flow.state["id"]
assert forked.kickoff() == "confirmed:True"
def test_non_dict_state_default_rejected_by_contract():
yaml_str = """
schema: crewai.flow/v1
name: BadStateFlow
state:
type: dict
default: 42
methods: {}
"""
with pytest.raises(ValidationError, match="default"):
FlowDefinition.from_yaml(yaml_str)
def test_definition_method_missing_from_class_fails_loudly():
class VanishingFlow(Flow):
@start()
def begin(self):
return "one"
VanishingFlow.flow_definition()
del VanishingFlow.begin
with pytest.raises(ValueError, match="does not provide: begin"):
VanishingFlow()

View File

@@ -92,8 +92,8 @@ class TestHumanFeedbackValidation:
assert hasattr(test_method, "__human_feedback_config__")
assert not hasattr(test_method, "__is_router__")
def test_persist_preserves_human_feedback_llm_attribute(self):
"""Test @persist preserves the live LLM stashed by @human_feedback."""
def test_persist_preserves_human_feedback_config(self):
"""Test @persist preserves the config stamped by @human_feedback."""
llm = object()
@persist()
@@ -105,8 +105,8 @@ class TestHumanFeedbackValidation:
def test_method(self):
return "output"
assert hasattr(test_method, "_human_feedback_llm")
assert test_method._human_feedback_llm is llm
assert hasattr(test_method, "__human_feedback_config__")
assert test_method.__human_feedback_config__.llm is llm
class TestHumanFeedbackConfig:
@@ -481,7 +481,7 @@ class TestHumanFeedbackLearn:
with patch.object(
flow, "_request_human_feedback", return_value="looks good"
):
flow.produce()
flow.kickoff()
# memory.recall and memory.remember_many should NOT be called
flow.memory.recall.assert_not_called()
@@ -516,7 +516,7 @@ class TestHumanFeedbackLearn:
)
MockLLM.return_value = mock_llm
flow.produce()
flow.kickoff()
# remember_many should be called with the distilled lesson
flow.memory.remember_many.assert_called_once()
@@ -570,7 +570,7 @@ class TestHumanFeedbackLearn:
]
MockLLM.return_value = mock_llm
flow.produce()
flow.kickoff()
assert captured_output["shown_to_human"] == "draft with citations added"
# recall was called to find past lessons
@@ -592,7 +592,7 @@ class TestHumanFeedbackLearn:
with patch.object(
flow, "_request_human_feedback", return_value=""
):
flow.produce()
flow.kickoff()
flow.memory.remember_many.assert_not_called()
@@ -645,7 +645,7 @@ class TestHumanFeedbackLearn:
mock_llm.call.side_effect = RuntimeError("simulated pre-review failure")
MockLLM.return_value = mock_llm
flow.produce()
flow.kickoff()
assert captured["shown_to_human"] == "raw draft"
assert any(
@@ -690,7 +690,7 @@ class TestHumanFeedbackLearn:
MockLLM.return_value = mock_llm
with pytest.raises(RuntimeError, match="simulated pre-review failure"):
flow.produce()
flow.kickoff()
def test_distillation_failure_logs_and_does_not_block_flow(self, caplog):
"""Distillation LLM failure logs a warning but does not break the flow."""
@@ -717,7 +717,7 @@ class TestHumanFeedbackLearn:
mock_llm.call.side_effect = RuntimeError("simulated distill failure")
MockLLM.return_value = mock_llm
flow.produce() # must not raise
flow.kickoff() # must not raise
flow.memory.remember_many.assert_not_called()
assert any(

View File

@@ -778,77 +778,11 @@ class TestEdgeCases:
class TestLLMConfigPreservation:
"""Tests that LLM config is preserved through @human_feedback serialization.
PR #4970 introduced _human_feedback_llm stashing so the live LLM object survives
decorator wrapping for same-process resume. The serialization path
(_serialize_llm_for_context / _deserialize_llm_from_context) preserves
config for cross-process resume.
The flow definition keeps the live LLM object for same-process execution.
The serialization path (_serialize_llm_for_context /
_deserialize_llm_from_context) preserves config for cross-process resume.
"""
def test_human_feedback_llm_stashed_on_wrapper_with_llm_instance(self):
"""Test that passing an LLM instance stashes it on the wrapper as _human_feedback_llm."""
from crewai.llm import LLM
llm_instance = LLM(model="gpt-4o-mini", temperature=0.42)
class ConfigFlow(Flow):
@start()
@human_feedback(
message="Review:",
emit=["approved", "rejected"],
llm=llm_instance,
)
def review(self):
return "content"
method = ConfigFlow.review
assert hasattr(method, "_human_feedback_llm"), "_human_feedback_llm not found on wrapper"
assert method._human_feedback_llm is llm_instance, "_human_feedback_llm is not the same object"
def test_human_feedback_llm_preserved_on_listen_method(self):
"""Test that _human_feedback_llm is preserved when @human_feedback is on a @listen method."""
from crewai.llm import LLM
llm_instance = LLM(model="gpt-4o-mini", temperature=0.7)
class ListenConfigFlow(Flow):
@start()
def generate(self):
return "draft"
@listen("generate")
@human_feedback(
message="Review:",
emit=["approved", "rejected"],
llm=llm_instance,
)
def review(self):
return "content"
method = ListenConfigFlow.review
assert hasattr(method, "_human_feedback_llm")
assert method._human_feedback_llm is llm_instance
def test_human_feedback_llm_accessible_on_instance(self):
"""Test that _human_feedback_llm survives Flow instantiation (bound method access)."""
from crewai.llm import LLM
llm_instance = LLM(model="gpt-4o-mini", temperature=0.42)
class InstanceFlow(Flow):
@start()
@human_feedback(
message="Review:",
emit=["approved", "rejected"],
llm=llm_instance,
)
def review(self):
return "content"
flow = InstanceFlow()
instance_method = flow.review
assert hasattr(instance_method, "_human_feedback_llm")
assert instance_method._human_feedback_llm is llm_instance
def test_serialize_llm_preserves_config_fields(self):
"""Test that _serialize_llm_for_context captures temperature, base_url, etc."""
from crewai.flow.human_feedback import _serialize_llm_for_context