mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-08 23:58:34 +00:00
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Check Documentation Broken Links / Check broken links (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
* feat: introduce human feedback events and decorator for flow methods - Added HumanFeedbackRequestedEvent and HumanFeedbackReceivedEvent classes to handle human feedback interactions within flows. - Implemented the @human_feedback decorator to facilitate human-in-the-loop workflows, allowing for feedback collection and routing based on responses. - Enhanced Flow class to store human feedback history and manage feedback outcomes. - Updated flow wrappers to preserve attributes from methods decorated with @human_feedback. - Added integration and unit tests for the new human feedback functionality, ensuring proper validation and routing behavior. * adding deployment docs * New docs * fix printer * wrong change * Adding Async Support feat: enhance human feedback support in flows - Updated the @human_feedback decorator to use 'message' parameter instead of 'request' for clarity. - Introduced new FlowPausedEvent and MethodExecutionPausedEvent to handle flow and method pauses during human feedback. - Added ConsoleProvider for synchronous feedback collection and integrated async feedback capabilities. - Implemented SQLite persistence for managing pending feedback context. - Expanded documentation to include examples of async human feedback usage and best practices. * linter * fix * migrating off printer * updating docs * new tests * doc update
582 lines
19 KiB
Plaintext
582 lines
19 KiB
Plaintext
---
|
|
title: Human Feedback in Flows
|
|
description: Learn how to integrate human feedback directly into your CrewAI Flows using the @human_feedback decorator
|
|
icon: user-check
|
|
mode: "wide"
|
|
---
|
|
|
|
## Overview
|
|
|
|
The `@human_feedback` decorator enables human-in-the-loop (HITL) workflows directly within CrewAI Flows. It allows you to pause flow execution, present output to a human for review, collect their feedback, and optionally route to different listeners based on the feedback outcome.
|
|
|
|
This is particularly valuable for:
|
|
|
|
- **Quality assurance**: Review AI-generated content before it's used downstream
|
|
- **Decision gates**: Let humans make critical decisions in automated workflows
|
|
- **Approval workflows**: Implement approve/reject/revise patterns
|
|
- **Interactive refinement**: Collect feedback to improve outputs iteratively
|
|
|
|
```mermaid
|
|
flowchart LR
|
|
A[Flow Method] --> B[Output Generated]
|
|
B --> C[Human Reviews]
|
|
C --> D{Feedback}
|
|
D -->|emit specified| E[LLM Collapses to Outcome]
|
|
D -->|no emit| F[HumanFeedbackResult]
|
|
E --> G["@listen('approved')"]
|
|
E --> H["@listen('rejected')"]
|
|
F --> I[Next Listener]
|
|
```
|
|
|
|
## Quick Start
|
|
|
|
Here's the simplest way to add human feedback to a flow:
|
|
|
|
```python Code
|
|
from crewai.flow.flow import Flow, start, listen
|
|
from crewai.flow.human_feedback import human_feedback
|
|
|
|
class SimpleReviewFlow(Flow):
|
|
@start()
|
|
@human_feedback(message="Please review this content:")
|
|
def generate_content(self):
|
|
return "This is AI-generated content that needs review."
|
|
|
|
@listen(generate_content)
|
|
def process_feedback(self, result):
|
|
print(f"Content: {result.output}")
|
|
print(f"Human said: {result.feedback}")
|
|
|
|
flow = SimpleReviewFlow()
|
|
flow.kickoff()
|
|
```
|
|
|
|
When this flow runs, it will:
|
|
1. Execute `generate_content` and return the string
|
|
2. Display the output to the user with the request message
|
|
3. Wait for the user to type feedback (or press Enter to skip)
|
|
4. Pass a `HumanFeedbackResult` object to `process_feedback`
|
|
|
|
## The @human_feedback Decorator
|
|
|
|
### Parameters
|
|
|
|
| Parameter | Type | Required | Description |
|
|
|-----------|------|----------|-------------|
|
|
| `message` | `str` | Yes | The message shown to the human alongside the method output |
|
|
| `emit` | `Sequence[str]` | No | List of possible outcomes. Feedback is collapsed to one of these, which triggers `@listen` decorators |
|
|
| `llm` | `str \| BaseLLM` | When `emit` specified | LLM used to interpret feedback and map to an outcome |
|
|
| `default_outcome` | `str` | No | Outcome to use if no feedback provided. Must be in `emit` |
|
|
| `metadata` | `dict` | No | Additional data for enterprise integrations |
|
|
| `provider` | `HumanFeedbackProvider` | No | Custom provider for async/non-blocking feedback. See [Async Human Feedback](#async-human-feedback-non-blocking) |
|
|
|
|
### Basic Usage (No Routing)
|
|
|
|
When you don't specify `emit`, the decorator simply collects feedback and passes a `HumanFeedbackResult` to the next listener:
|
|
|
|
```python Code
|
|
@start()
|
|
@human_feedback(message="What do you think of this analysis?")
|
|
def analyze_data(self):
|
|
return "Analysis results: Revenue up 15%, costs down 8%"
|
|
|
|
@listen(analyze_data)
|
|
def handle_feedback(self, result):
|
|
# result is a HumanFeedbackResult
|
|
print(f"Analysis: {result.output}")
|
|
print(f"Feedback: {result.feedback}")
|
|
```
|
|
|
|
### Routing with emit
|
|
|
|
When you specify `emit`, the decorator becomes a router. The human's free-form feedback is interpreted by an LLM and collapsed into one of the specified outcomes:
|
|
|
|
```python Code
|
|
@start()
|
|
@human_feedback(
|
|
message="Do you approve this content for publication?",
|
|
emit=["approved", "rejected", "needs_revision"],
|
|
llm="gpt-4o-mini",
|
|
default_outcome="needs_revision",
|
|
)
|
|
def review_content(self):
|
|
return "Draft blog post content here..."
|
|
|
|
@listen("approved")
|
|
def publish(self, result):
|
|
print(f"Publishing! User said: {result.feedback}")
|
|
|
|
@listen("rejected")
|
|
def discard(self, result):
|
|
print(f"Discarding. Reason: {result.feedback}")
|
|
|
|
@listen("needs_revision")
|
|
def revise(self, result):
|
|
print(f"Revising based on: {result.feedback}")
|
|
```
|
|
|
|
<Tip>
|
|
The LLM uses structured outputs (function calling) when available to guarantee the response is one of your specified outcomes. This makes routing reliable and predictable.
|
|
</Tip>
|
|
|
|
## HumanFeedbackResult
|
|
|
|
The `HumanFeedbackResult` dataclass contains all information about a human feedback interaction:
|
|
|
|
```python Code
|
|
from crewai.flow.human_feedback import HumanFeedbackResult
|
|
|
|
@dataclass
|
|
class HumanFeedbackResult:
|
|
output: Any # The original method output shown to the human
|
|
feedback: str # The raw feedback text from the human
|
|
outcome: str | None # The collapsed outcome (if emit was specified)
|
|
timestamp: datetime # When the feedback was received
|
|
method_name: str # Name of the decorated method
|
|
metadata: dict # Any metadata passed to the decorator
|
|
```
|
|
|
|
### Accessing in Listeners
|
|
|
|
When a listener is triggered by a `@human_feedback` method with `emit`, it receives the `HumanFeedbackResult`:
|
|
|
|
```python Code
|
|
@listen("approved")
|
|
def on_approval(self, result: HumanFeedbackResult):
|
|
print(f"Original output: {result.output}")
|
|
print(f"User feedback: {result.feedback}")
|
|
print(f"Outcome: {result.outcome}") # "approved"
|
|
print(f"Received at: {result.timestamp}")
|
|
```
|
|
|
|
## Accessing Feedback History
|
|
|
|
The `Flow` class provides two attributes for accessing human feedback:
|
|
|
|
### last_human_feedback
|
|
|
|
Returns the most recent `HumanFeedbackResult`:
|
|
|
|
```python Code
|
|
@listen(some_method)
|
|
def check_feedback(self):
|
|
if self.last_human_feedback:
|
|
print(f"Last feedback: {self.last_human_feedback.feedback}")
|
|
```
|
|
|
|
### human_feedback_history
|
|
|
|
A list of all `HumanFeedbackResult` objects collected during the flow:
|
|
|
|
```python Code
|
|
@listen(final_step)
|
|
def summarize(self):
|
|
print(f"Total feedback collected: {len(self.human_feedback_history)}")
|
|
for i, fb in enumerate(self.human_feedback_history):
|
|
print(f"{i+1}. {fb.method_name}: {fb.outcome or 'no routing'}")
|
|
```
|
|
|
|
<Warning>
|
|
Each `HumanFeedbackResult` is appended to `human_feedback_history`, so multiple feedback steps won't overwrite each other. Use this list to access all feedback collected during the flow.
|
|
</Warning>
|
|
|
|
## Complete Example: Content Approval Workflow
|
|
|
|
Here's a full example implementing a content review and approval workflow:
|
|
|
|
<CodeGroup>
|
|
|
|
```python Code
|
|
from crewai.flow.flow import Flow, start, listen
|
|
from crewai.flow.human_feedback import human_feedback, HumanFeedbackResult
|
|
from pydantic import BaseModel
|
|
|
|
|
|
class ContentState(BaseModel):
|
|
topic: str = ""
|
|
draft: str = ""
|
|
final_content: str = ""
|
|
revision_count: int = 0
|
|
|
|
|
|
class ContentApprovalFlow(Flow[ContentState]):
|
|
"""A flow that generates content and gets human approval."""
|
|
|
|
@start()
|
|
def get_topic(self):
|
|
self.state.topic = input("What topic should I write about? ")
|
|
return self.state.topic
|
|
|
|
@listen(get_topic)
|
|
def generate_draft(self, topic):
|
|
# In real use, this would call an LLM
|
|
self.state.draft = f"# {topic}\n\nThis is a draft about {topic}..."
|
|
return self.state.draft
|
|
|
|
@listen(generate_draft)
|
|
@human_feedback(
|
|
message="Please review this draft. Reply 'approved', 'rejected', or provide revision feedback:",
|
|
emit=["approved", "rejected", "needs_revision"],
|
|
llm="gpt-4o-mini",
|
|
default_outcome="needs_revision",
|
|
)
|
|
def review_draft(self, draft):
|
|
return draft
|
|
|
|
@listen("approved")
|
|
def publish_content(self, result: HumanFeedbackResult):
|
|
self.state.final_content = result.output
|
|
print("\n✅ Content approved and published!")
|
|
print(f"Reviewer comment: {result.feedback}")
|
|
return "published"
|
|
|
|
@listen("rejected")
|
|
def handle_rejection(self, result: HumanFeedbackResult):
|
|
print("\n❌ Content rejected")
|
|
print(f"Reason: {result.feedback}")
|
|
return "rejected"
|
|
|
|
@listen("needs_revision")
|
|
def revise_content(self, result: HumanFeedbackResult):
|
|
self.state.revision_count += 1
|
|
print(f"\n📝 Revision #{self.state.revision_count} requested")
|
|
print(f"Feedback: {result.feedback}")
|
|
|
|
# In a real flow, you might loop back to generate_draft
|
|
# For this example, we just acknowledge
|
|
return "revision_requested"
|
|
|
|
|
|
# Run the flow
|
|
flow = ContentApprovalFlow()
|
|
result = flow.kickoff()
|
|
print(f"\nFlow completed. Revisions requested: {flow.state.revision_count}")
|
|
```
|
|
|
|
```text Output
|
|
What topic should I write about? AI Safety
|
|
|
|
==================================================
|
|
OUTPUT FOR REVIEW:
|
|
==================================================
|
|
# AI Safety
|
|
|
|
This is a draft about AI Safety...
|
|
==================================================
|
|
|
|
Please review this draft. Reply 'approved', 'rejected', or provide revision feedback:
|
|
(Press Enter to skip, or type your feedback)
|
|
|
|
Your feedback: Looks good, approved!
|
|
|
|
✅ Content approved and published!
|
|
Reviewer comment: Looks good, approved!
|
|
|
|
Flow completed. Revisions requested: 0
|
|
```
|
|
|
|
</CodeGroup>
|
|
|
|
## Combining with Other Decorators
|
|
|
|
The `@human_feedback` decorator works with other flow decorators. Place it as the innermost decorator (closest to the function):
|
|
|
|
```python Code
|
|
# Correct: @human_feedback is innermost (closest to the function)
|
|
@start()
|
|
@human_feedback(message="Review this:")
|
|
def my_start_method(self):
|
|
return "content"
|
|
|
|
@listen(other_method)
|
|
@human_feedback(message="Review this too:")
|
|
def my_listener(self, data):
|
|
return f"processed: {data}"
|
|
```
|
|
|
|
<Tip>
|
|
Place `@human_feedback` as the innermost decorator (last/closest to the function) so it wraps the method directly and can capture the return value before passing to the flow system.
|
|
</Tip>
|
|
|
|
## Best Practices
|
|
|
|
### 1. Write Clear Request Messages
|
|
|
|
The `request` parameter is what the human sees. Make it actionable:
|
|
|
|
```python Code
|
|
# ✅ Good - clear and actionable
|
|
@human_feedback(message="Does this summary accurately capture the key points? Reply 'yes' or explain what's missing:")
|
|
|
|
# ❌ Bad - vague
|
|
@human_feedback(message="Review this:")
|
|
```
|
|
|
|
### 2. Choose Meaningful Outcomes
|
|
|
|
When using `emit`, pick outcomes that map naturally to human responses:
|
|
|
|
```python Code
|
|
# ✅ Good - natural language outcomes
|
|
emit=["approved", "rejected", "needs_more_detail"]
|
|
|
|
# ❌ Bad - technical or unclear
|
|
emit=["state_1", "state_2", "state_3"]
|
|
```
|
|
|
|
### 3. Always Provide a Default Outcome
|
|
|
|
Use `default_outcome` to handle cases where users press Enter without typing:
|
|
|
|
```python Code
|
|
@human_feedback(
|
|
message="Approve? (press Enter to request revision)",
|
|
emit=["approved", "needs_revision"],
|
|
llm="gpt-4o-mini",
|
|
default_outcome="needs_revision", # Safe default
|
|
)
|
|
```
|
|
|
|
### 4. Use Feedback History for Audit Trails
|
|
|
|
Access `human_feedback_history` to create audit logs:
|
|
|
|
```python Code
|
|
@listen(final_step)
|
|
def create_audit_log(self):
|
|
log = []
|
|
for fb in self.human_feedback_history:
|
|
log.append({
|
|
"step": fb.method_name,
|
|
"outcome": fb.outcome,
|
|
"feedback": fb.feedback,
|
|
"timestamp": fb.timestamp.isoformat(),
|
|
})
|
|
return log
|
|
```
|
|
|
|
### 5. Handle Both Routed and Non-Routed Feedback
|
|
|
|
When designing flows, consider whether you need routing:
|
|
|
|
| Scenario | Use |
|
|
|----------|-----|
|
|
| Simple review, just need the feedback text | No `emit` |
|
|
| Need to branch to different paths based on response | Use `emit` |
|
|
| Approval gates with approve/reject/revise | Use `emit` |
|
|
| Collecting comments for logging only | No `emit` |
|
|
|
|
## Async Human Feedback (Non-Blocking)
|
|
|
|
By default, `@human_feedback` blocks execution waiting for console input. For production applications, you may need **async/non-blocking** feedback that integrates with external systems like Slack, email, webhooks, or APIs.
|
|
|
|
### The Provider Abstraction
|
|
|
|
Use the `provider` parameter to specify a custom feedback collection strategy:
|
|
|
|
```python Code
|
|
from crewai.flow import Flow, start, human_feedback, HumanFeedbackProvider, HumanFeedbackPending, PendingFeedbackContext
|
|
|
|
class WebhookProvider(HumanFeedbackProvider):
|
|
"""Provider that pauses flow and waits for webhook callback."""
|
|
|
|
def __init__(self, webhook_url: str):
|
|
self.webhook_url = webhook_url
|
|
|
|
def request_feedback(self, context: PendingFeedbackContext, flow: Flow) -> str:
|
|
# Notify external system (e.g., send Slack message, create ticket)
|
|
self.send_notification(context)
|
|
|
|
# Pause execution - framework handles persistence automatically
|
|
raise HumanFeedbackPending(
|
|
context=context,
|
|
callback_info={"webhook_url": f"{self.webhook_url}/{context.flow_id}"}
|
|
)
|
|
|
|
class ReviewFlow(Flow):
|
|
@start()
|
|
@human_feedback(
|
|
message="Review this content:",
|
|
emit=["approved", "rejected"],
|
|
llm="gpt-4o-mini",
|
|
provider=WebhookProvider("https://myapp.com/api"),
|
|
)
|
|
def generate_content(self):
|
|
return "AI-generated content..."
|
|
|
|
@listen("approved")
|
|
def publish(self, result):
|
|
return "Published!"
|
|
```
|
|
|
|
<Tip>
|
|
The flow framework **automatically persists state** when `HumanFeedbackPending` is raised. Your provider only needs to notify the external system and raise the exception—no manual persistence calls required.
|
|
</Tip>
|
|
|
|
### Handling Paused Flows
|
|
|
|
When using an async provider, `kickoff()` returns a `HumanFeedbackPending` object instead of raising an exception:
|
|
|
|
```python Code
|
|
flow = ReviewFlow()
|
|
result = flow.kickoff()
|
|
|
|
if isinstance(result, HumanFeedbackPending):
|
|
# Flow is paused, state is automatically persisted
|
|
print(f"Waiting for feedback at: {result.callback_info['webhook_url']}")
|
|
print(f"Flow ID: {result.context.flow_id}")
|
|
else:
|
|
# Normal completion
|
|
print(f"Flow completed: {result}")
|
|
```
|
|
|
|
### Resuming a Paused Flow
|
|
|
|
When feedback arrives (e.g., via webhook), resume the flow:
|
|
|
|
```python Code
|
|
# Sync handler:
|
|
def handle_feedback_webhook(flow_id: str, feedback: str):
|
|
flow = ReviewFlow.from_pending(flow_id)
|
|
result = flow.resume(feedback)
|
|
return result
|
|
|
|
# Async handler (FastAPI, aiohttp, etc.):
|
|
async def handle_feedback_webhook(flow_id: str, feedback: str):
|
|
flow = ReviewFlow.from_pending(flow_id)
|
|
result = await flow.resume_async(feedback)
|
|
return result
|
|
```
|
|
|
|
### Key Types
|
|
|
|
| Type | Description |
|
|
|------|-------------|
|
|
| `HumanFeedbackProvider` | Protocol for custom feedback providers |
|
|
| `PendingFeedbackContext` | Contains all info needed to resume a paused flow |
|
|
| `HumanFeedbackPending` | Returned by `kickoff()` when flow is paused for feedback |
|
|
| `ConsoleProvider` | Default blocking console input provider |
|
|
|
|
### PendingFeedbackContext
|
|
|
|
The context contains everything needed to resume:
|
|
|
|
```python Code
|
|
@dataclass
|
|
class PendingFeedbackContext:
|
|
flow_id: str # Unique identifier for this flow execution
|
|
flow_class: str # Fully qualified class name
|
|
method_name: str # Method that triggered feedback
|
|
method_output: Any # Output shown to the human
|
|
message: str # The request message
|
|
emit: list[str] | None # Possible outcomes for routing
|
|
default_outcome: str | None
|
|
metadata: dict # Custom metadata
|
|
llm: str | None # LLM for outcome collapsing
|
|
requested_at: datetime
|
|
```
|
|
|
|
### Complete Async Flow Example
|
|
|
|
```python Code
|
|
from crewai.flow import (
|
|
Flow, start, listen, human_feedback,
|
|
HumanFeedbackProvider, HumanFeedbackPending, PendingFeedbackContext
|
|
)
|
|
|
|
class SlackNotificationProvider(HumanFeedbackProvider):
|
|
"""Provider that sends Slack notifications and pauses for async feedback."""
|
|
|
|
def __init__(self, channel: str):
|
|
self.channel = channel
|
|
|
|
def request_feedback(self, context: PendingFeedbackContext, flow: Flow) -> str:
|
|
# Send Slack notification (implement your own)
|
|
slack_thread_id = self.post_to_slack(
|
|
channel=self.channel,
|
|
message=f"Review needed:\n\n{context.method_output}\n\n{context.message}",
|
|
)
|
|
|
|
# Pause execution - framework handles persistence automatically
|
|
raise HumanFeedbackPending(
|
|
context=context,
|
|
callback_info={
|
|
"slack_channel": self.channel,
|
|
"thread_id": slack_thread_id,
|
|
}
|
|
)
|
|
|
|
class ContentPipeline(Flow):
|
|
@start()
|
|
@human_feedback(
|
|
message="Approve this content for publication?",
|
|
emit=["approved", "rejected", "needs_revision"],
|
|
llm="gpt-4o-mini",
|
|
default_outcome="needs_revision",
|
|
provider=SlackNotificationProvider("#content-reviews"),
|
|
)
|
|
def generate_content(self):
|
|
return "AI-generated blog post content..."
|
|
|
|
@listen("approved")
|
|
def publish(self, result):
|
|
print(f"Publishing! Reviewer said: {result.feedback}")
|
|
return {"status": "published"}
|
|
|
|
@listen("rejected")
|
|
def archive(self, result):
|
|
print(f"Archived. Reason: {result.feedback}")
|
|
return {"status": "archived"}
|
|
|
|
@listen("needs_revision")
|
|
def queue_revision(self, result):
|
|
print(f"Queued for revision: {result.feedback}")
|
|
return {"status": "revision_needed"}
|
|
|
|
|
|
# Starting the flow (will pause and wait for Slack response)
|
|
def start_content_pipeline():
|
|
flow = ContentPipeline()
|
|
result = flow.kickoff()
|
|
|
|
if isinstance(result, HumanFeedbackPending):
|
|
return {"status": "pending", "flow_id": result.context.flow_id}
|
|
|
|
return result
|
|
|
|
|
|
# Resuming when Slack webhook fires (sync handler)
|
|
def on_slack_feedback(flow_id: str, slack_message: str):
|
|
flow = ContentPipeline.from_pending(flow_id)
|
|
result = flow.resume(slack_message)
|
|
return result
|
|
|
|
|
|
# If your handler is async (FastAPI, aiohttp, Slack Bolt async, etc.)
|
|
async def on_slack_feedback_async(flow_id: str, slack_message: str):
|
|
flow = ContentPipeline.from_pending(flow_id)
|
|
result = await flow.resume_async(slack_message)
|
|
return result
|
|
```
|
|
|
|
<Warning>
|
|
If you're using an async web framework (FastAPI, aiohttp, Slack Bolt async mode), use `await flow.resume_async()` instead of `flow.resume()`. Calling `resume()` from within a running event loop will raise a `RuntimeError`.
|
|
</Warning>
|
|
|
|
### Best Practices for Async Feedback
|
|
|
|
1. **Check the return type**: `kickoff()` returns `HumanFeedbackPending` when paused—no try/except needed
|
|
2. **Use the right resume method**: Use `resume()` in sync code, `await resume_async()` in async code
|
|
3. **Store callback info**: Use `callback_info` to store webhook URLs, ticket IDs, etc.
|
|
4. **Implement idempotency**: Your resume handler should be idempotent for safety
|
|
5. **Automatic persistence**: State is automatically saved when `HumanFeedbackPending` is raised and uses `SQLiteFlowPersistence` by default
|
|
6. **Custom persistence**: Pass a custom persistence instance to `from_pending()` if needed
|
|
|
|
## Related Documentation
|
|
|
|
- [Flows Overview](/en/concepts/flows) - Learn about CrewAI Flows
|
|
- [Flow State Management](/en/guides/flows/mastering-flow-state) - Managing state in flows
|
|
- [Flow Persistence](/en/concepts/flows#persistence) - Persisting flow state
|
|
- [Routing with @router](/en/concepts/flows#router) - More about conditional routing
|
|
- [Human Input on Execution](/en/learn/human-input-on-execution) - Task-level human input
|