mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-04-17 08:22:39 +00:00
fix: resolve multiple bugs in HITL flow system
This commit is contained in:
@@ -182,7 +182,7 @@ class ConsoleProvider:
|
||||
console.print(message, style="yellow")
|
||||
console.print()
|
||||
|
||||
response = input(">>> \n").strip()
|
||||
response = input(">>> ").strip()
|
||||
else:
|
||||
response = input(f"{message} ").strip()
|
||||
|
||||
|
||||
@@ -63,6 +63,32 @@ class PendingFeedbackContext:
|
||||
llm: dict[str, Any] | str | None = None
|
||||
requested_at: datetime = field(default_factory=datetime.now)
|
||||
|
||||
@staticmethod
|
||||
def _make_json_safe(value: Any) -> Any:
|
||||
"""Convert a value to a JSON-serializable form.
|
||||
|
||||
Handles Pydantic models, dataclasses, and arbitrary objects by
|
||||
progressively falling back to string representation.
|
||||
"""
|
||||
if value is None or isinstance(value, (str, int, float, bool)):
|
||||
return value
|
||||
if isinstance(value, (list, tuple)):
|
||||
return [PendingFeedbackContext._make_json_safe(v) for v in value]
|
||||
if isinstance(value, dict):
|
||||
return {
|
||||
k: PendingFeedbackContext._make_json_safe(v) for k, v in value.items()
|
||||
}
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
if isinstance(value, BaseModel):
|
||||
return value.model_dump(mode="json")
|
||||
import dataclasses
|
||||
|
||||
if dataclasses.is_dataclass(value) and not isinstance(value, type):
|
||||
return PendingFeedbackContext._make_json_safe(dataclasses.asdict(value))
|
||||
return str(value)
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
"""Serialize context to a dictionary for persistence.
|
||||
|
||||
@@ -73,11 +99,11 @@ class PendingFeedbackContext:
|
||||
"flow_id": self.flow_id,
|
||||
"flow_class": self.flow_class,
|
||||
"method_name": self.method_name,
|
||||
"method_output": self.method_output,
|
||||
"method_output": self._make_json_safe(self.method_output),
|
||||
"message": self.message,
|
||||
"emit": self.emit,
|
||||
"default_outcome": self.default_outcome,
|
||||
"metadata": self.metadata,
|
||||
"metadata": self._make_json_safe(self.metadata),
|
||||
"llm": self.llm,
|
||||
"requested_at": self.requested_at.isoformat(),
|
||||
}
|
||||
|
||||
@@ -1223,9 +1223,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
# Mark that we're resuming execution
|
||||
instance._is_execution_resuming = True
|
||||
|
||||
# Mark the method as completed (it ran before pausing)
|
||||
instance._completed_methods.add(FlowMethodName(pending_context.method_name))
|
||||
|
||||
return instance
|
||||
|
||||
@property
|
||||
@@ -1380,7 +1377,8 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
self.human_feedback_history.append(result)
|
||||
self.last_human_feedback = result
|
||||
|
||||
# Clear pending context after processing
|
||||
self._completed_methods.add(FlowMethodName(context.method_name))
|
||||
|
||||
self._pending_feedback_context = None
|
||||
|
||||
# Clear pending feedback from persistence
|
||||
@@ -1403,7 +1401,10 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
# This allows methods to re-execute in loops (e.g., implement_changes → suggest_changes → implement_changes)
|
||||
self._is_execution_resuming = False
|
||||
|
||||
final_result: Any = result
|
||||
if emit and collapsed_outcome is None:
|
||||
collapsed_outcome = default_outcome or emit[0]
|
||||
result.outcome = collapsed_outcome
|
||||
|
||||
try:
|
||||
if emit and collapsed_outcome:
|
||||
self._method_outputs.append(collapsed_outcome)
|
||||
@@ -1421,7 +1422,8 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
from crewai.flow.async_feedback.types import HumanFeedbackPending
|
||||
|
||||
if isinstance(e, HumanFeedbackPending):
|
||||
# Auto-save pending feedback (create default persistence if needed)
|
||||
self._pending_feedback_context = e.context
|
||||
|
||||
if self._persistence is None:
|
||||
from crewai.flow.persistence import SQLiteFlowPersistence
|
||||
|
||||
@@ -1455,6 +1457,8 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
return e
|
||||
raise
|
||||
|
||||
final_result = self._method_outputs[-1] if self._method_outputs else result
|
||||
|
||||
# Emit flow finished
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
@@ -2314,7 +2318,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
if isinstance(e, HumanFeedbackPending):
|
||||
e.context.method_name = method_name
|
||||
|
||||
# Auto-save pending feedback (create default persistence if needed)
|
||||
if self._persistence is None:
|
||||
from crewai.flow.persistence import SQLiteFlowPersistence
|
||||
|
||||
@@ -3133,10 +3136,16 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
if outcome.lower() == response_clean.lower():
|
||||
return outcome
|
||||
|
||||
# Partial match
|
||||
# Partial match (longest wins, first on length ties)
|
||||
response_lower = response_clean.lower()
|
||||
best_outcome: str | None = None
|
||||
best_len = -1
|
||||
for outcome in outcomes:
|
||||
if outcome.lower() in response_clean.lower():
|
||||
return outcome
|
||||
if outcome.lower() in response_lower and len(outcome) > best_len:
|
||||
best_outcome = outcome
|
||||
best_len = len(outcome)
|
||||
if best_outcome is not None:
|
||||
return best_outcome
|
||||
|
||||
# Fallback to first outcome
|
||||
logger.warning(
|
||||
|
||||
@@ -116,10 +116,11 @@ def _deserialize_llm_from_context(
|
||||
return LLM(model=llm_data)
|
||||
|
||||
if isinstance(llm_data, dict):
|
||||
model = llm_data.pop("model", None)
|
||||
data = dict(llm_data)
|
||||
model = data.pop("model", None)
|
||||
if not model:
|
||||
return None
|
||||
return LLM(model=model, **llm_data)
|
||||
return LLM(model=model, **data)
|
||||
return None
|
||||
|
||||
|
||||
@@ -450,12 +451,12 @@ def human_feedback(
|
||||
|
||||
# -- Core feedback helpers ------------------------------------
|
||||
|
||||
def _request_feedback(flow_instance: Flow[Any], method_output: Any) -> str:
|
||||
"""Request feedback using provider or default console."""
|
||||
def _build_feedback_context(
|
||||
flow_instance: Flow[Any], method_output: Any
|
||||
) -> tuple[Any, Any]:
|
||||
"""Build the PendingFeedbackContext and resolve the effective provider."""
|
||||
from crewai.flow.async_feedback.types import PendingFeedbackContext
|
||||
|
||||
# Build context for provider
|
||||
# Use flow_id property which handles both dict and BaseModel states
|
||||
context = PendingFeedbackContext(
|
||||
flow_id=flow_instance.flow_id or "unknown",
|
||||
flow_class=f"{flow_instance.__class__.__module__}.{flow_instance.__class__.__name__}",
|
||||
@@ -468,15 +469,53 @@ def human_feedback(
|
||||
llm=llm if isinstance(llm, str) else _serialize_llm_for_context(llm),
|
||||
)
|
||||
|
||||
# Determine effective provider:
|
||||
effective_provider = provider
|
||||
if effective_provider is None:
|
||||
from crewai.flow.flow_config import flow_config
|
||||
|
||||
effective_provider = flow_config.hitl_provider
|
||||
|
||||
return context, effective_provider
|
||||
|
||||
def _request_feedback(flow_instance: Flow[Any], method_output: Any) -> str:
|
||||
"""Request feedback using provider or default console (sync)."""
|
||||
context, effective_provider = _build_feedback_context(
|
||||
flow_instance, method_output
|
||||
)
|
||||
|
||||
if effective_provider is not None:
|
||||
return effective_provider.request_feedback(context, flow_instance)
|
||||
feedback_result = effective_provider.request_feedback(
|
||||
context, flow_instance
|
||||
)
|
||||
if asyncio.iscoroutine(feedback_result):
|
||||
raise TypeError(
|
||||
f"Provider {type(effective_provider).__name__}.request_feedback() "
|
||||
"returned a coroutine in a sync flow method. Use an async flow "
|
||||
"method or a synchronous provider."
|
||||
)
|
||||
return str(feedback_result)
|
||||
return flow_instance._request_human_feedback(
|
||||
message=message,
|
||||
output=method_output,
|
||||
metadata=metadata,
|
||||
emit=emit,
|
||||
)
|
||||
|
||||
async def _request_feedback_async(
|
||||
flow_instance: Flow[Any], method_output: Any
|
||||
) -> str:
|
||||
"""Request feedback, awaiting the provider if it returns a coroutine."""
|
||||
context, effective_provider = _build_feedback_context(
|
||||
flow_instance, method_output
|
||||
)
|
||||
|
||||
if effective_provider is not None:
|
||||
feedback_result = effective_provider.request_feedback(
|
||||
context, flow_instance
|
||||
)
|
||||
if asyncio.iscoroutine(feedback_result):
|
||||
return str(await feedback_result)
|
||||
return str(feedback_result)
|
||||
return flow_instance._request_human_feedback(
|
||||
message=message,
|
||||
output=method_output,
|
||||
@@ -524,10 +563,11 @@ def human_feedback(
|
||||
flow_instance.human_feedback_history.append(result)
|
||||
flow_instance.last_human_feedback = result
|
||||
|
||||
# Return based on mode
|
||||
if emit:
|
||||
# Return outcome for routing
|
||||
return collapsed_outcome # type: ignore[return-value]
|
||||
if collapsed_outcome is None:
|
||||
collapsed_outcome = default_outcome or emit[0]
|
||||
result.outcome = collapsed_outcome
|
||||
return collapsed_outcome
|
||||
return result
|
||||
|
||||
if asyncio.iscoroutinefunction(func):
|
||||
@@ -540,7 +580,7 @@ def human_feedback(
|
||||
if learn and getattr(self, "memory", None) is not None:
|
||||
method_output = _pre_review_with_lessons(self, method_output)
|
||||
|
||||
raw_feedback = _request_feedback(self, method_output)
|
||||
raw_feedback = await _request_feedback_async(self, method_output)
|
||||
result = _process_feedback(self, method_output, raw_feedback)
|
||||
|
||||
# Distill: extract lessons from output + feedback, store in memory
|
||||
|
||||
Reference in New Issue
Block a user