mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-03-22 11:48:16 +00:00
Compare commits
1 Commits
devin/1774
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
09b84dd2b0 |
@@ -1315,7 +1315,25 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
context = self._pending_feedback_context
|
||||
emit = context.emit
|
||||
default_outcome = context.default_outcome
|
||||
llm = context.llm
|
||||
|
||||
# Try to get the live LLM from the re-imported decorator instead of the
|
||||
# serialized string. When a flow pauses for HITL and resumes (possibly in
|
||||
# a different process), context.llm only contains a model string like
|
||||
# 'gemini/gemini-3-flash-preview'. This loses credentials, project,
|
||||
# location, safety_settings, and client_params. By looking up the method
|
||||
# on the re-imported flow class, we can retrieve the fully-configured LLM
|
||||
# that was passed to the @human_feedback decorator.
|
||||
llm = context.llm # fallback to serialized string
|
||||
method = self._methods.get(FlowMethodName(context.method_name))
|
||||
if method is not None:
|
||||
live_llm = getattr(method, "_hf_llm", None)
|
||||
if live_llm is not None:
|
||||
from crewai.llms.base_llm import BaseLLM as BaseLLMClass
|
||||
|
||||
# Only use live LLM if it's a BaseLLM instance (not a string)
|
||||
# String values offer no benefit over the serialized context.llm
|
||||
if isinstance(live_llm, BaseLLMClass):
|
||||
llm = live_llm
|
||||
|
||||
# Determine outcome
|
||||
collapsed_outcome: str | None = None
|
||||
|
||||
@@ -75,6 +75,7 @@ class FlowMethod(Generic[P, R]):
|
||||
"__is_router__",
|
||||
"__router_paths__",
|
||||
"__human_feedback_config__",
|
||||
"_hf_llm", # Live LLM object for HITL resume
|
||||
]:
|
||||
if hasattr(meth, attr):
|
||||
setattr(self, attr, getattr(meth, attr))
|
||||
|
||||
@@ -572,6 +572,14 @@ def human_feedback(
|
||||
wrapper.__is_router__ = True
|
||||
wrapper.__router_paths__ = list(emit)
|
||||
|
||||
# Stash the live LLM object for HITL resume to retrieve.
|
||||
# When a flow pauses for human feedback and later resumes (possibly in a
|
||||
# different process), the serialized context only contains a model string.
|
||||
# By storing the original LLM on the wrapper, resume_async can retrieve
|
||||
# the fully-configured LLM (with credentials, project, safety_settings, etc.)
|
||||
# instead of creating a bare LLM from just the model string.
|
||||
wrapper._hf_llm = llm
|
||||
|
||||
return wrapper # type: ignore[no-any-return]
|
||||
|
||||
return decorator
|
||||
|
||||
@@ -1216,3 +1216,275 @@ class TestAsyncHumanFeedbackEdgeCases:
|
||||
|
||||
assert flow.last_human_feedback.outcome == "approved"
|
||||
assert flow.last_human_feedback.feedback == ""
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Tests for _hf_llm attribute and live LLM resolution on resume
|
||||
# =============================================================================
|
||||
|
||||
|
||||
class TestLiveLLMPreservationOnResume:
|
||||
"""Tests for preserving the full LLM config across HITL resume."""
|
||||
|
||||
def test_hf_llm_attribute_set_on_wrapper_with_basellm(self) -> None:
|
||||
"""Test that _hf_llm is set on the wrapper when llm is a BaseLLM instance."""
|
||||
from crewai.llms.base_llm import BaseLLM
|
||||
|
||||
# Create a mock BaseLLM object
|
||||
mock_llm = MagicMock(spec=BaseLLM)
|
||||
mock_llm.model = "gemini/gemini-3-flash"
|
||||
|
||||
class TestFlow(Flow):
|
||||
@start()
|
||||
@human_feedback(
|
||||
message="Review:",
|
||||
emit=["approved", "rejected"],
|
||||
llm=mock_llm,
|
||||
)
|
||||
def review(self):
|
||||
return "content"
|
||||
|
||||
flow = TestFlow()
|
||||
method = flow._methods.get("review")
|
||||
assert method is not None
|
||||
assert hasattr(method, "_hf_llm")
|
||||
assert method._hf_llm is mock_llm
|
||||
|
||||
def test_hf_llm_attribute_set_on_wrapper_with_string(self) -> None:
|
||||
"""Test that _hf_llm is set on the wrapper even when llm is a string."""
|
||||
|
||||
class TestFlow(Flow):
|
||||
@start()
|
||||
@human_feedback(
|
||||
message="Review:",
|
||||
emit=["approved", "rejected"],
|
||||
llm="gpt-4o-mini",
|
||||
)
|
||||
def review(self):
|
||||
return "content"
|
||||
|
||||
flow = TestFlow()
|
||||
method = flow._methods.get("review")
|
||||
assert method is not None
|
||||
assert hasattr(method, "_hf_llm")
|
||||
assert method._hf_llm == "gpt-4o-mini"
|
||||
|
||||
@patch("crewai.flow.flow.crewai_event_bus.emit")
|
||||
def test_resume_async_uses_live_basellm_over_serialized_string(
|
||||
self, mock_emit: MagicMock
|
||||
) -> None:
|
||||
"""Test that resume_async uses the live BaseLLM from decorator instead of serialized string.
|
||||
|
||||
This is the main bug fix: when a flow resumes, it should use the fully-configured
|
||||
LLM from the re-imported decorator (with credentials, project, etc.) instead of
|
||||
creating a new LLM from just the model string.
|
||||
"""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
db_path = os.path.join(tmpdir, "test_flows.db")
|
||||
persistence = SQLiteFlowPersistence(db_path)
|
||||
|
||||
from crewai.llms.base_llm import BaseLLM
|
||||
|
||||
# Create a mock BaseLLM with full config (simulating Gemini with service account)
|
||||
live_llm = MagicMock(spec=BaseLLM)
|
||||
live_llm.model = "gemini/gemini-3-flash"
|
||||
|
||||
class TestFlow(Flow):
|
||||
result_path: str = ""
|
||||
|
||||
@start()
|
||||
@human_feedback(
|
||||
message="Approve?",
|
||||
emit=["approved", "rejected"],
|
||||
llm=live_llm, # Full LLM object with credentials
|
||||
)
|
||||
def review(self):
|
||||
return "content"
|
||||
|
||||
@listen("approved")
|
||||
def handle_approved(self):
|
||||
self.result_path = "approved"
|
||||
return "Approved!"
|
||||
|
||||
# Save pending feedback with just a model STRING (simulating serialization)
|
||||
context = PendingFeedbackContext(
|
||||
flow_id="live-llm-test",
|
||||
flow_class="TestFlow",
|
||||
method_name="review",
|
||||
method_output="content",
|
||||
message="Approve?",
|
||||
emit=["approved", "rejected"],
|
||||
llm="gemini/gemini-3-flash", # Serialized string, NOT the live object
|
||||
)
|
||||
persistence.save_pending_feedback(
|
||||
flow_uuid="live-llm-test",
|
||||
context=context,
|
||||
state_data={"id": "live-llm-test"},
|
||||
)
|
||||
|
||||
# Restore flow - this re-imports the class with the live LLM
|
||||
flow = TestFlow.from_pending("live-llm-test", persistence)
|
||||
|
||||
# Mock _collapse_to_outcome to capture what LLM it receives
|
||||
captured_llm = []
|
||||
|
||||
def capture_llm(feedback, outcomes, llm):
|
||||
captured_llm.append(llm)
|
||||
return "approved"
|
||||
|
||||
with patch.object(flow, "_collapse_to_outcome", side_effect=capture_llm):
|
||||
flow.resume("looks good!")
|
||||
|
||||
# The key assertion: _collapse_to_outcome received the LIVE BaseLLM object,
|
||||
# NOT the serialized string. The live_llm was captured at class definition
|
||||
# time and stored on the method wrapper as _hf_llm.
|
||||
assert len(captured_llm) == 1
|
||||
# Verify it's the same object that was passed to the decorator
|
||||
# (which is stored on the method's _hf_llm attribute)
|
||||
method = flow._methods.get("review")
|
||||
assert method is not None
|
||||
assert captured_llm[0] is method._hf_llm
|
||||
# And verify it's a BaseLLM instance, not a string
|
||||
assert isinstance(captured_llm[0], BaseLLM)
|
||||
|
||||
@patch("crewai.flow.flow.crewai_event_bus.emit")
|
||||
def test_resume_async_falls_back_to_serialized_string_when_no_hf_llm(
|
||||
self, mock_emit: MagicMock
|
||||
) -> None:
|
||||
"""Test that resume_async falls back to context.llm when _hf_llm is not available.
|
||||
|
||||
This ensures backward compatibility with flows that were paused before this fix.
|
||||
"""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
db_path = os.path.join(tmpdir, "test_flows.db")
|
||||
persistence = SQLiteFlowPersistence(db_path)
|
||||
|
||||
class TestFlow(Flow):
|
||||
@start()
|
||||
@human_feedback(
|
||||
message="Approve?",
|
||||
emit=["approved", "rejected"],
|
||||
llm="gpt-4o-mini",
|
||||
)
|
||||
def review(self):
|
||||
return "content"
|
||||
|
||||
# Save pending feedback
|
||||
context = PendingFeedbackContext(
|
||||
flow_id="fallback-test",
|
||||
flow_class="TestFlow",
|
||||
method_name="review",
|
||||
method_output="content",
|
||||
message="Approve?",
|
||||
emit=["approved", "rejected"],
|
||||
llm="gpt-4o-mini",
|
||||
)
|
||||
persistence.save_pending_feedback(
|
||||
flow_uuid="fallback-test",
|
||||
context=context,
|
||||
state_data={"id": "fallback-test"},
|
||||
)
|
||||
|
||||
flow = TestFlow.from_pending("fallback-test", persistence)
|
||||
|
||||
# Remove _hf_llm to simulate old decorator without this attribute
|
||||
method = flow._methods.get("review")
|
||||
if hasattr(method, "_hf_llm"):
|
||||
delattr(method, "_hf_llm")
|
||||
|
||||
# Mock _collapse_to_outcome to capture what LLM it receives
|
||||
captured_llm = []
|
||||
|
||||
def capture_llm(feedback, outcomes, llm):
|
||||
captured_llm.append(llm)
|
||||
return "approved"
|
||||
|
||||
with patch.object(flow, "_collapse_to_outcome", side_effect=capture_llm):
|
||||
flow.resume("looks good!")
|
||||
|
||||
# Should fall back to the serialized string
|
||||
assert len(captured_llm) == 1
|
||||
assert captured_llm[0] == "gpt-4o-mini"
|
||||
|
||||
@patch("crewai.flow.flow.crewai_event_bus.emit")
|
||||
def test_resume_async_uses_string_from_context_when_hf_llm_is_string(
|
||||
self, mock_emit: MagicMock
|
||||
) -> None:
|
||||
"""Test that when _hf_llm is a string (not BaseLLM), we still use context.llm.
|
||||
|
||||
String LLM values offer no benefit over the serialized context.llm,
|
||||
so we don't prefer them.
|
||||
"""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
db_path = os.path.join(tmpdir, "test_flows.db")
|
||||
persistence = SQLiteFlowPersistence(db_path)
|
||||
|
||||
class TestFlow(Flow):
|
||||
@start()
|
||||
@human_feedback(
|
||||
message="Approve?",
|
||||
emit=["approved", "rejected"],
|
||||
llm="gpt-4o-mini", # String LLM
|
||||
)
|
||||
def review(self):
|
||||
return "content"
|
||||
|
||||
# Save pending feedback
|
||||
context = PendingFeedbackContext(
|
||||
flow_id="string-llm-test",
|
||||
flow_class="TestFlow",
|
||||
method_name="review",
|
||||
method_output="content",
|
||||
message="Approve?",
|
||||
emit=["approved", "rejected"],
|
||||
llm="gpt-4o-mini",
|
||||
)
|
||||
persistence.save_pending_feedback(
|
||||
flow_uuid="string-llm-test",
|
||||
context=context,
|
||||
state_data={"id": "string-llm-test"},
|
||||
)
|
||||
|
||||
flow = TestFlow.from_pending("string-llm-test", persistence)
|
||||
|
||||
# Verify _hf_llm is a string
|
||||
method = flow._methods.get("review")
|
||||
assert method._hf_llm == "gpt-4o-mini"
|
||||
|
||||
# Mock _collapse_to_outcome to capture what LLM it receives
|
||||
captured_llm = []
|
||||
|
||||
def capture_llm(feedback, outcomes, llm):
|
||||
captured_llm.append(llm)
|
||||
return "approved"
|
||||
|
||||
with patch.object(flow, "_collapse_to_outcome", side_effect=capture_llm):
|
||||
flow.resume("looks good!")
|
||||
|
||||
# Should use context.llm since _hf_llm is a string (not BaseLLM)
|
||||
assert len(captured_llm) == 1
|
||||
assert captured_llm[0] == "gpt-4o-mini"
|
||||
|
||||
def test_hf_llm_set_for_async_wrapper(self) -> None:
|
||||
"""Test that _hf_llm is set on async wrapper functions."""
|
||||
import asyncio
|
||||
from crewai.llms.base_llm import BaseLLM
|
||||
|
||||
mock_llm = MagicMock(spec=BaseLLM)
|
||||
mock_llm.model = "gemini/gemini-3-flash"
|
||||
|
||||
class TestFlow(Flow):
|
||||
@start()
|
||||
@human_feedback(
|
||||
message="Review:",
|
||||
emit=["approved", "rejected"],
|
||||
llm=mock_llm,
|
||||
)
|
||||
async def async_review(self):
|
||||
return "content"
|
||||
|
||||
flow = TestFlow()
|
||||
method = flow._methods.get("async_review")
|
||||
assert method is not None
|
||||
assert hasattr(method, "_hf_llm")
|
||||
assert method._hf_llm is mock_llm
|
||||
|
||||
Reference in New Issue
Block a user