Add FlowPlotEvent and update event bus to support flow plotting

- Introduce FlowPlotEvent to track flow plotting events
- Replace Telemetry method with event bus emission in Flow.plot()
- Update event bus to support new FlowPlotEvent type
- Add test case to validate flow plotting event emission
This commit is contained in:
Lorenze Jay
2025-02-18 09:10:27 -08:00
parent 935da884ed
commit d0f9abaa85
6 changed files with 162 additions and 10 deletions

View File

@@ -14,6 +14,7 @@ from crewai.utilities.events import (
MethodExecutionStartedEvent,
crewai_event_bus,
)
from crewai.utilities.events.flow_events import FlowPlotEvent
def test_simple_sequential_flow():
@@ -627,3 +628,29 @@ def test_stateless_flow_event_emission():
== "Deeds will not be less valiant because they are unpraised."
)
assert isinstance(received_events[5].timestamp, datetime)
def test_flow_plotting():
class StatelessFlow(Flow):
@start()
def init(self):
return "Initializing flow..."
@listen(init)
def process(self):
return "Deeds will not be less valiant because they are unpraised."
flow = StatelessFlow()
flow.kickoff()
received_events = []
@crewai_event_bus.on(FlowPlotEvent)
def handle_flow_plot(source, event):
received_events.append(event)
flow.plot("test_flow")
assert len(received_events) == 1
assert isinstance(received_events[0], FlowPlotEvent)
assert received_events[0].flow_name == "StatelessFlow"
assert isinstance(received_events[0].timestamp, datetime)