fix(flow): log HITL pre-review and distillation failures, add learn_strict

This commit is contained in:
Greyson LaLonde
2026-05-12 00:26:31 +08:00
committed by GitHub
parent b0d4dd256d
commit 5d757cb626
2 changed files with 168 additions and 3 deletions

View File

@@ -60,6 +60,7 @@ from collections.abc import Callable, Sequence
from dataclasses import dataclass, field
from datetime import datetime
from functools import wraps
import logging
from typing import TYPE_CHECKING, Any, TypeVar
from pydantic import BaseModel, Field
@@ -73,6 +74,8 @@ if TYPE_CHECKING:
from crewai.llms.base_llm import BaseLLM
logger = logging.getLogger(__name__)
F = TypeVar("F", bound=Callable[..., Any])
@@ -188,6 +191,7 @@ class HumanFeedbackConfig:
provider: HumanFeedbackProvider | None = None
learn: bool = False
learn_source: str = "hitl"
learn_strict: bool = False
class HumanFeedbackMethod(FlowMethod[Any, Any]):
@@ -237,6 +241,7 @@ def human_feedback(
provider: HumanFeedbackProvider | None = None,
learn: bool = False,
learn_source: str = "hitl",
learn_strict: bool = False,
) -> Callable[[F], F]:
"""Decorator for Flow methods that require human feedback.
@@ -275,6 +280,14 @@ def human_feedback(
external systems like Slack, Teams, or webhooks. When the
provider raises HumanFeedbackPending, the flow pauses and
can be resumed later with Flow.resume().
learn: Enable HITL learning. Recall past lessons to pre-review
output before the human sees it, and distill new lessons
from feedback after.
learn_source: Memory source tag for stored/recalled lessons.
learn_strict: When True, re-raise exceptions from the pre-review
and distillation steps instead of falling back to raw output.
Default False preserves graceful degradation; failures are
always logged via ``logger.warning`` regardless of this flag.
Returns:
A decorator function that wraps the method with human feedback
@@ -404,7 +417,19 @@ def human_feedback(
reviewed = llm_inst.call(messages)
return reviewed if isinstance(reviewed, str) else str(reviewed)
except Exception:
return method_output # fallback to raw output on any failure
if learn_strict:
logger.warning(
"HITL pre-review failed for %s; re-raising (learn_strict=True)",
func.__name__,
exc_info=True,
)
raise
logger.warning(
"HITL pre-review failed for %s; falling back to raw output",
func.__name__,
exc_info=True,
)
return method_output
def _distill_and_store_lessons(
flow_instance: Flow[Any], method_output: Any, raw_feedback: str
@@ -446,8 +471,19 @@ def human_feedback(
if lessons:
mem.remember_many(lessons, source=learn_source) # type: ignore[union-attr]
except Exception: # noqa: S110
pass # non-critical: don't fail the flow because lesson storage failed
except Exception:
if learn_strict:
logger.warning(
"HITL lesson distillation failed for %s; re-raising (learn_strict=True)",
func.__name__,
exc_info=True,
)
raise
logger.warning(
"HITL lesson distillation failed for %s; no lessons stored",
func.__name__,
exc_info=True,
)
# -- Core feedback helpers ------------------------------------
@@ -654,6 +690,7 @@ def human_feedback(
provider=provider,
learn=learn,
learn_source=learn_source,
learn_strict=learn_strict,
)
wrapper.__is_flow_method__ = True

View File

@@ -596,6 +596,134 @@ class TestHumanFeedbackLearn:
# llm defaults to "gpt-4o-mini" at the function level
assert config.llm == "gpt-4o-mini"
def test_pre_review_failure_logs_and_returns_raw_output(self, caplog):
"""Pre-review LLM failure falls back to raw output AND logs a warning."""
from crewai.memory.types import MemoryMatch, MemoryRecord
class LearnFlow(Flow):
@start()
@human_feedback(message="Review:", llm="gpt-4o-mini", learn=True)
def produce(self):
return "raw draft"
flow = LearnFlow()
flow.memory = MagicMock()
flow.memory.recall.return_value = [
MemoryMatch(
record=MemoryRecord(content="some lesson", embedding=[]),
score=0.9,
match_reasons=["semantic"],
)
]
captured: dict[str, Any] = {}
def capture_feedback(message, output, metadata=None, emit=None):
captured["shown_to_human"] = output
return "" # empty -> no distillation path
with (
patch.object(flow, "_request_human_feedback", side_effect=capture_feedback),
patch("crewai.llm.LLM") as MockLLM,
caplog.at_level("WARNING", logger="crewai.flow.human_feedback"),
):
mock_llm = MagicMock()
mock_llm.supports_function_calling.return_value = True
mock_llm.call.side_effect = RuntimeError("simulated pre-review failure")
MockLLM.return_value = mock_llm
flow.produce()
assert captured["shown_to_human"] == "raw draft"
assert any(
"HITL pre-review failed" in rec.message
and rec.levelname == "WARNING"
and rec.exc_info is not None
for rec in caplog.records
)
def test_pre_review_failure_strict_reraises(self):
"""When learn_strict=True, pre-review failures propagate instead of falling back."""
from crewai.memory.types import MemoryMatch, MemoryRecord
class LearnFlow(Flow):
@start()
@human_feedback(
message="Review:",
llm="gpt-4o-mini",
learn=True,
learn_strict=True,
)
def produce(self):
return "raw draft"
flow = LearnFlow()
flow.memory = MagicMock()
flow.memory.recall.return_value = [
MemoryMatch(
record=MemoryRecord(content="some lesson", embedding=[]),
score=0.9,
match_reasons=["semantic"],
)
]
with (
patch.object(flow, "_request_human_feedback", return_value=""),
patch("crewai.llm.LLM") as MockLLM,
):
mock_llm = MagicMock()
mock_llm.supports_function_calling.return_value = True
mock_llm.call.side_effect = RuntimeError("simulated pre-review failure")
MockLLM.return_value = mock_llm
with pytest.raises(RuntimeError, match="simulated pre-review failure"):
flow.produce()
def test_distillation_failure_logs_and_does_not_block_flow(self, caplog):
"""Distillation LLM failure logs a warning but does not break the flow."""
class LearnFlow(Flow):
@start()
@human_feedback(message="Review:", llm="gpt-4o-mini", learn=True)
def produce(self):
return "raw draft"
flow = LearnFlow()
flow.memory = MagicMock()
flow.memory.recall.return_value = [] # no pre-review path
with (
patch.object(
flow, "_request_human_feedback", return_value="please add citations"
),
patch("crewai.llm.LLM") as MockLLM,
caplog.at_level("WARNING", logger="crewai.flow.human_feedback"),
):
mock_llm = MagicMock()
mock_llm.supports_function_calling.return_value = True
mock_llm.call.side_effect = RuntimeError("simulated distill failure")
MockLLM.return_value = mock_llm
flow.produce() # must not raise
flow.memory.remember_many.assert_not_called()
assert any(
"HITL lesson distillation failed" in rec.message
and rec.levelname == "WARNING"
for rec in caplog.records
)
def test_learn_strict_config_propagates(self):
"""learn_strict is captured on the decorator config."""
@human_feedback(message="Review:", learn=True, learn_strict=True)
def test_method(self):
return "output"
config = test_method.__human_feedback_config__
assert config is not None
assert config.learn_strict is True
class TestHumanFeedbackFinalOutputPreservation:
"""Tests for preserving method return value as flow's final output when @human_feedback with emit is terminal.