mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-04-30 23:02:50 +00:00
fix: preserve full LLM config across HITL resume for non-OpenAI providers (#4970)
Some checks failed
Check Documentation Broken Links / Check broken links (push) Has been cancelled
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Nightly Canary Release / Build nightly packages (push) Has been cancelled
Nightly Canary Release / Check for new commits (push) Has been cancelled
Nightly Canary Release / Publish nightly to PyPI (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
Some checks failed
Check Documentation Broken Links / Check broken links (push) Has been cancelled
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Nightly Canary Release / Build nightly packages (push) Has been cancelled
Nightly Canary Release / Check for new commits (push) Has been cancelled
Nightly Canary Release / Publish nightly to PyPI (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
When a flow with @human_feedback(llm=create_llm()) pauses for HITL and later resumes: 1. The LLM object was being serialized to just a model string via _serialize_llm_for_context() (e.g. 'gemini/gemini-3.1-flash-lite-preview') 2. On resume, resume_async() was creating LLM(model=string) with NO credentials, project, location, safety_settings, or client_params 3. OpenAI worked by accident (OPENAI_API_KEY from env), but Gemini with service accounts broke This fix: - Stashes the live LLM object on the wrapper as _hf_llm attribute - On resume, looks up the method and retrieves the live LLM if available - Falls back to the serialized string for backward compatibility - Preserves _hf_llm through FlowMethod wrapper decorators Co-authored-by: Joao Moura <joao@crewai.com> Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -1315,7 +1315,25 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
|||||||
context = self._pending_feedback_context
|
context = self._pending_feedback_context
|
||||||
emit = context.emit
|
emit = context.emit
|
||||||
default_outcome = context.default_outcome
|
default_outcome = context.default_outcome
|
||||||
llm = context.llm
|
|
||||||
|
# Try to get the live LLM from the re-imported decorator instead of the
|
||||||
|
# serialized string. When a flow pauses for HITL and resumes (possibly in
|
||||||
|
# a different process), context.llm only contains a model string like
|
||||||
|
# 'gemini/gemini-3-flash-preview'. This loses credentials, project,
|
||||||
|
# location, safety_settings, and client_params. By looking up the method
|
||||||
|
# on the re-imported flow class, we can retrieve the fully-configured LLM
|
||||||
|
# that was passed to the @human_feedback decorator.
|
||||||
|
llm = context.llm # fallback to serialized string
|
||||||
|
method = self._methods.get(FlowMethodName(context.method_name))
|
||||||
|
if method is not None:
|
||||||
|
live_llm = getattr(method, "_hf_llm", None)
|
||||||
|
if live_llm is not None:
|
||||||
|
from crewai.llms.base_llm import BaseLLM as BaseLLMClass
|
||||||
|
|
||||||
|
# Only use live LLM if it's a BaseLLM instance (not a string)
|
||||||
|
# String values offer no benefit over the serialized context.llm
|
||||||
|
if isinstance(live_llm, BaseLLMClass):
|
||||||
|
llm = live_llm
|
||||||
|
|
||||||
# Determine outcome
|
# Determine outcome
|
||||||
collapsed_outcome: str | None = None
|
collapsed_outcome: str | None = None
|
||||||
|
|||||||
@@ -75,6 +75,7 @@ class FlowMethod(Generic[P, R]):
|
|||||||
"__is_router__",
|
"__is_router__",
|
||||||
"__router_paths__",
|
"__router_paths__",
|
||||||
"__human_feedback_config__",
|
"__human_feedback_config__",
|
||||||
|
"_hf_llm", # Live LLM object for HITL resume
|
||||||
]:
|
]:
|
||||||
if hasattr(meth, attr):
|
if hasattr(meth, attr):
|
||||||
setattr(self, attr, getattr(meth, attr))
|
setattr(self, attr, getattr(meth, attr))
|
||||||
|
|||||||
@@ -572,6 +572,14 @@ def human_feedback(
|
|||||||
wrapper.__is_router__ = True
|
wrapper.__is_router__ = True
|
||||||
wrapper.__router_paths__ = list(emit)
|
wrapper.__router_paths__ = list(emit)
|
||||||
|
|
||||||
|
# Stash the live LLM object for HITL resume to retrieve.
|
||||||
|
# When a flow pauses for human feedback and later resumes (possibly in a
|
||||||
|
# different process), the serialized context only contains a model string.
|
||||||
|
# By storing the original LLM on the wrapper, resume_async can retrieve
|
||||||
|
# the fully-configured LLM (with credentials, project, safety_settings, etc.)
|
||||||
|
# instead of creating a bare LLM from just the model string.
|
||||||
|
wrapper._hf_llm = llm
|
||||||
|
|
||||||
return wrapper # type: ignore[no-any-return]
|
return wrapper # type: ignore[no-any-return]
|
||||||
|
|
||||||
return decorator
|
return decorator
|
||||||
|
|||||||
@@ -1216,3 +1216,275 @@ class TestAsyncHumanFeedbackEdgeCases:
|
|||||||
|
|
||||||
assert flow.last_human_feedback.outcome == "approved"
|
assert flow.last_human_feedback.outcome == "approved"
|
||||||
assert flow.last_human_feedback.feedback == ""
|
assert flow.last_human_feedback.feedback == ""
|
||||||
|
|
||||||
|
|
||||||
|
# =============================================================================
|
||||||
|
# Tests for _hf_llm attribute and live LLM resolution on resume
|
||||||
|
# =============================================================================
|
||||||
|
|
||||||
|
|
||||||
|
class TestLiveLLMPreservationOnResume:
|
||||||
|
"""Tests for preserving the full LLM config across HITL resume."""
|
||||||
|
|
||||||
|
def test_hf_llm_attribute_set_on_wrapper_with_basellm(self) -> None:
|
||||||
|
"""Test that _hf_llm is set on the wrapper when llm is a BaseLLM instance."""
|
||||||
|
from crewai.llms.base_llm import BaseLLM
|
||||||
|
|
||||||
|
# Create a mock BaseLLM object
|
||||||
|
mock_llm = MagicMock(spec=BaseLLM)
|
||||||
|
mock_llm.model = "gemini/gemini-3-flash"
|
||||||
|
|
||||||
|
class TestFlow(Flow):
|
||||||
|
@start()
|
||||||
|
@human_feedback(
|
||||||
|
message="Review:",
|
||||||
|
emit=["approved", "rejected"],
|
||||||
|
llm=mock_llm,
|
||||||
|
)
|
||||||
|
def review(self):
|
||||||
|
return "content"
|
||||||
|
|
||||||
|
flow = TestFlow()
|
||||||
|
method = flow._methods.get("review")
|
||||||
|
assert method is not None
|
||||||
|
assert hasattr(method, "_hf_llm")
|
||||||
|
assert method._hf_llm is mock_llm
|
||||||
|
|
||||||
|
def test_hf_llm_attribute_set_on_wrapper_with_string(self) -> None:
|
||||||
|
"""Test that _hf_llm is set on the wrapper even when llm is a string."""
|
||||||
|
|
||||||
|
class TestFlow(Flow):
|
||||||
|
@start()
|
||||||
|
@human_feedback(
|
||||||
|
message="Review:",
|
||||||
|
emit=["approved", "rejected"],
|
||||||
|
llm="gpt-4o-mini",
|
||||||
|
)
|
||||||
|
def review(self):
|
||||||
|
return "content"
|
||||||
|
|
||||||
|
flow = TestFlow()
|
||||||
|
method = flow._methods.get("review")
|
||||||
|
assert method is not None
|
||||||
|
assert hasattr(method, "_hf_llm")
|
||||||
|
assert method._hf_llm == "gpt-4o-mini"
|
||||||
|
|
||||||
|
@patch("crewai.flow.flow.crewai_event_bus.emit")
|
||||||
|
def test_resume_async_uses_live_basellm_over_serialized_string(
|
||||||
|
self, mock_emit: MagicMock
|
||||||
|
) -> None:
|
||||||
|
"""Test that resume_async uses the live BaseLLM from decorator instead of serialized string.
|
||||||
|
|
||||||
|
This is the main bug fix: when a flow resumes, it should use the fully-configured
|
||||||
|
LLM from the re-imported decorator (with credentials, project, etc.) instead of
|
||||||
|
creating a new LLM from just the model string.
|
||||||
|
"""
|
||||||
|
with tempfile.TemporaryDirectory() as tmpdir:
|
||||||
|
db_path = os.path.join(tmpdir, "test_flows.db")
|
||||||
|
persistence = SQLiteFlowPersistence(db_path)
|
||||||
|
|
||||||
|
from crewai.llms.base_llm import BaseLLM
|
||||||
|
|
||||||
|
# Create a mock BaseLLM with full config (simulating Gemini with service account)
|
||||||
|
live_llm = MagicMock(spec=BaseLLM)
|
||||||
|
live_llm.model = "gemini/gemini-3-flash"
|
||||||
|
|
||||||
|
class TestFlow(Flow):
|
||||||
|
result_path: str = ""
|
||||||
|
|
||||||
|
@start()
|
||||||
|
@human_feedback(
|
||||||
|
message="Approve?",
|
||||||
|
emit=["approved", "rejected"],
|
||||||
|
llm=live_llm, # Full LLM object with credentials
|
||||||
|
)
|
||||||
|
def review(self):
|
||||||
|
return "content"
|
||||||
|
|
||||||
|
@listen("approved")
|
||||||
|
def handle_approved(self):
|
||||||
|
self.result_path = "approved"
|
||||||
|
return "Approved!"
|
||||||
|
|
||||||
|
# Save pending feedback with just a model STRING (simulating serialization)
|
||||||
|
context = PendingFeedbackContext(
|
||||||
|
flow_id="live-llm-test",
|
||||||
|
flow_class="TestFlow",
|
||||||
|
method_name="review",
|
||||||
|
method_output="content",
|
||||||
|
message="Approve?",
|
||||||
|
emit=["approved", "rejected"],
|
||||||
|
llm="gemini/gemini-3-flash", # Serialized string, NOT the live object
|
||||||
|
)
|
||||||
|
persistence.save_pending_feedback(
|
||||||
|
flow_uuid="live-llm-test",
|
||||||
|
context=context,
|
||||||
|
state_data={"id": "live-llm-test"},
|
||||||
|
)
|
||||||
|
|
||||||
|
# Restore flow - this re-imports the class with the live LLM
|
||||||
|
flow = TestFlow.from_pending("live-llm-test", persistence)
|
||||||
|
|
||||||
|
# Mock _collapse_to_outcome to capture what LLM it receives
|
||||||
|
captured_llm = []
|
||||||
|
|
||||||
|
def capture_llm(feedback, outcomes, llm):
|
||||||
|
captured_llm.append(llm)
|
||||||
|
return "approved"
|
||||||
|
|
||||||
|
with patch.object(flow, "_collapse_to_outcome", side_effect=capture_llm):
|
||||||
|
flow.resume("looks good!")
|
||||||
|
|
||||||
|
# The key assertion: _collapse_to_outcome received the LIVE BaseLLM object,
|
||||||
|
# NOT the serialized string. The live_llm was captured at class definition
|
||||||
|
# time and stored on the method wrapper as _hf_llm.
|
||||||
|
assert len(captured_llm) == 1
|
||||||
|
# Verify it's the same object that was passed to the decorator
|
||||||
|
# (which is stored on the method's _hf_llm attribute)
|
||||||
|
method = flow._methods.get("review")
|
||||||
|
assert method is not None
|
||||||
|
assert captured_llm[0] is method._hf_llm
|
||||||
|
# And verify it's a BaseLLM instance, not a string
|
||||||
|
assert isinstance(captured_llm[0], BaseLLM)
|
||||||
|
|
||||||
|
@patch("crewai.flow.flow.crewai_event_bus.emit")
|
||||||
|
def test_resume_async_falls_back_to_serialized_string_when_no_hf_llm(
|
||||||
|
self, mock_emit: MagicMock
|
||||||
|
) -> None:
|
||||||
|
"""Test that resume_async falls back to context.llm when _hf_llm is not available.
|
||||||
|
|
||||||
|
This ensures backward compatibility with flows that were paused before this fix.
|
||||||
|
"""
|
||||||
|
with tempfile.TemporaryDirectory() as tmpdir:
|
||||||
|
db_path = os.path.join(tmpdir, "test_flows.db")
|
||||||
|
persistence = SQLiteFlowPersistence(db_path)
|
||||||
|
|
||||||
|
class TestFlow(Flow):
|
||||||
|
@start()
|
||||||
|
@human_feedback(
|
||||||
|
message="Approve?",
|
||||||
|
emit=["approved", "rejected"],
|
||||||
|
llm="gpt-4o-mini",
|
||||||
|
)
|
||||||
|
def review(self):
|
||||||
|
return "content"
|
||||||
|
|
||||||
|
# Save pending feedback
|
||||||
|
context = PendingFeedbackContext(
|
||||||
|
flow_id="fallback-test",
|
||||||
|
flow_class="TestFlow",
|
||||||
|
method_name="review",
|
||||||
|
method_output="content",
|
||||||
|
message="Approve?",
|
||||||
|
emit=["approved", "rejected"],
|
||||||
|
llm="gpt-4o-mini",
|
||||||
|
)
|
||||||
|
persistence.save_pending_feedback(
|
||||||
|
flow_uuid="fallback-test",
|
||||||
|
context=context,
|
||||||
|
state_data={"id": "fallback-test"},
|
||||||
|
)
|
||||||
|
|
||||||
|
flow = TestFlow.from_pending("fallback-test", persistence)
|
||||||
|
|
||||||
|
# Remove _hf_llm to simulate old decorator without this attribute
|
||||||
|
method = flow._methods.get("review")
|
||||||
|
if hasattr(method, "_hf_llm"):
|
||||||
|
delattr(method, "_hf_llm")
|
||||||
|
|
||||||
|
# Mock _collapse_to_outcome to capture what LLM it receives
|
||||||
|
captured_llm = []
|
||||||
|
|
||||||
|
def capture_llm(feedback, outcomes, llm):
|
||||||
|
captured_llm.append(llm)
|
||||||
|
return "approved"
|
||||||
|
|
||||||
|
with patch.object(flow, "_collapse_to_outcome", side_effect=capture_llm):
|
||||||
|
flow.resume("looks good!")
|
||||||
|
|
||||||
|
# Should fall back to the serialized string
|
||||||
|
assert len(captured_llm) == 1
|
||||||
|
assert captured_llm[0] == "gpt-4o-mini"
|
||||||
|
|
||||||
|
@patch("crewai.flow.flow.crewai_event_bus.emit")
|
||||||
|
def test_resume_async_uses_string_from_context_when_hf_llm_is_string(
|
||||||
|
self, mock_emit: MagicMock
|
||||||
|
) -> None:
|
||||||
|
"""Test that when _hf_llm is a string (not BaseLLM), we still use context.llm.
|
||||||
|
|
||||||
|
String LLM values offer no benefit over the serialized context.llm,
|
||||||
|
so we don't prefer them.
|
||||||
|
"""
|
||||||
|
with tempfile.TemporaryDirectory() as tmpdir:
|
||||||
|
db_path = os.path.join(tmpdir, "test_flows.db")
|
||||||
|
persistence = SQLiteFlowPersistence(db_path)
|
||||||
|
|
||||||
|
class TestFlow(Flow):
|
||||||
|
@start()
|
||||||
|
@human_feedback(
|
||||||
|
message="Approve?",
|
||||||
|
emit=["approved", "rejected"],
|
||||||
|
llm="gpt-4o-mini", # String LLM
|
||||||
|
)
|
||||||
|
def review(self):
|
||||||
|
return "content"
|
||||||
|
|
||||||
|
# Save pending feedback
|
||||||
|
context = PendingFeedbackContext(
|
||||||
|
flow_id="string-llm-test",
|
||||||
|
flow_class="TestFlow",
|
||||||
|
method_name="review",
|
||||||
|
method_output="content",
|
||||||
|
message="Approve?",
|
||||||
|
emit=["approved", "rejected"],
|
||||||
|
llm="gpt-4o-mini",
|
||||||
|
)
|
||||||
|
persistence.save_pending_feedback(
|
||||||
|
flow_uuid="string-llm-test",
|
||||||
|
context=context,
|
||||||
|
state_data={"id": "string-llm-test"},
|
||||||
|
)
|
||||||
|
|
||||||
|
flow = TestFlow.from_pending("string-llm-test", persistence)
|
||||||
|
|
||||||
|
# Verify _hf_llm is a string
|
||||||
|
method = flow._methods.get("review")
|
||||||
|
assert method._hf_llm == "gpt-4o-mini"
|
||||||
|
|
||||||
|
# Mock _collapse_to_outcome to capture what LLM it receives
|
||||||
|
captured_llm = []
|
||||||
|
|
||||||
|
def capture_llm(feedback, outcomes, llm):
|
||||||
|
captured_llm.append(llm)
|
||||||
|
return "approved"
|
||||||
|
|
||||||
|
with patch.object(flow, "_collapse_to_outcome", side_effect=capture_llm):
|
||||||
|
flow.resume("looks good!")
|
||||||
|
|
||||||
|
# Should use context.llm since _hf_llm is a string (not BaseLLM)
|
||||||
|
assert len(captured_llm) == 1
|
||||||
|
assert captured_llm[0] == "gpt-4o-mini"
|
||||||
|
|
||||||
|
def test_hf_llm_set_for_async_wrapper(self) -> None:
|
||||||
|
"""Test that _hf_llm is set on async wrapper functions."""
|
||||||
|
import asyncio
|
||||||
|
from crewai.llms.base_llm import BaseLLM
|
||||||
|
|
||||||
|
mock_llm = MagicMock(spec=BaseLLM)
|
||||||
|
mock_llm.model = "gemini/gemini-3-flash"
|
||||||
|
|
||||||
|
class TestFlow(Flow):
|
||||||
|
@start()
|
||||||
|
@human_feedback(
|
||||||
|
message="Review:",
|
||||||
|
emit=["approved", "rejected"],
|
||||||
|
llm=mock_llm,
|
||||||
|
)
|
||||||
|
async def async_review(self):
|
||||||
|
return "content"
|
||||||
|
|
||||||
|
flow = TestFlow()
|
||||||
|
method = flow._methods.get("async_review")
|
||||||
|
assert method is not None
|
||||||
|
assert hasattr(method, "_hf_llm")
|
||||||
|
assert method._hf_llm is mock_llm
|
||||||
|
|||||||
Reference in New Issue
Block a user