From 8945457883721b526fa51226560618471a78f7f2 Mon Sep 17 00:00:00 2001 From: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com> Date: Tue, 6 Jan 2026 16:12:34 -0800 Subject: [PATCH] Lorenze/metrics for human feedback flows (#4188) * measuring human feedback feat * add some tests --- lib/crewai/src/crewai/events/__init__.py | 8 +- .../src/crewai/events/event_listener.py | 30 +++- lib/crewai/src/crewai/telemetry/telemetry.py | 32 +++++ lib/crewai/tests/utilities/test_events.py | 135 ++++++++++++++++++ 4 files changed, 202 insertions(+), 3 deletions(-) diff --git a/lib/crewai/src/crewai/events/__init__.py b/lib/crewai/src/crewai/events/__init__.py index 4147965e1..efbb479cd 100644 --- a/lib/crewai/src/crewai/events/__init__.py +++ b/lib/crewai/src/crewai/events/__init__.py @@ -10,7 +10,7 @@ This module provides the event infrastructure that allows users to: from __future__ import annotations -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any from crewai.events.base_event_listener import BaseEventListener from crewai.events.depends import Depends @@ -34,6 +34,8 @@ from crewai.events.types.flow_events import ( FlowFinishedEvent, FlowPlotEvent, FlowStartedEvent, + HumanFeedbackReceivedEvent, + HumanFeedbackRequestedEvent, MethodExecutionFailedEvent, MethodExecutionFinishedEvent, MethodExecutionStartedEvent, @@ -145,6 +147,8 @@ __all__ = [ "FlowFinishedEvent", "FlowPlotEvent", "FlowStartedEvent", + "HumanFeedbackReceivedEvent", + "HumanFeedbackRequestedEvent", "KnowledgeQueryCompletedEvent", "KnowledgeQueryFailedEvent", "KnowledgeQueryStartedEvent", @@ -205,7 +209,7 @@ _AGENT_EVENT_MAPPING = { } -def __getattr__(name: str): +def __getattr__(name: str) -> Any: """Lazy import for agent events to avoid circular imports.""" if name in _AGENT_EVENT_MAPPING: import importlib diff --git a/lib/crewai/src/crewai/events/event_listener.py b/lib/crewai/src/crewai/events/event_listener.py index 36c37e9c9..68a158642 100644 --- a/lib/crewai/src/crewai/events/event_listener.py +++ b/lib/crewai/src/crewai/events/event_listener.py @@ -37,6 +37,8 @@ from crewai.events.types.flow_events import ( FlowFinishedEvent, FlowPausedEvent, FlowStartedEvent, + HumanFeedbackReceivedEvent, + HumanFeedbackRequestedEvent, MethodExecutionFailedEvent, MethodExecutionFinishedEvent, MethodExecutionPausedEvent, @@ -67,7 +69,6 @@ from crewai.events.types.mcp_events import ( MCPConnectionCompletedEvent, MCPConnectionFailedEvent, MCPConnectionStartedEvent, - MCPToolExecutionCompletedEvent, MCPToolExecutionFailedEvent, MCPToolExecutionStartedEvent, ) @@ -329,6 +330,33 @@ class EventListener(BaseEventListener): "paused", ) + # ----------- HUMAN FEEDBACK EVENTS ----------- + @crewai_event_bus.on(HumanFeedbackRequestedEvent) + def on_human_feedback_requested( + _: Any, event: HumanFeedbackRequestedEvent + ) -> None: + """Handle human feedback requested event.""" + has_routing = event.emit is not None and len(event.emit) > 0 + self._telemetry.human_feedback_span( + event_type="requested", + has_routing=has_routing, + num_outcomes=len(event.emit) if event.emit else 0, + ) + + @crewai_event_bus.on(HumanFeedbackReceivedEvent) + def on_human_feedback_received( + _: Any, event: HumanFeedbackReceivedEvent + ) -> None: + """Handle human feedback received event.""" + has_routing = event.outcome is not None + self._telemetry.human_feedback_span( + event_type="received", + has_routing=has_routing, + num_outcomes=0, + feedback_provided=bool(event.feedback and event.feedback.strip()), + outcome=event.outcome, + ) + # ----------- TOOL USAGE EVENTS ----------- @crewai_event_bus.on(ToolUsageStartedEvent) def on_tool_usage_started(source: Any, event: ToolUsageStartedEvent) -> None: diff --git a/lib/crewai/src/crewai/telemetry/telemetry.py b/lib/crewai/src/crewai/telemetry/telemetry.py index 016bda10e..66a080894 100644 --- a/lib/crewai/src/crewai/telemetry/telemetry.py +++ b/lib/crewai/src/crewai/telemetry/telemetry.py @@ -969,3 +969,35 @@ class Telemetry: close_span(span) self._safe_telemetry_operation(_operation) + + def human_feedback_span( + self, + event_type: str, + has_routing: bool, + num_outcomes: int = 0, + feedback_provided: bool | None = None, + outcome: str | None = None, + ) -> None: + """Records human feedback feature usage. + + Args: + event_type: Type of event - "requested" or "received". + has_routing: Whether emit options were configured for routing. + num_outcomes: Number of possible outcomes if routing is used. + feedback_provided: Whether user provided feedback or skipped (None if requested). + outcome: The collapsed outcome string if routing was used. + """ + + def _operation() -> None: + tracer = trace.get_tracer("crewai.telemetry") + span = tracer.start_span("Human Feedback") + self._add_attribute(span, "event_type", event_type) + self._add_attribute(span, "has_routing", has_routing) + self._add_attribute(span, "num_outcomes", num_outcomes) + if feedback_provided is not None: + self._add_attribute(span, "feedback_provided", feedback_provided) + if outcome is not None: + self._add_attribute(span, "outcome", outcome) + close_span(span) + + self._safe_telemetry_operation(_operation) diff --git a/lib/crewai/tests/utilities/test_events.py b/lib/crewai/tests/utilities/test_events.py index e655c0c5a..f637b6c3d 100644 --- a/lib/crewai/tests/utilities/test_events.py +++ b/lib/crewai/tests/utilities/test_events.py @@ -25,6 +25,8 @@ from crewai.events.types.flow_events import ( FlowCreatedEvent, FlowFinishedEvent, FlowStartedEvent, + HumanFeedbackReceivedEvent, + HumanFeedbackRequestedEvent, MethodExecutionFailedEvent, MethodExecutionFinishedEvent, MethodExecutionStartedEvent, @@ -45,6 +47,7 @@ from crewai.events.types.tool_usage_events import ( ToolUsageFinishedEvent, ) from crewai.flow.flow import Flow, listen, start +from crewai.flow.human_feedback import human_feedback from crewai.llm import LLM from crewai.task import Task from crewai.tools.base_tool import BaseTool @@ -1273,3 +1276,135 @@ def test_llm_emits_event_with_lite_agent(): assert set(all_agent_roles) == {agent.role} assert set(all_agent_id) == {str(agent.id)} + + +# ----------- HUMAN FEEDBACK EVENTS ----------- + + +@patch("builtins.input", return_value="looks good") +@patch("builtins.print") +def test_human_feedback_emits_requested_and_received_events(mock_print, mock_input): + """Test that @human_feedback decorator emits HumanFeedbackRequested and Received events.""" + requested_events = [] + received_events = [] + events_received = threading.Event() + + @crewai_event_bus.on(HumanFeedbackRequestedEvent) + def handle_requested(source, event): + requested_events.append(event) + + @crewai_event_bus.on(HumanFeedbackReceivedEvent) + def handle_received(source, event): + received_events.append(event) + events_received.set() + + class TestFlow(Flow): + @start() + @human_feedback( + message="Review:", + emit=["approved", "rejected"], + llm="gpt-4o-mini", + ) + def review(self): + return "test content" + + flow = TestFlow() + + with patch.object(flow, "_collapse_to_outcome", return_value="approved"): + flow.kickoff() + + assert events_received.wait(timeout=5), ( + "Timeout waiting for human feedback events" + ) + + assert len(requested_events) == 1 + assert requested_events[0].type == "human_feedback_requested" + assert requested_events[0].emit == ["approved", "rejected"] + assert requested_events[0].message == "Review:" + assert requested_events[0].output == "test content" + + assert len(received_events) == 1 + assert received_events[0].type == "human_feedback_received" + assert received_events[0].feedback == "looks good" + assert received_events[0].outcome is None + + assert flow.last_human_feedback is not None + assert flow.last_human_feedback.outcome == "approved" + + +@patch("builtins.input", return_value="feedback text") +@patch("builtins.print") +def test_human_feedback_without_routing_emits_events(mock_print, mock_input): + """Test that @human_feedback without emit still emits events.""" + requested_events = [] + received_events = [] + events_received = threading.Event() + + @crewai_event_bus.on(HumanFeedbackRequestedEvent) + def handle_requested(source, event): + requested_events.append(event) + + @crewai_event_bus.on(HumanFeedbackReceivedEvent) + def handle_received(source, event): + received_events.append(event) + events_received.set() + + class SimpleFlow(Flow): + @start() + @human_feedback(message="Please review:") + def review(self): + return "content to review" + + flow = SimpleFlow() + flow.kickoff() + + assert events_received.wait(timeout=5), ( + "Timeout waiting for human feedback events" + ) + + assert len(requested_events) == 1 + assert requested_events[0].emit is None + + assert len(received_events) == 1 + assert received_events[0].feedback == "feedback text" + assert received_events[0].outcome is None + + +@patch("builtins.input", return_value="") +@patch("builtins.print") +def test_human_feedback_empty_feedback_emits_events(mock_print, mock_input): + """Test that empty feedback (skipped) still emits events correctly.""" + received_events = [] + events_received = threading.Event() + + @crewai_event_bus.on(HumanFeedbackReceivedEvent) + def handle_received(source, event): + received_events.append(event) + events_received.set() + + class SkipFlow(Flow): + @start() + @human_feedback( + message="Review:", + emit=["approved", "rejected"], + llm="gpt-4o-mini", + default_outcome="rejected", + ) + def review(self): + return "content" + + flow = SkipFlow() + flow.kickoff() + + assert events_received.wait(timeout=5), ( + "Timeout waiting for human feedback events" + ) + + + assert len(received_events) == 1 + assert received_events[0].feedback == "" + assert received_events[0].outcome is None + + + assert flow.last_human_feedback is not None + assert flow.last_human_feedback.outcome == "rejected"