mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-06-12 11:48:11 +00:00
Compare commits
3 Commits
1.14.7
...
flow-defin
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7549721ebe | ||
|
|
4ce7cf679f | ||
|
|
373dca3d04 |
@@ -47,7 +47,7 @@ from crewai.flow.conversation import (
|
||||
receive_user_message as _receive_user_message,
|
||||
)
|
||||
from crewai.flow.dsl import listen, start
|
||||
from crewai.flow.dsl._utils import _set_flow_method_definition
|
||||
from crewai.flow.dsl._utils import _method_action, _set_flow_method_definition
|
||||
from crewai.flow.flow_definition import FlowMethodDefinition
|
||||
from crewai.utilities.types import LLMMessage
|
||||
|
||||
@@ -78,7 +78,7 @@ def _conversation_start_router(func: Callable[..., Any]) -> Any:
|
||||
wrapper = start()(func)
|
||||
_set_flow_method_definition(
|
||||
cast(Any, wrapper),
|
||||
FlowMethodDefinition(start=True, router=True),
|
||||
FlowMethodDefinition(do=_method_action(func), start=True, router=True),
|
||||
)
|
||||
return wrapper
|
||||
|
||||
|
||||
@@ -3,11 +3,10 @@ from __future__ import annotations
|
||||
from collections.abc import Callable, Sequence
|
||||
from typing import TYPE_CHECKING, Any, TypeVar
|
||||
|
||||
from crewai.flow.flow_definition import FlowMethodDefinition
|
||||
from crewai.flow.human_feedback import (
|
||||
HumanFeedbackConfig,
|
||||
HumanFeedbackResult,
|
||||
_build_human_feedback_runtime_decorator,
|
||||
_validate_human_feedback_options,
|
||||
)
|
||||
|
||||
|
||||
@@ -21,32 +20,6 @@ F = TypeVar("F", bound=Callable[..., Any])
|
||||
__all__ = ["HumanFeedbackResult", "human_feedback"]
|
||||
|
||||
|
||||
def _stamp_human_feedback_metadata(
|
||||
wrapper: Any,
|
||||
func: Callable[..., Any],
|
||||
config: HumanFeedbackConfig,
|
||||
) -> None:
|
||||
for attr in [
|
||||
"__is_flow_method__",
|
||||
"__flow_persistence_config__",
|
||||
"__flow_method_definition__",
|
||||
]:
|
||||
if hasattr(func, attr):
|
||||
setattr(wrapper, attr, getattr(func, attr))
|
||||
|
||||
wrapper.__human_feedback_config__ = config
|
||||
wrapper.__is_flow_method__ = True
|
||||
|
||||
if config.emit:
|
||||
fragment = getattr(wrapper, "__flow_method_definition__", None)
|
||||
if isinstance(fragment, FlowMethodDefinition):
|
||||
wrapper.__flow_method_definition__ = fragment.model_copy(
|
||||
update={"router": True, "emit": list(config.emit)}
|
||||
)
|
||||
|
||||
wrapper._human_feedback_llm = config.llm
|
||||
|
||||
|
||||
def human_feedback(
|
||||
message: str,
|
||||
emit: Sequence[str] | None = None,
|
||||
@@ -58,21 +31,18 @@ def human_feedback(
|
||||
learn_source: str = "hitl",
|
||||
learn_strict: bool = False,
|
||||
) -> Callable[[F], F]:
|
||||
"""Decorator for Flow methods that require human feedback."""
|
||||
runtime_decorator = _build_human_feedback_runtime_decorator(
|
||||
message=message,
|
||||
emit=emit,
|
||||
llm=llm,
|
||||
default_outcome=default_outcome,
|
||||
metadata=metadata,
|
||||
provider=provider,
|
||||
learn=learn,
|
||||
learn_source=learn_source,
|
||||
learn_strict=learn_strict,
|
||||
"""Decorator for Flow methods that require human feedback.
|
||||
|
||||
The decorator is a pure metadata stamper: it records the feedback
|
||||
configuration on the method, and the Flow engine collects and routes
|
||||
feedback after the method completes, driven by the flow's definition.
|
||||
"""
|
||||
_validate_human_feedback_options(
|
||||
emit=emit, llm=llm, default_outcome=default_outcome
|
||||
)
|
||||
config = HumanFeedbackConfig(
|
||||
message=message,
|
||||
emit=emit,
|
||||
emit=list(emit) if emit is not None else None,
|
||||
llm=llm,
|
||||
default_outcome=default_outcome,
|
||||
metadata=metadata,
|
||||
@@ -83,8 +53,7 @@ def human_feedback(
|
||||
)
|
||||
|
||||
def decorator(func: F) -> F:
|
||||
wrapper = runtime_decorator(func)
|
||||
_stamp_human_feedback_metadata(wrapper, func, config)
|
||||
return wrapper
|
||||
func.__human_feedback_config__ = config # type: ignore[attr-defined]
|
||||
return func
|
||||
|
||||
return decorator
|
||||
|
||||
@@ -8,6 +8,7 @@ from crewai.flow.dsl._types import FlowMethodDecorator, FlowTrigger
|
||||
from crewai.flow.dsl._utils import (
|
||||
P,
|
||||
R,
|
||||
_method_action,
|
||||
_set_flow_method_definition,
|
||||
)
|
||||
from crewai.flow.flow_definition import FlowMethodDefinition
|
||||
@@ -45,7 +46,11 @@ def listen(condition: FlowTrigger) -> FlowMethodDecorator:
|
||||
wrapper = ListenMethod(func)
|
||||
|
||||
_set_flow_method_definition(
|
||||
wrapper, FlowMethodDefinition(listen=_to_definition_condition(condition))
|
||||
wrapper,
|
||||
FlowMethodDefinition(
|
||||
do=_method_action(func),
|
||||
listen=_to_definition_condition(condition),
|
||||
),
|
||||
)
|
||||
return wrapper
|
||||
|
||||
|
||||
@@ -19,6 +19,7 @@ from crewai.flow.dsl._types import FlowMethodDecorator, FlowTrigger
|
||||
from crewai.flow.dsl._utils import (
|
||||
P,
|
||||
R,
|
||||
_method_action,
|
||||
_set_flow_method_definition,
|
||||
)
|
||||
from crewai.flow.flow_definition import FlowMethodDefinition
|
||||
@@ -148,6 +149,7 @@ def router(
|
||||
_set_flow_method_definition(
|
||||
wrapper,
|
||||
FlowMethodDefinition(
|
||||
do=_method_action(func),
|
||||
listen=_to_definition_condition(condition),
|
||||
router=True,
|
||||
emit=router_events or None,
|
||||
|
||||
@@ -8,6 +8,7 @@ from crewai.flow.dsl._types import FlowMethodDecorator, FlowTrigger
|
||||
from crewai.flow.dsl._utils import (
|
||||
P,
|
||||
R,
|
||||
_method_action,
|
||||
_set_flow_method_definition,
|
||||
)
|
||||
from crewai.flow.flow_definition import FlowMethodDefinition
|
||||
@@ -53,13 +54,17 @@ def start(
|
||||
def decorator(func: Callable[P, R]) -> StartMethod[P, R]:
|
||||
wrapper = StartMethod(func)
|
||||
|
||||
if condition is not None:
|
||||
_set_flow_method_definition(
|
||||
wrapper,
|
||||
FlowMethodDefinition(start=_to_definition_condition(condition)),
|
||||
)
|
||||
else:
|
||||
_set_flow_method_definition(wrapper, FlowMethodDefinition(start=True))
|
||||
_set_flow_method_definition(
|
||||
wrapper,
|
||||
FlowMethodDefinition(
|
||||
do=_method_action(func),
|
||||
start=(
|
||||
_to_definition_condition(condition)
|
||||
if condition is not None
|
||||
else True
|
||||
),
|
||||
),
|
||||
)
|
||||
return wrapper
|
||||
|
||||
return cast(FlowMethodDecorator, decorator)
|
||||
|
||||
@@ -8,6 +8,7 @@ from pydantic import BaseModel
|
||||
from typing_extensions import TypeIs
|
||||
|
||||
from crewai.flow.flow_definition import (
|
||||
FlowActionDefinition,
|
||||
FlowConfigDefinition,
|
||||
FlowConversationalDefinition,
|
||||
FlowConversationalRouterDefinition,
|
||||
@@ -17,6 +18,7 @@ from crewai.flow.flow_definition import (
|
||||
FlowMethodDefinition,
|
||||
FlowPersistenceDefinition,
|
||||
FlowStateDefinition,
|
||||
_object_ref,
|
||||
)
|
||||
from crewai.flow.flow_wrappers import (
|
||||
FlowMethod,
|
||||
@@ -34,15 +36,12 @@ _FLOW_METHOD_METADATA_ATTRS = [
|
||||
"__flow_method_definition__",
|
||||
"__flow_persistence_config__",
|
||||
"__human_feedback_config__",
|
||||
"_human_feedback_llm",
|
||||
]
|
||||
|
||||
|
||||
def is_flow_method(obj: Any) -> TypeIs[FlowMethod[Any, Any]]:
|
||||
"""Check if the object carries Flow method wrapper metadata."""
|
||||
return hasattr(obj, "__is_flow_method__") or hasattr(
|
||||
obj, _FLOW_METHOD_DEFINITION_ATTR
|
||||
)
|
||||
return hasattr(obj, _FLOW_METHOD_DEFINITION_ATTR)
|
||||
|
||||
|
||||
def _should_include_flow_method(flow_class: type, method: Any) -> bool:
|
||||
@@ -80,10 +79,13 @@ def _stamp_inherited_conversational_metadata(
|
||||
for attr in _FLOW_METHOD_METADATA_ATTRS:
|
||||
if hasattr(inherited, attr):
|
||||
setattr(method, attr, getattr(inherited, attr))
|
||||
method.__is_flow_method__ = True
|
||||
return method
|
||||
|
||||
|
||||
def _method_action(method: Any) -> FlowActionDefinition:
|
||||
return FlowActionDefinition(ref=f"{method.__module__}:{method.__qualname__}")
|
||||
|
||||
|
||||
def _set_flow_method_definition(
|
||||
wrapper: FlowMethod[P, R],
|
||||
definition: FlowMethodDefinition,
|
||||
@@ -100,13 +102,6 @@ def _get_flow_method_definition(method: Any) -> FlowMethodDefinition | None:
|
||||
return None
|
||||
|
||||
|
||||
def _object_ref(value: Any) -> str:
|
||||
target = value if isinstance(value, type) else type(value)
|
||||
module = getattr(target, "__module__", "")
|
||||
qualname = getattr(target, "__qualname__", getattr(target, "__name__", ""))
|
||||
return f"{module}:{qualname}" if module and qualname else repr(value)
|
||||
|
||||
|
||||
def _is_json_serializable(value: Any) -> bool:
|
||||
try:
|
||||
json.dumps(value)
|
||||
@@ -214,16 +209,22 @@ def _build_config_definition(
|
||||
) -> FlowConfigDefinition:
|
||||
config_field_names = set(FlowConfigDefinition.model_fields)
|
||||
field_defaults = {
|
||||
name: field.default
|
||||
name: field.get_default(call_default_factory=True)
|
||||
for name, field in getattr(flow_class, "model_fields", {}).items()
|
||||
if name in config_field_names
|
||||
}
|
||||
values: dict[str, Any] = {}
|
||||
for field_name, default in field_defaults.items():
|
||||
value = getattr(flow_class, field_name, default)
|
||||
values[field_name] = _serialize_static_value(
|
||||
value, diagnostics, f"config.{field_name}"
|
||||
)
|
||||
if field_name == "input_provider":
|
||||
# A string value is already a ref; only live objects degrade.
|
||||
values[field_name] = (
|
||||
value if value is None or isinstance(value, str) else _object_ref(value)
|
||||
)
|
||||
else:
|
||||
values[field_name] = _serialize_static_value(
|
||||
value, diagnostics, f"config.{field_name}"
|
||||
)
|
||||
return FlowConfigDefinition(**values)
|
||||
|
||||
|
||||
@@ -239,38 +240,31 @@ def _build_human_feedback_definition(
|
||||
return FlowHumanFeedbackDefinition(
|
||||
message=str(config.message),
|
||||
emit=[str(value) for value in emit] if emit is not None else None,
|
||||
llm=_serialize_static_value(
|
||||
getattr(config, "llm", None), diagnostics, f"{path}.llm"
|
||||
),
|
||||
# llm and provider stay live: the engine consumes them in-process and
|
||||
# the contract degrades them to serializable forms at JSON dump time.
|
||||
llm=getattr(config, "llm", None),
|
||||
default_outcome=getattr(config, "default_outcome", None),
|
||||
metadata=_serialize_static_value(
|
||||
getattr(config, "metadata", None), diagnostics, f"{path}.metadata"
|
||||
),
|
||||
provider=_serialize_static_value(
|
||||
getattr(config, "provider", None), diagnostics, f"{path}.provider"
|
||||
),
|
||||
provider=getattr(config, "provider", None),
|
||||
learn=bool(getattr(config, "learn", False)),
|
||||
learn_source=str(getattr(config, "learn_source", "hitl")),
|
||||
learn_strict=bool(getattr(config, "learn_strict", False)),
|
||||
)
|
||||
|
||||
|
||||
def _build_persistence_definition(
|
||||
value: Any,
|
||||
diagnostics: list[FlowDefinitionDiagnostic],
|
||||
path: str,
|
||||
) -> FlowPersistenceDefinition | None:
|
||||
def _build_persistence_definition(value: Any) -> FlowPersistenceDefinition | None:
|
||||
config = getattr(value, "__flow_persistence_config__", None)
|
||||
if config is None:
|
||||
return None
|
||||
persistence = getattr(config, "persistence", None)
|
||||
verbose = bool(getattr(config, "verbose", False))
|
||||
return FlowPersistenceDefinition(
|
||||
enabled=True,
|
||||
verbose=verbose,
|
||||
persistence=_serialize_static_value(
|
||||
persistence, diagnostics, f"{path}.persistence"
|
||||
),
|
||||
verbose=bool(getattr(config, "verbose", False)),
|
||||
# The backend stays live: the engine persists through the exact
|
||||
# instance the user configured; the contract degrades it to a
|
||||
# serialized config at JSON dump time.
|
||||
persistence=getattr(config, "persistence", None),
|
||||
)
|
||||
|
||||
|
||||
@@ -373,9 +367,11 @@ def _build_method_definition(
|
||||
) -> FlowMethodDefinition:
|
||||
fragment = _get_flow_method_definition(method)
|
||||
if fragment is None:
|
||||
method_definition = FlowMethodDefinition()
|
||||
method_definition = FlowMethodDefinition(do=_method_action(method))
|
||||
else:
|
||||
method_definition = fragment.model_copy(deep=True)
|
||||
method_definition = fragment.model_copy(
|
||||
deep=True, update={"do": _method_action(method)}
|
||||
)
|
||||
|
||||
human_feedback = _build_human_feedback_definition(
|
||||
method, diagnostics, f"{path}.human_feedback"
|
||||
@@ -386,9 +382,7 @@ def _build_method_definition(
|
||||
method_definition.router = True
|
||||
method_definition.emit = None
|
||||
|
||||
method_definition.persist = _build_persistence_definition(
|
||||
method, diagnostics, f"{path}.persist"
|
||||
)
|
||||
method_definition.persist = _build_persistence_definition(method)
|
||||
|
||||
return method_definition
|
||||
|
||||
@@ -472,7 +466,7 @@ def _build_flow_definition_from_class(
|
||||
description=description,
|
||||
state=_build_state_definition(flow_class, diagnostics),
|
||||
config=_build_config_definition(flow_class, diagnostics),
|
||||
persist=_build_persistence_definition(flow_class, diagnostics, "persist"),
|
||||
persist=_build_persistence_definition(flow_class),
|
||||
conversational=_build_conversational_definition(flow_class, diagnostics),
|
||||
methods=methods,
|
||||
diagnostics=diagnostics,
|
||||
|
||||
@@ -13,7 +13,7 @@ import json
|
||||
import logging
|
||||
from typing import Any, Literal as TypingLiteral
|
||||
|
||||
from pydantic import BaseModel, ConfigDict, Field
|
||||
from pydantic import BaseModel, ConfigDict, Field, field_serializer, model_validator
|
||||
import yaml
|
||||
|
||||
from crewai.flow.conversational_definition import (
|
||||
@@ -27,6 +27,7 @@ logger = logging.getLogger(__name__)
|
||||
FlowDefinitionCondition = str | dict[str, Any]
|
||||
|
||||
__all__ = [
|
||||
"FlowActionDefinition",
|
||||
"FlowConfigDefinition",
|
||||
"FlowConversationalDefinition",
|
||||
"FlowConversationalRouterDefinition",
|
||||
@@ -40,6 +41,14 @@ __all__ = [
|
||||
]
|
||||
|
||||
|
||||
def _object_ref(value: Any) -> str:
|
||||
"""Format a class or instance as the canonical ``module:qualname`` ref."""
|
||||
target = value if isinstance(value, type) else type(value)
|
||||
module = getattr(target, "__module__", "")
|
||||
qualname = getattr(target, "__qualname__", getattr(target, "__name__", ""))
|
||||
return f"{module}:{qualname}" if module and qualname else repr(value)
|
||||
|
||||
|
||||
class FlowDefinitionDiagnostic(BaseModel):
|
||||
"""A non-fatal Flow Definition build or validation diagnostic."""
|
||||
|
||||
@@ -52,9 +61,10 @@ class FlowDefinitionDiagnostic(BaseModel):
|
||||
class FlowStateDefinition(BaseModel):
|
||||
"""Static description of a Flow state contract."""
|
||||
|
||||
type: TypingLiteral["dict", "pydantic", "unknown"] = "dict"
|
||||
type: TypingLiteral["dict", "pydantic", "json_schema", "unknown"] = "dict"
|
||||
ref: str | None = None
|
||||
default: Any = None
|
||||
json_schema: dict[str, Any] | None = None
|
||||
default: dict[str, Any] | None = None
|
||||
|
||||
|
||||
class FlowConfigDefinition(BaseModel):
|
||||
@@ -62,22 +72,50 @@ class FlowConfigDefinition(BaseModel):
|
||||
|
||||
tracing: bool | None = None
|
||||
stream: bool = False
|
||||
memory: Any = None
|
||||
input_provider: Any = None
|
||||
memory: dict[str, Any] | None = None
|
||||
input_provider: str | None = None
|
||||
suppress_flow_events: bool = False
|
||||
max_method_calls: int = 100
|
||||
defer_trace_finalization: bool = False
|
||||
checkpoint: bool | dict[str, Any] | None = None
|
||||
|
||||
|
||||
class FlowPersistenceDefinition(BaseModel):
|
||||
"""Static persistence configuration."""
|
||||
"""Static persistence configuration.
|
||||
|
||||
``persistence`` may hold a live backend when the definition is built from
|
||||
a decorated class — the engine then persists through the exact instance
|
||||
the user configured; the JSON/YAML projection degrades it to its
|
||||
serialized config.
|
||||
"""
|
||||
|
||||
enabled: bool = False
|
||||
verbose: bool = False
|
||||
persistence: Any = None
|
||||
|
||||
@field_serializer("persistence", when_used="json")
|
||||
def _serialize_persistence(self, value: Any) -> Any:
|
||||
if value is None or isinstance(value, dict):
|
||||
return value
|
||||
if isinstance(value, BaseModel):
|
||||
try:
|
||||
return value.model_dump(mode="json")
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"Persistence backend %s is not fully serializable; "
|
||||
"preserved import reference only.",
|
||||
_object_ref(value),
|
||||
)
|
||||
return {"ref": _object_ref(value)}
|
||||
|
||||
|
||||
class FlowHumanFeedbackDefinition(BaseModel):
|
||||
"""Static human feedback configuration."""
|
||||
"""Static human feedback configuration.
|
||||
|
||||
``llm`` and ``provider`` may hold live Python objects when the definition
|
||||
is built from a decorated class; the JSON/YAML projection degrades them to
|
||||
a serialized config (``llm``) or a ``module:qualname`` ref (``provider``).
|
||||
"""
|
||||
|
||||
message: str
|
||||
emit: list[str] | None = None
|
||||
@@ -89,10 +127,32 @@ class FlowHumanFeedbackDefinition(BaseModel):
|
||||
learn_source: str = "hitl"
|
||||
learn_strict: bool = False
|
||||
|
||||
@field_serializer("llm", when_used="json")
|
||||
def _serialize_llm(self, value: Any) -> dict[str, Any] | str | None:
|
||||
if value is None or isinstance(value, (str, dict)):
|
||||
return value
|
||||
from crewai.flow.human_feedback import _serialize_llm_for_context
|
||||
|
||||
return _serialize_llm_for_context(value)
|
||||
|
||||
@field_serializer("provider", when_used="json")
|
||||
def _serialize_provider(self, value: Any) -> str | None:
|
||||
if value is None or isinstance(value, str):
|
||||
return value
|
||||
return _object_ref(value)
|
||||
|
||||
|
||||
class FlowActionDefinition(BaseModel):
|
||||
"""What a Flow method node executes, independent of when it fires."""
|
||||
|
||||
call: TypingLiteral["code"] = "code"
|
||||
ref: str
|
||||
|
||||
|
||||
class FlowMethodDefinition(BaseModel):
|
||||
"""Static definition of one Flow method and its execution roles."""
|
||||
|
||||
do: FlowActionDefinition
|
||||
start: bool | FlowDefinitionCondition | None = None
|
||||
listen: FlowDefinitionCondition | None = None
|
||||
router: bool = False
|
||||
@@ -100,6 +160,16 @@ class FlowMethodDefinition(BaseModel):
|
||||
human_feedback: FlowHumanFeedbackDefinition | None = None
|
||||
persist: FlowPersistenceDefinition | None = None
|
||||
|
||||
@model_validator(mode="after")
|
||||
def _canonicalize_human_feedback_routing(self) -> FlowMethodDefinition:
|
||||
# Canonical shape: a method whose human_feedback declares emit
|
||||
# outcomes routes like a router, regardless of how the definition
|
||||
# was authored.
|
||||
if self.human_feedback is not None and self.human_feedback.emit:
|
||||
self.router = True
|
||||
self.emit = None
|
||||
return self
|
||||
|
||||
@property
|
||||
def is_start(self) -> bool:
|
||||
"""Whether this method is a start method.
|
||||
@@ -116,7 +186,9 @@ class FlowDefinition(BaseModel):
|
||||
|
||||
model_config = ConfigDict(populate_by_name=True, arbitrary_types_allowed=True)
|
||||
|
||||
schema_: str = Field(default="crewai.flow/v1", alias="schema")
|
||||
schema_: TypingLiteral["crewai.flow/v1"] = Field(
|
||||
default="crewai.flow/v1", alias="schema"
|
||||
)
|
||||
name: str
|
||||
description: str | None = None
|
||||
state: FlowStateDefinition | None = None
|
||||
|
||||
@@ -83,7 +83,6 @@ class FlowMethod(Generic[P, R]):
|
||||
"__conversational_only__", # gates registration on Flow.conversational
|
||||
"__flow_persistence_config__",
|
||||
"__flow_method_definition__",
|
||||
"_human_feedback_llm", # Live LLM object for HITL resume
|
||||
]:
|
||||
if hasattr(meth, attr):
|
||||
setattr(self, attr, getattr(meth, attr))
|
||||
|
||||
@@ -1,8 +1,11 @@
|
||||
"""Human feedback decorator for Flow methods.
|
||||
"""Human feedback support for Flow methods.
|
||||
|
||||
This module provides the @human_feedback decorator that enables human-in-the-loop
|
||||
workflows within CrewAI Flows. It allows collecting human feedback on method outputs
|
||||
and optionally routing to different listeners based on the feedback.
|
||||
This module backs the @human_feedback decorator that enables human-in-the-loop
|
||||
workflows within CrewAI Flows. The decorator is a pure metadata stamper: it
|
||||
records a :class:`HumanFeedbackConfig` on the method, the Flow definition
|
||||
builder lifts it into ``FlowHumanFeedbackDefinition``, and the Flow engine
|
||||
collects feedback after each decorated method completes, driven by the flow's
|
||||
definition.
|
||||
|
||||
Supports both synchronous (blocking) and asynchronous (non-blocking) feedback
|
||||
collection through the provider parameter.
|
||||
@@ -55,22 +58,18 @@ Example (asynchronous with custom provider):
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from collections.abc import Callable, Sequence
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
from functools import wraps
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Any, TypeVar
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from crewai.flow.flow_wrappers import FlowMethod
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from crewai.flow.async_feedback.types import HumanFeedbackProvider
|
||||
from crewai.flow.flow import Flow
|
||||
from crewai.flow.runtime import Flow
|
||||
from crewai.llms.base_llm import BaseLLM
|
||||
|
||||
|
||||
@@ -160,8 +159,8 @@ class HumanFeedbackResult:
|
||||
class HumanFeedbackConfig:
|
||||
"""Configuration for the @human_feedback decorator.
|
||||
|
||||
Stores the parameters passed to the decorator for later use during
|
||||
method execution and for introspection by visualization tools.
|
||||
Stores the parameters passed to the decorator for later use by the
|
||||
Flow definition builder and for introspection by visualization tools.
|
||||
|
||||
Attributes:
|
||||
message: The message shown to the human when requesting feedback.
|
||||
@@ -183,19 +182,6 @@ class HumanFeedbackConfig:
|
||||
learn_strict: bool = False
|
||||
|
||||
|
||||
class HumanFeedbackMethod(FlowMethod[Any, Any]):
|
||||
"""Wrapper for methods decorated with @human_feedback.
|
||||
|
||||
This wrapper extends FlowMethod to add human feedback specific attributes
|
||||
used by the FlowDefinition builder and runtime feedback handling.
|
||||
|
||||
Attributes:
|
||||
__human_feedback_config__: The HumanFeedbackConfig for this method.
|
||||
"""
|
||||
|
||||
__human_feedback_config__: HumanFeedbackConfig | None = None
|
||||
|
||||
|
||||
class PreReviewResult(BaseModel):
|
||||
"""Structured output from the HITL pre-review LLM call."""
|
||||
|
||||
@@ -217,17 +203,11 @@ class DistilledLessons(BaseModel):
|
||||
)
|
||||
|
||||
|
||||
def _build_human_feedback_runtime_decorator(
|
||||
message: str,
|
||||
emit: Sequence[str] | None = None,
|
||||
llm: str | BaseLLM | None = "gpt-4o-mini",
|
||||
default_outcome: str | None = None,
|
||||
metadata: dict[str, Any] | None = None,
|
||||
provider: HumanFeedbackProvider | None = None,
|
||||
learn: bool = False,
|
||||
learn_source: str = "hitl",
|
||||
learn_strict: bool = False,
|
||||
) -> Callable[[F], F]:
|
||||
def _validate_human_feedback_options(
|
||||
emit: Sequence[str] | None,
|
||||
llm: Any,
|
||||
default_outcome: str | None,
|
||||
) -> None:
|
||||
if emit is not None:
|
||||
if not llm:
|
||||
raise ValueError(
|
||||
@@ -244,295 +224,139 @@ def _build_human_feedback_runtime_decorator(
|
||||
elif default_outcome is not None:
|
||||
raise ValueError("default_outcome requires emit to be specified.")
|
||||
|
||||
def decorator(func: F) -> F:
|
||||
def _get_hitl_prompt(key: str) -> str:
|
||||
from crewai.utilities.i18n import I18N_DEFAULT
|
||||
|
||||
return I18N_DEFAULT.slice(key)
|
||||
def _get_hitl_prompt(key: str) -> str:
|
||||
from crewai.utilities.i18n import I18N_DEFAULT
|
||||
|
||||
def _resolve_llm_instance() -> Any:
|
||||
if llm is None:
|
||||
from crewai.llm import LLM
|
||||
return I18N_DEFAULT.slice(key)
|
||||
|
||||
return LLM(model="gpt-4o-mini")
|
||||
if isinstance(llm, str):
|
||||
from crewai.llm import LLM
|
||||
|
||||
return LLM(model=llm)
|
||||
return llm # already a BaseLLM instance
|
||||
def _resolve_llm_instance(llm: Any) -> Any:
|
||||
from crewai.llm import LLM
|
||||
|
||||
def _pre_review_with_lessons(
|
||||
flow_instance: Flow[Any], method_output: Any
|
||||
) -> Any:
|
||||
try:
|
||||
mem = flow_instance.memory
|
||||
if mem is None:
|
||||
return method_output
|
||||
query = f"human feedback lessons for {func.__name__}: {method_output!s}"
|
||||
matches = mem.recall(query, source=learn_source)
|
||||
if not matches:
|
||||
return method_output
|
||||
if llm is None:
|
||||
return LLM(model="gpt-4o-mini")
|
||||
if isinstance(llm, str):
|
||||
return LLM(model=llm)
|
||||
if isinstance(llm, dict):
|
||||
deserialized = _deserialize_llm_from_context(llm)
|
||||
return deserialized if deserialized is not None else LLM(model="gpt-4o-mini")
|
||||
return llm # already a BaseLLM instance
|
||||
|
||||
lessons = "\n".join(f"- {m.record.content}" for m in matches)
|
||||
llm_inst = _resolve_llm_instance()
|
||||
prompt = _get_hitl_prompt("hitl_pre_review_user").format(
|
||||
output=str(method_output),
|
||||
lessons=lessons,
|
||||
)
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": _get_hitl_prompt("hitl_pre_review_system"),
|
||||
},
|
||||
{"role": "user", "content": prompt},
|
||||
]
|
||||
if getattr(llm_inst, "supports_function_calling", lambda: False)():
|
||||
response = llm_inst.call(messages, response_model=PreReviewResult)
|
||||
if isinstance(response, PreReviewResult):
|
||||
return response.improved_output
|
||||
return PreReviewResult.model_validate(response).improved_output
|
||||
reviewed = llm_inst.call(messages)
|
||||
return reviewed if isinstance(reviewed, str) else str(reviewed)
|
||||
except Exception:
|
||||
if learn_strict:
|
||||
logger.warning(
|
||||
"HITL pre-review failed for %s; re-raising (learn_strict=True)",
|
||||
func.__name__,
|
||||
exc_info=True,
|
||||
)
|
||||
raise
|
||||
logger.warning(
|
||||
"HITL pre-review failed for %s; falling back to raw output",
|
||||
func.__name__,
|
||||
exc_info=True,
|
||||
)
|
||||
return method_output
|
||||
|
||||
def _distill_and_store_lessons(
|
||||
flow_instance: Flow[Any], method_output: Any, raw_feedback: str
|
||||
) -> None:
|
||||
try:
|
||||
mem = flow_instance.memory
|
||||
if mem is None:
|
||||
return
|
||||
llm_inst = _resolve_llm_instance()
|
||||
prompt = _get_hitl_prompt("hitl_distill_user").format(
|
||||
method_name=func.__name__,
|
||||
output=str(method_output),
|
||||
feedback=raw_feedback,
|
||||
)
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": _get_hitl_prompt("hitl_distill_system"),
|
||||
},
|
||||
{"role": "user", "content": prompt},
|
||||
]
|
||||
def _pre_review_with_lessons(
|
||||
flow_instance: Flow[Any],
|
||||
method_name: str,
|
||||
method_output: Any,
|
||||
*,
|
||||
llm: Any,
|
||||
learn_source: str,
|
||||
learn_strict: bool,
|
||||
) -> Any:
|
||||
try:
|
||||
mem = flow_instance.memory
|
||||
if mem is None:
|
||||
return method_output
|
||||
query = f"human feedback lessons for {method_name}: {method_output!s}"
|
||||
matches = mem.recall(query, source=learn_source)
|
||||
if not matches:
|
||||
return method_output
|
||||
|
||||
lessons: list[str] = []
|
||||
if getattr(llm_inst, "supports_function_calling", lambda: False)():
|
||||
response = llm_inst.call(messages, response_model=DistilledLessons)
|
||||
if isinstance(response, DistilledLessons):
|
||||
lessons = response.lessons
|
||||
else:
|
||||
lessons = DistilledLessons.model_validate(response).lessons
|
||||
else:
|
||||
response = llm_inst.call(messages)
|
||||
if isinstance(response, str):
|
||||
lessons = [
|
||||
line.strip("- ").strip()
|
||||
for line in response.strip().split("\n")
|
||||
if line.strip() and line.strip() != "NONE"
|
||||
]
|
||||
|
||||
if lessons:
|
||||
mem.remember_many(lessons, source=learn_source) # type: ignore[union-attr]
|
||||
except Exception:
|
||||
if learn_strict:
|
||||
logger.warning(
|
||||
"HITL lesson distillation failed for %s; re-raising (learn_strict=True)",
|
||||
func.__name__,
|
||||
exc_info=True,
|
||||
)
|
||||
raise
|
||||
logger.warning(
|
||||
"HITL lesson distillation failed for %s; no lessons stored",
|
||||
func.__name__,
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
def _build_feedback_context(
|
||||
flow_instance: Flow[Any], method_output: Any
|
||||
) -> tuple[Any, Any]:
|
||||
from crewai.flow.async_feedback.types import PendingFeedbackContext
|
||||
|
||||
context = PendingFeedbackContext(
|
||||
flow_id=flow_instance.flow_id or "unknown",
|
||||
flow_class=f"{flow_instance.__class__.__module__}.{flow_instance.__class__.__name__}",
|
||||
method_name=func.__name__,
|
||||
method_output=method_output,
|
||||
message=message,
|
||||
emit=list(emit) if emit else None,
|
||||
default_outcome=default_outcome,
|
||||
metadata=metadata or {},
|
||||
llm=llm if isinstance(llm, str) else _serialize_llm_for_context(llm),
|
||||
lessons = "\n".join(f"- {m.record.content}" for m in matches)
|
||||
llm_inst = _resolve_llm_instance(llm)
|
||||
prompt = _get_hitl_prompt("hitl_pre_review_user").format(
|
||||
output=str(method_output),
|
||||
lessons=lessons,
|
||||
)
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": _get_hitl_prompt("hitl_pre_review_system"),
|
||||
},
|
||||
{"role": "user", "content": prompt},
|
||||
]
|
||||
if getattr(llm_inst, "supports_function_calling", lambda: False)():
|
||||
response = llm_inst.call(messages, response_model=PreReviewResult)
|
||||
if isinstance(response, PreReviewResult):
|
||||
return response.improved_output
|
||||
return PreReviewResult.model_validate(response).improved_output
|
||||
reviewed = llm_inst.call(messages)
|
||||
return reviewed if isinstance(reviewed, str) else str(reviewed)
|
||||
except Exception:
|
||||
if learn_strict:
|
||||
logger.warning(
|
||||
"HITL pre-review failed for %s; re-raising (learn_strict=True)",
|
||||
method_name,
|
||||
exc_info=True,
|
||||
)
|
||||
raise
|
||||
logger.warning(
|
||||
"HITL pre-review failed for %s; falling back to raw output",
|
||||
method_name,
|
||||
exc_info=True,
|
||||
)
|
||||
return method_output
|
||||
|
||||
effective_provider = provider
|
||||
if effective_provider is None:
|
||||
from crewai.flow.flow_config import flow_config
|
||||
|
||||
effective_provider = flow_config.hitl_provider
|
||||
def _distill_and_store_lessons(
|
||||
flow_instance: Flow[Any],
|
||||
method_name: str,
|
||||
method_output: Any,
|
||||
raw_feedback: str,
|
||||
*,
|
||||
llm: Any,
|
||||
learn_source: str,
|
||||
learn_strict: bool,
|
||||
) -> None:
|
||||
try:
|
||||
mem = flow_instance.memory
|
||||
if mem is None:
|
||||
return
|
||||
llm_inst = _resolve_llm_instance(llm)
|
||||
prompt = _get_hitl_prompt("hitl_distill_user").format(
|
||||
method_name=method_name,
|
||||
output=str(method_output),
|
||||
feedback=raw_feedback,
|
||||
)
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": _get_hitl_prompt("hitl_distill_system"),
|
||||
},
|
||||
{"role": "user", "content": prompt},
|
||||
]
|
||||
|
||||
return context, effective_provider
|
||||
|
||||
def _request_feedback(flow_instance: Flow[Any], method_output: Any) -> str:
|
||||
context, effective_provider = _build_feedback_context(
|
||||
flow_instance, method_output
|
||||
)
|
||||
|
||||
if effective_provider is not None:
|
||||
feedback_result = effective_provider.request_feedback(
|
||||
context, flow_instance
|
||||
)
|
||||
if asyncio.iscoroutine(feedback_result):
|
||||
raise TypeError(
|
||||
f"Provider {type(effective_provider).__name__}.request_feedback() "
|
||||
"returned a coroutine in a sync flow method. Use an async flow "
|
||||
"method or a synchronous provider."
|
||||
)
|
||||
return str(feedback_result)
|
||||
return flow_instance._request_human_feedback(
|
||||
message=message,
|
||||
output=method_output,
|
||||
metadata=metadata,
|
||||
emit=emit,
|
||||
)
|
||||
|
||||
async def _request_feedback_async(
|
||||
flow_instance: Flow[Any], method_output: Any
|
||||
) -> str:
|
||||
context, effective_provider = _build_feedback_context(
|
||||
flow_instance, method_output
|
||||
)
|
||||
|
||||
if effective_provider is not None:
|
||||
feedback_result = effective_provider.request_feedback(
|
||||
context, flow_instance
|
||||
)
|
||||
if asyncio.iscoroutine(feedback_result):
|
||||
return str(await feedback_result)
|
||||
return str(feedback_result)
|
||||
return flow_instance._request_human_feedback(
|
||||
message=message,
|
||||
output=method_output,
|
||||
metadata=metadata,
|
||||
emit=emit,
|
||||
)
|
||||
|
||||
def _process_feedback(
|
||||
flow_instance: Flow[Any],
|
||||
method_output: Any,
|
||||
raw_feedback: str,
|
||||
) -> HumanFeedbackResult | str:
|
||||
collapsed_outcome: str | None = None
|
||||
|
||||
if not raw_feedback.strip():
|
||||
if default_outcome:
|
||||
collapsed_outcome = default_outcome
|
||||
elif emit:
|
||||
collapsed_outcome = emit[0]
|
||||
elif emit:
|
||||
if llm is not None:
|
||||
collapsed_outcome = flow_instance._collapse_to_outcome(
|
||||
feedback=raw_feedback,
|
||||
outcomes=emit,
|
||||
llm=llm,
|
||||
)
|
||||
else:
|
||||
collapsed_outcome = emit[0]
|
||||
|
||||
result = HumanFeedbackResult(
|
||||
output=method_output,
|
||||
feedback=raw_feedback,
|
||||
outcome=collapsed_outcome,
|
||||
timestamp=datetime.now(),
|
||||
method_name=func.__name__,
|
||||
metadata=metadata or {},
|
||||
)
|
||||
|
||||
flow_instance.human_feedback_history.append(result)
|
||||
flow_instance.last_human_feedback = result
|
||||
|
||||
if emit:
|
||||
if collapsed_outcome is None:
|
||||
collapsed_outcome = default_outcome or emit[0]
|
||||
result.outcome = collapsed_outcome
|
||||
return collapsed_outcome
|
||||
return result
|
||||
|
||||
if asyncio.iscoroutinefunction(func):
|
||||
|
||||
@wraps(func)
|
||||
async def async_wrapper(self: Flow[Any], *args: Any, **kwargs: Any) -> Any:
|
||||
method_output = await func(self, *args, **kwargs)
|
||||
|
||||
if learn and getattr(self, "memory", None) is not None:
|
||||
method_output = _pre_review_with_lessons(self, method_output)
|
||||
|
||||
raw_feedback = await _request_feedback_async(self, method_output)
|
||||
result = _process_feedback(self, method_output, raw_feedback)
|
||||
|
||||
if (
|
||||
learn
|
||||
and getattr(self, "memory", None) is not None
|
||||
and raw_feedback.strip()
|
||||
):
|
||||
_distill_and_store_lessons(self, method_output, raw_feedback)
|
||||
|
||||
# Stash the real method output for final flow result when emit is set:
|
||||
# result is the collapsed outcome string for routing, but we preserve the
|
||||
# actual method output as the flow's final result. Uses per-method dict for
|
||||
# concurrency safety and to handle None returns.
|
||||
if emit:
|
||||
self._human_feedback_method_outputs[func.__name__] = method_output
|
||||
|
||||
return result
|
||||
|
||||
wrapper: Any = async_wrapper
|
||||
lessons: list[str] = []
|
||||
if getattr(llm_inst, "supports_function_calling", lambda: False)():
|
||||
response = llm_inst.call(messages, response_model=DistilledLessons)
|
||||
if isinstance(response, DistilledLessons):
|
||||
lessons = response.lessons
|
||||
else:
|
||||
lessons = DistilledLessons.model_validate(response).lessons
|
||||
else:
|
||||
response = llm_inst.call(messages)
|
||||
if isinstance(response, str):
|
||||
lessons = [
|
||||
line.strip("- ").strip()
|
||||
for line in response.strip().split("\n")
|
||||
if line.strip() and line.strip() != "NONE"
|
||||
]
|
||||
|
||||
@wraps(func)
|
||||
def sync_wrapper(self: Flow[Any], *args: Any, **kwargs: Any) -> Any:
|
||||
method_output = func(self, *args, **kwargs)
|
||||
|
||||
if learn and getattr(self, "memory", None) is not None:
|
||||
method_output = _pre_review_with_lessons(self, method_output)
|
||||
|
||||
raw_feedback = _request_feedback(self, method_output)
|
||||
result = _process_feedback(self, method_output, raw_feedback)
|
||||
|
||||
if (
|
||||
learn
|
||||
and getattr(self, "memory", None) is not None
|
||||
and raw_feedback.strip()
|
||||
):
|
||||
_distill_and_store_lessons(self, method_output, raw_feedback)
|
||||
|
||||
# Stash the real method output for final flow result when emit is set:
|
||||
# result is the collapsed outcome string for routing, but we preserve the
|
||||
# actual method output as the flow's final result. Uses per-method dict for
|
||||
# concurrency safety and to handle None returns.
|
||||
if emit:
|
||||
self._human_feedback_method_outputs[func.__name__] = method_output
|
||||
|
||||
return result
|
||||
|
||||
wrapper = sync_wrapper
|
||||
|
||||
return wrapper # type: ignore[no-any-return]
|
||||
|
||||
return decorator
|
||||
if lessons:
|
||||
mem.remember_many(lessons, source=learn_source) # type: ignore[union-attr]
|
||||
except Exception:
|
||||
if learn_strict:
|
||||
logger.warning(
|
||||
"HITL lesson distillation failed for %s; re-raising (learn_strict=True)",
|
||||
method_name,
|
||||
exc_info=True,
|
||||
)
|
||||
raise
|
||||
logger.warning(
|
||||
"HITL lesson distillation failed for %s; no lessons stored",
|
||||
method_name,
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
|
||||
def human_feedback(
|
||||
|
||||
@@ -24,12 +24,10 @@ Example:
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from collections.abc import Callable
|
||||
import functools
|
||||
import logging
|
||||
from types import SimpleNamespace
|
||||
from typing import TYPE_CHECKING, Any, Final, TypeVar, cast
|
||||
from typing import TYPE_CHECKING, Any, Final, TypeVar
|
||||
|
||||
from crewai_core.printer import PRINTER
|
||||
from pydantic import BaseModel
|
||||
@@ -39,7 +37,7 @@ from crewai.flow.persistence.factory import default_flow_persistence
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from crewai.flow.flow import Flow
|
||||
from crewai.flow.runtime import Flow
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -66,14 +64,6 @@ def _stamp_persistence_metadata(
|
||||
)
|
||||
|
||||
|
||||
_PRESERVED_FLOW_ATTRS: Final[tuple[str, ...]] = (
|
||||
"__human_feedback_config__",
|
||||
"__flow_persistence_config__",
|
||||
"__flow_method_definition__",
|
||||
"_human_feedback_llm",
|
||||
)
|
||||
|
||||
|
||||
class PersistenceDecorator:
|
||||
"""Class to handle flow state persistence with consistent logging."""
|
||||
|
||||
@@ -164,6 +154,10 @@ def persist(
|
||||
states. When applied at the method level, it persists only that method's
|
||||
state.
|
||||
|
||||
The decorator is a pure metadata stamper: it records the persistence
|
||||
configuration on the class or method, and the Flow engine saves state
|
||||
after each persisted method completes, driven by the flow's definition.
|
||||
|
||||
Args:
|
||||
persistence: Optional FlowPersistence implementation to use.
|
||||
If not provided, uses ``default_flow_persistence()`` (the
|
||||
@@ -191,122 +185,7 @@ def persist(
|
||||
persistence if persistence is not None else default_flow_persistence()
|
||||
)
|
||||
|
||||
if isinstance(target, type):
|
||||
_stamp_persistence_metadata(target, actual_persistence, verbose)
|
||||
original_init = target.__init__ # type: ignore[misc]
|
||||
|
||||
@functools.wraps(original_init)
|
||||
def new_init(self: Any, *args: Any, **kwargs: Any) -> None:
|
||||
if "persistence" not in kwargs:
|
||||
kwargs["persistence"] = actual_persistence
|
||||
original_init(self, *args, **kwargs)
|
||||
|
||||
target.__init__ = new_init # type: ignore[misc]
|
||||
|
||||
# Preserve original methods' decorators
|
||||
original_methods = {
|
||||
name: method
|
||||
for name, method in target.__dict__.items()
|
||||
if callable(method)
|
||||
and (
|
||||
hasattr(method, "__is_flow_method__")
|
||||
or hasattr(method, "__flow_method_definition__")
|
||||
)
|
||||
}
|
||||
|
||||
for name, method in original_methods.items():
|
||||
if asyncio.iscoroutinefunction(method):
|
||||
# Closure captures the current name and method
|
||||
def create_async_wrapper(
|
||||
method_name: str, original_method: Callable[..., Any]
|
||||
) -> Callable[..., Any]:
|
||||
@functools.wraps(original_method)
|
||||
async def method_wrapper(
|
||||
self: Any, *args: Any, **kwargs: Any
|
||||
) -> Any:
|
||||
result = await original_method(self, *args, **kwargs)
|
||||
PersistenceDecorator.persist_state(
|
||||
self, method_name, actual_persistence, verbose
|
||||
)
|
||||
return result
|
||||
|
||||
return method_wrapper
|
||||
|
||||
wrapped = create_async_wrapper(name, method)
|
||||
|
||||
for attr in _PRESERVED_FLOW_ATTRS:
|
||||
if hasattr(method, attr):
|
||||
setattr(wrapped, attr, getattr(method, attr))
|
||||
wrapped.__is_flow_method__ = True # type: ignore[attr-defined]
|
||||
|
||||
setattr(target, name, wrapped)
|
||||
else:
|
||||
|
||||
def create_sync_wrapper(
|
||||
method_name: str, original_method: Callable[..., Any]
|
||||
) -> Callable[..., Any]:
|
||||
@functools.wraps(original_method)
|
||||
def method_wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
|
||||
result = original_method(self, *args, **kwargs)
|
||||
PersistenceDecorator.persist_state(
|
||||
self, method_name, actual_persistence, verbose
|
||||
)
|
||||
return result
|
||||
|
||||
return method_wrapper
|
||||
|
||||
wrapped = create_sync_wrapper(name, method)
|
||||
|
||||
for attr in _PRESERVED_FLOW_ATTRS:
|
||||
if hasattr(method, attr):
|
||||
setattr(wrapped, attr, getattr(method, attr))
|
||||
wrapped.__is_flow_method__ = True # type: ignore[attr-defined]
|
||||
|
||||
setattr(target, name, wrapped)
|
||||
|
||||
return target
|
||||
method = target
|
||||
method.__is_flow_method__ = True # type: ignore[attr-defined]
|
||||
_stamp_persistence_metadata(method, actual_persistence, verbose)
|
||||
|
||||
if asyncio.iscoroutinefunction(method):
|
||||
|
||||
@functools.wraps(method)
|
||||
async def method_async_wrapper(
|
||||
flow_instance: Any, *args: Any, **kwargs: Any
|
||||
) -> T:
|
||||
method_coro = method(flow_instance, *args, **kwargs)
|
||||
if asyncio.iscoroutine(method_coro):
|
||||
result = await method_coro
|
||||
else:
|
||||
result = method_coro
|
||||
PersistenceDecorator.persist_state(
|
||||
flow_instance, method.__name__, actual_persistence, verbose
|
||||
)
|
||||
return cast(T, result)
|
||||
|
||||
for attr in _PRESERVED_FLOW_ATTRS:
|
||||
if hasattr(method, attr):
|
||||
setattr(method_async_wrapper, attr, getattr(method, attr))
|
||||
method_async_wrapper.__is_flow_method__ = True # type: ignore[attr-defined]
|
||||
_stamp_persistence_metadata(
|
||||
method_async_wrapper, actual_persistence, verbose
|
||||
)
|
||||
return cast(Callable[..., T], method_async_wrapper)
|
||||
|
||||
@functools.wraps(method)
|
||||
def method_sync_wrapper(flow_instance: Any, *args: Any, **kwargs: Any) -> T:
|
||||
result = method(flow_instance, *args, **kwargs)
|
||||
PersistenceDecorator.persist_state(
|
||||
flow_instance, method.__name__, actual_persistence, verbose
|
||||
)
|
||||
return result
|
||||
|
||||
for attr in _PRESERVED_FLOW_ATTRS:
|
||||
if hasattr(method, attr):
|
||||
setattr(method_sync_wrapper, attr, getattr(method, attr))
|
||||
method_sync_wrapper.__is_flow_method__ = True # type: ignore[attr-defined]
|
||||
_stamp_persistence_metadata(method_sync_wrapper, actual_persistence, verbose)
|
||||
return cast(Callable[..., T], method_sync_wrapper)
|
||||
_stamp_persistence_metadata(target, actual_persistence, verbose)
|
||||
return target
|
||||
|
||||
return decorator
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
70
lib/crewai/src/crewai/flow/runtime/_resolvers.py
Normal file
70
lib/crewai/src/crewai/flow/runtime/_resolvers.py
Normal file
@@ -0,0 +1,70 @@
|
||||
"""Resolution of FlowDefinition refs (``module:qualname``) into live objects.
|
||||
|
||||
Every ref-shaped value in a definition — ``do`` actions, ``state.ref``,
|
||||
``config.input_provider``, ``human_feedback.provider`` — resolves through
|
||||
:func:`resolve_ref`. Failures are loud and name the field and the ref.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Callable
|
||||
import importlib
|
||||
import inspect
|
||||
from operator import attrgetter
|
||||
from typing import TYPE_CHECKING, Any, cast
|
||||
|
||||
from crewai.flow.flow_definition import FlowActionDefinition
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from crewai.flow.runtime import Flow
|
||||
|
||||
|
||||
class InvalidRefError(ValueError):
|
||||
"""A definition ref that cannot be resolved to a live object."""
|
||||
|
||||
|
||||
def resolve_ref(ref: str, *, field: str) -> Any:
|
||||
"""Import the object a definition's `module:qualname` ref points to."""
|
||||
module_name, _, qualname = ref.partition(":")
|
||||
if "<" in ref or not module_name or not qualname:
|
||||
raise InvalidRefError(
|
||||
f"invalid {field} ref {ref!r}; expected 'module:qualname'"
|
||||
)
|
||||
try:
|
||||
return attrgetter(qualname)(importlib.import_module(module_name))
|
||||
except (ImportError, AttributeError) as e:
|
||||
raise InvalidRefError(f"unresolvable {field} ref {ref!r}") from e
|
||||
|
||||
|
||||
def resolve_instance_ref(ref: str, *, field: str) -> Any:
|
||||
"""Resolve a ref, auto-instantiating a no-arg class into an instance."""
|
||||
target = resolve_ref(ref, field=field)
|
||||
if not inspect.isclass(target):
|
||||
return target
|
||||
try:
|
||||
return target()
|
||||
except Exception as e:
|
||||
raise InvalidRefError(
|
||||
f"cannot instantiate {field} ref {ref!r} without arguments: {e}"
|
||||
) from e
|
||||
|
||||
|
||||
def _resolve_code_action(
|
||||
flow: Flow[Any], action: FlowActionDefinition
|
||||
) -> Callable[..., Any]:
|
||||
ref = action.ref
|
||||
target = resolve_ref(ref, field="do")
|
||||
if not callable(target):
|
||||
raise InvalidRefError(f"invalid do ref {ref!r}; object is not callable")
|
||||
handler = cast(Callable[..., Any], target)
|
||||
if getattr(handler, "__self__", None) is None:
|
||||
handler = handler.__get__(flow, type(flow))
|
||||
return handler
|
||||
|
||||
|
||||
def resolve_action(flow: Flow[Any], action: FlowActionDefinition) -> Callable[..., Any]:
|
||||
"""Turn one `do:` action into the callable the flow runs for that node."""
|
||||
if action.call == "code":
|
||||
return _resolve_code_action(flow, action)
|
||||
raise ValueError(f"unknown call type {action.call!r}")
|
||||
@@ -999,7 +999,11 @@ def _json_schema_to_pydantic_field(
|
||||
if examples:
|
||||
schema_extra["examples"] = examples
|
||||
|
||||
default = ... if is_required else None
|
||||
default = (
|
||||
json_schema["default"]
|
||||
if "default" in json_schema
|
||||
else (... if is_required else None)
|
||||
)
|
||||
|
||||
if isinstance(type_, type) and issubclass(type_, (int, float)):
|
||||
if "minimum" in json_schema:
|
||||
|
||||
@@ -1168,132 +1168,13 @@ class TestAsyncHumanFeedbackEdgeCases:
|
||||
|
||||
|
||||
|
||||
class TestLiveLLMPreservationOnResume:
|
||||
"""Tests for preserving the full LLM config across HITL resume."""
|
||||
|
||||
def test_human_feedback_llm_attribute_set_on_wrapper_with_basellm(self) -> None:
|
||||
"""Test that _human_feedback_llm is set on the wrapper when llm is a BaseLLM instance."""
|
||||
from crewai.llms.base_llm import BaseLLM
|
||||
|
||||
mock_llm = MagicMock(spec=BaseLLM)
|
||||
mock_llm.model = "gemini/gemini-3-flash"
|
||||
|
||||
class TestFlow(Flow):
|
||||
@start()
|
||||
@human_feedback(
|
||||
message="Review:",
|
||||
emit=["approved", "rejected"],
|
||||
llm=mock_llm,
|
||||
)
|
||||
def review(self):
|
||||
return "content"
|
||||
|
||||
flow = TestFlow()
|
||||
method = flow._methods.get("review")
|
||||
assert method is not None
|
||||
assert hasattr(method, "_human_feedback_llm")
|
||||
assert method._human_feedback_llm is mock_llm
|
||||
|
||||
def test_human_feedback_llm_attribute_set_on_wrapper_with_string(self) -> None:
|
||||
"""Test that _human_feedback_llm is set on the wrapper even when llm is a string."""
|
||||
|
||||
class TestFlow(Flow):
|
||||
@start()
|
||||
@human_feedback(
|
||||
message="Review:",
|
||||
emit=["approved", "rejected"],
|
||||
llm="gpt-4o-mini",
|
||||
)
|
||||
def review(self):
|
||||
return "content"
|
||||
|
||||
flow = TestFlow()
|
||||
method = flow._methods.get("review")
|
||||
assert method is not None
|
||||
assert hasattr(method, "_human_feedback_llm")
|
||||
assert method._human_feedback_llm == "gpt-4o-mini"
|
||||
class TestResumeLLMFromSerializedContext:
|
||||
"""Resume rebuilds the collapse LLM from the serialized context alone."""
|
||||
|
||||
@patch("crewai.flow.runtime.crewai_event_bus.emit")
|
||||
def test_resume_async_uses_live_basellm_over_serialized_string(
|
||||
def test_resume_builds_llm_from_serialized_context(
|
||||
self, mock_emit: MagicMock
|
||||
) -> None:
|
||||
"""Test that resume_async uses the live BaseLLM from decorator instead of serialized string.
|
||||
|
||||
This is the main bug fix: when a flow resumes, it should use the fully-configured
|
||||
LLM from the re-imported decorator (with credentials, project, etc.) instead of
|
||||
creating a new LLM from just the model string.
|
||||
"""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
db_path = os.path.join(tmpdir, "test_flows.db")
|
||||
persistence = SQLiteFlowPersistence(db_path)
|
||||
|
||||
from crewai.llms.base_llm import BaseLLM
|
||||
|
||||
# Create a mock BaseLLM with full config (simulating Gemini with service account)
|
||||
live_llm = MagicMock(spec=BaseLLM)
|
||||
live_llm.model = "gemini/gemini-3-flash"
|
||||
|
||||
class TestFlow(Flow):
|
||||
result_path: str = ""
|
||||
|
||||
@start()
|
||||
@human_feedback(
|
||||
message="Approve?",
|
||||
emit=["approved", "rejected"],
|
||||
llm=live_llm,
|
||||
)
|
||||
def review(self):
|
||||
return "content"
|
||||
|
||||
@listen("approved")
|
||||
def handle_approved(self):
|
||||
self.result_path = "approved"
|
||||
return "Approved!"
|
||||
|
||||
context = PendingFeedbackContext(
|
||||
flow_id="live-llm-test",
|
||||
flow_class="TestFlow",
|
||||
method_name="review",
|
||||
method_output="content",
|
||||
message="Approve?",
|
||||
emit=["approved", "rejected"],
|
||||
llm="gemini/gemini-3-flash", # Serialized string, NOT the live object
|
||||
)
|
||||
persistence.save_pending_feedback(
|
||||
flow_uuid="live-llm-test",
|
||||
context=context,
|
||||
state_data={"id": "live-llm-test"},
|
||||
)
|
||||
|
||||
flow = TestFlow.from_pending("live-llm-test", persistence)
|
||||
|
||||
captured_llm = []
|
||||
|
||||
def capture_llm(feedback, outcomes, llm):
|
||||
captured_llm.append(llm)
|
||||
return "approved"
|
||||
|
||||
with patch.object(flow, "_collapse_to_outcome", side_effect=capture_llm):
|
||||
flow.resume("looks good!")
|
||||
|
||||
# NOT the serialized string. The live_llm was captured at class definition
|
||||
# time and stored on the method wrapper as _human_feedback_llm.
|
||||
assert len(captured_llm) == 1
|
||||
# (which is stored on the method's _human_feedback_llm attribute)
|
||||
method = flow._methods.get("review")
|
||||
assert method is not None
|
||||
assert captured_llm[0] is method._human_feedback_llm
|
||||
# And verify it's a BaseLLM instance, not a string
|
||||
assert isinstance(captured_llm[0], BaseLLM)
|
||||
|
||||
@patch("crewai.flow.runtime.crewai_event_bus.emit")
|
||||
def test_resume_async_falls_back_to_serialized_string_when_no_human_feedback_llm(
|
||||
self, mock_emit: MagicMock
|
||||
) -> None:
|
||||
"""Test that resume_async falls back to context.llm when _human_feedback_llm is not available.
|
||||
|
||||
This ensures backward compatibility with flows that were paused before this fix.
|
||||
"""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
db_path = os.path.join(tmpdir, "test_flows.db")
|
||||
persistence = SQLiteFlowPersistence(db_path)
|
||||
@@ -1325,11 +1206,6 @@ class TestLiveLLMPreservationOnResume:
|
||||
|
||||
flow = TestFlow.from_pending("fallback-test", persistence)
|
||||
|
||||
# Remove _human_feedback_llm to simulate old decorator without this attribute
|
||||
method = flow._methods.get("review")
|
||||
if hasattr(method, "_human_feedback_llm"):
|
||||
delattr(method, "_human_feedback_llm")
|
||||
|
||||
captured_llm = []
|
||||
|
||||
def capture_llm(feedback, outcomes, llm):
|
||||
@@ -1343,85 +1219,3 @@ class TestLiveLLMPreservationOnResume:
|
||||
from crewai.llms.base_llm import BaseLLM as BaseLLMClass
|
||||
assert isinstance(captured_llm[0], BaseLLMClass)
|
||||
assert captured_llm[0].model == "gpt-4o-mini"
|
||||
|
||||
@patch("crewai.flow.runtime.crewai_event_bus.emit")
|
||||
def test_resume_async_uses_string_from_context_when_human_feedback_llm_is_string(
|
||||
self, mock_emit: MagicMock
|
||||
) -> None:
|
||||
"""Test that when _human_feedback_llm is a string (not BaseLLM), we still use context.llm.
|
||||
|
||||
String LLM values offer no benefit over the serialized context.llm,
|
||||
so we don't prefer them.
|
||||
"""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
db_path = os.path.join(tmpdir, "test_flows.db")
|
||||
persistence = SQLiteFlowPersistence(db_path)
|
||||
|
||||
class TestFlow(Flow):
|
||||
@start()
|
||||
@human_feedback(
|
||||
message="Approve?",
|
||||
emit=["approved", "rejected"],
|
||||
llm="gpt-4o-mini",
|
||||
)
|
||||
def review(self):
|
||||
return "content"
|
||||
|
||||
context = PendingFeedbackContext(
|
||||
flow_id="string-llm-test",
|
||||
flow_class="TestFlow",
|
||||
method_name="review",
|
||||
method_output="content",
|
||||
message="Approve?",
|
||||
emit=["approved", "rejected"],
|
||||
llm="gpt-4o-mini",
|
||||
)
|
||||
persistence.save_pending_feedback(
|
||||
flow_uuid="string-llm-test",
|
||||
context=context,
|
||||
state_data={"id": "string-llm-test"},
|
||||
)
|
||||
|
||||
flow = TestFlow.from_pending("string-llm-test", persistence)
|
||||
|
||||
method = flow._methods.get("review")
|
||||
assert method._human_feedback_llm == "gpt-4o-mini"
|
||||
|
||||
captured_llm = []
|
||||
|
||||
def capture_llm(feedback, outcomes, llm):
|
||||
captured_llm.append(llm)
|
||||
return "approved"
|
||||
|
||||
with patch.object(flow, "_collapse_to_outcome", side_effect=capture_llm):
|
||||
flow.resume("looks good!")
|
||||
|
||||
# _human_feedback_llm is a string, so resume deserializes context.llm into an LLM instance
|
||||
assert len(captured_llm) == 1
|
||||
from crewai.llms.base_llm import BaseLLM as BaseLLMClass
|
||||
assert isinstance(captured_llm[0], BaseLLMClass)
|
||||
assert captured_llm[0].model == "gpt-4o-mini"
|
||||
|
||||
def test_human_feedback_llm_set_for_async_wrapper(self) -> None:
|
||||
"""Test that _human_feedback_llm is set on async wrapper functions."""
|
||||
import asyncio
|
||||
from crewai.llms.base_llm import BaseLLM
|
||||
|
||||
mock_llm = MagicMock(spec=BaseLLM)
|
||||
mock_llm.model = "gemini/gemini-3-flash"
|
||||
|
||||
class TestFlow(Flow):
|
||||
@start()
|
||||
@human_feedback(
|
||||
message="Review:",
|
||||
emit=["approved", "rejected"],
|
||||
llm=mock_llm,
|
||||
)
|
||||
async def async_review(self):
|
||||
return "content"
|
||||
|
||||
flow = TestFlow()
|
||||
method = flow._methods.get("async_review")
|
||||
assert method is not None
|
||||
assert hasattr(method, "_human_feedback_llm")
|
||||
assert method._human_feedback_llm is mock_llm
|
||||
|
||||
@@ -1157,6 +1157,26 @@ def test_flow_name():
|
||||
assert flow.name == "MyFlow"
|
||||
|
||||
|
||||
def test_flow_custom_name_overrides_class_name_in_events():
|
||||
class InternalFlowClass(Flow):
|
||||
name = "PublicName"
|
||||
|
||||
@start()
|
||||
def begin(self):
|
||||
return "done"
|
||||
|
||||
received = []
|
||||
|
||||
with crewai_event_bus.scoped_handlers():
|
||||
@crewai_event_bus.on(FlowStartedEvent)
|
||||
def handle(source, event):
|
||||
received.append(event)
|
||||
|
||||
InternalFlowClass().kickoff()
|
||||
|
||||
assert received[0].flow_name == "PublicName"
|
||||
|
||||
|
||||
def test_nested_and_or_conditions():
|
||||
"""Test nested conditions like or_(and_(A, B), and_(C, D)).
|
||||
|
||||
|
||||
@@ -36,6 +36,7 @@ def test_flow_public_exports_are_explicit():
|
||||
"start",
|
||||
}
|
||||
assert set(flow_definition.__all__) == {
|
||||
"FlowActionDefinition",
|
||||
"FlowConfigDefinition",
|
||||
"FlowConversationalDefinition",
|
||||
"FlowConversationalRouterDefinition",
|
||||
@@ -629,6 +630,7 @@ def test_flow_definition_preserves_diagnostics_loaded_from_contract():
|
||||
"name": "LoadedDiagnosticsFlow",
|
||||
"methods": {
|
||||
"decision": {
|
||||
"do": {"ref": "loaded_flows:LoadedDiagnosticsFlow.decision"},
|
||||
"router": True,
|
||||
"emit": ["continue"],
|
||||
}
|
||||
@@ -662,6 +664,7 @@ def test_router_start_false_without_listen_reports_missing_trigger():
|
||||
"name": "LoadedFlow",
|
||||
"methods": {
|
||||
"decision": {
|
||||
"do": {"ref": "loaded_flows:LoadedFlow.decision"},
|
||||
"router": True,
|
||||
"start": False,
|
||||
"emit": ["continue"],
|
||||
@@ -740,8 +743,14 @@ def test_static_string_listener_is_allowed_by_contract():
|
||||
"schema": "crewai.flow/v1",
|
||||
"name": "TypoFlow",
|
||||
"methods": {
|
||||
"begin": {"start": True},
|
||||
"handle": {"listen": "begni"},
|
||||
"begin": {
|
||||
"do": {"ref": "loaded_flows:TypoFlow.begin"},
|
||||
"start": True,
|
||||
},
|
||||
"handle": {
|
||||
"do": {"ref": "loaded_flows:TypoFlow.handle"},
|
||||
"listen": "begni",
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
@@ -754,8 +763,15 @@ def test_start_false_not_classified_as_start_method():
|
||||
"schema": "crewai.flow/v1",
|
||||
"name": "ExplicitNonStartFlow",
|
||||
"methods": {
|
||||
"begin": {"start": True},
|
||||
"handle": {"start": False, "listen": "begin"},
|
||||
"begin": {
|
||||
"do": {"ref": "loaded_flows:ExplicitNonStartFlow.begin"},
|
||||
"start": True,
|
||||
},
|
||||
"handle": {
|
||||
"do": {"ref": "loaded_flows:ExplicitNonStartFlow.handle"},
|
||||
"start": False,
|
||||
"listen": "begin",
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
@@ -812,6 +828,7 @@ def test_flow_definition_logs_diagnostics_when_loaded_from_contract(caplog):
|
||||
"name": "LoadedFlow",
|
||||
"methods": {
|
||||
"decision": {
|
||||
"do": {"ref": "loaded_flows:LoadedFlow.decision"},
|
||||
"router": True,
|
||||
"emit": ["continue"],
|
||||
}
|
||||
|
||||
1380
lib/crewai/tests/test_flow_from_definition.py
Normal file
1380
lib/crewai/tests/test_flow_from_definition.py
Normal file
File diff suppressed because it is too large
Load Diff
@@ -77,12 +77,22 @@ class ComplexFlow(Flow):
|
||||
return "complete"
|
||||
|
||||
|
||||
def _attach_flow_definition(flow_class: type[Flow], methods: dict[str, object]) -> None:
|
||||
def _attach_flow_definition(
|
||||
flow_class: type[Flow], methods: dict[str, dict[str, object]]
|
||||
) -> None:
|
||||
flow_class._flow_definition = FlowDefinition.from_dict(
|
||||
{
|
||||
"schema": "crewai.flow/v1",
|
||||
"name": flow_class.__name__,
|
||||
"methods": methods,
|
||||
"methods": {
|
||||
name: {
|
||||
"do": {
|
||||
"ref": f"{flow_class.__module__}:{flow_class.__name__}.{name}"
|
||||
},
|
||||
**spec,
|
||||
}
|
||||
for name, spec in methods.items()
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
@@ -125,13 +135,20 @@ def test_build_flow_structure_from_flow_definition():
|
||||
"schema": "crewai.flow/v1",
|
||||
"name": "DefinedFlow",
|
||||
"methods": {
|
||||
"begin": {"start": True},
|
||||
"begin": {
|
||||
"do": {"ref": "defined_flows:DefinedFlow.begin"},
|
||||
"start": True,
|
||||
},
|
||||
"decide": {
|
||||
"do": {"ref": "defined_flows:DefinedFlow.decide"},
|
||||
"listen": "begin",
|
||||
"router": True,
|
||||
"emit": ["done"],
|
||||
},
|
||||
"finish": {"listen": "done"},
|
||||
"finish": {
|
||||
"do": {"ref": "defined_flows:DefinedFlow.finish"},
|
||||
"listen": "done",
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
@@ -92,8 +92,8 @@ class TestHumanFeedbackValidation:
|
||||
assert hasattr(test_method, "__human_feedback_config__")
|
||||
assert not hasattr(test_method, "__is_router__")
|
||||
|
||||
def test_persist_preserves_human_feedback_llm_attribute(self):
|
||||
"""Test @persist preserves the live LLM stashed by @human_feedback."""
|
||||
def test_persist_preserves_human_feedback_config(self):
|
||||
"""Test @persist preserves the config stamped by @human_feedback."""
|
||||
llm = object()
|
||||
|
||||
@persist()
|
||||
@@ -105,8 +105,8 @@ class TestHumanFeedbackValidation:
|
||||
def test_method(self):
|
||||
return "output"
|
||||
|
||||
assert hasattr(test_method, "_human_feedback_llm")
|
||||
assert test_method._human_feedback_llm is llm
|
||||
assert hasattr(test_method, "__human_feedback_config__")
|
||||
assert test_method.__human_feedback_config__.llm is llm
|
||||
|
||||
|
||||
class TestHumanFeedbackConfig:
|
||||
@@ -481,7 +481,7 @@ class TestHumanFeedbackLearn:
|
||||
with patch.object(
|
||||
flow, "_request_human_feedback", return_value="looks good"
|
||||
):
|
||||
flow.produce()
|
||||
flow.kickoff()
|
||||
|
||||
# memory.recall and memory.remember_many should NOT be called
|
||||
flow.memory.recall.assert_not_called()
|
||||
@@ -516,7 +516,7 @@ class TestHumanFeedbackLearn:
|
||||
)
|
||||
MockLLM.return_value = mock_llm
|
||||
|
||||
flow.produce()
|
||||
flow.kickoff()
|
||||
|
||||
# remember_many should be called with the distilled lesson
|
||||
flow.memory.remember_many.assert_called_once()
|
||||
@@ -570,7 +570,7 @@ class TestHumanFeedbackLearn:
|
||||
]
|
||||
MockLLM.return_value = mock_llm
|
||||
|
||||
flow.produce()
|
||||
flow.kickoff()
|
||||
|
||||
assert captured_output["shown_to_human"] == "draft with citations added"
|
||||
# recall was called to find past lessons
|
||||
@@ -592,7 +592,7 @@ class TestHumanFeedbackLearn:
|
||||
with patch.object(
|
||||
flow, "_request_human_feedback", return_value=""
|
||||
):
|
||||
flow.produce()
|
||||
flow.kickoff()
|
||||
|
||||
flow.memory.remember_many.assert_not_called()
|
||||
|
||||
@@ -645,7 +645,7 @@ class TestHumanFeedbackLearn:
|
||||
mock_llm.call.side_effect = RuntimeError("simulated pre-review failure")
|
||||
MockLLM.return_value = mock_llm
|
||||
|
||||
flow.produce()
|
||||
flow.kickoff()
|
||||
|
||||
assert captured["shown_to_human"] == "raw draft"
|
||||
assert any(
|
||||
@@ -690,7 +690,7 @@ class TestHumanFeedbackLearn:
|
||||
MockLLM.return_value = mock_llm
|
||||
|
||||
with pytest.raises(RuntimeError, match="simulated pre-review failure"):
|
||||
flow.produce()
|
||||
flow.kickoff()
|
||||
|
||||
def test_distillation_failure_logs_and_does_not_block_flow(self, caplog):
|
||||
"""Distillation LLM failure logs a warning but does not break the flow."""
|
||||
@@ -717,7 +717,7 @@ class TestHumanFeedbackLearn:
|
||||
mock_llm.call.side_effect = RuntimeError("simulated distill failure")
|
||||
MockLLM.return_value = mock_llm
|
||||
|
||||
flow.produce() # must not raise
|
||||
flow.kickoff() # must not raise
|
||||
|
||||
flow.memory.remember_many.assert_not_called()
|
||||
assert any(
|
||||
|
||||
@@ -778,77 +778,11 @@ class TestEdgeCases:
|
||||
class TestLLMConfigPreservation:
|
||||
"""Tests that LLM config is preserved through @human_feedback serialization.
|
||||
|
||||
PR #4970 introduced _human_feedback_llm stashing so the live LLM object survives
|
||||
decorator wrapping for same-process resume. The serialization path
|
||||
(_serialize_llm_for_context / _deserialize_llm_from_context) preserves
|
||||
config for cross-process resume.
|
||||
The flow definition keeps the live LLM object for same-process execution.
|
||||
The serialization path (_serialize_llm_for_context /
|
||||
_deserialize_llm_from_context) preserves config for cross-process resume.
|
||||
"""
|
||||
|
||||
def test_human_feedback_llm_stashed_on_wrapper_with_llm_instance(self):
|
||||
"""Test that passing an LLM instance stashes it on the wrapper as _human_feedback_llm."""
|
||||
from crewai.llm import LLM
|
||||
|
||||
llm_instance = LLM(model="gpt-4o-mini", temperature=0.42)
|
||||
|
||||
class ConfigFlow(Flow):
|
||||
@start()
|
||||
@human_feedback(
|
||||
message="Review:",
|
||||
emit=["approved", "rejected"],
|
||||
llm=llm_instance,
|
||||
)
|
||||
def review(self):
|
||||
return "content"
|
||||
|
||||
method = ConfigFlow.review
|
||||
assert hasattr(method, "_human_feedback_llm"), "_human_feedback_llm not found on wrapper"
|
||||
assert method._human_feedback_llm is llm_instance, "_human_feedback_llm is not the same object"
|
||||
|
||||
def test_human_feedback_llm_preserved_on_listen_method(self):
|
||||
"""Test that _human_feedback_llm is preserved when @human_feedback is on a @listen method."""
|
||||
from crewai.llm import LLM
|
||||
|
||||
llm_instance = LLM(model="gpt-4o-mini", temperature=0.7)
|
||||
|
||||
class ListenConfigFlow(Flow):
|
||||
@start()
|
||||
def generate(self):
|
||||
return "draft"
|
||||
|
||||
@listen("generate")
|
||||
@human_feedback(
|
||||
message="Review:",
|
||||
emit=["approved", "rejected"],
|
||||
llm=llm_instance,
|
||||
)
|
||||
def review(self):
|
||||
return "content"
|
||||
|
||||
method = ListenConfigFlow.review
|
||||
assert hasattr(method, "_human_feedback_llm")
|
||||
assert method._human_feedback_llm is llm_instance
|
||||
|
||||
def test_human_feedback_llm_accessible_on_instance(self):
|
||||
"""Test that _human_feedback_llm survives Flow instantiation (bound method access)."""
|
||||
from crewai.llm import LLM
|
||||
|
||||
llm_instance = LLM(model="gpt-4o-mini", temperature=0.42)
|
||||
|
||||
class InstanceFlow(Flow):
|
||||
@start()
|
||||
@human_feedback(
|
||||
message="Review:",
|
||||
emit=["approved", "rejected"],
|
||||
llm=llm_instance,
|
||||
)
|
||||
def review(self):
|
||||
return "content"
|
||||
|
||||
flow = InstanceFlow()
|
||||
instance_method = flow.review
|
||||
assert hasattr(instance_method, "_human_feedback_llm")
|
||||
assert instance_method._human_feedback_llm is llm_instance
|
||||
|
||||
def test_serialize_llm_preserves_config_fields(self):
|
||||
"""Test that _serialize_llm_for_context captures temperature, base_url, etc."""
|
||||
from crewai.flow.human_feedback import _serialize_llm_for_context
|
||||
|
||||
Reference in New Issue
Block a user