diff --git a/src/crewai/crew.py b/src/crewai/crew.py index cae8cbfa5..55a24640c 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -574,8 +574,7 @@ class Crew(BaseModel): CrewKickoffStartedEvent(crew_name=self.name or "crew", inputs=inputs), ) - """Starts the crew to work on its assigned tasks.""" - # self._execution_span = self._telemetry.crew_execution_span(self, inputs) # TODO: drop this + # Starts the crew to work on its assigned tasks. self._task_output_handler.reset() self._logging_color = "bold_purple" diff --git a/src/crewai/flow/flow.py b/src/crewai/flow/flow.py index d4c574d76..396b8f8c5 100644 --- a/src/crewai/flow/flow.py +++ b/src/crewai/flow/flow.py @@ -22,16 +22,16 @@ from pydantic import BaseModel, Field, ValidationError from crewai.flow.flow_visualizer import plot_flow from crewai.flow.persistence.base import FlowPersistence from crewai.flow.utils import get_possible_return_constants -from crewai.telemetry import Telemetry -from crewai.utilities.events import ( +from crewai.utilities.events.crewai_event_bus import crewai_event_bus +from crewai.utilities.events.flow_events import ( FlowCreatedEvent, FlowFinishedEvent, + FlowPlotEvent, FlowStartedEvent, + MethodExecutionFailedEvent, MethodExecutionFinishedEvent, MethodExecutionStartedEvent, ) -from crewai.utilities.events.crewai_event_bus import crewai_event_bus -from crewai.utilities.events.flow_events import MethodExecutionFailedEvent from crewai.utilities.printer import Printer logger = logging.getLogger(__name__) @@ -1045,7 +1045,11 @@ class Flow(Generic[T], metaclass=FlowMeta): logger.warning(message) def plot(self, filename: str = "crewai_flow") -> None: - Telemetry().flow_plotting_span( - self.__class__.__name__, list(self._methods.keys()) + crewai_event_bus.emit( + self, + FlowPlotEvent( + type="flow_plot", + flow_name=self.__class__.__name__, + ), ) plot_flow(self, filename) diff --git a/src/crewai/utilities/events/__init__.py b/src/crewai/utilities/events/__init__.py index 24ad3121c..b8b3b9cfb 100644 --- a/src/crewai/utilities/events/__init__.py +++ b/src/crewai/utilities/events/__init__.py @@ -19,6 +19,7 @@ from .flow_events import ( FlowCreatedEvent, FlowStartedEvent, FlowFinishedEvent, + FlowPlotEvent, MethodExecutionStartedEvent, MethodExecutionFinishedEvent, MethodExecutionFailedEvent, diff --git a/src/crewai/utilities/events/crewai_event_bus.py b/src/crewai/utilities/events/crewai_event_bus.py new file mode 100644 index 000000000..e3cafa0e1 --- /dev/null +++ b/src/crewai/utilities/events/crewai_event_bus.py @@ -0,0 +1,114 @@ +import threading +from contextlib import contextmanager +from typing import Any, Callable, Dict, List, Type, TypeVar, cast + +from blinker import Signal + +from crewai.utilities.events.crew_events import CrewEvent + +EventT = TypeVar("EventT", bound=CrewEvent) + + +class CrewAIEventsBus: + """ + A singleton event bus that uses blinker signals for event handling. + Allows both internal (Flow/Crew) and external event handling. + """ + + _instance = None + _lock = threading.Lock() + + def __new__(cls): + if cls._instance is None: + with cls._lock: + if cls._instance is None: # prevent race condition + cls._instance = super(CrewAIEventsBus, cls).__new__(cls) + cls._instance._initialize() + return cls._instance + + def _initialize(self) -> None: + """Initialize the event bus internal state""" + self._signal = Signal("crewai_event_bus") + self._handlers: Dict[ + Type[CrewEvent], List[Callable[[Any, CrewEvent], None]] + ] = {} + + def on( + self, event_type: Type[EventT] + ) -> Callable[[Callable[[Any, EventT], None]], Callable[[Any, EventT], None]]: + """ + Decorator to register an event handler for a specific event type. + + Usage: + @crewai_event_bus.on(AgentExecutionCompletedEvent) + def on_agent_execution_completed( + source: Any, event: AgentExecutionCompletedEvent + ): + print(f"👍 Agent '{event.agent}' completed task") + print(f" Output: {event.output}") + """ + + def decorator( + handler: Callable[[Any, EventT], None], + ) -> Callable[[Any, EventT], None]: + if event_type not in self._handlers: + self._handlers[event_type] = [] + self._handlers[event_type].append( + cast(Callable[[Any, CrewEvent], None], handler) + ) + return handler + + return decorator + + def emit(self, source: Any, event: CrewEvent) -> None: + """ + Emit an event to all registered handlers + + Args: + source: The object emitting the event + event: The event instance to emit + """ + event_type = type(event) + if event_type in self._handlers: + for handler in self._handlers[event_type]: + handler(source, event) + self._signal.send(source, event=event) + + def clear_handlers(self) -> None: + """Clear all registered event handlers - useful for testing""" + self._handlers.clear() + + def register_handler( + self, event_type: Type[EventT], handler: Callable[[Any, EventT], None] + ) -> None: + """Register an event handler for a specific event type""" + if event_type not in self._handlers: + self._handlers[event_type] = [] + self._handlers[event_type].append( + cast(Callable[[Any, CrewEvent], None], handler) + ) + + @contextmanager + def scoped_handlers(self): + """ + Context manager for temporary event handling scope. + Useful for testing or temporary event handling. + + Usage: + with crewai_event_bus.scoped_handlers(): + @crewai_event_bus.on(CrewKickoffStarted) + def temp_handler(source, event): + print("Temporary handler") + # Do stuff... + # Handlers are cleared after the context + """ + previous_handlers = self._handlers.copy() + self._handlers.clear() + try: + yield + finally: + self._handlers = previous_handlers + + +# Global instance +crewai_event_bus = CrewAIEventsBus() diff --git a/src/crewai/utilities/events/flow_events.py b/src/crewai/utilities/events/flow_events.py index e6a272a1d..b3daf889f 100644 --- a/src/crewai/utilities/events/flow_events.py +++ b/src/crewai/utilities/events/flow_events.py @@ -20,13 +20,13 @@ class FlowStartedEvent(FlowEvent): type: str = "flow_started" - class FlowCreatedEvent(FlowEvent): """Event emitted when a flow is created""" flow_name: str type: str = "flow_created" + class MethodExecutionStartedEvent(FlowEvent): """Event emitted when a flow method starts execution""" @@ -45,7 +45,7 @@ class MethodExecutionFinishedEvent(FlowEvent): result: Any = None state: Union[Dict[str, Any], BaseModel] type: str = "method_execution_finished" - + class MethodExecutionFailedEvent(FlowEvent): """Event emitted when a flow method fails execution""" @@ -62,3 +62,10 @@ class FlowFinishedEvent(FlowEvent): flow_name: str result: Optional[Any] = None type: str = "flow_finished" + + +class FlowPlotEvent(FlowEvent): + """Event emitted when a flow plot is created""" + + flow_name: str + type: str = "flow_plot" diff --git a/tests/flow_test.py b/tests/flow_test.py index 7e32a9c4c..b2edcfa5a 100644 --- a/tests/flow_test.py +++ b/tests/flow_test.py @@ -14,6 +14,7 @@ from crewai.utilities.events import ( MethodExecutionStartedEvent, crewai_event_bus, ) +from crewai.utilities.events.flow_events import FlowPlotEvent def test_simple_sequential_flow(): @@ -627,3 +628,29 @@ def test_stateless_flow_event_emission(): == "Deeds will not be less valiant because they are unpraised." ) assert isinstance(received_events[5].timestamp, datetime) + + +def test_flow_plotting(): + class StatelessFlow(Flow): + @start() + def init(self): + return "Initializing flow..." + + @listen(init) + def process(self): + return "Deeds will not be less valiant because they are unpraised." + + flow = StatelessFlow() + flow.kickoff() + received_events = [] + + @crewai_event_bus.on(FlowPlotEvent) + def handle_flow_plot(source, event): + received_events.append(event) + + flow.plot("test_flow") + + assert len(received_events) == 1 + assert isinstance(received_events[0], FlowPlotEvent) + assert received_events[0].flow_name == "StatelessFlow" + assert isinstance(received_events[0].timestamp, datetime)