fix: preserve method return value as flow output for @human_feedback with emit (#5099)

* fix: preserve method return value as flow output for @human_feedback with emit

When a @human_feedback decorated method with emit= is the final method in a
flow (no downstream listeners triggered), the flow's final output was
incorrectly set to the collapsed outcome string (e.g., 'approved') instead
of the method's actual return value (e.g., a state dict).

Root cause: _process_feedback() returns the collapsed_outcome string when
emit is set, and this string was being stored as the method's result in
_method_outputs.

The fix:
1. In human_feedback.py: After _process_feedback, stash the real method_output
   on the flow instance as _human_feedback_method_output when emit is set.

2. In flow.py: After appending a method result to _method_outputs, check if
   _human_feedback_method_output is set. If so, replace the last entry with
   the stashed real output and clear the stash.

This ensures:
- Routing still works correctly (collapsed outcome used for @listen matching)
- The flow's final result is the actual method return value
- If downstream listeners execute, their results become the final output

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* style: ruff format flow.py

* fix: use per-method dict stash for concurrency safety and None returns

Addresses review comments:
- Replace single flow-level slot with dict keyed by method name,
  safe under concurrent @human_feedback+emit execution
- Dict key presence (not value) indicates stashed output,
  correctly preserving None return values
- Added test for None return value preservation

---------

Co-authored-by: Joao Moura <joao@crewai.com>
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
alex-clawd
2026-03-25 23:28:17 -07:00
committed by GitHub
parent bd03f6cf64
commit 74976b157d
4 changed files with 205 additions and 8 deletions

View File

@@ -883,6 +883,9 @@ class Flow(Generic[T], metaclass=FlowMeta):
self.human_feedback_history: list[HumanFeedbackResult] = []
self.last_human_feedback: HumanFeedbackResult | None = None
self._pending_feedback_context: PendingFeedbackContext | None = None
# Per-method stash for real @human_feedback output (keyed by method name)
# Used to decouple routing outcome from method return value when emit is set
self._human_feedback_method_outputs: dict[str, Any] = {}
self.suppress_flow_events: bool = suppress_flow_events
# User input history (for self.ask())
@@ -2290,6 +2293,17 @@ class Flow(Generic[T], metaclass=FlowMeta):
result = await result
self._method_outputs.append(result)
# For @human_feedback methods with emit, the result is the collapsed outcome
# (e.g., "approved") used for routing. But we want the actual method output
# to be the stored result (for final flow output). Replace the last entry
# if a stashed output exists. Dict-based stash is concurrency-safe and
# handles None return values (presence in dict = stashed, not value).
if method_name in self._human_feedback_method_outputs:
self._method_outputs[-1] = self._human_feedback_method_outputs.pop(
method_name
)
self._method_execution_counts[method_name] = (
self._method_execution_counts.get(method_name, 0) + 1
)

View File

@@ -591,6 +591,13 @@ def human_feedback(
):
_distill_and_store_lessons(self, method_output, raw_feedback)
# Stash the real method output for final flow result when emit is set
# (result is the collapsed outcome string for routing, but we want to
# preserve the actual method output as the flow's final result)
# Uses per-method dict for concurrency safety and to handle None returns
if emit:
self._human_feedback_method_outputs[func.__name__] = method_output
return result
wrapper: Any = async_wrapper
@@ -615,6 +622,13 @@ def human_feedback(
):
_distill_and_store_lessons(self, method_output, raw_feedback)
# Stash the real method output for final flow result when emit is set
# (result is the collapsed outcome string for routing, but we want to
# preserve the actual method output as the flow's final result)
# Uses per-method dict for concurrency safety and to handle None returns
if emit:
self._human_feedback_method_outputs[func.__name__] = method_output
return result
wrapper = sync_wrapper

View File

@@ -246,7 +246,7 @@ class TestHumanFeedbackExecution:
@patch("builtins.input", return_value="")
@patch("builtins.print")
def test_empty_feedback_with_default_outcome(self, mock_print, mock_input):
"""Test empty feedback uses default_outcome."""
"""Test empty feedback uses default_outcome for routing, but flow returns method output."""
class TestFlow(Flow):
@start()
@@ -264,14 +264,16 @@ class TestHumanFeedbackExecution:
with patch.object(flow, "_request_human_feedback", return_value=""):
result = flow.kickoff()
assert result == "needs_work"
# Flow result is the method's return value, NOT the collapsed outcome
assert result == "Content"
assert flow.last_human_feedback is not None
# But the outcome is still correctly set for routing purposes
assert flow.last_human_feedback.outcome == "needs_work"
@patch("builtins.input", return_value="Approved!")
@patch("builtins.print")
def test_feedback_collapsing(self, mock_print, mock_input):
"""Test that feedback is collapsed to an outcome."""
"""Test that feedback is collapsed to an outcome for routing, but flow returns method output."""
class TestFlow(Flow):
@start()
@@ -291,8 +293,10 @@ class TestHumanFeedbackExecution:
):
result = flow.kickoff()
assert result == "approved"
# Flow result is the method's return value, NOT the collapsed outcome
assert result == "Content"
assert flow.last_human_feedback is not None
# But the outcome is still correctly set for routing purposes
assert flow.last_human_feedback.outcome == "approved"
@@ -591,3 +595,162 @@ class TestHumanFeedbackLearn:
assert config.learn is True
# llm defaults to "gpt-4o-mini" at the function level
assert config.llm == "gpt-4o-mini"
class TestHumanFeedbackFinalOutputPreservation:
"""Tests for preserving method return value as flow's final output when @human_feedback with emit is terminal.
This addresses the bug where the flow's final output was the collapsed outcome string (e.g., 'approved')
instead of the method's actual return value when a @human_feedback method with emit is the final method.
"""
@patch("builtins.input", return_value="Looks good!")
@patch("builtins.print")
def test_final_output_is_method_return_not_collapsed_outcome(
self, mock_print, mock_input
):
"""When @human_feedback with emit is the final method, flow output is the method's return value."""
class FinalHumanFeedbackFlow(Flow):
@start()
@human_feedback(
message="Review this content:",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
)
def generate_and_review(self):
# This dict should be the final output, NOT the string 'approved'
return {"title": "My Article", "content": "Article content here", "status": "ready"}
flow = FinalHumanFeedbackFlow()
with (
patch.object(flow, "_request_human_feedback", return_value="Looks great, approved!"),
patch.object(flow, "_collapse_to_outcome", return_value="approved"),
):
result = flow.kickoff()
# The final output should be the actual method return value, not the collapsed outcome
assert isinstance(result, dict), f"Expected dict, got {type(result).__name__}: {result}"
assert result == {"title": "My Article", "content": "Article content here", "status": "ready"}
# But the outcome should still be tracked in last_human_feedback
assert flow.last_human_feedback is not None
assert flow.last_human_feedback.outcome == "approved"
@patch("builtins.input", return_value="approved")
@patch("builtins.print")
def test_routing_still_works_with_downstream_listener(self, mock_print, mock_input):
"""When @human_feedback has a downstream listener, routing still triggers the listener."""
publish_called = []
class RoutingFlow(Flow):
@start()
@human_feedback(
message="Review:",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
)
def review(self):
return {"content": "original content"}
@listen("approved")
def publish(self):
publish_called.append(True)
return {"published": True, "timestamp": "2024-01-01"}
flow = RoutingFlow()
with (
patch.object(flow, "_request_human_feedback", return_value="LGTM"),
patch.object(flow, "_collapse_to_outcome", return_value="approved"),
):
result = flow.kickoff()
# The downstream listener should have been triggered
assert len(publish_called) == 1, "publish() should have been called"
# The final output should be from the listener, not the human_feedback method
assert result == {"published": True, "timestamp": "2024-01-01"}
@patch("builtins.input", return_value="")
@patch("builtins.print")
@pytest.mark.asyncio
async def test_async_human_feedback_final_output_preserved(self, mock_print, mock_input):
"""Async @human_feedback methods also preserve the real return value."""
class AsyncFinalFlow(Flow):
@start()
@human_feedback(
message="Review async content:",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
default_outcome="approved",
)
async def async_generate(self):
return {"async_data": "value", "computed": 42}
flow = AsyncFinalFlow()
with (
patch.object(flow, "_request_human_feedback", return_value=""),
):
result = await flow.kickoff_async()
# The final output should be the dict, not "approved"
assert isinstance(result, dict), f"Expected dict, got {type(result).__name__}: {result}"
assert result == {"async_data": "value", "computed": 42}
assert flow.last_human_feedback.outcome == "approved"
@patch("builtins.input", return_value="feedback")
@patch("builtins.print")
def test_method_outputs_contains_real_output(self, mock_print, mock_input):
"""The _method_outputs list should contain the real method output, not the collapsed outcome."""
class OutputTrackingFlow(Flow):
@start()
@human_feedback(
message="Review:",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
)
def generate(self):
return {"data": "real output"}
flow = OutputTrackingFlow()
with (
patch.object(flow, "_request_human_feedback", return_value="approved"),
patch.object(flow, "_collapse_to_outcome", return_value="approved"),
):
flow.kickoff()
# _method_outputs should contain the real output
assert len(flow._method_outputs) == 1
assert flow._method_outputs[0] == {"data": "real output"}
@patch("builtins.input", return_value="looks good")
@patch("builtins.print")
def test_none_return_value_is_preserved(self, mock_print, mock_input):
"""A method returning None should preserve None as flow output, not the outcome string."""
class NoneReturnFlow(Flow):
@start()
@human_feedback(
message="Review:",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
)
def process(self):
# Method does work but returns None (implicit)
pass
flow = NoneReturnFlow()
with (
patch.object(flow, "_request_human_feedback", return_value=""),
patch.object(flow, "_collapse_to_outcome", return_value="approved"),
):
result = flow.kickoff()
# Final output should be None (the method's real return), not "approved"
assert result is None, f"Expected None, got {result!r}"
assert flow.last_human_feedback.outcome == "approved"

