Files
crewAI/lib/crewai/tests/test_human_feedback_integration.py
alex-clawd 74976b157d fix: preserve method return value as flow output for @human_feedback with emit (#5099)
* fix: preserve method return value as flow output for @human_feedback with emit

When a @human_feedback decorated method with emit= is the final method in a
flow (no downstream listeners triggered), the flow's final output was
incorrectly set to the collapsed outcome string (e.g., 'approved') instead
of the method's actual return value (e.g., a state dict).

Root cause: _process_feedback() returns the collapsed_outcome string when
emit is set, and this string was being stored as the method's result in
_method_outputs.

The fix:
1. In human_feedback.py: After _process_feedback, stash the real method_output
   on the flow instance as _human_feedback_method_output when emit is set.

2. In flow.py: After appending a method result to _method_outputs, check if
   _human_feedback_method_output is set. If so, replace the last entry with
   the stashed real output and clear the stash.

This ensures:
- Routing still works correctly (collapsed outcome used for @listen matching)
- The flow's final result is the actual method return value
- If downstream listeners execute, their results become the final output

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* style: ruff format flow.py

* fix: use per-method dict stash for concurrency safety and None returns

Addresses review comments:
- Replace single flow-level slot with dict keyed by method name,
  safe under concurrent @human_feedback+emit execution
- Dict key presence (not value) indicates stashed output,
  correctly preserving None return values
- Added test for None return value preservation

---------

Co-authored-by: Joao Moura <joao@crewai.com>
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
2026-03-26 03:28:17 -03:00

982 lines
33 KiB
Python

"""Integration tests for the @human_feedback decorator with Flow.
This module tests the integration of @human_feedback with @listen,
routing behavior, multi-step flows, and state management.
"""
from __future__ import annotations
import asyncio
from datetime import datetime
from typing import Any
from unittest.mock import MagicMock, patch
import pytest
from pydantic import BaseModel
from crewai.flow import Flow, HumanFeedbackResult, human_feedback, listen, or_, start
from crewai.flow.flow import FlowState
class TestRoutingIntegration:
"""Tests for routing integration with @listen decorators."""
@patch("builtins.input", return_value="I approve")
@patch("builtins.print")
def test_routes_to_matching_listener(self, mock_print, mock_input):
"""Test that collapsed outcome routes to the matching @listen method."""
execution_order = []
class ReviewFlow(Flow):
@start()
@human_feedback(
message="Review:",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
)
def generate(self):
execution_order.append("generate")
return "content"
@listen("approved")
def on_approved(self):
execution_order.append("on_approved")
return "published"
@listen("rejected")
def on_rejected(self):
execution_order.append("on_rejected")
return "discarded"
flow = ReviewFlow()
with (
patch.object(flow, "_request_human_feedback", return_value="Approved!"),
patch.object(flow, "_collapse_to_outcome", return_value="approved"),
):
result = flow.kickoff()
assert "generate" in execution_order
assert "on_approved" in execution_order
assert "on_rejected" not in execution_order
@patch("builtins.input", return_value="")
@patch("builtins.print")
def test_default_outcome_routes_correctly(self, mock_print, mock_input):
"""Test that default_outcome routes when no feedback provided."""
executed_listener = []
class ReviewFlow(Flow):
@start()
@human_feedback(
message="Review:",
emit=["approved", "needs_work"],
llm="gpt-4o-mini",
default_outcome="needs_work",
)
def generate(self):
return "content"
@listen("approved")
def on_approved(self):
executed_listener.append("approved")
@listen("needs_work")
def on_needs_work(self):
executed_listener.append("needs_work")
flow = ReviewFlow()
with patch.object(flow, "_request_human_feedback", return_value=""):
flow.kickoff()
assert "needs_work" in executed_listener
assert "approved" not in executed_listener
class TestMultiStepFlows:
"""Tests for multi-step flows with multiple @human_feedback decorators."""
@patch("builtins.input", side_effect=["Good draft", "Final approved"])
@patch("builtins.print")
def test_multiple_feedback_steps(self, mock_print, mock_input):
"""Test a flow with multiple human feedback steps."""
class MultiStepFlow(Flow):
@start()
@human_feedback(message="Review draft:")
def draft(self):
return "Draft content"
@listen(draft)
@human_feedback(message="Final review:")
def final_review(self, prev_result: HumanFeedbackResult):
return f"Final content based on: {prev_result.feedback}"
flow = MultiStepFlow()
with patch.object(
flow, "_request_human_feedback", side_effect=["Good draft", "Approved"]
):
flow.kickoff()
# Both feedbacks should be recorded
assert len(flow.human_feedback_history) == 2
assert flow.human_feedback_history[0].method_name == "draft"
assert flow.human_feedback_history[0].feedback == "Good draft"
assert flow.human_feedback_history[1].method_name == "final_review"
assert flow.human_feedback_history[1].feedback == "Approved"
@patch("builtins.input", return_value="feedback")
@patch("builtins.print")
def test_mixed_feedback_and_regular_methods(self, mock_print, mock_input):
"""Test flow with both @human_feedback and regular methods."""
execution_order = []
class MixedFlow(Flow):
@start()
def generate(self):
execution_order.append("generate")
return "generated"
@listen(generate)
@human_feedback(message="Review:")
def review(self):
execution_order.append("review")
return "reviewed"
@listen(review)
def finalize(self, result):
execution_order.append("finalize")
return "finalized"
flow = MixedFlow()
with patch.object(flow, "_request_human_feedback", return_value="feedback"):
flow.kickoff()
assert execution_order == ["generate", "review", "finalize"]
def test_chained_router_feedback_steps(self):
"""Test that a router outcome can trigger another router method.
Regression test: @listen("outcome") combined with @human_feedback(emit=...)
creates a method that is both a listener and a router. The flow must find
and execute it when the upstream router emits the matching outcome.
"""
execution_order: list[str] = []
class ChainedRouterFlow(Flow):
@start()
@human_feedback(
message="First review:",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
)
def draft(self):
execution_order.append("draft")
return "draft content"
@listen("approved")
@human_feedback(
message="Final review:",
emit=["publish", "revise"],
llm="gpt-4o-mini",
)
def final_review(self, prev: HumanFeedbackResult):
execution_order.append("final_review")
return "final content"
@listen("rejected")
def on_rejected(self, prev: HumanFeedbackResult):
execution_order.append("on_rejected")
return "rejected"
@listen("publish")
def on_publish(self, prev: HumanFeedbackResult):
execution_order.append("on_publish")
return "published"
@listen("revise")
def on_revise(self, prev: HumanFeedbackResult):
execution_order.append("on_revise")
return "revised"
flow = ChainedRouterFlow()
with (
patch.object(
flow,
"_request_human_feedback",
side_effect=["looks good", "ship it"],
),
patch.object(
flow,
"_collapse_to_outcome",
side_effect=["approved", "publish"],
),
):
result = flow.kickoff()
assert execution_order == ["draft", "final_review", "on_publish"]
assert result == "published"
assert len(flow.human_feedback_history) == 2
assert flow.human_feedback_history[0].outcome == "approved"
assert flow.human_feedback_history[1].outcome == "publish"
def test_chained_router_rejected_path(self):
"""Test that a start-router outcome routes to a non-router listener."""
execution_order: list[str] = []
class ChainedRouterFlow(Flow):
@start()
@human_feedback(
message="Review:",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
)
def draft(self):
execution_order.append("draft")
return "draft"
@listen("approved")
@human_feedback(
message="Final:",
emit=["publish", "revise"],
llm="gpt-4o-mini",
)
def final_review(self, prev: HumanFeedbackResult):
execution_order.append("final_review")
return "final"
@listen("rejected")
def on_rejected(self, prev: HumanFeedbackResult):
execution_order.append("on_rejected")
return "rejected"
flow = ChainedRouterFlow()
with (
patch.object(
flow, "_request_human_feedback", return_value="bad"
),
patch.object(
flow, "_collapse_to_outcome", return_value="rejected"
),
):
result = flow.kickoff()
assert execution_order == ["draft", "on_rejected"]
assert result == "rejected"
assert len(flow.human_feedback_history) == 1
assert flow.human_feedback_history[0].outcome == "rejected"
def test_hitl_self_loop_routes_back_to_same_method(self):
"""Test that a HITL router can loop back to itself via its own emit outcome.
Pattern: review_work listens to or_("do_work", "review") and emits
["review", "approved"]. When the human rejects (outcome="review"),
the method should re-execute. When approved, the flow should continue
to the approve_work listener.
"""
execution_order: list[str] = []
class SelfLoopFlow(Flow):
@start()
def initial_func(self):
execution_order.append("initial_func")
return "initial"
@listen(initial_func)
def do_work(self):
execution_order.append("do_work")
return "work output"
@human_feedback(
message="Do you approve this content?",
emit=["review", "approved"],
llm="gpt-4o-mini",
default_outcome="approved",
)
@listen(or_("do_work", "review"))
def review_work(self):
execution_order.append("review_work")
return "content for review"
@listen("approved")
def approve_work(self):
execution_order.append("approve_work")
return "published"
flow = SelfLoopFlow()
# First call: human rejects (outcome="review") -> self-loop
# Second call: human approves (outcome="approved") -> continue
with (
patch.object(
flow,
"_request_human_feedback",
side_effect=["needs changes", "looks good"],
),
patch.object(
flow,
"_collapse_to_outcome",
side_effect=["review", "approved"],
),
):
result = flow.kickoff()
assert execution_order == [
"initial_func",
"do_work",
"review_work", # first review -> rejected (review)
"review_work", # second review -> approved
"approve_work",
]
assert result == "published"
assert len(flow.human_feedback_history) == 2
assert flow.human_feedback_history[0].outcome == "review"
assert flow.human_feedback_history[1].outcome == "approved"
def test_hitl_self_loop_multiple_rejections(self):
"""Test that a HITL router can loop back multiple times before approving.
Verifies the self-loop works for more than one rejection cycle.
"""
execution_order: list[str] = []
class MultiRejectFlow(Flow):
@start()
def generate(self):
execution_order.append("generate")
return "draft"
@human_feedback(
message="Review this content:",
emit=["revise", "approved"],
llm="gpt-4o-mini",
default_outcome="approved",
)
@listen(or_("generate", "revise"))
def review(self):
execution_order.append("review")
return "content v" + str(execution_order.count("review"))
@listen("approved")
def publish(self):
execution_order.append("publish")
return "published"
flow = MultiRejectFlow()
# Three rejections, then approval
with (
patch.object(
flow,
"_request_human_feedback",
side_effect=["bad", "still bad", "not yet", "great"],
),
patch.object(
flow,
"_collapse_to_outcome",
side_effect=["revise", "revise", "revise", "approved"],
),
):
result = flow.kickoff()
assert execution_order == [
"generate",
"review", # 1st review -> revise
"review", # 2nd review -> revise
"review", # 3rd review -> revise
"review", # 4th review -> approved
"publish",
]
assert result == "published"
assert len(flow.human_feedback_history) == 4
assert [r.outcome for r in flow.human_feedback_history] == [
"revise", "revise", "revise", "approved"
]
def test_hitl_self_loop_immediate_approval(self):
"""Test that a HITL self-loop flow works when approved on the first try.
No looping occurs -- the flow should proceed straight through.
"""
execution_order: list[str] = []
class ImmediateApprovalFlow(Flow):
@start()
def generate(self):
execution_order.append("generate")
return "perfect draft"
@human_feedback(
message="Review:",
emit=["revise", "approved"],
llm="gpt-4o-mini",
)
@listen(or_("generate", "revise"))
def review(self):
execution_order.append("review")
return "content"
@listen("approved")
def publish(self):
execution_order.append("publish")
return "published"
flow = ImmediateApprovalFlow()
with (
patch.object(
flow,
"_request_human_feedback",
return_value="perfect",
),
patch.object(
flow,
"_collapse_to_outcome",
return_value="approved",
),
):
result = flow.kickoff()
assert execution_order == ["generate", "review", "publish"]
assert result == "published"
assert len(flow.human_feedback_history) == 1
assert flow.human_feedback_history[0].outcome == "approved"
def test_router_and_non_router_listeners_for_same_outcome(self):
"""Test that both router and non-router listeners fire for the same outcome."""
execution_order: list[str] = []
class MixedListenerFlow(Flow):
@start()
@human_feedback(
message="Review:",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
)
def draft(self):
execution_order.append("draft")
return "draft"
@listen("approved")
@human_feedback(
message="Final:",
emit=["publish", "revise"],
llm="gpt-4o-mini",
)
def router_listener(self, prev: HumanFeedbackResult):
execution_order.append("router_listener")
return "final"
@listen("approved")
def plain_listener(self, prev: HumanFeedbackResult):
execution_order.append("plain_listener")
return "logged"
@listen("publish")
def on_publish(self, prev: HumanFeedbackResult):
execution_order.append("on_publish")
return "published"
flow = MixedListenerFlow()
with (
patch.object(
flow,
"_request_human_feedback",
side_effect=["approve it", "publish it"],
),
patch.object(
flow,
"_collapse_to_outcome",
side_effect=["approved", "publish"],
),
):
flow.kickoff()
assert "draft" in execution_order
assert "router_listener" in execution_order
assert "plain_listener" in execution_order
assert "on_publish" in execution_order
class TestStateManagement:
"""Tests for state management with human feedback."""
@patch("builtins.input", return_value="approved")
@patch("builtins.print")
def test_feedback_available_in_listener(self, mock_print, mock_input):
"""Test that feedback is accessible in downstream listeners."""
captured_feedback = []
class StateFlow(Flow):
@start()
@human_feedback(
message="Review:",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
)
def review(self):
return "Content to review"
@listen("approved")
def on_approved(self):
# Access the feedback via property
captured_feedback.append(self.last_human_feedback)
return "done"
flow = StateFlow()
with (
patch.object(flow, "_request_human_feedback", return_value="Great content!"),
patch.object(flow, "_collapse_to_outcome", return_value="approved"),
):
flow.kickoff()
assert len(captured_feedback) == 1
result = captured_feedback[0]
assert isinstance(result, HumanFeedbackResult)
assert result.output == "Content to review"
assert result.feedback == "Great content!"
assert result.outcome == "approved"
@patch("builtins.input", return_value="")
@patch("builtins.print")
def test_history_preserved_across_steps(self, mock_print, mock_input):
"""Test that feedback history is preserved across flow execution."""
class HistoryFlow(Flow):
@start()
@human_feedback(message="Step 1:")
def step1(self):
return "Step 1"
@listen(step1)
@human_feedback(message="Step 2:")
def step2(self, result):
return "Step 2"
@listen(step2)
def final(self, result):
# Access history
return len(self.human_feedback_history)
flow = HistoryFlow()
with patch.object(flow, "_request_human_feedback", return_value="feedback"):
result = flow.kickoff()
# Final method should see 2 feedback entries
assert result == 2
class TestAsyncFlowIntegration:
"""Tests for async flow integration."""
@pytest.mark.asyncio
async def test_async_flow_with_human_feedback(self):
"""Test that @human_feedback works with async flows."""
executed = []
class AsyncFlow(Flow):
@start()
@human_feedback(message="Review:")
async def async_review(self):
executed.append("async_review")
await asyncio.sleep(0.01) # Simulate async work
return "async content"
flow = AsyncFlow()
with patch.object(flow, "_request_human_feedback", return_value="feedback"):
await flow.kickoff_async()
assert "async_review" in executed
assert flow.last_human_feedback is not None
assert flow.last_human_feedback.output == "async content"
class TestWithStructuredState:
"""Tests for flows with structured (Pydantic) state."""
@patch("builtins.input", return_value="approved")
@patch("builtins.print")
def test_with_pydantic_state(self, mock_print, mock_input):
"""Test human feedback with structured Pydantic state."""
class ReviewState(FlowState):
content: str = ""
review_count: int = 0
class StructuredFlow(Flow[ReviewState]):
initial_state = ReviewState
@start()
@human_feedback(
message="Review:",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
)
def review(self):
self.state.content = "Generated content"
self.state.review_count += 1
return self.state.content
@listen("approved")
def on_approved(self):
return f"Approved: {self.state.content}"
flow = StructuredFlow()
with (
patch.object(flow, "_request_human_feedback", return_value="LGTM"),
patch.object(flow, "_collapse_to_outcome", return_value="approved"),
):
result = flow.kickoff()
assert flow.state.review_count == 1
assert flow.last_human_feedback is not None
assert flow.last_human_feedback.feedback == "LGTM"
class TestMetadataPassthrough:
"""Tests for metadata passthrough functionality."""
@patch("builtins.input", return_value="")
@patch("builtins.print")
def test_metadata_included_in_result(self, mock_print, mock_input):
"""Test that metadata is passed through to HumanFeedbackResult."""
class MetadataFlow(Flow):
@start()
@human_feedback(
message="Review:",
metadata={"channel": "slack", "priority": "high"},
)
def review(self):
return "content"
flow = MetadataFlow()
with patch.object(flow, "_request_human_feedback", return_value="feedback"):
flow.kickoff()
result = flow.last_human_feedback
assert result is not None
assert result.metadata == {"channel": "slack", "priority": "high"}
class TestEventEmission:
"""Tests for event emission during human feedback."""
@patch("builtins.input", return_value="test feedback")
@patch("builtins.print")
def test_events_emitted_on_feedback_request(self, mock_print, mock_input):
"""Test that events are emitted when feedback is requested."""
from crewai.events.event_listener import event_listener
class EventFlow(Flow):
@start()
@human_feedback(message="Review:")
def review(self):
return "content"
flow = EventFlow()
# We can't easily capture events in tests, but we can verify
# the flow executes without errors
with (
patch.object(
event_listener.formatter, "pause_live_updates", return_value=None
),
patch.object(
event_listener.formatter, "resume_live_updates", return_value=None
),
):
flow.kickoff()
assert flow.last_human_feedback is not None
class TestEdgeCases:
"""Tests for edge cases and error handling."""
@patch("builtins.input", return_value="")
@patch("builtins.print")
def test_empty_feedback_first_outcome_fallback(self, mock_print, mock_input):
"""Test that empty feedback without default uses first outcome for routing, but returns method output."""
class FallbackFlow(Flow):
@start()
@human_feedback(
message="Review:",
emit=["first", "second", "third"],
llm="gpt-4o-mini",
# No default_outcome specified
)
def review(self):
return "content"
flow = FallbackFlow()
with patch.object(flow, "_request_human_feedback", return_value=""):
result = flow.kickoff()
# Flow result is the method's return value, NOT the collapsed outcome
assert result == "content"
# But outcome is still set to first for routing purposes
assert flow.last_human_feedback.outcome == "first"
@patch("builtins.input", return_value="whitespace only ")
@patch("builtins.print")
def test_whitespace_only_feedback_treated_as_empty(self, mock_print, mock_input):
"""Test that whitespace-only feedback is treated as empty for routing, but returns method output."""
class WhitespaceFlow(Flow):
@start()
@human_feedback(
message="Review:",
emit=["approve", "reject"],
llm="gpt-4o-mini",
default_outcome="reject",
)
def review(self):
return "content"
flow = WhitespaceFlow()
with patch.object(flow, "_request_human_feedback", return_value=" "):
result = flow.kickoff()
# Flow result is the method's return value, NOT the collapsed outcome
assert result == "content"
# But outcome is set to default because feedback is empty after strip
assert flow.last_human_feedback.outcome == "reject"
@patch("builtins.input", return_value="feedback")
@patch("builtins.print")
def test_feedback_result_without_routing(self, mock_print, mock_input):
"""Test that HumanFeedbackResult is returned when not routing."""
class NoRoutingFlow(Flow):
@start()
@human_feedback(message="Review:")
def review(self):
return "content"
flow = NoRoutingFlow()
with patch.object(flow, "_request_human_feedback", return_value="feedback"):
result = flow.kickoff()
# Result should be HumanFeedbackResult when not routing
assert isinstance(result, HumanFeedbackResult)
assert result.output == "content"
assert result.feedback == "feedback"
assert result.outcome is None # No routing, no outcome
class TestLLMConfigPreservation:
"""Tests that LLM config is preserved through @human_feedback serialization.
PR #4970 introduced _hf_llm stashing so the live LLM object survives
decorator wrapping for same-process resume. The serialization path
(_serialize_llm_for_context / _deserialize_llm_from_context) preserves
config for cross-process resume.
"""
def test_hf_llm_stashed_on_wrapper_with_llm_instance(self):
"""Test that passing an LLM instance stashes it on the wrapper as _hf_llm."""
from crewai.llm import LLM
llm_instance = LLM(model="gpt-4o-mini", temperature=0.42)
class ConfigFlow(Flow):
@start()
@human_feedback(
message="Review:",
emit=["approved", "rejected"],
llm=llm_instance,
)
def review(self):
return "content"
method = ConfigFlow.review
assert hasattr(method, "_hf_llm"), "_hf_llm not found on wrapper"
assert method._hf_llm is llm_instance, "_hf_llm is not the same object"
def test_hf_llm_preserved_on_listen_method(self):
"""Test that _hf_llm is preserved when @human_feedback is on a @listen method."""
from crewai.llm import LLM
llm_instance = LLM(model="gpt-4o-mini", temperature=0.7)
class ListenConfigFlow(Flow):
@start()
def generate(self):
return "draft"
@listen("generate")
@human_feedback(
message="Review:",
emit=["approved", "rejected"],
llm=llm_instance,
)
def review(self):
return "content"
method = ListenConfigFlow.review
assert hasattr(method, "_hf_llm")
assert method._hf_llm is llm_instance
def test_hf_llm_accessible_on_instance(self):
"""Test that _hf_llm survives Flow instantiation (bound method access)."""
from crewai.llm import LLM
llm_instance = LLM(model="gpt-4o-mini", temperature=0.42)
class InstanceFlow(Flow):
@start()
@human_feedback(
message="Review:",
emit=["approved", "rejected"],
llm=llm_instance,
)
def review(self):
return "content"
flow = InstanceFlow()
instance_method = flow.review
assert hasattr(instance_method, "_hf_llm")
assert instance_method._hf_llm is llm_instance
def test_serialize_llm_preserves_config_fields(self):
"""Test that _serialize_llm_for_context captures temperature, base_url, etc."""
from crewai.flow.human_feedback import _serialize_llm_for_context
from crewai.llm import LLM
llm = LLM(
model="gpt-4o-mini",
temperature=0.42,
base_url="https://custom.example.com/v1",
)
serialized = _serialize_llm_for_context(llm)
assert isinstance(serialized, dict), f"Expected dict, got {type(serialized)}"
assert serialized["model"] == "openai/gpt-4o-mini"
assert serialized["temperature"] == 0.42
assert serialized["base_url"] == "https://custom.example.com/v1"
def test_serialize_llm_excludes_api_key(self):
"""Test that api_key is NOT included in serialized output (security)."""
from crewai.flow.human_feedback import _serialize_llm_for_context
from crewai.llm import LLM
llm = LLM(model="gpt-4o-mini")
serialized = _serialize_llm_for_context(llm)
assert isinstance(serialized, dict)
assert "api_key" not in serialized
def test_deserialize_round_trip_preserves_config(self):
"""Test that serialize → deserialize round-trip preserves all config."""
from crewai.flow.human_feedback import (
_deserialize_llm_from_context,
_serialize_llm_for_context,
)
from crewai.llm import LLM
original = LLM(
model="gpt-4o-mini",
temperature=0.42,
base_url="https://custom.example.com/v1",
)
serialized = _serialize_llm_for_context(original)
reconstructed = _deserialize_llm_from_context(serialized)
assert reconstructed is not None
assert reconstructed.model == original.model
assert reconstructed.temperature == original.temperature
assert reconstructed.base_url == original.base_url
def test_deserialize_handles_legacy_string_format(self):
"""Test backward compat: plain string still reconstructs an LLM."""
from crewai.flow.human_feedback import _deserialize_llm_from_context
reconstructed = _deserialize_llm_from_context("openai/gpt-4o-mini")
assert reconstructed is not None
assert reconstructed.model == "gpt-4o-mini"
def test_deserialize_returns_none_for_none(self):
"""Test that None input returns None."""
from crewai.flow.human_feedback import _deserialize_llm_from_context
assert _deserialize_llm_from_context(None) is None
def test_serialize_llm_preserves_provider_specific_fields(self):
"""Test that provider-specific fields like project/location are serialized."""
from crewai.flow.human_feedback import _serialize_llm_for_context
from crewai.llm import LLM
# Create a Gemini-style LLM with project and non-default location
llm = LLM(
model="gemini-2.0-flash",
provider="gemini",
project="my-project",
location="europe-west1",
temperature=0.3,
)
serialized = _serialize_llm_for_context(llm)
assert isinstance(serialized, dict)
assert serialized.get("project") == "my-project"
assert serialized.get("location") == "europe-west1"
assert serialized.get("temperature") == 0.3
def test_config_preserved_through_full_flow_execution(self):
"""Test that the LLM with custom config is used during outcome collapsing."""
from crewai.llm import LLM
llm_instance = LLM(model="gpt-4o-mini", temperature=0.42)
collapse_calls = []
class FullFlow(Flow):
@start()
@human_feedback(
message="Review:",
emit=["approved", "rejected"],
llm=llm_instance,
)
def review(self):
return "content"
@listen("approved")
def on_approved(self):
return "done"
flow = FullFlow()
original_collapse = flow._collapse_to_outcome
def spy_collapse(feedback, outcomes, llm):
collapse_calls.append(llm)
return "approved"
with (
patch.object(flow, "_request_human_feedback", return_value="looks good"),
patch.object(flow, "_collapse_to_outcome", side_effect=spy_collapse),
):
flow.kickoff()
assert len(collapse_calls) == 1
# The LLM passed to _collapse_to_outcome should be the original instance
assert collapse_calls[0] is llm_instance