Merge branch 'main' into feat/per-user-token-tracing

This commit is contained in:
Devasy Patel
2025-12-27 23:20:38 +05:30
committed by GitHub
27 changed files with 5717 additions and 69 deletions

View File

@@ -38,9 +38,11 @@ from crewai.events.types.crew_events import (
from crewai.events.types.flow_events import (
FlowCreatedEvent,
FlowFinishedEvent,
FlowPausedEvent,
FlowStartedEvent,
MethodExecutionFailedEvent,
MethodExecutionFinishedEvent,
MethodExecutionPausedEvent,
MethodExecutionStartedEvent,
)
from crewai.events.types.knowledge_events import (
@@ -363,6 +365,28 @@ class EventListener(BaseEventListener):
)
self.method_branches[event.method_name] = updated_branch
@crewai_event_bus.on(MethodExecutionPausedEvent)
def on_method_execution_paused(
_: Any, event: MethodExecutionPausedEvent
) -> None:
method_branch = self.method_branches.get(event.method_name)
updated_branch = self.formatter.update_method_status(
method_branch,
self.formatter.current_flow_tree,
event.method_name,
"paused",
)
self.method_branches[event.method_name] = updated_branch
@crewai_event_bus.on(FlowPausedEvent)
def on_flow_paused(_: Any, event: FlowPausedEvent) -> None:
self.formatter.update_flow_status(
self.formatter.current_flow_tree,
event.flow_name,
event.flow_id,
"paused",
)
# ----------- TOOL USAGE EVENTS -----------
@crewai_event_bus.on(ToolUsageStartedEvent)

View File

@@ -58,6 +58,29 @@ class MethodExecutionFailedEvent(FlowEvent):
model_config = ConfigDict(arbitrary_types_allowed=True)
class MethodExecutionPausedEvent(FlowEvent):
"""Event emitted when a flow method is paused waiting for human feedback.
This event is emitted when a @human_feedback decorated method with an
async provider raises HumanFeedbackPending to pause execution.
Attributes:
flow_name: Name of the flow that is paused.
method_name: Name of the method waiting for feedback.
state: Current flow state when paused.
flow_id: Unique identifier for this flow execution.
message: The message shown when requesting feedback.
emit: Optional list of possible outcomes for routing.
"""
method_name: str
state: dict[str, Any] | BaseModel
flow_id: str
message: str
emit: list[str] | None = None
type: str = "method_execution_paused"
class FlowFinishedEvent(FlowEvent):
"""Event emitted when a flow completes execution"""
@@ -67,8 +90,71 @@ class FlowFinishedEvent(FlowEvent):
state: dict[str, Any] | BaseModel
class FlowPausedEvent(FlowEvent):
"""Event emitted when a flow is paused waiting for human feedback.
This event is emitted when a flow is paused due to a @human_feedback
decorated method with an async provider raising HumanFeedbackPending.
Attributes:
flow_name: Name of the flow that is paused.
flow_id: Unique identifier for this flow execution.
method_name: Name of the method waiting for feedback.
state: Current flow state when paused.
message: The message shown when requesting feedback.
emit: Optional list of possible outcomes for routing.
"""
flow_id: str
method_name: str
state: dict[str, Any] | BaseModel
message: str
emit: list[str] | None = None
type: str = "flow_paused"
class FlowPlotEvent(FlowEvent):
"""Event emitted when a flow plot is created"""
flow_name: str
type: str = "flow_plot"
class HumanFeedbackRequestedEvent(FlowEvent):
"""Event emitted when human feedback is requested.
This event is emitted when a @human_feedback decorated method
requires input from a human reviewer.
Attributes:
flow_name: Name of the flow requesting feedback.
method_name: Name of the method decorated with @human_feedback.
output: The method output shown to the human for review.
message: The message displayed when requesting feedback.
emit: Optional list of possible outcomes for routing.
"""
method_name: str
output: Any
message: str
emit: list[str] | None = None
type: str = "human_feedback_requested"
class HumanFeedbackReceivedEvent(FlowEvent):
"""Event emitted when human feedback is received.
This event is emitted after a human provides feedback in response
to a @human_feedback decorated method.
Attributes:
flow_name: Name of the flow that received feedback.
method_name: Name of the method that received feedback.
feedback: The raw text feedback provided by the human.
outcome: The collapsed outcome string (if emit was specified).
"""
method_name: str
feedback: str
outcome: str | None = None
type: str = "human_feedback_received"

View File

@@ -453,41 +453,48 @@ To enable tracing, do any one of these:
if flow_tree is None:
return
# Determine status-specific labels and styles
if status == "completed":
label_prefix = "✅ Flow Finished:"
style = "green"
node_text = "✅ Flow Completed"
content_text = "Flow Execution Completed"
panel_title = "Flow Completion"
elif status == "paused":
label_prefix = "⏳ Flow Paused:"
style = "cyan"
node_text = "⏳ Waiting for Human Feedback"
content_text = "Flow Paused - Waiting for Feedback"
panel_title = "Flow Paused"
else:
label_prefix = "❌ Flow Failed:"
style = "red"
node_text = "❌ Flow Failed"
content_text = "Flow Execution Failed"
panel_title = "Flow Failure"
# Update main flow label
self.update_tree_label(
flow_tree,
"✅ Flow Finished:" if status == "completed" else "❌ Flow Failed:",
label_prefix,
flow_name,
"green" if status == "completed" else "red",
style,
)
# Update initialization node status
for child in flow_tree.children:
if "Starting Flow" in str(child.label):
child.label = Text(
(
"✅ Flow Completed"
if status == "completed"
else "❌ Flow Failed"
),
style="green" if status == "completed" else "red",
)
child.label = Text(node_text, style=style)
break
content = self.create_status_content(
(
"Flow Execution Completed"
if status == "completed"
else "Flow Execution Failed"
),
content_text,
flow_name,
"green" if status == "completed" else "red",
style,
ID=flow_id,
)
self.print(flow_tree)
self.print_panel(
content, "Flow Completion", "green" if status == "completed" else "red"
)
self.print_panel(content, panel_title, style)
def update_method_status(
self,
@@ -508,6 +515,12 @@ To enable tracing, do any one of these:
if "Starting Flow" in str(child.label):
child.label = Text("Flow Method Step", style="white")
break
elif status == "paused":
prefix, style = "⏳ Paused:", "cyan"
for child in flow_tree.children:
if "Starting Flow" in str(child.label):
child.label = Text("⏳ Waiting for Feedback", style="cyan")
break
else:
prefix, style = "❌ Failed:", "red"
for child in flow_tree.children:

View File

@@ -1,4 +1,11 @@
from crewai.flow.async_feedback import (
ConsoleProvider,
HumanFeedbackPending,
HumanFeedbackProvider,
PendingFeedbackContext,
)
from crewai.flow.flow import Flow, and_, listen, or_, router, start
from crewai.flow.human_feedback import HumanFeedbackResult, human_feedback
from crewai.flow.persistence import persist
from crewai.flow.visualization import (
FlowStructure,
@@ -8,10 +15,16 @@ from crewai.flow.visualization import (
__all__ = [
"ConsoleProvider",
"Flow",
"FlowStructure",
"HumanFeedbackPending",
"HumanFeedbackProvider",
"HumanFeedbackResult",
"PendingFeedbackContext",
"and_",
"build_flow_structure",
"human_feedback",
"listen",
"or_",
"persist",

View File

@@ -0,0 +1,41 @@
"""Async human feedback support for CrewAI Flows.
This module provides abstractions for non-blocking human-in-the-loop workflows,
allowing integration with external systems like Slack, Teams, webhooks, or APIs.
Example:
```python
from crewai.flow import Flow, start, human_feedback
from crewai.flow.async_feedback import HumanFeedbackProvider, HumanFeedbackPending
class SlackProvider(HumanFeedbackProvider):
def request_feedback(self, context, flow):
self.send_slack_notification(context)
raise HumanFeedbackPending(context=context)
class MyFlow(Flow):
@start()
@human_feedback(
message="Review this:",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
provider=SlackProvider(),
)
def review(self):
return "Content to review"
```
"""
from crewai.flow.async_feedback.types import (
HumanFeedbackPending,
HumanFeedbackProvider,
PendingFeedbackContext,
)
from crewai.flow.async_feedback.providers import ConsoleProvider
__all__ = [
"ConsoleProvider",
"HumanFeedbackPending",
"HumanFeedbackProvider",
"PendingFeedbackContext",
]

View File

@@ -0,0 +1,124 @@
"""Default provider implementations for human feedback.
This module provides the ConsoleProvider, which is the default synchronous
provider that collects feedback via console input.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from crewai.flow.async_feedback.types import PendingFeedbackContext
if TYPE_CHECKING:
from crewai.flow.flow import Flow
class ConsoleProvider:
"""Default synchronous console-based feedback provider.
This provider blocks execution and waits for console input from the user.
It displays the method output with formatting and prompts for feedback.
This is the default provider used when no custom provider is specified
in the @human_feedback decorator.
Example:
```python
from crewai.flow.async_feedback import ConsoleProvider
# Explicitly use console provider
@human_feedback(
message="Review this:",
provider=ConsoleProvider(),
)
def my_method(self):
return "Content to review"
```
"""
def __init__(self, verbose: bool = True):
"""Initialize the console provider.
Args:
verbose: Whether to display formatted output. If False, only
shows the prompt message.
"""
self.verbose = verbose
def request_feedback(
self,
context: PendingFeedbackContext,
flow: Flow,
) -> str:
"""Request feedback via console input (blocking).
Displays the method output with formatting and waits for the user
to type their feedback. Press Enter to skip (returns empty string).
Args:
context: The pending feedback context with output and message.
flow: The Flow instance (used for event emission).
Returns:
The user's feedback as a string, or empty string if skipped.
"""
from crewai.events.event_bus import crewai_event_bus
from crewai.events.event_listener import event_listener
from crewai.events.types.flow_events import (
HumanFeedbackReceivedEvent,
HumanFeedbackRequestedEvent,
)
# Emit feedback requested event
crewai_event_bus.emit(
flow,
HumanFeedbackRequestedEvent(
type="human_feedback_requested",
flow_name=flow.name or flow.__class__.__name__,
method_name=context.method_name,
output=context.method_output,
message=context.message,
emit=context.emit,
),
)
# Pause live updates during human input
formatter = event_listener.formatter
formatter.pause_live_updates()
try:
console = formatter.console
if self.verbose:
# Display output with formatting using Rich console
console.print("\n" + "" * 50, style="bold cyan")
console.print(" OUTPUT FOR REVIEW", style="bold cyan")
console.print("" * 50 + "\n", style="bold cyan")
console.print(context.method_output)
console.print("\n" + "" * 50 + "\n", style="bold cyan")
# Show message and prompt for feedback
console.print(context.message, style="yellow")
console.print(
"(Press Enter to skip, or type your feedback)\n", style="cyan"
)
feedback = input("Your feedback: ").strip()
# Emit feedback received event
crewai_event_bus.emit(
flow,
HumanFeedbackReceivedEvent(
type="human_feedback_received",
flow_name=flow.name or flow.__class__.__name__,
method_name=context.method_name,
feedback=feedback,
outcome=None, # Will be determined after collapsing
),
)
return feedback
finally:
# Resume live updates
formatter.resume_live_updates()

View File

@@ -0,0 +1,264 @@
"""Core types for async human feedback in Flows.
This module defines the protocol, exception, and context types used for
non-blocking human-in-the-loop workflows.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable
if TYPE_CHECKING:
from crewai.flow.flow import Flow
@dataclass
class PendingFeedbackContext:
"""Context capturing everything needed to resume a paused flow.
When a flow is paused waiting for async human feedback, this dataclass
stores all the information needed to:
1. Identify which flow execution is waiting
2. What method triggered the feedback request
3. What was shown to the human
4. How to route the response when it arrives
Attributes:
flow_id: Unique identifier for the flow instance (from state.id)
flow_class: Fully qualified class name (e.g., "myapp.flows.ReviewFlow")
method_name: Name of the method that triggered feedback request
method_output: The output that was shown to the human for review
message: The message displayed when requesting feedback
emit: Optional list of outcome strings for routing
default_outcome: Outcome to use when no feedback is provided
metadata: Optional metadata for external system integration
llm: LLM model string for outcome collapsing
requested_at: When the feedback was requested
Example:
```python
context = PendingFeedbackContext(
flow_id="abc-123",
flow_class="myapp.ReviewFlow",
method_name="review_content",
method_output={"title": "Draft", "body": "..."},
message="Please review and approve or reject:",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
)
```
"""
flow_id: str
flow_class: str
method_name: str
method_output: Any
message: str
emit: list[str] | None = None
default_outcome: str | None = None
metadata: dict[str, Any] = field(default_factory=dict)
llm: str | None = None
requested_at: datetime = field(default_factory=datetime.now)
def to_dict(self) -> dict[str, Any]:
"""Serialize context to a dictionary for persistence.
Returns:
Dictionary representation suitable for JSON serialization.
"""
return {
"flow_id": self.flow_id,
"flow_class": self.flow_class,
"method_name": self.method_name,
"method_output": self.method_output,
"message": self.message,
"emit": self.emit,
"default_outcome": self.default_outcome,
"metadata": self.metadata,
"llm": self.llm,
"requested_at": self.requested_at.isoformat(),
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> PendingFeedbackContext:
"""Deserialize context from a dictionary.
Args:
data: Dictionary representation of the context.
Returns:
Reconstructed PendingFeedbackContext instance.
"""
requested_at = data.get("requested_at")
if isinstance(requested_at, str):
requested_at = datetime.fromisoformat(requested_at)
elif requested_at is None:
requested_at = datetime.now()
return cls(
flow_id=data["flow_id"],
flow_class=data["flow_class"],
method_name=data["method_name"],
method_output=data.get("method_output"),
message=data.get("message", ""),
emit=data.get("emit"),
default_outcome=data.get("default_outcome"),
metadata=data.get("metadata", {}),
llm=data.get("llm"),
requested_at=requested_at,
)
class HumanFeedbackPending(Exception): # noqa: N818 - Not an error, a control flow signal
"""Signal that flow execution should pause for async human feedback.
When raised by a provider, the flow framework will:
1. Stop execution at the current method
2. Automatically persist state and context (if persistence is configured)
3. Return this object to the caller (not re-raise it)
The caller receives this as a return value from `flow.kickoff()`, enabling
graceful handling of the paused state without try/except blocks:
```python
result = flow.kickoff()
if isinstance(result, HumanFeedbackPending):
# Flow is paused, handle async feedback
print(f"Waiting for feedback: {result.context.flow_id}")
else:
# Normal completion
print(f"Flow completed: {result}")
```
Note:
The flow framework automatically saves pending feedback when this
exception is raised. Providers do NOT need to call `save_pending_feedback`
manually - just raise this exception and the framework handles persistence.
Attributes:
context: The PendingFeedbackContext with all details needed to resume
callback_info: Optional dict with information for external systems
(e.g., webhook URL, ticket ID, Slack thread ID)
Example:
```python
class SlackProvider(HumanFeedbackProvider):
def request_feedback(self, context, flow):
# Send notification to external system
ticket_id = self.create_slack_thread(context)
# Raise to pause - framework handles persistence automatically
raise HumanFeedbackPending(
context=context,
callback_info={
"slack_channel": "#reviews",
"thread_id": ticket_id,
}
)
```
"""
def __init__(
self,
context: PendingFeedbackContext,
callback_info: dict[str, Any] | None = None,
message: str | None = None,
):
"""Initialize the pending feedback exception.
Args:
context: The pending feedback context with flow details
callback_info: Optional information for external system callbacks
message: Optional custom message (defaults to descriptive message)
"""
self.context = context
self.callback_info = callback_info or {}
if message is None:
message = (
f"Human feedback pending for flow '{context.flow_id}' "
f"at method '{context.method_name}'"
)
super().__init__(message)
@runtime_checkable
class HumanFeedbackProvider(Protocol):
"""Protocol for human feedback collection strategies.
Implement this protocol to create custom feedback providers that integrate
with external systems like Slack, Teams, email, or custom APIs.
Providers can be either:
- **Synchronous (blocking)**: Return feedback string directly
- **Asynchronous (non-blocking)**: Raise HumanFeedbackPending to pause
The default ConsoleProvider is synchronous and blocks waiting for input.
For async workflows, implement a provider that raises HumanFeedbackPending.
Note:
The flow framework automatically handles state persistence when
HumanFeedbackPending is raised. Providers only need to:
1. Notify the external system (Slack, email, webhook, etc.)
2. Raise HumanFeedbackPending with the context and callback info
Example synchronous provider:
```python
class ConsoleProvider(HumanFeedbackProvider):
def request_feedback(self, context, flow):
print(context.method_output)
return input("Your feedback: ")
```
Example async provider:
```python
class SlackProvider(HumanFeedbackProvider):
def __init__(self, channel: str):
self.channel = channel
def request_feedback(self, context, flow):
# Send notification to Slack
thread_id = self.post_to_slack(
channel=self.channel,
message=context.message,
content=context.method_output,
)
# Raise to pause - framework handles persistence automatically
raise HumanFeedbackPending(
context=context,
callback_info={
"channel": self.channel,
"thread_id": thread_id,
}
)
```
"""
def request_feedback(
self,
context: PendingFeedbackContext,
flow: Flow,
) -> str:
"""Request feedback from a human.
For synchronous providers, block and return the feedback string.
For async providers, notify the external system and raise
HumanFeedbackPending to pause the flow.
Args:
context: The pending feedback context containing all details
about what feedback is needed and how to route the response.
flow: The Flow instance, providing access to state and name.
Returns:
The human's feedback as a string (synchronous providers only).
Raises:
HumanFeedbackPending: To signal that the flow should pause and
wait for external feedback. The framework will automatically
persist state when this is raised.
"""
...

View File

@@ -7,12 +7,13 @@ for building event-driven workflows with conditional execution and routing.
from __future__ import annotations
import asyncio
from collections.abc import Callable
from collections.abc import Callable, Sequence
from concurrent.futures import Future
import copy
import inspect
import logging
from typing import (
TYPE_CHECKING,
Any,
ClassVar,
Generic,
@@ -41,10 +42,12 @@ from crewai.events.listeners.tracing.utils import (
from crewai.events.types.flow_events import (
FlowCreatedEvent,
FlowFinishedEvent,
FlowPausedEvent,
FlowPlotEvent,
FlowStartedEvent,
MethodExecutionFailedEvent,
MethodExecutionFinishedEvent,
MethodExecutionPausedEvent,
MethodExecutionStartedEvent,
)
from crewai.flow.constants import AND_CONDITION, OR_CONDITION
@@ -69,9 +72,14 @@ from crewai.flow.utils import (
is_flow_method_name,
is_simple_flow_condition,
)
if TYPE_CHECKING:
from crewai.flow.async_feedback.types import PendingFeedbackContext
from crewai.flow.human_feedback import HumanFeedbackResult
from crewai.llms.base_llm import BaseLLM
from crewai.flow.visualization import build_flow_structure, render_interactive
from crewai.types.streaming import CrewStreamingOutput, FlowStreamingOutput
from crewai.utilities.printer import Printer, PrinterColor
from crewai.utilities.streaming import (
TaskInfo,
create_async_chunk_generator,
@@ -443,6 +451,23 @@ class FlowMeta(type):
else:
router_paths[attr_name] = []
# Handle start methods that are also routers (e.g., @human_feedback with emit)
if (
hasattr(attr_value, "__is_start_method__")
and hasattr(attr_value, "__is_router__")
and attr_value.__is_router__
):
routers.add(attr_name)
# Get router paths from the decorator attribute
if hasattr(attr_value, "__router_paths__") and attr_value.__router_paths__:
router_paths[attr_name] = attr_value.__router_paths__
else:
possible_returns = get_possible_return_constants(attr_value)
if possible_returns:
router_paths[attr_name] = possible_returns
else:
router_paths[attr_name] = []
cls._start_methods = start_methods # type: ignore[attr-defined]
cls._listeners = listeners # type: ignore[attr-defined]
cls._routers = routers # type: ignore[attr-defined]
@@ -456,8 +481,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
type parameter T must be either dict[str, Any] or a subclass of BaseModel."""
_printer: ClassVar[Printer] = Printer()
_start_methods: ClassVar[list[FlowMethodName]] = []
_listeners: ClassVar[dict[FlowMethodName, SimpleFlowCondition | FlowCondition]] = {}
_routers: ClassVar[set[FlowMethodName]] = set()
@@ -499,6 +522,11 @@ class Flow(Generic[T], metaclass=FlowMeta):
self._is_execution_resuming: bool = False
self._event_futures: list[Future[None]] = []
# Human feedback storage
self.human_feedback_history: list[HumanFeedbackResult] = []
self.last_human_feedback: HumanFeedbackResult | None = None
self._pending_feedback_context: PendingFeedbackContext | None = None
# Initialize state with initial values
self._state = self._create_initial_state()
self.tracing = tracing
@@ -529,6 +557,295 @@ class Flow(Generic[T], metaclass=FlowMeta):
method = method.__get__(self, self.__class__)
self._methods[method.__name__] = method
@classmethod
def from_pending(
cls,
flow_id: str,
persistence: FlowPersistence | None = None,
**kwargs: Any,
) -> "Flow[Any]":
"""Create a Flow instance from a pending feedback state.
This classmethod is used to restore a flow that was paused waiting
for async human feedback. It loads the persisted state and pending
feedback context, then returns a flow instance ready to resume.
Args:
flow_id: The unique identifier of the paused flow (from state.id)
persistence: The persistence backend where the state was saved.
If not provided, defaults to SQLiteFlowPersistence().
**kwargs: Additional keyword arguments passed to the Flow constructor
Returns:
A new Flow instance with restored state, ready to call resume()
Raises:
ValueError: If no pending feedback exists for the given flow_id
Example:
```python
# Simple usage with default persistence:
flow = MyFlow.from_pending("abc-123")
result = flow.resume("looks good!")
# Or with custom persistence:
persistence = SQLiteFlowPersistence("custom.db")
flow = MyFlow.from_pending("abc-123", persistence)
result = flow.resume("looks good!")
```
"""
if persistence is None:
from crewai.flow.persistence import SQLiteFlowPersistence
persistence = SQLiteFlowPersistence()
# Load pending feedback context and state
loaded = persistence.load_pending_feedback(flow_id)
if loaded is None:
raise ValueError(f"No pending feedback found for flow_id: {flow_id}")
state_data, pending_context = loaded
# Create flow instance with persistence
instance = cls(persistence=persistence, **kwargs)
# Restore state
instance._initialize_state(state_data)
# Store pending context for resume
instance._pending_feedback_context = pending_context
# Mark that we're resuming execution
instance._is_execution_resuming = True
# Mark the method as completed (it ran before pausing)
instance._completed_methods.add(FlowMethodName(pending_context.method_name))
return instance
@property
def pending_feedback(self) -> "PendingFeedbackContext | None":
"""Get the pending feedback context if this flow is waiting for feedback.
Returns:
The PendingFeedbackContext if the flow is paused waiting for feedback,
None otherwise.
Example:
```python
flow = MyFlow.from_pending("abc-123", persistence)
if flow.pending_feedback:
print(f"Waiting for feedback on: {flow.pending_feedback.method_name}")
```
"""
return self._pending_feedback_context
def resume(self, feedback: str = "") -> Any:
"""Resume flow execution, optionally with human feedback.
This method continues flow execution after a flow was paused for
async human feedback. It processes the feedback (including LLM-based
outcome collapsing if emit was specified), stores the result, and
triggers downstream listeners.
Note:
If called from within an async context (running event loop),
use `await flow.resume_async(feedback)` instead.
Args:
feedback: The human's feedback as a string. If empty, uses
default_outcome or the first emit option.
Returns:
The final output from the flow execution, or HumanFeedbackPending
if another feedback point is reached.
Raises:
ValueError: If no pending feedback context exists (flow wasn't paused)
RuntimeError: If called from within a running event loop (use resume_async instead)
Example:
```python
# In a sync webhook handler:
def handle_feedback(flow_id: str, feedback: str):
flow = MyFlow.from_pending(flow_id)
result = flow.resume(feedback)
return result
# In an async handler, use resume_async instead:
async def handle_feedback_async(flow_id: str, feedback: str):
flow = MyFlow.from_pending(flow_id)
result = await flow.resume_async(feedback)
return result
```
"""
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = None
if loop is not None:
raise RuntimeError(
"resume() cannot be called from within an async context. "
"Use 'await flow.resume_async(feedback)' instead."
)
return asyncio.run(self.resume_async(feedback))
async def resume_async(self, feedback: str = "") -> Any:
"""Async version of resume.
Resume flow execution, optionally with human feedback asynchronously.
Args:
feedback: The human's feedback as a string. If empty, uses
default_outcome or the first emit option.
Returns:
The final output from the flow execution, or HumanFeedbackPending
if another feedback point is reached.
Raises:
ValueError: If no pending feedback context exists
"""
from crewai.flow.human_feedback import HumanFeedbackResult
from datetime import datetime
if self._pending_feedback_context is None:
raise ValueError(
"No pending feedback context. Use from_pending() to restore a paused flow."
)
context = self._pending_feedback_context
emit = context.emit
default_outcome = context.default_outcome
llm = context.llm
# Determine outcome
collapsed_outcome: str | None = None
if not feedback.strip():
# Empty feedback
if default_outcome:
collapsed_outcome = default_outcome
elif emit:
# No default and no feedback - use first outcome
collapsed_outcome = emit[0]
elif emit:
# Collapse feedback to outcome using LLM
collapsed_outcome = self._collapse_to_outcome(
feedback=feedback,
outcomes=emit,
llm=llm,
)
# Create result
result = HumanFeedbackResult(
output=context.method_output,
feedback=feedback,
outcome=collapsed_outcome,
timestamp=datetime.now(),
method_name=context.method_name,
metadata=context.metadata,
)
# Store in flow instance
self.human_feedback_history.append(result)
self.last_human_feedback = result
# Clear pending context after processing
self._pending_feedback_context = None
# Clear pending feedback from persistence
if self._persistence:
self._persistence.clear_pending_feedback(context.flow_id)
# Emit feedback received event
crewai_event_bus.emit(
self,
MethodExecutionFinishedEvent(
type="method_execution_finished",
flow_name=self.name or self.__class__.__name__,
method_name=context.method_name,
result=collapsed_outcome if emit else result,
state=self._state,
),
)
# Clear resumption flag before triggering listeners
# This allows methods to re-execute in loops (e.g., implement_changes → suggest_changes → implement_changes)
self._is_execution_resuming = False
# Determine what to pass to listeners
try:
if emit and collapsed_outcome:
# Router behavior - the outcome itself triggers listeners
# First, add the outcome to method outputs as a router would
self._method_outputs.append(collapsed_outcome)
# Then trigger listeners for the outcome (e.g., "approved" triggers @listen("approved"))
final_result = await self._execute_listeners(
FlowMethodName(collapsed_outcome), # Use outcome as trigger
result, # Pass HumanFeedbackResult to listeners
)
else:
# Normal behavior - pass the HumanFeedbackResult
final_result = await self._execute_listeners(
FlowMethodName(context.method_name),
result,
)
except Exception as e:
# Check if flow was paused again for human feedback (loop case)
from crewai.flow.async_feedback.types import HumanFeedbackPending
if isinstance(e, HumanFeedbackPending):
# Auto-save pending feedback (create default persistence if needed)
if self._persistence is None:
from crewai.flow.persistence import SQLiteFlowPersistence
self._persistence = SQLiteFlowPersistence()
state_data = (
self._state
if isinstance(self._state, dict)
else self._state.model_dump()
)
self._persistence.save_pending_feedback(
flow_uuid=e.context.flow_id,
context=e.context,
state_data=state_data,
)
# Emit flow paused event
crewai_event_bus.emit(
self,
FlowPausedEvent(
type="flow_paused",
flow_name=self.name or self.__class__.__name__,
flow_id=e.context.flow_id,
method_name=e.context.method_name,
state=self._copy_and_serialize_state(),
message=e.context.message,
emit=e.context.emit,
),
)
# Return the pending exception instead of raising
return e
raise
# Emit flow finished
crewai_event_bus.emit(
self,
FlowFinishedEvent(
type="flow_finished",
flow_name=self.name or self.__class__.__name__,
result=final_result,
state=self._state,
),
)
return final_result
def _create_initial_state(self) -> T:
"""Create and initialize flow state with UUID and default values.
@@ -544,19 +861,21 @@ class Flow(Generic[T], metaclass=FlowMeta):
state_type = self._initial_state_t
if isinstance(state_type, type):
if issubclass(state_type, FlowState):
# Create instance without id, then set it
# Create instance - FlowState auto-generates id via default_factory
instance = state_type()
if not hasattr(instance, "id"):
instance.id = str(uuid4())
# Ensure id is set - generate UUID if empty
if not getattr(instance, "id", None):
object.__setattr__(instance, "id", str(uuid4()))
return cast(T, instance)
if issubclass(state_type, BaseModel):
# Create a new type that includes the ID field
class StateWithId(state_type, FlowState): # type: ignore
# Create a new type with FlowState first for proper id default
class StateWithId(FlowState, state_type): # type: ignore
pass
instance = StateWithId()
if not hasattr(instance, "id"):
instance.id = str(uuid4())
# Ensure id is set - generate UUID if empty
if not getattr(instance, "id", None):
object.__setattr__(instance, "id", str(uuid4()))
return cast(T, instance)
if state_type is dict:
return cast(T, {"id": str(uuid4())})
@@ -574,7 +893,11 @@ class Flow(Generic[T], metaclass=FlowMeta):
model_fields = getattr(self.initial_state, "model_fields", None)
if not model_fields or "id" not in model_fields:
raise ValueError("Flow state model must have an 'id' field")
return self.initial_state() # Uses model defaults
instance = self.initial_state()
# Ensure id is set - generate UUID if empty
if not getattr(instance, "id", None):
object.__setattr__(instance, "id", str(uuid4()))
return instance
if self.initial_state is dict:
return cast(T, {"id": str(uuid4())})
@@ -604,6 +927,10 @@ class Flow(Generic[T], metaclass=FlowMeta):
k: v for k, v in model.__dict__.items() if not k.startswith("_")
}
# Ensure id is set - generate UUID if empty
if not state_dict.get("id"):
state_dict["id"] = str(uuid4())
# Create new instance of the same class
model_class = type(model)
return cast(T, model_class(**state_dict))
@@ -686,16 +1013,22 @@ class Flow(Generic[T], metaclass=FlowMeta):
TypeError: If state is neither BaseModel nor dictionary
"""
if isinstance(self._state, dict):
# For dict states, preserve existing fields unless overridden
# For dict states, update with inputs
# If inputs contains an id, use it (for restoring from persistence)
# Otherwise preserve the current id or generate a new one
current_id = self._state.get("id")
# Only update specified fields
inputs_has_id = "id" in inputs
# Update specified fields
for k, v in inputs.items():
self._state[k] = v
# Ensure ID is preserved or generated
if current_id:
self._state["id"] = current_id
elif "id" not in self._state:
self._state["id"] = str(uuid4())
# Ensure ID is set: prefer inputs id, then current id, then generate
if not inputs_has_id:
if current_id:
self._state["id"] = current_id
elif "id" not in self._state:
self._state["id"] = str(uuid4())
elif isinstance(self._state, BaseModel):
# For BaseModel states, preserve existing fields unless overridden
try:
@@ -985,17 +1318,73 @@ class Flow(Generic[T], metaclass=FlowMeta):
if future:
self._event_futures.append(future)
self._log_flow_event(
f"Flow started with ID: {self.flow_id}", color="bold_magenta"
f"Flow started with ID: {self.flow_id}", color="bold magenta"
)
if inputs is not None and "id" not in inputs:
self._initialize_state(inputs)
tasks = [
self._execute_start_method(start_method)
for start_method in self._start_methods
]
await asyncio.gather(*tasks)
try:
tasks = [
self._execute_start_method(start_method)
for start_method in self._start_methods
]
await asyncio.gather(*tasks)
except Exception as e:
# Check if flow was paused for human feedback
from crewai.flow.async_feedback.types import HumanFeedbackPending
if isinstance(e, HumanFeedbackPending):
# Auto-save pending feedback (create default persistence if needed)
if self._persistence is None:
from crewai.flow.persistence import SQLiteFlowPersistence
self._persistence = SQLiteFlowPersistence()
state_data = (
self._state
if isinstance(self._state, dict)
else self._state.model_dump()
)
self._persistence.save_pending_feedback(
flow_uuid=e.context.flow_id,
context=e.context,
state_data=state_data,
)
# Emit flow paused event
future = crewai_event_bus.emit(
self,
FlowPausedEvent(
type="flow_paused",
flow_name=self.name or self.__class__.__name__,
flow_id=e.context.flow_id,
method_name=e.context.method_name,
state=self._copy_and_serialize_state(),
message=e.context.message,
emit=e.context.emit,
),
)
if future and isinstance(future, Future):
self._event_futures.append(future)
# Wait for events to be processed
if self._event_futures:
await asyncio.gather(
*[
asyncio.wrap_future(f)
for f in self._event_futures
if isinstance(f, Future)
]
)
self._event_futures.clear()
# Return the pending exception instead of raising
# This allows the caller to handle the paused state gracefully
return e
# Re-raise other exceptions
raise
# Clear the resumption flag after initial execution completes
self._is_execution_resuming = False
@@ -1075,7 +1464,30 @@ class Flow(Generic[T], metaclass=FlowMeta):
enhanced_method = self._inject_trigger_payload_for_start_method(method)
result = await self._execute_method(start_method_name, enhanced_method)
await self._execute_listeners(start_method_name, result)
# If start method is a router, use its result as an additional trigger
if start_method_name in self._routers and result is not None:
# Execute listeners for the start method name first
await self._execute_listeners(start_method_name, result)
# Then execute listeners for the router result (e.g., "approved")
router_result_trigger = FlowMethodName(str(result))
listeners_for_result = self._find_triggered_methods(
router_result_trigger, router_only=False
)
if listeners_for_result:
# Pass the HumanFeedbackResult if available
listener_result = (
self.last_human_feedback
if self.last_human_feedback is not None
else result
)
tasks = [
self._execute_single_listener(listener_name, listener_result)
for listener_name in listeners_for_result
]
await asyncio.gather(*tasks)
else:
await self._execute_listeners(start_method_name, result)
def _inject_trigger_payload_for_start_method(
self, original_method: Callable[..., Any]
@@ -1166,6 +1578,28 @@ class Flow(Generic[T], metaclass=FlowMeta):
return result
except Exception as e:
# Check if this is a HumanFeedbackPending exception (paused, not failed)
from crewai.flow.async_feedback.types import HumanFeedbackPending
if isinstance(e, HumanFeedbackPending):
# Emit paused event instead of failed
future = crewai_event_bus.emit(
self,
MethodExecutionPausedEvent(
type="method_execution_paused",
method_name=method_name,
flow_name=self.name or self.__class__.__name__,
state=self._copy_and_serialize_state(),
flow_id=e.context.flow_id,
message=e.context.message,
emit=e.context.emit,
),
)
if future:
self._event_futures.append(future)
raise e
# Regular failure
future = crewai_event_bus.emit(
self,
MethodExecutionFailedEvent(
@@ -1210,7 +1644,9 @@ class Flow(Generic[T], metaclass=FlowMeta):
"""
# First, handle routers repeatedly until no router triggers anymore
router_results = []
router_result_to_feedback: dict[str, Any] = {} # Map outcome -> HumanFeedbackResult
current_trigger = trigger_method
current_result = result # Track the result to pass to each router
while True:
routers_triggered = self._find_triggered_methods(
@@ -1220,13 +1656,22 @@ class Flow(Generic[T], metaclass=FlowMeta):
break
for router_name in routers_triggered:
await self._execute_single_listener(router_name, result)
# For routers triggered by a router outcome, pass the HumanFeedbackResult
router_input = router_result_to_feedback.get(
str(current_trigger), current_result
)
await self._execute_single_listener(router_name, router_input)
# After executing router, the router's result is the path
router_result = (
self._method_outputs[-1] if self._method_outputs else None
)
if router_result: # Only add non-None results
router_results.append(router_result)
# If this was a human_feedback router, map the outcome to the feedback
if self.last_human_feedback is not None:
router_result_to_feedback[str(router_result)] = (
self.last_human_feedback
)
current_trigger = (
FlowMethodName(str(router_result))
if router_result is not None
@@ -1242,8 +1687,13 @@ class Flow(Generic[T], metaclass=FlowMeta):
current_trigger, router_only=False
)
if listeners_triggered:
# Determine what result to pass to listeners
# For router outcomes, pass the HumanFeedbackResult if available
listener_result = router_result_to_feedback.get(
str(current_trigger), result
)
tasks = [
self._execute_single_listener(listener_name, result)
self._execute_single_listener(listener_name, listener_result)
for listener_name in listeners_triggered
]
await asyncio.gather(*tasks)
@@ -1435,14 +1885,223 @@ class Flow(Generic[T], metaclass=FlowMeta):
# Execute listeners (and possibly routers) of this listener
await self._execute_listeners(listener_name, listener_result)
# If this listener is also a router (e.g., has @human_feedback with emit),
# we need to trigger listeners for the router result as well
if listener_name in self._routers and listener_result is not None:
router_result_trigger = FlowMethodName(str(listener_result))
listeners_for_result = self._find_triggered_methods(
router_result_trigger, router_only=False
)
if listeners_for_result:
# Pass the HumanFeedbackResult if available
feedback_result = (
self.last_human_feedback
if self.last_human_feedback is not None
else listener_result
)
tasks = [
self._execute_single_listener(name, feedback_result)
for name in listeners_for_result
]
await asyncio.gather(*tasks)
except Exception as e:
logger.error(f"Error executing listener {listener_name}: {e}")
# Don't log HumanFeedbackPending as an error - it's expected control flow
from crewai.flow.async_feedback.types import HumanFeedbackPending
if not isinstance(e, HumanFeedbackPending):
logger.error(f"Error executing listener {listener_name}: {e}")
raise
def _request_human_feedback(
self,
message: str,
output: Any,
metadata: dict[str, Any] | None = None,
emit: Sequence[str] | None = None,
) -> str:
"""Request feedback from a human.
Args:
message: The message to display when requesting feedback.
output: The method output to show the human for review.
metadata: Optional metadata for enterprise integrations.
emit: Optional list of possible outcomes for routing.
Returns:
The human's feedback as a string. Empty string if no feedback provided.
"""
from crewai.events.event_listener import event_listener
from crewai.events.types.flow_events import (
HumanFeedbackReceivedEvent,
HumanFeedbackRequestedEvent,
)
# Emit feedback requested event
crewai_event_bus.emit(
self,
HumanFeedbackRequestedEvent(
type="human_feedback_requested",
flow_name=self.name or self.__class__.__name__,
method_name="", # Will be set by decorator if needed
output=output,
message=message,
emit=list(emit) if emit else None,
),
)
# Pause live updates during human input
formatter = event_listener.formatter
formatter.pause_live_updates()
try:
# Display output with formatting using centralized Rich console
formatter.console.print("\n" + "" * 50, style="bold cyan")
formatter.console.print(" OUTPUT FOR REVIEW", style="bold cyan")
formatter.console.print("" * 50 + "\n", style="bold cyan")
formatter.console.print(output)
formatter.console.print("\n" + "" * 50 + "\n", style="bold cyan")
# Show message and prompt for feedback
formatter.console.print(message, style="yellow")
formatter.console.print("(Press Enter to skip, or type your feedback)\n", style="cyan")
feedback = input("Your feedback: ").strip()
# Emit feedback received event
crewai_event_bus.emit(
self,
HumanFeedbackReceivedEvent(
type="human_feedback_received",
flow_name=self.name or self.__class__.__name__,
method_name="", # Will be set by decorator if needed
feedback=feedback,
outcome=None, # Will be determined after collapsing
),
)
return feedback
finally:
# Resume live updates
formatter.resume_live_updates()
def _collapse_to_outcome(
self,
feedback: str,
outcomes: Sequence[str],
llm: str | BaseLLM,
) -> str:
"""Collapse free-form feedback to a predefined outcome using LLM.
This method uses the specified LLM to interpret the human's feedback
and map it to one of the predefined outcomes for routing purposes.
Uses structured outputs (function calling) when supported by the LLM
to guarantee the response is one of the valid outcomes. Falls back
to simple prompting if structured outputs fail.
Args:
feedback: The raw human feedback text.
outcomes: Sequence of valid outcome strings to choose from.
llm: The LLM model to use. Can be a model string or BaseLLM instance.
Returns:
One of the outcome strings that best matches the feedback intent.
"""
from typing import Literal
from pydantic import BaseModel, Field
from crewai.llm import LLM
from crewai.llms.base_llm import BaseLLM as BaseLLMClass
from crewai.utilities.i18n import get_i18n
# Get or create LLM instance
if isinstance(llm, str):
llm_instance = LLM(model=llm)
elif isinstance(llm, BaseLLMClass):
llm_instance = llm
else:
raise ValueError(f"Invalid llm type: {type(llm)}. Expected str or BaseLLM.")
# Dynamically create a Pydantic model with constrained outcomes
outcomes_tuple = tuple(outcomes)
class FeedbackOutcome(BaseModel):
"""The outcome that best matches the human's feedback intent."""
outcome: Literal[outcomes_tuple] = Field( # type: ignore[valid-type]
description=f"The outcome that best matches the feedback. Must be one of: {', '.join(outcomes)}"
)
# Load prompt from translations (using cached instance)
i18n = get_i18n()
prompt_template = i18n.slice("human_feedback_collapse")
prompt = prompt_template.format(
feedback=feedback,
outcomes=", ".join(outcomes),
)
try:
# Try structured output first (function calling)
# Note: LLM.call with response_model returns JSON string, not Pydantic model
response = llm_instance.call(
messages=[{"role": "user", "content": prompt}],
response_model=FeedbackOutcome,
)
# Parse the response - LLM returns JSON string when using response_model
if isinstance(response, str):
import json
try:
parsed = json.loads(response)
return parsed.get("outcome", outcomes[0])
except json.JSONDecodeError:
# Not valid JSON, might be raw outcome string
response_clean = response.strip()
for outcome in outcomes:
if outcome.lower() == response_clean.lower():
return outcome
return outcomes[0]
elif isinstance(response, FeedbackOutcome):
return response.outcome
elif hasattr(response, "outcome"):
return response.outcome
else:
# Unexpected type, fall back to first outcome
logger.warning(f"Unexpected response type: {type(response)}")
return outcomes[0]
except Exception as e:
# Fallback to simple prompting if structured output fails
logger.warning(
f"Structured output failed, falling back to simple prompting: {e}"
)
response = llm_instance.call(messages=prompt)
response_clean = str(response).strip()
# Exact match (case-insensitive)
for outcome in outcomes:
if outcome.lower() == response_clean.lower():
return outcome
# Partial match
for outcome in outcomes:
if outcome.lower() in response_clean.lower():
return outcome
# Fallback to first outcome
logger.warning(
f"Could not match LLM response '{response_clean}' to outcomes {list(outcomes)}. "
f"Falling back to first outcome: {outcomes[0]}"
)
return outcomes[0]
def _log_flow_event(
self,
message: str,
color: PrinterColor = "yellow",
color: str = "yellow",
level: Literal["info", "warning"] = "info",
) -> None:
"""Centralized logging method for flow events.
@@ -1452,20 +2111,22 @@ class Flow(Generic[T], metaclass=FlowMeta):
Args:
message: The message to log
color: Color to use for console output (default: yellow)
Available colors: purple, red, bold_green, bold_purple,
bold_blue, yellow, yellow
color: Rich style for console output (default: "yellow")
Examples: "yellow", "red", "bold green", "bold magenta"
level: Log level to use (default: info)
Supported levels: info, warning
Note:
This method uses the Printer utility for colored console output
This method uses the centralized Rich console formatter for output
and the standard logging module for log level support.
"""
self._printer.print(message, color=color)
from crewai.events.event_listener import event_listener
event_listener.formatter.console.print(message, style=color)
if level == "info":
logger.info(message)
logger.warning(message)
else:
logger.warning(message)
def plot(self, filename: str = "crewai_flow.html", show: bool = True) -> str:
"""Create interactive HTML visualization of Flow structure.

View File

@@ -70,6 +70,15 @@ class FlowMethod(Generic[P, R]):
self._is_coroutine = asyncio.coroutines._is_coroutine # type: ignore[attr-defined]
# Preserve flow-related attributes from wrapped method (e.g., from @human_feedback)
for attr in [
"__is_router__",
"__router_paths__",
"__human_feedback_config__",
]:
if hasattr(meth, attr):
setattr(self, attr, getattr(meth, attr))
def __call__(self, *args: P.args, **kwargs: P.kwargs) -> R:
"""Call the wrapped method.

View File

@@ -0,0 +1,400 @@
"""Human feedback decorator for Flow methods.
This module provides the @human_feedback decorator that enables human-in-the-loop
workflows within CrewAI Flows. It allows collecting human feedback on method outputs
and optionally routing to different listeners based on the feedback.
Supports both synchronous (blocking) and asynchronous (non-blocking) feedback
collection through the provider parameter.
Example (synchronous, default):
```python
from crewai.flow import Flow, start, listen, human_feedback
class ReviewFlow(Flow):
@start()
@human_feedback(
message="Please review this content:",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
)
def generate_content(self):
return {"title": "Article", "body": "Content..."}
@listen("approved")
def publish(self):
result = self.human_feedback
print(f"Publishing: {result.output}")
```
Example (asynchronous with custom provider):
```python
from crewai.flow import Flow, start, human_feedback
from crewai.flow.async_feedback import HumanFeedbackProvider, HumanFeedbackPending
class SlackProvider(HumanFeedbackProvider):
def request_feedback(self, context, flow):
self.send_notification(context)
raise HumanFeedbackPending(context=context)
class ReviewFlow(Flow):
@start()
@human_feedback(
message="Review this:",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
provider=SlackProvider(),
)
def generate_content(self):
return "Content..."
```
"""
from __future__ import annotations
import asyncio
from collections.abc import Callable, Sequence
from dataclasses import dataclass, field
from datetime import datetime
from functools import wraps
from typing import TYPE_CHECKING, Any, TypeVar
from crewai.flow.flow_wrappers import FlowMethod
if TYPE_CHECKING:
from crewai.flow.async_feedback.types import HumanFeedbackProvider
from crewai.flow.flow import Flow
from crewai.llms.base_llm import BaseLLM
F = TypeVar("F", bound=Callable[..., Any])
@dataclass
class HumanFeedbackResult:
"""Result from a @human_feedback decorated method.
This dataclass captures all information about a human feedback interaction,
including the original method output, the human's feedback, and any
collapsed outcome for routing purposes.
Attributes:
output: The original return value from the decorated method that was
shown to the human for review.
feedback: The raw text feedback provided by the human. Empty string
if no feedback was provided.
outcome: The collapsed outcome string when emit is specified.
This is determined by the LLM based on the human's feedback.
None if emit was not specified.
timestamp: When the feedback was received.
method_name: The name of the decorated method that triggered feedback.
metadata: Optional metadata for enterprise integrations. Can be used
to pass additional context like channel, assignee, etc.
Example:
```python
@listen("approved")
def handle_approval(self):
result = self.human_feedback
print(f"Output: {result.output}")
print(f"Feedback: {result.feedback}")
print(f"Outcome: {result.outcome}") # "approved"
```
"""
output: Any
feedback: str
outcome: str | None = None
timestamp: datetime = field(default_factory=datetime.now)
method_name: str = ""
metadata: dict[str, Any] = field(default_factory=dict)
@dataclass
class HumanFeedbackConfig:
"""Configuration for the @human_feedback decorator.
Stores the parameters passed to the decorator for later use during
method execution and for introspection by visualization tools.
Attributes:
message: The message shown to the human when requesting feedback.
emit: Optional sequence of outcome strings for routing.
llm: The LLM model to use for collapsing feedback to outcomes.
default_outcome: The outcome to use when no feedback is provided.
metadata: Optional metadata for enterprise integrations.
provider: Optional custom feedback provider for async workflows.
"""
message: str
emit: Sequence[str] | None = None
llm: str | BaseLLM | None = None
default_outcome: str | None = None
metadata: dict[str, Any] | None = None
provider: HumanFeedbackProvider | None = None
class HumanFeedbackMethod(FlowMethod[Any, Any]):
"""Wrapper for methods decorated with @human_feedback.
This wrapper extends FlowMethod to add human feedback specific attributes
that are used by FlowMeta for routing and by visualization tools.
Attributes:
__is_router__: True when emit is specified, enabling router behavior.
__router_paths__: List of possible outcomes when acting as a router.
__human_feedback_config__: The HumanFeedbackConfig for this method.
"""
__is_router__: bool = False
__router_paths__: list[str] | None = None
__human_feedback_config__: HumanFeedbackConfig | None = None
def human_feedback(
message: str,
emit: Sequence[str] | None = None,
llm: str | BaseLLM | None = None,
default_outcome: str | None = None,
metadata: dict[str, Any] | None = None,
provider: HumanFeedbackProvider | None = None,
) -> Callable[[F], F]:
"""Decorator for Flow methods that require human feedback.
This decorator wraps a Flow method to:
1. Execute the method and capture its output
2. Display the output to the human with a feedback request
3. Collect the human's free-form feedback
4. Optionally collapse the feedback to a predefined outcome using an LLM
5. Store the result for access by downstream methods
When `emit` is specified, the decorator acts as a router, and the
collapsed outcome triggers the appropriate @listen decorated method.
Supports both synchronous (blocking) and asynchronous (non-blocking)
feedback collection through the `provider` parameter. If no provider
is specified, defaults to synchronous console input.
Args:
message: The message shown to the human when requesting feedback.
This should clearly explain what kind of feedback is expected.
emit: Optional sequence of outcome strings. When provided, the
human's feedback will be collapsed to one of these outcomes
using the specified LLM. The outcome then triggers @listen
methods that match.
llm: The LLM model to use for collapsing feedback to outcomes.
Required when emit is specified. Can be a model string
like "gpt-4o-mini" or a BaseLLM instance.
default_outcome: The outcome to use when the human provides no
feedback (empty input). Must be one of the emit values
if emit is specified.
metadata: Optional metadata for enterprise integrations. This is
passed through to the HumanFeedbackResult and can be used
by enterprise forks for features like Slack/Teams integration.
provider: Optional HumanFeedbackProvider for custom feedback
collection. Use this for async workflows that integrate with
external systems like Slack, Teams, or webhooks. When the
provider raises HumanFeedbackPending, the flow pauses and
can be resumed later with Flow.resume().
Returns:
A decorator function that wraps the method with human feedback
collection logic.
Raises:
ValueError: If emit is specified but llm is not provided.
ValueError: If default_outcome is specified but emit is not.
ValueError: If default_outcome is not in the emit list.
HumanFeedbackPending: When an async provider pauses execution.
Example:
Basic feedback without routing:
```python
@start()
@human_feedback(message="Please review this output:")
def generate_content(self):
return "Generated content..."
```
With routing based on feedback:
```python
@start()
@human_feedback(
message="Review and approve or reject:",
emit=["approved", "rejected", "needs_revision"],
llm="gpt-4o-mini",
default_outcome="needs_revision",
)
def review_document(self):
return document_content
@listen("approved")
def publish(self):
print(f"Publishing: {self.last_human_feedback.output}")
```
Async feedback with custom provider:
```python
@start()
@human_feedback(
message="Review this content:",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
provider=SlackProvider(channel="#reviews"),
)
def generate_content(self):
return "Content to review..."
```
"""
# Validation at decoration time
if emit is not None:
if not llm:
raise ValueError(
"llm is required when emit is specified. "
"Provide an LLM model string (e.g., 'gpt-4o-mini') or a BaseLLM instance."
)
if default_outcome is not None and default_outcome not in emit:
raise ValueError(
f"default_outcome '{default_outcome}' must be one of the "
f"emit options: {list(emit)}"
)
elif default_outcome is not None:
raise ValueError("default_outcome requires emit to be specified.")
def decorator(func: F) -> F:
"""Inner decorator that wraps the function."""
def _request_feedback(flow_instance: Flow, method_output: Any) -> str:
"""Request feedback using provider or default console."""
from crewai.flow.async_feedback.types import PendingFeedbackContext
# Build context for provider
# Use flow_id property which handles both dict and BaseModel states
context = PendingFeedbackContext(
flow_id=flow_instance.flow_id or "unknown",
flow_class=f"{flow_instance.__class__.__module__}.{flow_instance.__class__.__name__}",
method_name=func.__name__,
method_output=method_output,
message=message,
emit=list(emit) if emit else None,
default_outcome=default_outcome,
metadata=metadata or {},
llm=llm if isinstance(llm, str) else None,
)
if provider is not None:
# Use custom provider (may raise HumanFeedbackPending)
return provider.request_feedback(context, flow_instance)
else:
# Use default console input
return flow_instance._request_human_feedback(
message=message,
output=method_output,
metadata=metadata,
emit=emit,
)
def _process_feedback(
flow_instance: Flow,
method_output: Any,
raw_feedback: str,
) -> HumanFeedbackResult | str:
"""Process feedback and return result or outcome."""
# Determine outcome
collapsed_outcome: str | None = None
if not raw_feedback.strip():
# Empty feedback
if default_outcome:
collapsed_outcome = default_outcome
elif emit:
# No default and no feedback - use first outcome
collapsed_outcome = emit[0]
elif emit:
# Collapse feedback to outcome using LLM
collapsed_outcome = flow_instance._collapse_to_outcome(
feedback=raw_feedback,
outcomes=emit,
llm=llm,
)
# Create result
result = HumanFeedbackResult(
output=method_output,
feedback=raw_feedback,
outcome=collapsed_outcome,
timestamp=datetime.now(),
method_name=func.__name__,
metadata=metadata or {},
)
# Store in flow instance
flow_instance.human_feedback_history.append(result)
flow_instance.last_human_feedback = result
# Return based on mode
if emit:
# Return outcome for routing
return collapsed_outcome # type: ignore[return-value]
return result
if asyncio.iscoroutinefunction(func):
# Async wrapper
@wraps(func)
async def async_wrapper(self: Flow, *args: Any, **kwargs: Any) -> Any:
# Execute the original method
method_output = await func(self, *args, **kwargs)
# Request human feedback (may raise HumanFeedbackPending)
raw_feedback = _request_feedback(self, method_output)
# Process and return
return _process_feedback(self, method_output, raw_feedback)
wrapper: Any = async_wrapper
else:
# Sync wrapper
@wraps(func)
def sync_wrapper(self: Flow, *args: Any, **kwargs: Any) -> Any:
# Execute the original method
method_output = func(self, *args, **kwargs)
# Request human feedback (may raise HumanFeedbackPending)
raw_feedback = _request_feedback(self, method_output)
# Process and return
return _process_feedback(self, method_output, raw_feedback)
wrapper = sync_wrapper
# Preserve existing Flow decorator attributes
for attr in [
"__is_start_method__",
"__trigger_methods__",
"__condition_type__",
"__trigger_condition__",
"__is_flow_method__",
]:
if hasattr(func, attr):
setattr(wrapper, attr, getattr(func, attr))
# Add human feedback specific attributes (create config inline to avoid race conditions)
wrapper.__human_feedback_config__ = HumanFeedbackConfig(
message=message,
emit=emit,
llm=llm,
default_outcome=default_outcome,
metadata=metadata,
provider=provider,
)
wrapper.__is_flow_method__ = True
# Make it a router if emit specified
if emit:
wrapper.__is_router__ = True
wrapper.__router_paths__ = list(emit)
return wrapper # type: ignore[return-value]
return decorator

View File

@@ -1,16 +1,26 @@
"""Base class for flow state persistence."""
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Any
from typing import TYPE_CHECKING, Any
from pydantic import BaseModel
if TYPE_CHECKING:
from crewai.flow.async_feedback.types import PendingFeedbackContext
class FlowPersistence(ABC):
"""Abstract base class for flow state persistence.
This class defines the interface that all persistence implementations must follow.
It supports both structured (Pydantic BaseModel) and unstructured (dict) states.
For async human feedback support, implementations can optionally override:
- save_pending_feedback(): Saves state with pending feedback context
- load_pending_feedback(): Loads state and pending feedback context
- clear_pending_feedback(): Clears pending feedback after resume
"""
@abstractmethod
@@ -45,3 +55,52 @@ class FlowPersistence(ABC):
Returns:
The most recent state as a dictionary, or None if no state exists
"""
def save_pending_feedback(
self,
flow_uuid: str,
context: PendingFeedbackContext,
state_data: dict[str, Any] | BaseModel,
) -> None:
"""Save state with a pending feedback marker.
This method is called when a flow is paused waiting for async human
feedback. The default implementation just saves the state without
the pending feedback context. Override to store the context.
Args:
flow_uuid: Unique identifier for the flow instance
context: The pending feedback context with all resume information
state_data: Current state data
"""
# Default: just save the state without pending context
self.save_state(flow_uuid, context.method_name, state_data)
def load_pending_feedback(
self,
flow_uuid: str,
) -> tuple[dict[str, Any], PendingFeedbackContext] | None:
"""Load state and pending feedback context.
This method is called when resuming a paused flow. Override to
load both the state and the pending feedback context.
Args:
flow_uuid: Unique identifier for the flow instance
Returns:
Tuple of (state_data, pending_context) if pending feedback exists,
None otherwise.
"""
return None
def clear_pending_feedback(self, flow_uuid: str) -> None: # noqa: B027
"""Clear the pending feedback marker after successful resume.
This is called after feedback is received and the flow resumes.
Optional override to remove the pending feedback marker.
Args:
flow_uuid: Unique identifier for the flow instance
"""
pass

View File

@@ -2,17 +2,22 @@
SQLite-based implementation of flow state persistence.
"""
from __future__ import annotations
from datetime import datetime, timezone
import json
from pathlib import Path
import sqlite3
from typing import Any
from typing import TYPE_CHECKING, Any
from pydantic import BaseModel
from crewai.flow.persistence.base import FlowPersistence
from crewai.utilities.paths import db_storage_path
if TYPE_CHECKING:
from crewai.flow.async_feedback.types import PendingFeedbackContext
class SQLiteFlowPersistence(FlowPersistence):
"""SQLite-based implementation of flow state persistence.
@@ -20,6 +25,28 @@ class SQLiteFlowPersistence(FlowPersistence):
This class provides a simple, file-based persistence implementation using SQLite.
It's suitable for development and testing, or for production use cases with
moderate performance requirements.
This implementation supports async human feedback by storing pending feedback
context in a separate table. When a flow is paused waiting for feedback,
use save_pending_feedback() to persist the context. Later, use
load_pending_feedback() to retrieve it when resuming.
Example:
```python
persistence = SQLiteFlowPersistence("flows.db")
# Start a flow with async feedback
try:
flow = MyFlow(persistence=persistence)
result = flow.kickoff()
except HumanFeedbackPending as e:
# Flow is paused, state is already persisted
print(f"Waiting for feedback: {e.context.flow_id}")
# Later, resume with feedback
flow = MyFlow.from_pending("abc-123", persistence)
result = flow.resume("looks good!")
```
"""
def __init__(self, db_path: str | None = None) -> None:
@@ -45,6 +72,7 @@ class SQLiteFlowPersistence(FlowPersistence):
def init_db(self) -> None:
"""Create the necessary tables if they don't exist."""
with sqlite3.connect(self.db_path) as conn:
# Main state table
conn.execute(
"""
CREATE TABLE IF NOT EXISTS flow_states (
@@ -64,6 +92,26 @@ class SQLiteFlowPersistence(FlowPersistence):
"""
)
# Pending feedback table for async HITL
conn.execute(
"""
CREATE TABLE IF NOT EXISTS pending_feedback (
id INTEGER PRIMARY KEY AUTOINCREMENT,
flow_uuid TEXT NOT NULL UNIQUE,
context_json TEXT NOT NULL,
state_json TEXT NOT NULL,
created_at DATETIME NOT NULL
)
"""
)
# Add index for faster UUID lookups on pending feedback
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_pending_feedback_uuid
ON pending_feedback(flow_uuid)
"""
)
def save_state(
self,
flow_uuid: str,
@@ -130,3 +178,104 @@ class SQLiteFlowPersistence(FlowPersistence):
if row:
return json.loads(row[0])
return None
def save_pending_feedback(
self,
flow_uuid: str,
context: PendingFeedbackContext,
state_data: dict[str, Any] | BaseModel,
) -> None:
"""Save state with a pending feedback marker.
This method stores both the flow state and the pending feedback context,
allowing the flow to be resumed later when feedback is received.
Args:
flow_uuid: Unique identifier for the flow instance
context: The pending feedback context with all resume information
state_data: Current state data
"""
# Import here to avoid circular imports
from crewai.flow.async_feedback.types import PendingFeedbackContext
# Convert state_data to dict
if isinstance(state_data, BaseModel):
state_dict = state_data.model_dump()
elif isinstance(state_data, dict):
state_dict = state_data
else:
raise ValueError(
f"state_data must be either a Pydantic BaseModel or dict, got {type(state_data)}"
)
# Also save to regular state table for consistency
self.save_state(flow_uuid, context.method_name, state_data)
# Save pending feedback context
with sqlite3.connect(self.db_path) as conn:
# Use INSERT OR REPLACE to handle re-triggering feedback on same flow
conn.execute(
"""
INSERT OR REPLACE INTO pending_feedback (
flow_uuid,
context_json,
state_json,
created_at
) VALUES (?, ?, ?, ?)
""",
(
flow_uuid,
json.dumps(context.to_dict()),
json.dumps(state_dict),
datetime.now(timezone.utc).isoformat(),
),
)
def load_pending_feedback(
self,
flow_uuid: str,
) -> tuple[dict[str, Any], PendingFeedbackContext] | None:
"""Load state and pending feedback context.
Args:
flow_uuid: Unique identifier for the flow instance
Returns:
Tuple of (state_data, pending_context) if pending feedback exists,
None otherwise.
"""
# Import here to avoid circular imports
from crewai.flow.async_feedback.types import PendingFeedbackContext
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute(
"""
SELECT state_json, context_json
FROM pending_feedback
WHERE flow_uuid = ?
""",
(flow_uuid,),
)
row = cursor.fetchone()
if row:
state_dict = json.loads(row[0])
context_dict = json.loads(row[1])
context = PendingFeedbackContext.from_dict(context_dict)
return (state_dict, context)
return None
def clear_pending_feedback(self, flow_uuid: str) -> None:
"""Clear the pending feedback marker after successful resume.
Args:
flow_uuid: Unique identifier for the flow instance
"""
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"""
DELETE FROM pending_feedback
WHERE flow_uuid = ?
""",
(flow_uuid,),
)

View File

@@ -29,7 +29,8 @@
"lite_agent_system_prompt_without_tools": "You are {role}. {backstory}\nYour personal goal is: {goal}\n\nTo give my best complete final answer to the task respond using the exact following format:\n\nThought: I now can give a great answer\nFinal Answer: Your final answer must be the great and the most complete as possible, it must be outcome described.\n\nI MUST use these formats, my job depends on it!",
"lite_agent_response_format": "Ensure your final answer strictly adheres to the following OpenAPI schema: {response_format}\n\nDo not include the OpenAPI schema in the final output. Ensure the final output does not include any code block markers like ```json or ```python.",
"knowledge_search_query": "The original query is: {task_prompt}.",
"knowledge_search_query_system_prompt": "Your goal is to rewrite the user query so that it is optimized for retrieval from a vector database. Consider how the query will be used to find relevant documents, and aim to make it more specific and context-aware. \n\n Do not include any other text than the rewritten query, especially any preamble or postamble and only add expected output format if its relevant to the rewritten query. \n\n Focus on the key words of the intended task and to retrieve the most relevant information. \n\n There will be some extra context provided that might need to be removed such as expected_output formats structured_outputs and other instructions."
"knowledge_search_query_system_prompt": "Your goal is to rewrite the user query so that it is optimized for retrieval from a vector database. Consider how the query will be used to find relevant documents, and aim to make it more specific and context-aware. \n\n Do not include any other text than the rewritten query, especially any preamble or postamble and only add expected output format if its relevant to the rewritten query. \n\n Focus on the key words of the intended task and to retrieve the most relevant information. \n\n There will be some extra context provided that might need to be removed such as expected_output formats structured_outputs and other instructions.",
"human_feedback_collapse": "Based on the following human feedback, determine which outcome best matches their intent.\n\nFeedback: {feedback}\n\nPossible outcomes: {outcomes}\n\nRespond with ONLY one of the exact outcome values listed above, nothing else."
},
"errors": {
"force_final_answer_error": "You can't keep going, here is the best final answer you generated:\n\n {formatted_answer}",

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,401 @@
"""Unit tests for the @human_feedback decorator.
This module tests the @human_feedback decorator's validation logic,
async support, and attribute preservation functionality.
"""
from __future__ import annotations
import asyncio
from datetime import datetime
from typing import Any
from unittest.mock import MagicMock, patch
import pytest
from crewai.flow import Flow, human_feedback, listen, start
from crewai.flow.human_feedback import (
HumanFeedbackConfig,
HumanFeedbackResult,
)
class TestHumanFeedbackValidation:
"""Tests for decorator parameter validation."""
def test_emit_requires_llm(self):
"""Test that specifying emit without llm raises ValueError."""
with pytest.raises(ValueError) as exc_info:
@human_feedback(
message="Review this:",
emit=["approve", "reject"],
# llm not provided
)
def test_method(self):
return "output"
assert "llm is required" in str(exc_info.value)
def test_default_outcome_requires_emit(self):
"""Test that specifying default_outcome without emit raises ValueError."""
with pytest.raises(ValueError) as exc_info:
@human_feedback(
message="Review this:",
default_outcome="approve",
# emit not provided
)
def test_method(self):
return "output"
assert "requires emit" in str(exc_info.value)
def test_default_outcome_must_be_in_emit(self):
"""Test that default_outcome must be one of the emit values."""
with pytest.raises(ValueError) as exc_info:
@human_feedback(
message="Review this:",
emit=["approve", "reject"],
llm="gpt-4o-mini",
default_outcome="invalid_outcome",
)
def test_method(self):
return "output"
assert "must be one of" in str(exc_info.value)
def test_valid_configuration_with_routing(self):
"""Test that valid configuration with routing doesn't raise."""
@human_feedback(
message="Review this:",
emit=["approve", "reject"],
llm="gpt-4o-mini",
default_outcome="reject",
)
def test_method(self):
return "output"
# Should not raise
assert hasattr(test_method, "__human_feedback_config__")
assert test_method.__is_router__ is True
assert test_method.__router_paths__ == ["approve", "reject"]
def test_valid_configuration_without_routing(self):
"""Test that valid configuration without routing doesn't raise."""
@human_feedback(message="Review this:")
def test_method(self):
return "output"
# Should not raise
assert hasattr(test_method, "__human_feedback_config__")
assert not hasattr(test_method, "__is_router__") or not test_method.__is_router__
class TestHumanFeedbackConfig:
"""Tests for HumanFeedbackConfig dataclass."""
def test_config_creation(self):
"""Test HumanFeedbackConfig can be created with all parameters."""
config = HumanFeedbackConfig(
message="Test message",
emit=["a", "b"],
llm="gpt-4",
default_outcome="a",
metadata={"key": "value"},
)
assert config.message == "Test message"
assert config.emit == ["a", "b"]
assert config.llm == "gpt-4"
assert config.default_outcome == "a"
assert config.metadata == {"key": "value"}
class TestHumanFeedbackResult:
"""Tests for HumanFeedbackResult dataclass."""
def test_result_creation(self):
"""Test HumanFeedbackResult can be created with all fields."""
result = HumanFeedbackResult(
output={"title": "Test"},
feedback="Looks good",
outcome="approved",
method_name="test_method",
)
assert result.output == {"title": "Test"}
assert result.feedback == "Looks good"
assert result.outcome == "approved"
assert result.method_name == "test_method"
assert isinstance(result.timestamp, datetime)
assert result.metadata == {}
def test_result_with_metadata(self):
"""Test HumanFeedbackResult with custom metadata."""
result = HumanFeedbackResult(
output="test",
feedback="feedback",
metadata={"channel": "slack", "user": "test_user"},
)
assert result.metadata == {"channel": "slack", "user": "test_user"}
class TestDecoratorAttributePreservation:
"""Tests for preserving Flow decorator attributes."""
def test_preserves_start_method_attributes(self):
"""Test that @human_feedback preserves @start decorator attributes."""
class TestFlow(Flow):
@start()
@human_feedback(message="Review:")
def my_start_method(self):
return "output"
# Check that start method attributes are preserved
flow = TestFlow()
method = flow._methods.get("my_start_method")
assert method is not None
assert hasattr(method, "__is_start_method__") or "my_start_method" in flow._start_methods
def test_preserves_listen_method_attributes(self):
"""Test that @human_feedback preserves @listen decorator attributes."""
class TestFlow(Flow):
@start()
def begin(self):
return "start"
@listen("begin")
@human_feedback(message="Review:")
def review(self):
return "review output"
flow = TestFlow()
# The method should be registered as a listener
assert "review" in flow._listeners or any(
"review" in str(v) for v in flow._listeners.values()
)
def test_sets_router_attributes_when_emit_specified(self):
"""Test that router attributes are set when emit is specified."""
# Test the decorator directly without @start wrapping
@human_feedback(
message="Review:",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
)
def review_method(self):
return "output"
assert review_method.__is_router__ is True
assert review_method.__router_paths__ == ["approved", "rejected"]
class TestAsyncSupport:
"""Tests for async method support."""
def test_async_method_detection(self):
"""Test that async methods are properly detected and wrapped."""
@human_feedback(message="Review:")
async def async_method(self):
return "async output"
assert asyncio.iscoroutinefunction(async_method)
def test_sync_method_remains_sync(self):
"""Test that sync methods remain synchronous."""
@human_feedback(message="Review:")
def sync_method(self):
return "sync output"
assert not asyncio.iscoroutinefunction(sync_method)
class TestHumanFeedbackExecution:
"""Tests for actual human feedback execution."""
@patch("builtins.input", return_value="This looks great!")
@patch("builtins.print")
def test_basic_feedback_collection(self, mock_print, mock_input):
"""Test basic feedback collection without routing."""
class TestFlow(Flow):
@start()
@human_feedback(message="Please review:")
def generate(self):
return "Generated content"
flow = TestFlow()
with patch.object(flow, "_request_human_feedback", return_value="Great job!"):
result = flow.kickoff()
assert flow.last_human_feedback is not None
assert flow.last_human_feedback.output == "Generated content"
assert flow.last_human_feedback.feedback == "Great job!"
@patch("builtins.input", return_value="")
@patch("builtins.print")
def test_empty_feedback_with_default_outcome(self, mock_print, mock_input):
"""Test empty feedback uses default_outcome."""
class TestFlow(Flow):
@start()
@human_feedback(
message="Review:",
emit=["approved", "needs_work"],
llm="gpt-4o-mini",
default_outcome="needs_work",
)
def review(self):
return "Content"
flow = TestFlow()
with patch.object(flow, "_request_human_feedback", return_value=""):
result = flow.kickoff()
assert result == "needs_work"
assert flow.last_human_feedback is not None
assert flow.last_human_feedback.outcome == "needs_work"
@patch("builtins.input", return_value="Approved!")
@patch("builtins.print")
def test_feedback_collapsing(self, mock_print, mock_input):
"""Test that feedback is collapsed to an outcome."""
class TestFlow(Flow):
@start()
@human_feedback(
message="Review:",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
)
def review(self):
return "Content"
flow = TestFlow()
with (
patch.object(flow, "_request_human_feedback", return_value="Looks great, approved!"),
patch.object(flow, "_collapse_to_outcome", return_value="approved"),
):
result = flow.kickoff()
assert result == "approved"
assert flow.last_human_feedback is not None
assert flow.last_human_feedback.outcome == "approved"
class TestHumanFeedbackHistory:
"""Tests for human feedback history tracking."""
@patch("builtins.input", return_value="feedback")
@patch("builtins.print")
def test_history_accumulates(self, mock_print, mock_input):
"""Test that multiple feedbacks are stored in history."""
class TestFlow(Flow):
@start()
@human_feedback(message="Review step 1:")
def step1(self):
return "Step 1 output"
@listen(step1)
@human_feedback(message="Review step 2:")
def step2(self, prev):
return "Step 2 output"
flow = TestFlow()
with patch.object(flow, "_request_human_feedback", return_value="feedback"):
flow.kickoff()
# Both feedbacks should be in history
assert len(flow.human_feedback_history) == 2
assert flow.human_feedback_history[0].method_name == "step1"
assert flow.human_feedback_history[1].method_name == "step2"
@patch("builtins.input", return_value="")
@patch("builtins.print")
def test_human_feedback_property_returns_last(self, mock_print, mock_input):
"""Test that human_feedback property returns the last result."""
class TestFlow(Flow):
@start()
@human_feedback(message="Review:")
def generate(self):
return "output"
flow = TestFlow()
with patch.object(flow, "_request_human_feedback", return_value="last feedback"):
flow.kickoff()
assert flow.last_human_feedback is not None
assert flow.last_human_feedback.feedback == "last feedback"
assert flow.last_human_feedback is flow.last_human_feedback
class TestCollapseToOutcome:
"""Tests for the _collapse_to_outcome method."""
def test_exact_match(self):
"""Test exact match returns the correct outcome."""
flow = Flow()
with patch("crewai.llm.LLM") as MockLLM:
mock_llm = MagicMock()
mock_llm.call.return_value = "approved"
MockLLM.return_value = mock_llm
result = flow._collapse_to_outcome(
feedback="I approve this",
outcomes=["approved", "rejected"],
llm="gpt-4o-mini",
)
assert result == "approved"
def test_partial_match(self):
"""Test partial match finds the outcome in the response."""
flow = Flow()
with patch("crewai.llm.LLM") as MockLLM:
mock_llm = MagicMock()
mock_llm.call.return_value = "The outcome is approved based on the feedback"
MockLLM.return_value = mock_llm
result = flow._collapse_to_outcome(
feedback="Looks good",
outcomes=["approved", "rejected"],
llm="gpt-4o-mini",
)
assert result == "approved"
def test_fallback_to_first(self):
"""Test that unmatched response falls back to first outcome."""
flow = Flow()
with patch("crewai.llm.LLM") as MockLLM:
mock_llm = MagicMock()
mock_llm.call.return_value = "something completely different"
MockLLM.return_value = mock_llm
result = flow._collapse_to_outcome(
feedback="Unclear feedback",
outcomes=["approved", "rejected"],
llm="gpt-4o-mini",
)
assert result == "approved" # First in list

View File

@@ -0,0 +1,428 @@
"""Integration tests for the @human_feedback decorator with Flow.
This module tests the integration of @human_feedback with @listen,
routing behavior, multi-step flows, and state management.
"""
from __future__ import annotations
import asyncio
from datetime import datetime
from typing import Any
from unittest.mock import MagicMock, patch
import pytest
from pydantic import BaseModel
from crewai.flow import Flow, HumanFeedbackResult, human_feedback, listen, start
from crewai.flow.flow import FlowState
class TestRoutingIntegration:
"""Tests for routing integration with @listen decorators."""
@patch("builtins.input", return_value="I approve")
@patch("builtins.print")
def test_routes_to_matching_listener(self, mock_print, mock_input):
"""Test that collapsed outcome routes to the matching @listen method."""
execution_order = []
class ReviewFlow(Flow):
@start()
@human_feedback(
message="Review:",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
)
def generate(self):
execution_order.append("generate")
return "content"
@listen("approved")
def on_approved(self):
execution_order.append("on_approved")
return "published"
@listen("rejected")
def on_rejected(self):
execution_order.append("on_rejected")
return "discarded"
flow = ReviewFlow()
with (
patch.object(flow, "_request_human_feedback", return_value="Approved!"),
patch.object(flow, "_collapse_to_outcome", return_value="approved"),
):
result = flow.kickoff()
assert "generate" in execution_order
assert "on_approved" in execution_order
assert "on_rejected" not in execution_order
@patch("builtins.input", return_value="")
@patch("builtins.print")
def test_default_outcome_routes_correctly(self, mock_print, mock_input):
"""Test that default_outcome routes when no feedback provided."""
executed_listener = []
class ReviewFlow(Flow):
@start()
@human_feedback(
message="Review:",
emit=["approved", "needs_work"],
llm="gpt-4o-mini",
default_outcome="needs_work",
)
def generate(self):
return "content"
@listen("approved")
def on_approved(self):
executed_listener.append("approved")
@listen("needs_work")
def on_needs_work(self):
executed_listener.append("needs_work")
flow = ReviewFlow()
with patch.object(flow, "_request_human_feedback", return_value=""):
flow.kickoff()
assert "needs_work" in executed_listener
assert "approved" not in executed_listener
class TestMultiStepFlows:
"""Tests for multi-step flows with multiple @human_feedback decorators."""
@patch("builtins.input", side_effect=["Good draft", "Final approved"])
@patch("builtins.print")
def test_multiple_feedback_steps(self, mock_print, mock_input):
"""Test a flow with multiple human feedback steps."""
class MultiStepFlow(Flow):
@start()
@human_feedback(message="Review draft:")
def draft(self):
return "Draft content"
@listen(draft)
@human_feedback(message="Final review:")
def final_review(self, prev_result: HumanFeedbackResult):
return f"Final content based on: {prev_result.feedback}"
flow = MultiStepFlow()
with patch.object(
flow, "_request_human_feedback", side_effect=["Good draft", "Approved"]
):
flow.kickoff()
# Both feedbacks should be recorded
assert len(flow.human_feedback_history) == 2
assert flow.human_feedback_history[0].method_name == "draft"
assert flow.human_feedback_history[0].feedback == "Good draft"
assert flow.human_feedback_history[1].method_name == "final_review"
assert flow.human_feedback_history[1].feedback == "Approved"
@patch("builtins.input", return_value="feedback")
@patch("builtins.print")
def test_mixed_feedback_and_regular_methods(self, mock_print, mock_input):
"""Test flow with both @human_feedback and regular methods."""
execution_order = []
class MixedFlow(Flow):
@start()
def generate(self):
execution_order.append("generate")
return "generated"
@listen(generate)
@human_feedback(message="Review:")
def review(self):
execution_order.append("review")
return "reviewed"
@listen(review)
def finalize(self, result):
execution_order.append("finalize")
return "finalized"
flow = MixedFlow()
with patch.object(flow, "_request_human_feedback", return_value="feedback"):
flow.kickoff()
assert execution_order == ["generate", "review", "finalize"]
class TestStateManagement:
"""Tests for state management with human feedback."""
@patch("builtins.input", return_value="approved")
@patch("builtins.print")
def test_feedback_available_in_listener(self, mock_print, mock_input):
"""Test that feedback is accessible in downstream listeners."""
captured_feedback = []
class StateFlow(Flow):
@start()
@human_feedback(
message="Review:",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
)
def review(self):
return "Content to review"
@listen("approved")
def on_approved(self):
# Access the feedback via property
captured_feedback.append(self.last_human_feedback)
return "done"
flow = StateFlow()
with (
patch.object(flow, "_request_human_feedback", return_value="Great content!"),
patch.object(flow, "_collapse_to_outcome", return_value="approved"),
):
flow.kickoff()
assert len(captured_feedback) == 1
result = captured_feedback[0]
assert isinstance(result, HumanFeedbackResult)
assert result.output == "Content to review"
assert result.feedback == "Great content!"
assert result.outcome == "approved"
@patch("builtins.input", return_value="")
@patch("builtins.print")
def test_history_preserved_across_steps(self, mock_print, mock_input):
"""Test that feedback history is preserved across flow execution."""
class HistoryFlow(Flow):
@start()
@human_feedback(message="Step 1:")
def step1(self):
return "Step 1"
@listen(step1)
@human_feedback(message="Step 2:")
def step2(self, result):
return "Step 2"
@listen(step2)
def final(self, result):
# Access history
return len(self.human_feedback_history)
flow = HistoryFlow()
with patch.object(flow, "_request_human_feedback", return_value="feedback"):
result = flow.kickoff()
# Final method should see 2 feedback entries
assert result == 2
class TestAsyncFlowIntegration:
"""Tests for async flow integration."""
@pytest.mark.asyncio
async def test_async_flow_with_human_feedback(self):
"""Test that @human_feedback works with async flows."""
executed = []
class AsyncFlow(Flow):
@start()
@human_feedback(message="Review:")
async def async_review(self):
executed.append("async_review")
await asyncio.sleep(0.01) # Simulate async work
return "async content"
flow = AsyncFlow()
with patch.object(flow, "_request_human_feedback", return_value="feedback"):
await flow.kickoff_async()
assert "async_review" in executed
assert flow.last_human_feedback is not None
assert flow.last_human_feedback.output == "async content"
class TestWithStructuredState:
"""Tests for flows with structured (Pydantic) state."""
@patch("builtins.input", return_value="approved")
@patch("builtins.print")
def test_with_pydantic_state(self, mock_print, mock_input):
"""Test human feedback with structured Pydantic state."""
class ReviewState(FlowState):
content: str = ""
review_count: int = 0
class StructuredFlow(Flow[ReviewState]):
initial_state = ReviewState
@start()
@human_feedback(
message="Review:",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
)
def review(self):
self.state.content = "Generated content"
self.state.review_count += 1
return self.state.content
@listen("approved")
def on_approved(self):
return f"Approved: {self.state.content}"
flow = StructuredFlow()
with (
patch.object(flow, "_request_human_feedback", return_value="LGTM"),
patch.object(flow, "_collapse_to_outcome", return_value="approved"),
):
result = flow.kickoff()
assert flow.state.review_count == 1
assert flow.last_human_feedback is not None
assert flow.last_human_feedback.feedback == "LGTM"
class TestMetadataPassthrough:
"""Tests for metadata passthrough functionality."""
@patch("builtins.input", return_value="")
@patch("builtins.print")
def test_metadata_included_in_result(self, mock_print, mock_input):
"""Test that metadata is passed through to HumanFeedbackResult."""
class MetadataFlow(Flow):
@start()
@human_feedback(
message="Review:",
metadata={"channel": "slack", "priority": "high"},
)
def review(self):
return "content"
flow = MetadataFlow()
with patch.object(flow, "_request_human_feedback", return_value="feedback"):
flow.kickoff()
result = flow.last_human_feedback
assert result is not None
assert result.metadata == {"channel": "slack", "priority": "high"}
class TestEventEmission:
"""Tests for event emission during human feedback."""
@patch("builtins.input", return_value="test feedback")
@patch("builtins.print")
def test_events_emitted_on_feedback_request(self, mock_print, mock_input):
"""Test that events are emitted when feedback is requested."""
from crewai.events.event_listener import event_listener
class EventFlow(Flow):
@start()
@human_feedback(message="Review:")
def review(self):
return "content"
flow = EventFlow()
# We can't easily capture events in tests, but we can verify
# the flow executes without errors
with (
patch.object(
event_listener.formatter, "pause_live_updates", return_value=None
),
patch.object(
event_listener.formatter, "resume_live_updates", return_value=None
),
):
flow.kickoff()
assert flow.last_human_feedback is not None
class TestEdgeCases:
"""Tests for edge cases and error handling."""
@patch("builtins.input", return_value="")
@patch("builtins.print")
def test_empty_feedback_first_outcome_fallback(self, mock_print, mock_input):
"""Test that empty feedback without default uses first outcome."""
class FallbackFlow(Flow):
@start()
@human_feedback(
message="Review:",
emit=["first", "second", "third"],
llm="gpt-4o-mini",
# No default_outcome specified
)
def review(self):
return "content"
flow = FallbackFlow()
with patch.object(flow, "_request_human_feedback", return_value=""):
result = flow.kickoff()
assert result == "first" # Falls back to first outcome
@patch("builtins.input", return_value="whitespace only ")
@patch("builtins.print")
def test_whitespace_only_feedback_treated_as_empty(self, mock_print, mock_input):
"""Test that whitespace-only feedback is treated as empty."""
class WhitespaceFlow(Flow):
@start()
@human_feedback(
message="Review:",
emit=["approve", "reject"],
llm="gpt-4o-mini",
default_outcome="reject",
)
def review(self):
return "content"
flow = WhitespaceFlow()
with patch.object(flow, "_request_human_feedback", return_value=" "):
result = flow.kickoff()
assert result == "reject" # Uses default because feedback is empty after strip
@patch("builtins.input", return_value="feedback")
@patch("builtins.print")
def test_feedback_result_without_routing(self, mock_print, mock_input):
"""Test that HumanFeedbackResult is returned when not routing."""
class NoRoutingFlow(Flow):
@start()
@human_feedback(message="Review:")
def review(self):
return "content"
flow = NoRoutingFlow()
with patch.object(flow, "_request_human_feedback", return_value="feedback"):
result = flow.kickoff()
# Result should be HumanFeedbackResult when not routing
assert isinstance(result, HumanFeedbackResult)
assert result.output == "content"
assert result.feedback == "feedback"
assert result.outcome is None # No routing, no outcome