From 7d168d6d61319a2d2d284da8fb31b0bcc0d80797 Mon Sep 17 00:00:00 2001 From: Lorenze Jay Date: Fri, 14 Feb 2025 09:00:16 -0800 Subject: [PATCH] Add MethodExecutionFailedEvent to handle flow method execution failures - Introduce new MethodExecutionFailedEvent in flow_events module - Update Flow class to catch and emit method execution failures - Add event listener for method execution failure events - Update event-related imports to include new event type - Enhance test coverage for method execution failure handling --- src/crewai/flow/flow.py | 76 +++++++++++-------- src/crewai/utilities/events/__init__.py | 1 + src/crewai/utilities/events/event_listener.py | 17 ++++- src/crewai/utilities/events/event_types.py | 2 + src/crewai/utilities/events/flow_events.py | 9 +++ tests/utilities/test_events.py | 25 ++++++ 6 files changed, 95 insertions(+), 35 deletions(-) diff --git a/src/crewai/flow/flow.py b/src/crewai/flow/flow.py index c89dcc49c..ac89fa088 100644 --- a/src/crewai/flow/flow.py +++ b/src/crewai/flow/flow.py @@ -31,6 +31,7 @@ from crewai.utilities.events import ( MethodExecutionStartedEvent, ) from crewai.utilities.events.event_bus import event_bus +from crewai.utilities.events.flow_events import MethodExecutionFailedEvent from crewai.utilities.printer import Printer logger = logging.getLogger(__name__) @@ -808,40 +809,53 @@ class Flow(Generic[T], metaclass=FlowMeta): async def _execute_method( self, method_name: str, method: Callable, *args: Any, **kwargs: Any ) -> Any: - dumped_params = {f"_{i}": arg for i, arg in enumerate(args)} | (kwargs or {}) - event_bus.emit( - self, - MethodExecutionStartedEvent( - type="method_execution_started", - method_name=method_name, - flow_name=self.__class__.__name__, - params=dumped_params, - state=self._copy_state(), - ), - ) + try: + dumped_params = {f"_{i}": arg for i, arg in enumerate(args)} | (kwargs or {}) + event_bus.emit( + self, + MethodExecutionStartedEvent( + type="method_execution_started", + method_name=method_name, + flow_name=self.__class__.__name__, + params=dumped_params, + state=self._copy_state(), + ), + ) - result = ( - await method(*args, **kwargs) - if asyncio.iscoroutinefunction(method) - else method(*args, **kwargs) - ) - self._method_outputs.append(result) - self._method_execution_counts[method_name] = ( - self._method_execution_counts.get(method_name, 0) + 1 - ) + + result = ( + await method(*args, **kwargs) + if asyncio.iscoroutinefunction(method) + else method(*args, **kwargs) + ) + + self._method_outputs.append(result) + self._method_execution_counts[method_name] = ( + self._method_execution_counts.get(method_name, 0) + 1 + ) - event_bus.emit( - self, - MethodExecutionFinishedEvent( - type="method_execution_finished", - method_name=method_name, - flow_name=self.__class__.__name__, - state=self._copy_state(), - result=result, - ), - ) + event_bus.emit( + self, + MethodExecutionFinishedEvent( + type="method_execution_finished", + method_name=method_name, + flow_name=self.__class__.__name__, + state=self._copy_state(), + result=result, + ), + ) - return result + return result + except Exception as e: + event_bus.emit( + self, + MethodExecutionFailedEvent( + type="method_execution_failed", + method_name=method_name, + flow_name=self.__class__.__name__, + error=e, + ), + ) async def _execute_listeners(self, trigger_method: str, result: Any) -> None: """ diff --git a/src/crewai/utilities/events/__init__.py b/src/crewai/utilities/events/__init__.py index b7e49bcd3..36ae36c9d 100644 --- a/src/crewai/utilities/events/__init__.py +++ b/src/crewai/utilities/events/__init__.py @@ -21,6 +21,7 @@ from .flow_events import ( FlowFinishedEvent, MethodExecutionStartedEvent, MethodExecutionFinishedEvent, + MethodExecutionFailedEvent, ) from .event_bus import EventBus, event_bus from .tool_usage_events import ToolUsageFinishedEvent, ToolUsageErrorEvent diff --git a/src/crewai/utilities/events/event_listener.py b/src/crewai/utilities/events/event_listener.py index d796889e1..80d265105 100644 --- a/src/crewai/utilities/events/event_listener.py +++ b/src/crewai/utilities/events/event_listener.py @@ -20,6 +20,7 @@ from .flow_events import ( FlowCreatedEvent, FlowFinishedEvent, FlowStartedEvent, + MethodExecutionFailedEvent, MethodExecutionFinishedEvent, MethodExecutionStartedEvent, ) @@ -125,9 +126,8 @@ class EventListener(BaseEventListener): source._execution_span = self._telemetry.task_started( crew=source.agent.crew, task=source ) - context = event.context self.logger.log( - f"📋 Task started: {source.description} Context: {context}", + f"📋 Task started: {source.description}", event.timestamp, color=self.color, ) @@ -139,7 +139,7 @@ class EventListener(BaseEventListener): if source._execution_span: self._telemetry.task_ended(source._execution_span, source, source.agent.crew) self.logger.log( - f"📋 Task completed: {source.description}", + f"✅ Task completed: {source.description}", event.timestamp, color=self.color, ) @@ -169,7 +169,7 @@ class EventListener(BaseEventListener): @event_bus.on(AgentExecutionCompletedEvent) def on_agent_execution_completed(source, event: AgentExecutionCompletedEvent): self.logger.log( - f"👍 Agent '{event.agent.role}' completed task", + f"✅ Agent '{event.agent.role}' completed task", event.timestamp, color=self.color, ) @@ -212,6 +212,15 @@ class EventListener(BaseEventListener): color=self.color, ) + @event_bus.on(MethodExecutionFailedEvent) + def on_method_execution_failed(source, event: MethodExecutionFailedEvent): + self.logger.log( + f"❌ Flow Method Failed: '{event.method_name}'", + event.timestamp, + color=self.color, + ) + + @event_bus.on(MethodExecutionFinishedEvent) def on_method_execution_finished(source, event: MethodExecutionFinishedEvent): self.logger.log( diff --git a/src/crewai/utilities/events/event_types.py b/src/crewai/utilities/events/event_types.py index b4fa0f929..774dc8b0e 100644 --- a/src/crewai/utilities/events/event_types.py +++ b/src/crewai/utilities/events/event_types.py @@ -21,6 +21,7 @@ from .flow_events import ( FlowStartedEvent, MethodExecutionFinishedEvent, MethodExecutionStartedEvent, + MethodExecutionFailedEvent, ) from .task_events import ( TaskCompletedEvent, @@ -48,6 +49,7 @@ EventTypes = Union[ FlowFinishedEvent, MethodExecutionStartedEvent, MethodExecutionFinishedEvent, + MethodExecutionFailedEvent, AgentExecutionErrorEvent, ToolUsageFinishedEvent, ToolUsageErrorEvent, diff --git a/src/crewai/utilities/events/flow_events.py b/src/crewai/utilities/events/flow_events.py index ac0fbeea1..e6a272a1d 100644 --- a/src/crewai/utilities/events/flow_events.py +++ b/src/crewai/utilities/events/flow_events.py @@ -45,6 +45,15 @@ class MethodExecutionFinishedEvent(FlowEvent): result: Any = None state: Union[Dict[str, Any], BaseModel] type: str = "method_execution_finished" + + +class MethodExecutionFailedEvent(FlowEvent): + """Event emitted when a flow method fails execution""" + + flow_name: str + method_name: str + error: Any + type: str = "method_execution_failed" class FlowFinishedEvent(FlowEvent): diff --git a/tests/utilities/test_events.py b/tests/utilities/test_events.py index 4efa4377c..33aebc962 100644 --- a/tests/utilities/test_events.py +++ b/tests/utilities/test_events.py @@ -26,6 +26,7 @@ from crewai.utilities.events.flow_events import ( FlowCreatedEvent, FlowFinishedEvent, FlowStartedEvent, + MethodExecutionFailedEvent, MethodExecutionStartedEvent, ) from crewai.utilities.events.task_events import ( @@ -468,3 +469,27 @@ def test_flow_emits_created_event(): assert len(received_events) == 1 assert received_events[0].flow_name == "TestFlow" assert received_events[0].type == "flow_created" + + +def test_flow_emits_method_execution_failed_event(): + received_events = [] + error = Exception("Simulated method failure") + + @event_bus.on(MethodExecutionFailedEvent) + def handle_method_failed(source, event): + received_events.append(event) + + class TestFlow(Flow[dict]): + @start() + def begin(self): + raise error + + flow = TestFlow() + # with pytest.raises(Exception): + flow.kickoff() + + assert len(received_events) == 1 + assert received_events[0].method_name == "begin" + assert received_events[0].flow_name == "TestFlow" + assert received_events[0].type == "method_execution_failed" + assert received_events[0].error == error