Compare commits

...

3 Commits

Author SHA1 Message Date
Vinicius Brasil
7549721ebe Drive human feedback from the flow definition
@human_feedback previously wrapped methods with the full HITL runtime (feedback
request, outcome collapse, learn loop), so flows built from a YAML definition —
which carry no decorated callables — could not pause for or route on human
feedback.
2026-06-11 22:26:28 -07:00
Vinicius Brasil
4ce7cf679f Wire config and persistence from FlowDefinition into the runtime
`from_definition` was silently dropping all config fields; it now passes
`config.model_dump()` so suppress_flow_events, max_method_calls, etc.
actually apply.

Persistence is now engine-driven: `_persist_method_completion` fires
after every method using the definition's persist metadata, so
`@persist` no longer needs to wrap methods — it just stamps them.
2026-06-11 20:07:05 -07:00
Vini Brasil
373dca3d04 Run flows from a definition without a Python subclass (#6104)
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Vulnerability Scan / pip-audit (push) Has been cancelled
* Read flow dispatch from FlowDefinition

Store the definition in a `_definition` PrivateAttr at post-init and
convert the dispatch helpers (`_start_method_names`, `_listener_methods`,
`_start_condition`, `_listen_condition`, `_is_router`) from classmethods
to instance methods that read it. Event names now fall back to
`self._definition.name` instead of `self.__class__.__name__`.

Behavior is identical for decorator subclasses, but the engine no longer
assumes the definition comes from the class. This is the seam for
`Flow.from_definition`, where an instance runs a definition that was
loaded rather than built from a Python subclass.

* Add Flow.from_definition to run flows without a subclass

A FlowDefinition (e.g. loaded from YAML) was only usable for dispatch on
decorator-authored subclasses. Now each method definition records an
importable `module:qualname` handler ref, and `Flow.from_definition`
resolves and binds those handlers to build a runnable flow directly.

* Build flow state from FlowDefinition

Definition-driven flows previously always started with a bare dict
state.

* Replace handler string with structured FlowActionDefinition

`handler: str | None` was optional and opaque — missing handlers only
surfaced at kickoff time. `do: FlowActionDefinition` is required, so
Pydantic rejects invalid definitions at parse time.

The `call: "code"` discriminator prepares the schema for future
non-Python action types (e.g. MCP tool, crew) without touching
`FlowMethodDefinition`. Resolution logic is extracted to
`runtime/_action_resolvers.py` to keep the dispatch point isolated.

* Fix conversational start router missing required do field

FlowMethodDefinition.do became required when the handler string was
replaced with FlowActionDefinition, but _conversation_start_router still
built its fragment without it, breaking crewai import entirely.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>

* Add event scoping to flow test

* Change lib/crewai/tests/test_flow_from_definition.py

---------

Co-authored-by: Claude Fable 5 <noreply@anthropic.com>
2026-06-11 14:18:49 -07:00
20 changed files with 2288 additions and 976 deletions

View File

@@ -47,7 +47,7 @@ from crewai.flow.conversation import (
receive_user_message as _receive_user_message,
)
from crewai.flow.dsl import listen, start
from crewai.flow.dsl._utils import _set_flow_method_definition
from crewai.flow.dsl._utils import _method_action, _set_flow_method_definition
from crewai.flow.flow_definition import FlowMethodDefinition
from crewai.utilities.types import LLMMessage
@@ -78,7 +78,7 @@ def _conversation_start_router(func: Callable[..., Any]) -> Any:
wrapper = start()(func)
_set_flow_method_definition(
cast(Any, wrapper),
FlowMethodDefinition(start=True, router=True),
FlowMethodDefinition(do=_method_action(func), start=True, router=True),
)
return wrapper

View File

@@ -3,11 +3,10 @@ from __future__ import annotations
from collections.abc import Callable, Sequence
from typing import TYPE_CHECKING, Any, TypeVar
from crewai.flow.flow_definition import FlowMethodDefinition
from crewai.flow.human_feedback import (
HumanFeedbackConfig,
HumanFeedbackResult,
_build_human_feedback_runtime_decorator,
_validate_human_feedback_options,
)
@@ -21,32 +20,6 @@ F = TypeVar("F", bound=Callable[..., Any])
__all__ = ["HumanFeedbackResult", "human_feedback"]
def _stamp_human_feedback_metadata(
wrapper: Any,
func: Callable[..., Any],
config: HumanFeedbackConfig,
) -> None:
for attr in [
"__is_flow_method__",
"__flow_persistence_config__",
"__flow_method_definition__",
]:
if hasattr(func, attr):
setattr(wrapper, attr, getattr(func, attr))
wrapper.__human_feedback_config__ = config
wrapper.__is_flow_method__ = True
if config.emit:
fragment = getattr(wrapper, "__flow_method_definition__", None)
if isinstance(fragment, FlowMethodDefinition):
wrapper.__flow_method_definition__ = fragment.model_copy(
update={"router": True, "emit": list(config.emit)}
)
wrapper._human_feedback_llm = config.llm
def human_feedback(
message: str,
emit: Sequence[str] | None = None,
@@ -58,21 +31,18 @@ def human_feedback(
learn_source: str = "hitl",
learn_strict: bool = False,
) -> Callable[[F], F]:
"""Decorator for Flow methods that require human feedback."""
runtime_decorator = _build_human_feedback_runtime_decorator(
message=message,
emit=emit,
llm=llm,
default_outcome=default_outcome,
metadata=metadata,
provider=provider,
learn=learn,
learn_source=learn_source,
learn_strict=learn_strict,
"""Decorator for Flow methods that require human feedback.
The decorator is a pure metadata stamper: it records the feedback
configuration on the method, and the Flow engine collects and routes
feedback after the method completes, driven by the flow's definition.
"""
_validate_human_feedback_options(
emit=emit, llm=llm, default_outcome=default_outcome
)
config = HumanFeedbackConfig(
message=message,
emit=emit,
emit=list(emit) if emit is not None else None,
llm=llm,
default_outcome=default_outcome,
metadata=metadata,
@@ -83,8 +53,7 @@ def human_feedback(
)
def decorator(func: F) -> F:
wrapper = runtime_decorator(func)
_stamp_human_feedback_metadata(wrapper, func, config)
return wrapper
func.__human_feedback_config__ = config # type: ignore[attr-defined]
return func
return decorator

View File

@@ -8,6 +8,7 @@ from crewai.flow.dsl._types import FlowMethodDecorator, FlowTrigger
from crewai.flow.dsl._utils import (
P,
R,
_method_action,
_set_flow_method_definition,
)
from crewai.flow.flow_definition import FlowMethodDefinition
@@ -45,7 +46,11 @@ def listen(condition: FlowTrigger) -> FlowMethodDecorator:
wrapper = ListenMethod(func)
_set_flow_method_definition(
wrapper, FlowMethodDefinition(listen=_to_definition_condition(condition))
wrapper,
FlowMethodDefinition(
do=_method_action(func),
listen=_to_definition_condition(condition),
),
)
return wrapper

View File

@@ -19,6 +19,7 @@ from crewai.flow.dsl._types import FlowMethodDecorator, FlowTrigger
from crewai.flow.dsl._utils import (
P,
R,
_method_action,
_set_flow_method_definition,
)
from crewai.flow.flow_definition import FlowMethodDefinition
@@ -148,6 +149,7 @@ def router(
_set_flow_method_definition(
wrapper,
FlowMethodDefinition(
do=_method_action(func),
listen=_to_definition_condition(condition),
router=True,
emit=router_events or None,

View File

@@ -8,6 +8,7 @@ from crewai.flow.dsl._types import FlowMethodDecorator, FlowTrigger
from crewai.flow.dsl._utils import (
P,
R,
_method_action,
_set_flow_method_definition,
)
from crewai.flow.flow_definition import FlowMethodDefinition
@@ -53,13 +54,17 @@ def start(
def decorator(func: Callable[P, R]) -> StartMethod[P, R]:
wrapper = StartMethod(func)
if condition is not None:
_set_flow_method_definition(
wrapper,
FlowMethodDefinition(start=_to_definition_condition(condition)),
)
else:
_set_flow_method_definition(wrapper, FlowMethodDefinition(start=True))
_set_flow_method_definition(
wrapper,
FlowMethodDefinition(
do=_method_action(func),
start=(
_to_definition_condition(condition)
if condition is not None
else True
),
),
)
return wrapper
return cast(FlowMethodDecorator, decorator)

View File

@@ -8,6 +8,7 @@ from pydantic import BaseModel
from typing_extensions import TypeIs
from crewai.flow.flow_definition import (
FlowActionDefinition,
FlowConfigDefinition,
FlowConversationalDefinition,
FlowConversationalRouterDefinition,
@@ -17,6 +18,7 @@ from crewai.flow.flow_definition import (
FlowMethodDefinition,
FlowPersistenceDefinition,
FlowStateDefinition,
_object_ref,
)
from crewai.flow.flow_wrappers import (
FlowMethod,
@@ -34,15 +36,12 @@ _FLOW_METHOD_METADATA_ATTRS = [
"__flow_method_definition__",
"__flow_persistence_config__",
"__human_feedback_config__",
"_human_feedback_llm",
]
def is_flow_method(obj: Any) -> TypeIs[FlowMethod[Any, Any]]:
"""Check if the object carries Flow method wrapper metadata."""
return hasattr(obj, "__is_flow_method__") or hasattr(
obj, _FLOW_METHOD_DEFINITION_ATTR
)
return hasattr(obj, _FLOW_METHOD_DEFINITION_ATTR)
def _should_include_flow_method(flow_class: type, method: Any) -> bool:
@@ -80,10 +79,13 @@ def _stamp_inherited_conversational_metadata(
for attr in _FLOW_METHOD_METADATA_ATTRS:
if hasattr(inherited, attr):
setattr(method, attr, getattr(inherited, attr))
method.__is_flow_method__ = True
return method
def _method_action(method: Any) -> FlowActionDefinition:
return FlowActionDefinition(ref=f"{method.__module__}:{method.__qualname__}")
def _set_flow_method_definition(
wrapper: FlowMethod[P, R],
definition: FlowMethodDefinition,
@@ -100,13 +102,6 @@ def _get_flow_method_definition(method: Any) -> FlowMethodDefinition | None:
return None
def _object_ref(value: Any) -> str:
target = value if isinstance(value, type) else type(value)
module = getattr(target, "__module__", "")
qualname = getattr(target, "__qualname__", getattr(target, "__name__", ""))
return f"{module}:{qualname}" if module and qualname else repr(value)
def _is_json_serializable(value: Any) -> bool:
try:
json.dumps(value)
@@ -214,16 +209,22 @@ def _build_config_definition(
) -> FlowConfigDefinition:
config_field_names = set(FlowConfigDefinition.model_fields)
field_defaults = {
name: field.default
name: field.get_default(call_default_factory=True)
for name, field in getattr(flow_class, "model_fields", {}).items()
if name in config_field_names
}
values: dict[str, Any] = {}
for field_name, default in field_defaults.items():
value = getattr(flow_class, field_name, default)
values[field_name] = _serialize_static_value(
value, diagnostics, f"config.{field_name}"
)
if field_name == "input_provider":
# A string value is already a ref; only live objects degrade.
values[field_name] = (
value if value is None or isinstance(value, str) else _object_ref(value)
)
else:
values[field_name] = _serialize_static_value(
value, diagnostics, f"config.{field_name}"
)
return FlowConfigDefinition(**values)
@@ -239,38 +240,31 @@ def _build_human_feedback_definition(
return FlowHumanFeedbackDefinition(
message=str(config.message),
emit=[str(value) for value in emit] if emit is not None else None,
llm=_serialize_static_value(
getattr(config, "llm", None), diagnostics, f"{path}.llm"
),
# llm and provider stay live: the engine consumes them in-process and
# the contract degrades them to serializable forms at JSON dump time.
llm=getattr(config, "llm", None),
default_outcome=getattr(config, "default_outcome", None),
metadata=_serialize_static_value(
getattr(config, "metadata", None), diagnostics, f"{path}.metadata"
),
provider=_serialize_static_value(
getattr(config, "provider", None), diagnostics, f"{path}.provider"
),
provider=getattr(config, "provider", None),
learn=bool(getattr(config, "learn", False)),
learn_source=str(getattr(config, "learn_source", "hitl")),
learn_strict=bool(getattr(config, "learn_strict", False)),
)
def _build_persistence_definition(
value: Any,
diagnostics: list[FlowDefinitionDiagnostic],
path: str,
) -> FlowPersistenceDefinition | None:
def _build_persistence_definition(value: Any) -> FlowPersistenceDefinition | None:
config = getattr(value, "__flow_persistence_config__", None)
if config is None:
return None
persistence = getattr(config, "persistence", None)
verbose = bool(getattr(config, "verbose", False))
return FlowPersistenceDefinition(
enabled=True,
verbose=verbose,
persistence=_serialize_static_value(
persistence, diagnostics, f"{path}.persistence"
),
verbose=bool(getattr(config, "verbose", False)),
# The backend stays live: the engine persists through the exact
# instance the user configured; the contract degrades it to a
# serialized config at JSON dump time.
persistence=getattr(config, "persistence", None),
)
@@ -373,9 +367,11 @@ def _build_method_definition(
) -> FlowMethodDefinition:
fragment = _get_flow_method_definition(method)
if fragment is None:
method_definition = FlowMethodDefinition()
method_definition = FlowMethodDefinition(do=_method_action(method))
else:
method_definition = fragment.model_copy(deep=True)
method_definition = fragment.model_copy(
deep=True, update={"do": _method_action(method)}
)
human_feedback = _build_human_feedback_definition(
method, diagnostics, f"{path}.human_feedback"
@@ -386,9 +382,7 @@ def _build_method_definition(
method_definition.router = True
method_definition.emit = None
method_definition.persist = _build_persistence_definition(
method, diagnostics, f"{path}.persist"
)
method_definition.persist = _build_persistence_definition(method)
return method_definition
@@ -472,7 +466,7 @@ def _build_flow_definition_from_class(
description=description,
state=_build_state_definition(flow_class, diagnostics),
config=_build_config_definition(flow_class, diagnostics),
persist=_build_persistence_definition(flow_class, diagnostics, "persist"),
persist=_build_persistence_definition(flow_class),
conversational=_build_conversational_definition(flow_class, diagnostics),
methods=methods,
diagnostics=diagnostics,

View File

@@ -13,7 +13,7 @@ import json
import logging
from typing import Any, Literal as TypingLiteral
from pydantic import BaseModel, ConfigDict, Field
from pydantic import BaseModel, ConfigDict, Field, field_serializer, model_validator
import yaml
from crewai.flow.conversational_definition import (
@@ -27,6 +27,7 @@ logger = logging.getLogger(__name__)
FlowDefinitionCondition = str | dict[str, Any]
__all__ = [
"FlowActionDefinition",
"FlowConfigDefinition",
"FlowConversationalDefinition",
"FlowConversationalRouterDefinition",
@@ -40,6 +41,14 @@ __all__ = [
]
def _object_ref(value: Any) -> str:
"""Format a class or instance as the canonical ``module:qualname`` ref."""
target = value if isinstance(value, type) else type(value)
module = getattr(target, "__module__", "")
qualname = getattr(target, "__qualname__", getattr(target, "__name__", ""))
return f"{module}:{qualname}" if module and qualname else repr(value)
class FlowDefinitionDiagnostic(BaseModel):
"""A non-fatal Flow Definition build or validation diagnostic."""
@@ -52,9 +61,10 @@ class FlowDefinitionDiagnostic(BaseModel):
class FlowStateDefinition(BaseModel):
"""Static description of a Flow state contract."""
type: TypingLiteral["dict", "pydantic", "unknown"] = "dict"
type: TypingLiteral["dict", "pydantic", "json_schema", "unknown"] = "dict"
ref: str | None = None
default: Any = None
json_schema: dict[str, Any] | None = None
default: dict[str, Any] | None = None
class FlowConfigDefinition(BaseModel):
@@ -62,22 +72,50 @@ class FlowConfigDefinition(BaseModel):
tracing: bool | None = None
stream: bool = False
memory: Any = None
input_provider: Any = None
memory: dict[str, Any] | None = None
input_provider: str | None = None
suppress_flow_events: bool = False
max_method_calls: int = 100
defer_trace_finalization: bool = False
checkpoint: bool | dict[str, Any] | None = None
class FlowPersistenceDefinition(BaseModel):
"""Static persistence configuration."""
"""Static persistence configuration.
``persistence`` may hold a live backend when the definition is built from
a decorated class — the engine then persists through the exact instance
the user configured; the JSON/YAML projection degrades it to its
serialized config.
"""
enabled: bool = False
verbose: bool = False
persistence: Any = None
@field_serializer("persistence", when_used="json")
def _serialize_persistence(self, value: Any) -> Any:
if value is None or isinstance(value, dict):
return value
if isinstance(value, BaseModel):
try:
return value.model_dump(mode="json")
except Exception:
logger.warning(
"Persistence backend %s is not fully serializable; "
"preserved import reference only.",
_object_ref(value),
)
return {"ref": _object_ref(value)}
class FlowHumanFeedbackDefinition(BaseModel):
"""Static human feedback configuration."""
"""Static human feedback configuration.
``llm`` and ``provider`` may hold live Python objects when the definition
is built from a decorated class; the JSON/YAML projection degrades them to
a serialized config (``llm``) or a ``module:qualname`` ref (``provider``).
"""
message: str
emit: list[str] | None = None
@@ -89,10 +127,32 @@ class FlowHumanFeedbackDefinition(BaseModel):
learn_source: str = "hitl"
learn_strict: bool = False
@field_serializer("llm", when_used="json")
def _serialize_llm(self, value: Any) -> dict[str, Any] | str | None:
if value is None or isinstance(value, (str, dict)):
return value
from crewai.flow.human_feedback import _serialize_llm_for_context
return _serialize_llm_for_context(value)
@field_serializer("provider", when_used="json")
def _serialize_provider(self, value: Any) -> str | None:
if value is None or isinstance(value, str):
return value
return _object_ref(value)
class FlowActionDefinition(BaseModel):
"""What a Flow method node executes, independent of when it fires."""
call: TypingLiteral["code"] = "code"
ref: str
class FlowMethodDefinition(BaseModel):
"""Static definition of one Flow method and its execution roles."""
do: FlowActionDefinition
start: bool | FlowDefinitionCondition | None = None
listen: FlowDefinitionCondition | None = None
router: bool = False
@@ -100,6 +160,16 @@ class FlowMethodDefinition(BaseModel):
human_feedback: FlowHumanFeedbackDefinition | None = None
persist: FlowPersistenceDefinition | None = None
@model_validator(mode="after")
def _canonicalize_human_feedback_routing(self) -> FlowMethodDefinition:
# Canonical shape: a method whose human_feedback declares emit
# outcomes routes like a router, regardless of how the definition
# was authored.
if self.human_feedback is not None and self.human_feedback.emit:
self.router = True
self.emit = None
return self
@property
def is_start(self) -> bool:
"""Whether this method is a start method.
@@ -116,7 +186,9 @@ class FlowDefinition(BaseModel):
model_config = ConfigDict(populate_by_name=True, arbitrary_types_allowed=True)
schema_: str = Field(default="crewai.flow/v1", alias="schema")
schema_: TypingLiteral["crewai.flow/v1"] = Field(
default="crewai.flow/v1", alias="schema"
)
name: str
description: str | None = None
state: FlowStateDefinition | None = None

View File

@@ -83,7 +83,6 @@ class FlowMethod(Generic[P, R]):
"__conversational_only__", # gates registration on Flow.conversational
"__flow_persistence_config__",
"__flow_method_definition__",
"_human_feedback_llm", # Live LLM object for HITL resume
]:
if hasattr(meth, attr):
setattr(self, attr, getattr(meth, attr))

View File

@@ -1,8 +1,11 @@
"""Human feedback decorator for Flow methods.
"""Human feedback support for Flow methods.
This module provides the @human_feedback decorator that enables human-in-the-loop
workflows within CrewAI Flows. It allows collecting human feedback on method outputs
and optionally routing to different listeners based on the feedback.
This module backs the @human_feedback decorator that enables human-in-the-loop
workflows within CrewAI Flows. The decorator is a pure metadata stamper: it
records a :class:`HumanFeedbackConfig` on the method, the Flow definition
builder lifts it into ``FlowHumanFeedbackDefinition``, and the Flow engine
collects feedback after each decorated method completes, driven by the flow's
definition.
Supports both synchronous (blocking) and asynchronous (non-blocking) feedback
collection through the provider parameter.
@@ -55,22 +58,18 @@ Example (asynchronous with custom provider):
from __future__ import annotations
import asyncio
from collections.abc import Callable, Sequence
from dataclasses import dataclass, field
from datetime import datetime
from functools import wraps
import logging
from typing import TYPE_CHECKING, Any, TypeVar
from pydantic import BaseModel, Field
from crewai.flow.flow_wrappers import FlowMethod
if TYPE_CHECKING:
from crewai.flow.async_feedback.types import HumanFeedbackProvider
from crewai.flow.flow import Flow
from crewai.flow.runtime import Flow
from crewai.llms.base_llm import BaseLLM
@@ -160,8 +159,8 @@ class HumanFeedbackResult:
class HumanFeedbackConfig:
"""Configuration for the @human_feedback decorator.
Stores the parameters passed to the decorator for later use during
method execution and for introspection by visualization tools.
Stores the parameters passed to the decorator for later use by the
Flow definition builder and for introspection by visualization tools.
Attributes:
message: The message shown to the human when requesting feedback.
@@ -183,19 +182,6 @@ class HumanFeedbackConfig:
learn_strict: bool = False
class HumanFeedbackMethod(FlowMethod[Any, Any]):
"""Wrapper for methods decorated with @human_feedback.
This wrapper extends FlowMethod to add human feedback specific attributes
used by the FlowDefinition builder and runtime feedback handling.
Attributes:
__human_feedback_config__: The HumanFeedbackConfig for this method.
"""
__human_feedback_config__: HumanFeedbackConfig | None = None
class PreReviewResult(BaseModel):
"""Structured output from the HITL pre-review LLM call."""
@@ -217,17 +203,11 @@ class DistilledLessons(BaseModel):
)
def _build_human_feedback_runtime_decorator(
message: str,
emit: Sequence[str] | None = None,
llm: str | BaseLLM | None = "gpt-4o-mini",
default_outcome: str | None = None,
metadata: dict[str, Any] | None = None,
provider: HumanFeedbackProvider | None = None,
learn: bool = False,
learn_source: str = "hitl",
learn_strict: bool = False,
) -> Callable[[F], F]:
def _validate_human_feedback_options(
emit: Sequence[str] | None,
llm: Any,
default_outcome: str | None,
) -> None:
if emit is not None:
if not llm:
raise ValueError(
@@ -244,295 +224,139 @@ def _build_human_feedback_runtime_decorator(
elif default_outcome is not None:
raise ValueError("default_outcome requires emit to be specified.")
def decorator(func: F) -> F:
def _get_hitl_prompt(key: str) -> str:
from crewai.utilities.i18n import I18N_DEFAULT
return I18N_DEFAULT.slice(key)
def _get_hitl_prompt(key: str) -> str:
from crewai.utilities.i18n import I18N_DEFAULT
def _resolve_llm_instance() -> Any:
if llm is None:
from crewai.llm import LLM
return I18N_DEFAULT.slice(key)
return LLM(model="gpt-4o-mini")
if isinstance(llm, str):
from crewai.llm import LLM
return LLM(model=llm)
return llm # already a BaseLLM instance
def _resolve_llm_instance(llm: Any) -> Any:
from crewai.llm import LLM
def _pre_review_with_lessons(
flow_instance: Flow[Any], method_output: Any
) -> Any:
try:
mem = flow_instance.memory
if mem is None:
return method_output
query = f"human feedback lessons for {func.__name__}: {method_output!s}"
matches = mem.recall(query, source=learn_source)
if not matches:
return method_output
if llm is None:
return LLM(model="gpt-4o-mini")
if isinstance(llm, str):
return LLM(model=llm)
if isinstance(llm, dict):
deserialized = _deserialize_llm_from_context(llm)
return deserialized if deserialized is not None else LLM(model="gpt-4o-mini")
return llm # already a BaseLLM instance
lessons = "\n".join(f"- {m.record.content}" for m in matches)
llm_inst = _resolve_llm_instance()
prompt = _get_hitl_prompt("hitl_pre_review_user").format(
output=str(method_output),
lessons=lessons,
)
messages = [
{
"role": "system",
"content": _get_hitl_prompt("hitl_pre_review_system"),
},
{"role": "user", "content": prompt},
]
if getattr(llm_inst, "supports_function_calling", lambda: False)():
response = llm_inst.call(messages, response_model=PreReviewResult)
if isinstance(response, PreReviewResult):
return response.improved_output
return PreReviewResult.model_validate(response).improved_output
reviewed = llm_inst.call(messages)
return reviewed if isinstance(reviewed, str) else str(reviewed)
except Exception:
if learn_strict:
logger.warning(
"HITL pre-review failed for %s; re-raising (learn_strict=True)",
func.__name__,
exc_info=True,
)
raise
logger.warning(
"HITL pre-review failed for %s; falling back to raw output",
func.__name__,
exc_info=True,
)
return method_output
def _distill_and_store_lessons(
flow_instance: Flow[Any], method_output: Any, raw_feedback: str
) -> None:
try:
mem = flow_instance.memory
if mem is None:
return
llm_inst = _resolve_llm_instance()
prompt = _get_hitl_prompt("hitl_distill_user").format(
method_name=func.__name__,
output=str(method_output),
feedback=raw_feedback,
)
messages = [
{
"role": "system",
"content": _get_hitl_prompt("hitl_distill_system"),
},
{"role": "user", "content": prompt},
]
def _pre_review_with_lessons(
flow_instance: Flow[Any],
method_name: str,
method_output: Any,
*,
llm: Any,
learn_source: str,
learn_strict: bool,
) -> Any:
try:
mem = flow_instance.memory
if mem is None:
return method_output
query = f"human feedback lessons for {method_name}: {method_output!s}"
matches = mem.recall(query, source=learn_source)
if not matches:
return method_output
lessons: list[str] = []
if getattr(llm_inst, "supports_function_calling", lambda: False)():
response = llm_inst.call(messages, response_model=DistilledLessons)
if isinstance(response, DistilledLessons):
lessons = response.lessons
else:
lessons = DistilledLessons.model_validate(response).lessons
else:
response = llm_inst.call(messages)
if isinstance(response, str):
lessons = [
line.strip("- ").strip()
for line in response.strip().split("\n")
if line.strip() and line.strip() != "NONE"
]
if lessons:
mem.remember_many(lessons, source=learn_source) # type: ignore[union-attr]
except Exception:
if learn_strict:
logger.warning(
"HITL lesson distillation failed for %s; re-raising (learn_strict=True)",
func.__name__,
exc_info=True,
)
raise
logger.warning(
"HITL lesson distillation failed for %s; no lessons stored",
func.__name__,
exc_info=True,
)
def _build_feedback_context(
flow_instance: Flow[Any], method_output: Any
) -> tuple[Any, Any]:
from crewai.flow.async_feedback.types import PendingFeedbackContext
context = PendingFeedbackContext(
flow_id=flow_instance.flow_id or "unknown",
flow_class=f"{flow_instance.__class__.__module__}.{flow_instance.__class__.__name__}",
method_name=func.__name__,
method_output=method_output,
message=message,
emit=list(emit) if emit else None,
default_outcome=default_outcome,
metadata=metadata or {},
llm=llm if isinstance(llm, str) else _serialize_llm_for_context(llm),
lessons = "\n".join(f"- {m.record.content}" for m in matches)
llm_inst = _resolve_llm_instance(llm)
prompt = _get_hitl_prompt("hitl_pre_review_user").format(
output=str(method_output),
lessons=lessons,
)
messages = [
{
"role": "system",
"content": _get_hitl_prompt("hitl_pre_review_system"),
},
{"role": "user", "content": prompt},
]
if getattr(llm_inst, "supports_function_calling", lambda: False)():
response = llm_inst.call(messages, response_model=PreReviewResult)
if isinstance(response, PreReviewResult):
return response.improved_output
return PreReviewResult.model_validate(response).improved_output
reviewed = llm_inst.call(messages)
return reviewed if isinstance(reviewed, str) else str(reviewed)
except Exception:
if learn_strict:
logger.warning(
"HITL pre-review failed for %s; re-raising (learn_strict=True)",
method_name,
exc_info=True,
)
raise
logger.warning(
"HITL pre-review failed for %s; falling back to raw output",
method_name,
exc_info=True,
)
return method_output
effective_provider = provider
if effective_provider is None:
from crewai.flow.flow_config import flow_config
effective_provider = flow_config.hitl_provider
def _distill_and_store_lessons(
flow_instance: Flow[Any],
method_name: str,
method_output: Any,
raw_feedback: str,
*,
llm: Any,
learn_source: str,
learn_strict: bool,
) -> None:
try:
mem = flow_instance.memory
if mem is None:
return
llm_inst = _resolve_llm_instance(llm)
prompt = _get_hitl_prompt("hitl_distill_user").format(
method_name=method_name,
output=str(method_output),
feedback=raw_feedback,
)
messages = [
{
"role": "system",
"content": _get_hitl_prompt("hitl_distill_system"),
},
{"role": "user", "content": prompt},
]
return context, effective_provider
def _request_feedback(flow_instance: Flow[Any], method_output: Any) -> str:
context, effective_provider = _build_feedback_context(
flow_instance, method_output
)
if effective_provider is not None:
feedback_result = effective_provider.request_feedback(
context, flow_instance
)
if asyncio.iscoroutine(feedback_result):
raise TypeError(
f"Provider {type(effective_provider).__name__}.request_feedback() "
"returned a coroutine in a sync flow method. Use an async flow "
"method or a synchronous provider."
)
return str(feedback_result)
return flow_instance._request_human_feedback(
message=message,
output=method_output,
metadata=metadata,
emit=emit,
)
async def _request_feedback_async(
flow_instance: Flow[Any], method_output: Any
) -> str:
context, effective_provider = _build_feedback_context(
flow_instance, method_output
)
if effective_provider is not None:
feedback_result = effective_provider.request_feedback(
context, flow_instance
)
if asyncio.iscoroutine(feedback_result):
return str(await feedback_result)
return str(feedback_result)
return flow_instance._request_human_feedback(
message=message,
output=method_output,
metadata=metadata,
emit=emit,
)
def _process_feedback(
flow_instance: Flow[Any],
method_output: Any,
raw_feedback: str,
) -> HumanFeedbackResult | str:
collapsed_outcome: str | None = None
if not raw_feedback.strip():
if default_outcome:
collapsed_outcome = default_outcome
elif emit:
collapsed_outcome = emit[0]
elif emit:
if llm is not None:
collapsed_outcome = flow_instance._collapse_to_outcome(
feedback=raw_feedback,
outcomes=emit,
llm=llm,
)
else:
collapsed_outcome = emit[0]
result = HumanFeedbackResult(
output=method_output,
feedback=raw_feedback,
outcome=collapsed_outcome,
timestamp=datetime.now(),
method_name=func.__name__,
metadata=metadata or {},
)
flow_instance.human_feedback_history.append(result)
flow_instance.last_human_feedback = result
if emit:
if collapsed_outcome is None:
collapsed_outcome = default_outcome or emit[0]
result.outcome = collapsed_outcome
return collapsed_outcome
return result
if asyncio.iscoroutinefunction(func):
@wraps(func)
async def async_wrapper(self: Flow[Any], *args: Any, **kwargs: Any) -> Any:
method_output = await func(self, *args, **kwargs)
if learn and getattr(self, "memory", None) is not None:
method_output = _pre_review_with_lessons(self, method_output)
raw_feedback = await _request_feedback_async(self, method_output)
result = _process_feedback(self, method_output, raw_feedback)
if (
learn
and getattr(self, "memory", None) is not None
and raw_feedback.strip()
):
_distill_and_store_lessons(self, method_output, raw_feedback)
# Stash the real method output for final flow result when emit is set:
# result is the collapsed outcome string for routing, but we preserve the
# actual method output as the flow's final result. Uses per-method dict for
# concurrency safety and to handle None returns.
if emit:
self._human_feedback_method_outputs[func.__name__] = method_output
return result
wrapper: Any = async_wrapper
lessons: list[str] = []
if getattr(llm_inst, "supports_function_calling", lambda: False)():
response = llm_inst.call(messages, response_model=DistilledLessons)
if isinstance(response, DistilledLessons):
lessons = response.lessons
else:
lessons = DistilledLessons.model_validate(response).lessons
else:
response = llm_inst.call(messages)
if isinstance(response, str):
lessons = [
line.strip("- ").strip()
for line in response.strip().split("\n")
if line.strip() and line.strip() != "NONE"
]
@wraps(func)
def sync_wrapper(self: Flow[Any], *args: Any, **kwargs: Any) -> Any:
method_output = func(self, *args, **kwargs)
if learn and getattr(self, "memory", None) is not None:
method_output = _pre_review_with_lessons(self, method_output)
raw_feedback = _request_feedback(self, method_output)
result = _process_feedback(self, method_output, raw_feedback)
if (
learn
and getattr(self, "memory", None) is not None
and raw_feedback.strip()
):
_distill_and_store_lessons(self, method_output, raw_feedback)
# Stash the real method output for final flow result when emit is set:
# result is the collapsed outcome string for routing, but we preserve the
# actual method output as the flow's final result. Uses per-method dict for
# concurrency safety and to handle None returns.
if emit:
self._human_feedback_method_outputs[func.__name__] = method_output
return result
wrapper = sync_wrapper
return wrapper # type: ignore[no-any-return]
return decorator
if lessons:
mem.remember_many(lessons, source=learn_source) # type: ignore[union-attr]
except Exception:
if learn_strict:
logger.warning(
"HITL lesson distillation failed for %s; re-raising (learn_strict=True)",
method_name,
exc_info=True,
)
raise
logger.warning(
"HITL lesson distillation failed for %s; no lessons stored",
method_name,
exc_info=True,
)
def human_feedback(

View File

@@ -24,12 +24,10 @@ Example:
from __future__ import annotations
import asyncio
from collections.abc import Callable
import functools
import logging
from types import SimpleNamespace
from typing import TYPE_CHECKING, Any, Final, TypeVar, cast
from typing import TYPE_CHECKING, Any, Final, TypeVar
from crewai_core.printer import PRINTER
from pydantic import BaseModel
@@ -39,7 +37,7 @@ from crewai.flow.persistence.factory import default_flow_persistence
if TYPE_CHECKING:
from crewai.flow.flow import Flow
from crewai.flow.runtime import Flow
logger = logging.getLogger(__name__)
@@ -66,14 +64,6 @@ def _stamp_persistence_metadata(
)
_PRESERVED_FLOW_ATTRS: Final[tuple[str, ...]] = (
"__human_feedback_config__",
"__flow_persistence_config__",
"__flow_method_definition__",
"_human_feedback_llm",
)
class PersistenceDecorator:
"""Class to handle flow state persistence with consistent logging."""
@@ -164,6 +154,10 @@ def persist(
states. When applied at the method level, it persists only that method's
state.
The decorator is a pure metadata stamper: it records the persistence
configuration on the class or method, and the Flow engine saves state
after each persisted method completes, driven by the flow's definition.
Args:
persistence: Optional FlowPersistence implementation to use.
If not provided, uses ``default_flow_persistence()`` (the
@@ -191,122 +185,7 @@ def persist(
persistence if persistence is not None else default_flow_persistence()
)
if isinstance(target, type):
_stamp_persistence_metadata(target, actual_persistence, verbose)
original_init = target.__init__ # type: ignore[misc]
@functools.wraps(original_init)
def new_init(self: Any, *args: Any, **kwargs: Any) -> None:
if "persistence" not in kwargs:
kwargs["persistence"] = actual_persistence
original_init(self, *args, **kwargs)
target.__init__ = new_init # type: ignore[misc]
# Preserve original methods' decorators
original_methods = {
name: method
for name, method in target.__dict__.items()
if callable(method)
and (
hasattr(method, "__is_flow_method__")
or hasattr(method, "__flow_method_definition__")
)
}
for name, method in original_methods.items():
if asyncio.iscoroutinefunction(method):
# Closure captures the current name and method
def create_async_wrapper(
method_name: str, original_method: Callable[..., Any]
) -> Callable[..., Any]:
@functools.wraps(original_method)
async def method_wrapper(
self: Any, *args: Any, **kwargs: Any
) -> Any:
result = await original_method(self, *args, **kwargs)
PersistenceDecorator.persist_state(
self, method_name, actual_persistence, verbose
)
return result
return method_wrapper
wrapped = create_async_wrapper(name, method)
for attr in _PRESERVED_FLOW_ATTRS:
if hasattr(method, attr):
setattr(wrapped, attr, getattr(method, attr))
wrapped.__is_flow_method__ = True # type: ignore[attr-defined]
setattr(target, name, wrapped)
else:
def create_sync_wrapper(
method_name: str, original_method: Callable[..., Any]
) -> Callable[..., Any]:
@functools.wraps(original_method)
def method_wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
result = original_method(self, *args, **kwargs)
PersistenceDecorator.persist_state(
self, method_name, actual_persistence, verbose
)
return result
return method_wrapper
wrapped = create_sync_wrapper(name, method)
for attr in _PRESERVED_FLOW_ATTRS:
if hasattr(method, attr):
setattr(wrapped, attr, getattr(method, attr))
wrapped.__is_flow_method__ = True # type: ignore[attr-defined]
setattr(target, name, wrapped)
return target
method = target
method.__is_flow_method__ = True # type: ignore[attr-defined]
_stamp_persistence_metadata(method, actual_persistence, verbose)
if asyncio.iscoroutinefunction(method):
@functools.wraps(method)
async def method_async_wrapper(
flow_instance: Any, *args: Any, **kwargs: Any
) -> T:
method_coro = method(flow_instance, *args, **kwargs)
if asyncio.iscoroutine(method_coro):
result = await method_coro
else:
result = method_coro
PersistenceDecorator.persist_state(
flow_instance, method.__name__, actual_persistence, verbose
)
return cast(T, result)
for attr in _PRESERVED_FLOW_ATTRS:
if hasattr(method, attr):
setattr(method_async_wrapper, attr, getattr(method, attr))
method_async_wrapper.__is_flow_method__ = True # type: ignore[attr-defined]
_stamp_persistence_metadata(
method_async_wrapper, actual_persistence, verbose
)
return cast(Callable[..., T], method_async_wrapper)
@functools.wraps(method)
def method_sync_wrapper(flow_instance: Any, *args: Any, **kwargs: Any) -> T:
result = method(flow_instance, *args, **kwargs)
PersistenceDecorator.persist_state(
flow_instance, method.__name__, actual_persistence, verbose
)
return result
for attr in _PRESERVED_FLOW_ATTRS:
if hasattr(method, attr):
setattr(method_sync_wrapper, attr, getattr(method, attr))
method_sync_wrapper.__is_flow_method__ = True # type: ignore[attr-defined]
_stamp_persistence_metadata(method_sync_wrapper, actual_persistence, verbose)
return cast(Callable[..., T], method_sync_wrapper)
_stamp_persistence_metadata(target, actual_persistence, verbose)
return target
return decorator

View File

@@ -0,0 +1,70 @@
"""Resolution of FlowDefinition refs (``module:qualname``) into live objects.
Every ref-shaped value in a definition — ``do`` actions, ``state.ref``,
``config.input_provider``, ``human_feedback.provider`` — resolves through
:func:`resolve_ref`. Failures are loud and name the field and the ref.
"""
from __future__ import annotations
from collections.abc import Callable
import importlib
import inspect
from operator import attrgetter
from typing import TYPE_CHECKING, Any, cast
from crewai.flow.flow_definition import FlowActionDefinition
if TYPE_CHECKING:
from crewai.flow.runtime import Flow
class InvalidRefError(ValueError):
"""A definition ref that cannot be resolved to a live object."""
def resolve_ref(ref: str, *, field: str) -> Any:
"""Import the object a definition's `module:qualname` ref points to."""
module_name, _, qualname = ref.partition(":")
if "<" in ref or not module_name or not qualname:
raise InvalidRefError(
f"invalid {field} ref {ref!r}; expected 'module:qualname'"
)
try:
return attrgetter(qualname)(importlib.import_module(module_name))
except (ImportError, AttributeError) as e:
raise InvalidRefError(f"unresolvable {field} ref {ref!r}") from e
def resolve_instance_ref(ref: str, *, field: str) -> Any:
"""Resolve a ref, auto-instantiating a no-arg class into an instance."""
target = resolve_ref(ref, field=field)
if not inspect.isclass(target):
return target
try:
return target()
except Exception as e:
raise InvalidRefError(
f"cannot instantiate {field} ref {ref!r} without arguments: {e}"
) from e
def _resolve_code_action(
flow: Flow[Any], action: FlowActionDefinition
) -> Callable[..., Any]:
ref = action.ref
target = resolve_ref(ref, field="do")
if not callable(target):
raise InvalidRefError(f"invalid do ref {ref!r}; object is not callable")
handler = cast(Callable[..., Any], target)
if getattr(handler, "__self__", None) is None:
handler = handler.__get__(flow, type(flow))
return handler
def resolve_action(flow: Flow[Any], action: FlowActionDefinition) -> Callable[..., Any]:
"""Turn one `do:` action into the callable the flow runs for that node."""
if action.call == "code":
return _resolve_code_action(flow, action)
raise ValueError(f"unknown call type {action.call!r}")

View File

@@ -999,7 +999,11 @@ def _json_schema_to_pydantic_field(
if examples:
schema_extra["examples"] = examples
default = ... if is_required else None
default = (
json_schema["default"]
if "default" in json_schema
else (... if is_required else None)
)
if isinstance(type_, type) and issubclass(type_, (int, float)):
if "minimum" in json_schema:

View File

@@ -1168,132 +1168,13 @@ class TestAsyncHumanFeedbackEdgeCases:
class TestLiveLLMPreservationOnResume:
"""Tests for preserving the full LLM config across HITL resume."""
def test_human_feedback_llm_attribute_set_on_wrapper_with_basellm(self) -> None:
"""Test that _human_feedback_llm is set on the wrapper when llm is a BaseLLM instance."""
from crewai.llms.base_llm import BaseLLM
mock_llm = MagicMock(spec=BaseLLM)
mock_llm.model = "gemini/gemini-3-flash"
class TestFlow(Flow):
@start()
@human_feedback(
message="Review:",
emit=["approved", "rejected"],
llm=mock_llm,
)
def review(self):
return "content"
flow = TestFlow()
method = flow._methods.get("review")
assert method is not None
assert hasattr(method, "_human_feedback_llm")
assert method._human_feedback_llm is mock_llm
def test_human_feedback_llm_attribute_set_on_wrapper_with_string(self) -> None:
"""Test that _human_feedback_llm is set on the wrapper even when llm is a string."""
class TestFlow(Flow):
@start()
@human_feedback(
message="Review:",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
)
def review(self):
return "content"
flow = TestFlow()
method = flow._methods.get("review")
assert method is not None
assert hasattr(method, "_human_feedback_llm")
assert method._human_feedback_llm == "gpt-4o-mini"
class TestResumeLLMFromSerializedContext:
"""Resume rebuilds the collapse LLM from the serialized context alone."""
@patch("crewai.flow.runtime.crewai_event_bus.emit")
def test_resume_async_uses_live_basellm_over_serialized_string(
def test_resume_builds_llm_from_serialized_context(
self, mock_emit: MagicMock
) -> None:
"""Test that resume_async uses the live BaseLLM from decorator instead of serialized string.
This is the main bug fix: when a flow resumes, it should use the fully-configured
LLM from the re-imported decorator (with credentials, project, etc.) instead of
creating a new LLM from just the model string.
"""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = os.path.join(tmpdir, "test_flows.db")
persistence = SQLiteFlowPersistence(db_path)
from crewai.llms.base_llm import BaseLLM
# Create a mock BaseLLM with full config (simulating Gemini with service account)
live_llm = MagicMock(spec=BaseLLM)
live_llm.model = "gemini/gemini-3-flash"
class TestFlow(Flow):
result_path: str = ""
@start()
@human_feedback(
message="Approve?",
emit=["approved", "rejected"],
llm=live_llm,
)
def review(self):
return "content"
@listen("approved")
def handle_approved(self):
self.result_path = "approved"
return "Approved!"
context = PendingFeedbackContext(
flow_id="live-llm-test",
flow_class="TestFlow",
method_name="review",
method_output="content",
message="Approve?",
emit=["approved", "rejected"],
llm="gemini/gemini-3-flash", # Serialized string, NOT the live object
)
persistence.save_pending_feedback(
flow_uuid="live-llm-test",
context=context,
state_data={"id": "live-llm-test"},
)
flow = TestFlow.from_pending("live-llm-test", persistence)
captured_llm = []
def capture_llm(feedback, outcomes, llm):
captured_llm.append(llm)
return "approved"
with patch.object(flow, "_collapse_to_outcome", side_effect=capture_llm):
flow.resume("looks good!")
# NOT the serialized string. The live_llm was captured at class definition
# time and stored on the method wrapper as _human_feedback_llm.
assert len(captured_llm) == 1
# (which is stored on the method's _human_feedback_llm attribute)
method = flow._methods.get("review")
assert method is not None
assert captured_llm[0] is method._human_feedback_llm
# And verify it's a BaseLLM instance, not a string
assert isinstance(captured_llm[0], BaseLLM)
@patch("crewai.flow.runtime.crewai_event_bus.emit")
def test_resume_async_falls_back_to_serialized_string_when_no_human_feedback_llm(
self, mock_emit: MagicMock
) -> None:
"""Test that resume_async falls back to context.llm when _human_feedback_llm is not available.
This ensures backward compatibility with flows that were paused before this fix.
"""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = os.path.join(tmpdir, "test_flows.db")
persistence = SQLiteFlowPersistence(db_path)
@@ -1325,11 +1206,6 @@ class TestLiveLLMPreservationOnResume:
flow = TestFlow.from_pending("fallback-test", persistence)
# Remove _human_feedback_llm to simulate old decorator without this attribute
method = flow._methods.get("review")
if hasattr(method, "_human_feedback_llm"):
delattr(method, "_human_feedback_llm")
captured_llm = []
def capture_llm(feedback, outcomes, llm):
@@ -1343,85 +1219,3 @@ class TestLiveLLMPreservationOnResume:
from crewai.llms.base_llm import BaseLLM as BaseLLMClass
assert isinstance(captured_llm[0], BaseLLMClass)
assert captured_llm[0].model == "gpt-4o-mini"
@patch("crewai.flow.runtime.crewai_event_bus.emit")
def test_resume_async_uses_string_from_context_when_human_feedback_llm_is_string(
self, mock_emit: MagicMock
) -> None:
"""Test that when _human_feedback_llm is a string (not BaseLLM), we still use context.llm.
String LLM values offer no benefit over the serialized context.llm,
so we don't prefer them.
"""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = os.path.join(tmpdir, "test_flows.db")
persistence = SQLiteFlowPersistence(db_path)
class TestFlow(Flow):
@start()
@human_feedback(
message="Approve?",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
)
def review(self):
return "content"
context = PendingFeedbackContext(
flow_id="string-llm-test",
flow_class="TestFlow",
method_name="review",
method_output="content",
message="Approve?",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
)
persistence.save_pending_feedback(
flow_uuid="string-llm-test",
context=context,
state_data={"id": "string-llm-test"},
)
flow = TestFlow.from_pending("string-llm-test", persistence)
method = flow._methods.get("review")
assert method._human_feedback_llm == "gpt-4o-mini"
captured_llm = []
def capture_llm(feedback, outcomes, llm):
captured_llm.append(llm)
return "approved"
with patch.object(flow, "_collapse_to_outcome", side_effect=capture_llm):
flow.resume("looks good!")
# _human_feedback_llm is a string, so resume deserializes context.llm into an LLM instance
assert len(captured_llm) == 1
from crewai.llms.base_llm import BaseLLM as BaseLLMClass
assert isinstance(captured_llm[0], BaseLLMClass)
assert captured_llm[0].model == "gpt-4o-mini"
def test_human_feedback_llm_set_for_async_wrapper(self) -> None:
"""Test that _human_feedback_llm is set on async wrapper functions."""
import asyncio
from crewai.llms.base_llm import BaseLLM
mock_llm = MagicMock(spec=BaseLLM)
mock_llm.model = "gemini/gemini-3-flash"
class TestFlow(Flow):
@start()
@human_feedback(
message="Review:",
emit=["approved", "rejected"],
llm=mock_llm,
)
async def async_review(self):
return "content"
flow = TestFlow()
method = flow._methods.get("async_review")
assert method is not None
assert hasattr(method, "_human_feedback_llm")
assert method._human_feedback_llm is mock_llm

View File

@@ -1157,6 +1157,26 @@ def test_flow_name():
assert flow.name == "MyFlow"
def test_flow_custom_name_overrides_class_name_in_events():
class InternalFlowClass(Flow):
name = "PublicName"
@start()
def begin(self):
return "done"
received = []
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(FlowStartedEvent)
def handle(source, event):
received.append(event)
InternalFlowClass().kickoff()
assert received[0].flow_name == "PublicName"
def test_nested_and_or_conditions():
"""Test nested conditions like or_(and_(A, B), and_(C, D)).

View File

@@ -36,6 +36,7 @@ def test_flow_public_exports_are_explicit():
"start",
}
assert set(flow_definition.__all__) == {
"FlowActionDefinition",
"FlowConfigDefinition",
"FlowConversationalDefinition",
"FlowConversationalRouterDefinition",
@@ -629,6 +630,7 @@ def test_flow_definition_preserves_diagnostics_loaded_from_contract():
"name": "LoadedDiagnosticsFlow",
"methods": {
"decision": {
"do": {"ref": "loaded_flows:LoadedDiagnosticsFlow.decision"},
"router": True,
"emit": ["continue"],
}
@@ -662,6 +664,7 @@ def test_router_start_false_without_listen_reports_missing_trigger():
"name": "LoadedFlow",
"methods": {
"decision": {
"do": {"ref": "loaded_flows:LoadedFlow.decision"},
"router": True,
"start": False,
"emit": ["continue"],
@@ -740,8 +743,14 @@ def test_static_string_listener_is_allowed_by_contract():
"schema": "crewai.flow/v1",
"name": "TypoFlow",
"methods": {
"begin": {"start": True},
"handle": {"listen": "begni"},
"begin": {
"do": {"ref": "loaded_flows:TypoFlow.begin"},
"start": True,
},
"handle": {
"do": {"ref": "loaded_flows:TypoFlow.handle"},
"listen": "begni",
},
},
}
)
@@ -754,8 +763,15 @@ def test_start_false_not_classified_as_start_method():
"schema": "crewai.flow/v1",
"name": "ExplicitNonStartFlow",
"methods": {
"begin": {"start": True},
"handle": {"start": False, "listen": "begin"},
"begin": {
"do": {"ref": "loaded_flows:ExplicitNonStartFlow.begin"},
"start": True,
},
"handle": {
"do": {"ref": "loaded_flows:ExplicitNonStartFlow.handle"},
"start": False,
"listen": "begin",
},
},
}
)
@@ -812,6 +828,7 @@ def test_flow_definition_logs_diagnostics_when_loaded_from_contract(caplog):
"name": "LoadedFlow",
"methods": {
"decision": {
"do": {"ref": "loaded_flows:LoadedFlow.decision"},
"router": True,
"emit": ["continue"],
}

File diff suppressed because it is too large Load Diff

View File

@@ -77,12 +77,22 @@ class ComplexFlow(Flow):
return "complete"
def _attach_flow_definition(flow_class: type[Flow], methods: dict[str, object]) -> None:
def _attach_flow_definition(
flow_class: type[Flow], methods: dict[str, dict[str, object]]
) -> None:
flow_class._flow_definition = FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": flow_class.__name__,
"methods": methods,
"methods": {
name: {
"do": {
"ref": f"{flow_class.__module__}:{flow_class.__name__}.{name}"
},
**spec,
}
for name, spec in methods.items()
},
}
)
@@ -125,13 +135,20 @@ def test_build_flow_structure_from_flow_definition():
"schema": "crewai.flow/v1",
"name": "DefinedFlow",
"methods": {
"begin": {"start": True},
"begin": {
"do": {"ref": "defined_flows:DefinedFlow.begin"},
"start": True,
},
"decide": {
"do": {"ref": "defined_flows:DefinedFlow.decide"},
"listen": "begin",
"router": True,
"emit": ["done"],
},
"finish": {"listen": "done"},
"finish": {
"do": {"ref": "defined_flows:DefinedFlow.finish"},
"listen": "done",
},
},
}
)

View File

@@ -92,8 +92,8 @@ class TestHumanFeedbackValidation:
assert hasattr(test_method, "__human_feedback_config__")
assert not hasattr(test_method, "__is_router__")
def test_persist_preserves_human_feedback_llm_attribute(self):
"""Test @persist preserves the live LLM stashed by @human_feedback."""
def test_persist_preserves_human_feedback_config(self):
"""Test @persist preserves the config stamped by @human_feedback."""
llm = object()
@persist()
@@ -105,8 +105,8 @@ class TestHumanFeedbackValidation:
def test_method(self):
return "output"
assert hasattr(test_method, "_human_feedback_llm")
assert test_method._human_feedback_llm is llm
assert hasattr(test_method, "__human_feedback_config__")
assert test_method.__human_feedback_config__.llm is llm
class TestHumanFeedbackConfig:
@@ -481,7 +481,7 @@ class TestHumanFeedbackLearn:
with patch.object(
flow, "_request_human_feedback", return_value="looks good"
):
flow.produce()
flow.kickoff()
# memory.recall and memory.remember_many should NOT be called
flow.memory.recall.assert_not_called()
@@ -516,7 +516,7 @@ class TestHumanFeedbackLearn:
)
MockLLM.return_value = mock_llm
flow.produce()
flow.kickoff()
# remember_many should be called with the distilled lesson
flow.memory.remember_many.assert_called_once()
@@ -570,7 +570,7 @@ class TestHumanFeedbackLearn:
]
MockLLM.return_value = mock_llm
flow.produce()
flow.kickoff()
assert captured_output["shown_to_human"] == "draft with citations added"
# recall was called to find past lessons
@@ -592,7 +592,7 @@ class TestHumanFeedbackLearn:
with patch.object(
flow, "_request_human_feedback", return_value=""
):
flow.produce()
flow.kickoff()
flow.memory.remember_many.assert_not_called()
@@ -645,7 +645,7 @@ class TestHumanFeedbackLearn:
mock_llm.call.side_effect = RuntimeError("simulated pre-review failure")
MockLLM.return_value = mock_llm
flow.produce()
flow.kickoff()
assert captured["shown_to_human"] == "raw draft"
assert any(
@@ -690,7 +690,7 @@ class TestHumanFeedbackLearn:
MockLLM.return_value = mock_llm
with pytest.raises(RuntimeError, match="simulated pre-review failure"):
flow.produce()
flow.kickoff()
def test_distillation_failure_logs_and_does_not_block_flow(self, caplog):
"""Distillation LLM failure logs a warning but does not break the flow."""
@@ -717,7 +717,7 @@ class TestHumanFeedbackLearn:
mock_llm.call.side_effect = RuntimeError("simulated distill failure")
MockLLM.return_value = mock_llm
flow.produce() # must not raise
flow.kickoff() # must not raise
flow.memory.remember_many.assert_not_called()
assert any(

View File

@@ -778,77 +778,11 @@ class TestEdgeCases:
class TestLLMConfigPreservation:
"""Tests that LLM config is preserved through @human_feedback serialization.
PR #4970 introduced _human_feedback_llm stashing so the live LLM object survives
decorator wrapping for same-process resume. The serialization path
(_serialize_llm_for_context / _deserialize_llm_from_context) preserves
config for cross-process resume.
The flow definition keeps the live LLM object for same-process execution.
The serialization path (_serialize_llm_for_context /
_deserialize_llm_from_context) preserves config for cross-process resume.
"""
def test_human_feedback_llm_stashed_on_wrapper_with_llm_instance(self):
"""Test that passing an LLM instance stashes it on the wrapper as _human_feedback_llm."""
from crewai.llm import LLM
llm_instance = LLM(model="gpt-4o-mini", temperature=0.42)
class ConfigFlow(Flow):
@start()
@human_feedback(
message="Review:",
emit=["approved", "rejected"],
llm=llm_instance,
)
def review(self):
return "content"
method = ConfigFlow.review
assert hasattr(method, "_human_feedback_llm"), "_human_feedback_llm not found on wrapper"
assert method._human_feedback_llm is llm_instance, "_human_feedback_llm is not the same object"
def test_human_feedback_llm_preserved_on_listen_method(self):
"""Test that _human_feedback_llm is preserved when @human_feedback is on a @listen method."""
from crewai.llm import LLM
llm_instance = LLM(model="gpt-4o-mini", temperature=0.7)
class ListenConfigFlow(Flow):
@start()
def generate(self):
return "draft"
@listen("generate")
@human_feedback(
message="Review:",
emit=["approved", "rejected"],
llm=llm_instance,
)
def review(self):
return "content"
method = ListenConfigFlow.review
assert hasattr(method, "_human_feedback_llm")
assert method._human_feedback_llm is llm_instance
def test_human_feedback_llm_accessible_on_instance(self):
"""Test that _human_feedback_llm survives Flow instantiation (bound method access)."""
from crewai.llm import LLM
llm_instance = LLM(model="gpt-4o-mini", temperature=0.42)
class InstanceFlow(Flow):
@start()
@human_feedback(
message="Review:",
emit=["approved", "rejected"],
llm=llm_instance,
)
def review(self):
return "content"
flow = InstanceFlow()
instance_method = flow.review
assert hasattr(instance_method, "_human_feedback_llm")
assert instance_method._human_feedback_llm is llm_instance
def test_serialize_llm_preserves_config_fields(self):
"""Test that _serialize_llm_for_context captures temperature, base_url, etc."""
from crewai.flow.human_feedback import _serialize_llm_for_context