Add MethodExecutionFailedEvent to handle flow method execution failures

- Introduce new MethodExecutionFailedEvent in flow_events module
- Update Flow class to catch and emit method execution failures
- Add event listener for method execution failure events
- Update event-related imports to include new event type
- Enhance test coverage for method execution failure handling
This commit is contained in:
Lorenze Jay
2025-02-14 09:00:16 -08:00
parent 766422dd5e
commit 7d168d6d61
6 changed files with 95 additions and 35 deletions

View File

@@ -31,6 +31,7 @@ from crewai.utilities.events import (
MethodExecutionStartedEvent,
)
from crewai.utilities.events.event_bus import event_bus
from crewai.utilities.events.flow_events import MethodExecutionFailedEvent
from crewai.utilities.printer import Printer
logger = logging.getLogger(__name__)
@@ -808,40 +809,53 @@ class Flow(Generic[T], metaclass=FlowMeta):
async def _execute_method(
self, method_name: str, method: Callable, *args: Any, **kwargs: Any
) -> Any:
dumped_params = {f"_{i}": arg for i, arg in enumerate(args)} | (kwargs or {})
event_bus.emit(
self,
MethodExecutionStartedEvent(
type="method_execution_started",
method_name=method_name,
flow_name=self.__class__.__name__,
params=dumped_params,
state=self._copy_state(),
),
)
try:
dumped_params = {f"_{i}": arg for i, arg in enumerate(args)} | (kwargs or {})
event_bus.emit(
self,
MethodExecutionStartedEvent(
type="method_execution_started",
method_name=method_name,
flow_name=self.__class__.__name__,
params=dumped_params,
state=self._copy_state(),
),
)
result = (
await method(*args, **kwargs)
if asyncio.iscoroutinefunction(method)
else method(*args, **kwargs)
)
self._method_outputs.append(result)
self._method_execution_counts[method_name] = (
self._method_execution_counts.get(method_name, 0) + 1
)
result = (
await method(*args, **kwargs)
if asyncio.iscoroutinefunction(method)
else method(*args, **kwargs)
)
self._method_outputs.append(result)
self._method_execution_counts[method_name] = (
self._method_execution_counts.get(method_name, 0) + 1
)
event_bus.emit(
self,
MethodExecutionFinishedEvent(
type="method_execution_finished",
method_name=method_name,
flow_name=self.__class__.__name__,
state=self._copy_state(),
result=result,
),
)
event_bus.emit(
self,
MethodExecutionFinishedEvent(
type="method_execution_finished",
method_name=method_name,
flow_name=self.__class__.__name__,
state=self._copy_state(),
result=result,
),
)
return result
return result
except Exception as e:
event_bus.emit(
self,
MethodExecutionFailedEvent(
type="method_execution_failed",
method_name=method_name,
flow_name=self.__class__.__name__,
error=e,
),
)
async def _execute_listeners(self, trigger_method: str, result: Any) -> None:
"""

View File

@@ -21,6 +21,7 @@ from .flow_events import (
FlowFinishedEvent,
MethodExecutionStartedEvent,
MethodExecutionFinishedEvent,
MethodExecutionFailedEvent,
)
from .event_bus import EventBus, event_bus
from .tool_usage_events import ToolUsageFinishedEvent, ToolUsageErrorEvent

View File

@@ -20,6 +20,7 @@ from .flow_events import (
FlowCreatedEvent,
FlowFinishedEvent,
FlowStartedEvent,
MethodExecutionFailedEvent,
MethodExecutionFinishedEvent,
MethodExecutionStartedEvent,
)
@@ -125,9 +126,8 @@ class EventListener(BaseEventListener):
source._execution_span = self._telemetry.task_started(
crew=source.agent.crew, task=source
)
context = event.context
self.logger.log(
f"📋 Task started: {source.description} Context: {context}",
f"📋 Task started: {source.description}",
event.timestamp,
color=self.color,
)
@@ -139,7 +139,7 @@ class EventListener(BaseEventListener):
if source._execution_span:
self._telemetry.task_ended(source._execution_span, source, source.agent.crew)
self.logger.log(
f"📋 Task completed: {source.description}",
f" Task completed: {source.description}",
event.timestamp,
color=self.color,
)
@@ -169,7 +169,7 @@ class EventListener(BaseEventListener):
@event_bus.on(AgentExecutionCompletedEvent)
def on_agent_execution_completed(source, event: AgentExecutionCompletedEvent):
self.logger.log(
f"👍 Agent '{event.agent.role}' completed task",
f" Agent '{event.agent.role}' completed task",
event.timestamp,
color=self.color,
)
@@ -212,6 +212,15 @@ class EventListener(BaseEventListener):
color=self.color,
)
@event_bus.on(MethodExecutionFailedEvent)
def on_method_execution_failed(source, event: MethodExecutionFailedEvent):
self.logger.log(
f"❌ Flow Method Failed: '{event.method_name}'",
event.timestamp,
color=self.color,
)
@event_bus.on(MethodExecutionFinishedEvent)
def on_method_execution_finished(source, event: MethodExecutionFinishedEvent):
self.logger.log(

View File

@@ -21,6 +21,7 @@ from .flow_events import (
FlowStartedEvent,
MethodExecutionFinishedEvent,
MethodExecutionStartedEvent,
MethodExecutionFailedEvent,
)
from .task_events import (
TaskCompletedEvent,
@@ -48,6 +49,7 @@ EventTypes = Union[
FlowFinishedEvent,
MethodExecutionStartedEvent,
MethodExecutionFinishedEvent,
MethodExecutionFailedEvent,
AgentExecutionErrorEvent,
ToolUsageFinishedEvent,
ToolUsageErrorEvent,

View File

@@ -45,6 +45,15 @@ class MethodExecutionFinishedEvent(FlowEvent):
result: Any = None
state: Union[Dict[str, Any], BaseModel]
type: str = "method_execution_finished"
class MethodExecutionFailedEvent(FlowEvent):
"""Event emitted when a flow method fails execution"""
flow_name: str
method_name: str
error: Any
type: str = "method_execution_failed"
class FlowFinishedEvent(FlowEvent):

View File

@@ -26,6 +26,7 @@ from crewai.utilities.events.flow_events import (
FlowCreatedEvent,
FlowFinishedEvent,
FlowStartedEvent,
MethodExecutionFailedEvent,
MethodExecutionStartedEvent,
)
from crewai.utilities.events.task_events import (
@@ -468,3 +469,27 @@ def test_flow_emits_created_event():
assert len(received_events) == 1
assert received_events[0].flow_name == "TestFlow"
assert received_events[0].type == "flow_created"
def test_flow_emits_method_execution_failed_event():
received_events = []
error = Exception("Simulated method failure")
@event_bus.on(MethodExecutionFailedEvent)
def handle_method_failed(source, event):
received_events.append(event)
class TestFlow(Flow[dict]):
@start()
def begin(self):
raise error
flow = TestFlow()
# with pytest.raises(Exception):
flow.kickoff()
assert len(received_events) == 1
assert received_events[0].method_name == "begin"
assert received_events[0].flow_name == "TestFlow"
assert received_events[0].type == "method_execution_failed"
assert received_events[0].error == error