diff --git a/lib/crewai/src/crewai/flow/async_feedback/providers.py b/lib/crewai/src/crewai/flow/async_feedback/providers.py index 43443046f..021fbb4a2 100644 --- a/lib/crewai/src/crewai/flow/async_feedback/providers.py +++ b/lib/crewai/src/crewai/flow/async_feedback/providers.py @@ -182,7 +182,7 @@ class ConsoleProvider: console.print(message, style="yellow") console.print() - response = input(">>> \n").strip() + response = input(">>> ").strip() else: response = input(f"{message} ").strip() diff --git a/lib/crewai/src/crewai/flow/async_feedback/types.py b/lib/crewai/src/crewai/flow/async_feedback/types.py index 50bac22a6..911624cd9 100644 --- a/lib/crewai/src/crewai/flow/async_feedback/types.py +++ b/lib/crewai/src/crewai/flow/async_feedback/types.py @@ -63,6 +63,32 @@ class PendingFeedbackContext: llm: dict[str, Any] | str | None = None requested_at: datetime = field(default_factory=datetime.now) + @staticmethod + def _make_json_safe(value: Any) -> Any: + """Convert a value to a JSON-serializable form. + + Handles Pydantic models, dataclasses, and arbitrary objects by + progressively falling back to string representation. + """ + if value is None or isinstance(value, (str, int, float, bool)): + return value + if isinstance(value, (list, tuple)): + return [PendingFeedbackContext._make_json_safe(v) for v in value] + if isinstance(value, dict): + return { + k: PendingFeedbackContext._make_json_safe(v) for k, v in value.items() + } + + from pydantic import BaseModel + + if isinstance(value, BaseModel): + return value.model_dump(mode="json") + import dataclasses + + if dataclasses.is_dataclass(value) and not isinstance(value, type): + return PendingFeedbackContext._make_json_safe(dataclasses.asdict(value)) + return str(value) + def to_dict(self) -> dict[str, Any]: """Serialize context to a dictionary for persistence. @@ -73,11 +99,11 @@ class PendingFeedbackContext: "flow_id": self.flow_id, "flow_class": self.flow_class, "method_name": self.method_name, - "method_output": self.method_output, + "method_output": self._make_json_safe(self.method_output), "message": self.message, "emit": self.emit, "default_outcome": self.default_outcome, - "metadata": self.metadata, + "metadata": self._make_json_safe(self.metadata), "llm": self.llm, "requested_at": self.requested_at.isoformat(), } diff --git a/lib/crewai/src/crewai/flow/flow.py b/lib/crewai/src/crewai/flow/flow.py index 1c1aa90b5..f1e75e617 100644 --- a/lib/crewai/src/crewai/flow/flow.py +++ b/lib/crewai/src/crewai/flow/flow.py @@ -1223,9 +1223,6 @@ class Flow(Generic[T], metaclass=FlowMeta): # Mark that we're resuming execution instance._is_execution_resuming = True - # Mark the method as completed (it ran before pausing) - instance._completed_methods.add(FlowMethodName(pending_context.method_name)) - return instance @property @@ -1380,7 +1377,8 @@ class Flow(Generic[T], metaclass=FlowMeta): self.human_feedback_history.append(result) self.last_human_feedback = result - # Clear pending context after processing + self._completed_methods.add(FlowMethodName(context.method_name)) + self._pending_feedback_context = None # Clear pending feedback from persistence @@ -1403,7 +1401,10 @@ class Flow(Generic[T], metaclass=FlowMeta): # This allows methods to re-execute in loops (e.g., implement_changes → suggest_changes → implement_changes) self._is_execution_resuming = False - final_result: Any = result + if emit and collapsed_outcome is None: + collapsed_outcome = default_outcome or emit[0] + result.outcome = collapsed_outcome + try: if emit and collapsed_outcome: self._method_outputs.append(collapsed_outcome) @@ -1421,7 +1422,8 @@ class Flow(Generic[T], metaclass=FlowMeta): from crewai.flow.async_feedback.types import HumanFeedbackPending if isinstance(e, HumanFeedbackPending): - # Auto-save pending feedback (create default persistence if needed) + self._pending_feedback_context = e.context + if self._persistence is None: from crewai.flow.persistence import SQLiteFlowPersistence @@ -1455,6 +1457,8 @@ class Flow(Generic[T], metaclass=FlowMeta): return e raise + final_result = self._method_outputs[-1] if self._method_outputs else result + # Emit flow finished crewai_event_bus.emit( self, @@ -2314,7 +2318,6 @@ class Flow(Generic[T], metaclass=FlowMeta): if isinstance(e, HumanFeedbackPending): e.context.method_name = method_name - # Auto-save pending feedback (create default persistence if needed) if self._persistence is None: from crewai.flow.persistence import SQLiteFlowPersistence @@ -3133,10 +3136,16 @@ class Flow(Generic[T], metaclass=FlowMeta): if outcome.lower() == response_clean.lower(): return outcome - # Partial match + # Partial match (longest wins, first on length ties) + response_lower = response_clean.lower() + best_outcome: str | None = None + best_len = -1 for outcome in outcomes: - if outcome.lower() in response_clean.lower(): - return outcome + if outcome.lower() in response_lower and len(outcome) > best_len: + best_outcome = outcome + best_len = len(outcome) + if best_outcome is not None: + return best_outcome # Fallback to first outcome logger.warning( diff --git a/lib/crewai/src/crewai/flow/human_feedback.py b/lib/crewai/src/crewai/flow/human_feedback.py index 9bace438e..e43fc3337 100644 --- a/lib/crewai/src/crewai/flow/human_feedback.py +++ b/lib/crewai/src/crewai/flow/human_feedback.py @@ -116,10 +116,11 @@ def _deserialize_llm_from_context( return LLM(model=llm_data) if isinstance(llm_data, dict): - model = llm_data.pop("model", None) + data = dict(llm_data) + model = data.pop("model", None) if not model: return None - return LLM(model=model, **llm_data) + return LLM(model=model, **data) return None @@ -450,12 +451,12 @@ def human_feedback( # -- Core feedback helpers ------------------------------------ - def _request_feedback(flow_instance: Flow[Any], method_output: Any) -> str: - """Request feedback using provider or default console.""" + def _build_feedback_context( + flow_instance: Flow[Any], method_output: Any + ) -> tuple[Any, Any]: + """Build the PendingFeedbackContext and resolve the effective provider.""" from crewai.flow.async_feedback.types import PendingFeedbackContext - # Build context for provider - # Use flow_id property which handles both dict and BaseModel states context = PendingFeedbackContext( flow_id=flow_instance.flow_id or "unknown", flow_class=f"{flow_instance.__class__.__module__}.{flow_instance.__class__.__name__}", @@ -468,15 +469,53 @@ def human_feedback( llm=llm if isinstance(llm, str) else _serialize_llm_for_context(llm), ) - # Determine effective provider: effective_provider = provider if effective_provider is None: from crewai.flow.flow_config import flow_config effective_provider = flow_config.hitl_provider + return context, effective_provider + + def _request_feedback(flow_instance: Flow[Any], method_output: Any) -> str: + """Request feedback using provider or default console (sync).""" + context, effective_provider = _build_feedback_context( + flow_instance, method_output + ) + if effective_provider is not None: - return effective_provider.request_feedback(context, flow_instance) + feedback_result = effective_provider.request_feedback( + context, flow_instance + ) + if asyncio.iscoroutine(feedback_result): + raise TypeError( + f"Provider {type(effective_provider).__name__}.request_feedback() " + "returned a coroutine in a sync flow method. Use an async flow " + "method or a synchronous provider." + ) + return str(feedback_result) + return flow_instance._request_human_feedback( + message=message, + output=method_output, + metadata=metadata, + emit=emit, + ) + + async def _request_feedback_async( + flow_instance: Flow[Any], method_output: Any + ) -> str: + """Request feedback, awaiting the provider if it returns a coroutine.""" + context, effective_provider = _build_feedback_context( + flow_instance, method_output + ) + + if effective_provider is not None: + feedback_result = effective_provider.request_feedback( + context, flow_instance + ) + if asyncio.iscoroutine(feedback_result): + return str(await feedback_result) + return str(feedback_result) return flow_instance._request_human_feedback( message=message, output=method_output, @@ -524,10 +563,11 @@ def human_feedback( flow_instance.human_feedback_history.append(result) flow_instance.last_human_feedback = result - # Return based on mode if emit: - # Return outcome for routing - return collapsed_outcome # type: ignore[return-value] + if collapsed_outcome is None: + collapsed_outcome = default_outcome or emit[0] + result.outcome = collapsed_outcome + return collapsed_outcome return result if asyncio.iscoroutinefunction(func): @@ -540,7 +580,7 @@ def human_feedback( if learn and getattr(self, "memory", None) is not None: method_output = _pre_review_with_lessons(self, method_output) - raw_feedback = _request_feedback(self, method_output) + raw_feedback = await _request_feedback_async(self, method_output) result = _process_feedback(self, method_output, raw_feedback) # Distill: extract lessons from output + feedback, store in memory