View File

@@ -708,7 +708,7 @@ class TestEdgeCases:
@patch("builtins.input", return_value="")
@patch("builtins.print")
def test_empty_feedback_first_outcome_fallback(self, mock_print, mock_input):
"""Test that empty feedback without default uses first outcome."""
"""Test that empty feedback without default uses first outcome for routing, but returns method output."""
class FallbackFlow(Flow):
@start()
@@ -726,12 +726,15 @@ class TestEdgeCases:
with patch.object(flow, "_request_human_feedback", return_value=""):
result = flow.kickoff()
assert result == "first" # Falls back to first outcome
# Flow result is the method's return value, NOT the collapsed outcome
assert result == "content"
# But outcome is still set to first for routing purposes
assert flow.last_human_feedback.outcome == "first"
@patch("builtins.input", return_value="whitespace only ")
@patch("builtins.print")
def test_whitespace_only_feedback_treated_as_empty(self, mock_print, mock_input):
"""Test that whitespace-only feedback is treated as empty."""
"""Test that whitespace-only feedback is treated as empty for routing, but returns method output."""
class WhitespaceFlow(Flow):
@start()
@@ -749,7 +752,10 @@ class TestEdgeCases:
with patch.object(flow, "_request_human_feedback", return_value=" "):
result = flow.kickoff()
assert result == "reject" # Uses default because feedback is empty after strip
# Flow result is the method's return value, NOT the collapsed outcome
assert result == "content"
# But outcome is set to default because feedback is empty after strip
assert flow.last_human_feedback.outcome == "reject"
@patch("builtins.input", return_value="feedback")
@patch("builtins.print")