include event emitter in flows (#1740)

* include event emitter in flows

* Clean up

* Fix linter
This commit is contained in:
Brandon Hancock (bhancock_ai)
2024-12-11 10:16:05 -05:00
committed by GitHub
parent da9220fa81
commit 1df61aba4c
5 changed files with 94 additions and 9 deletions

View File

@@ -14,8 +14,15 @@ from typing import (
cast,
)
from blinker import Signal
from pydantic import BaseModel, ValidationError
from crewai.flow.flow_events import (
FlowFinishedEvent,
FlowStartedEvent,
MethodExecutionFinishedEvent,
MethodExecutionStartedEvent,
)
from crewai.flow.flow_visualizer import plot_flow
from crewai.flow.utils import get_possible_return_constants
from crewai.telemetry import Telemetry
@@ -159,6 +166,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
_routers: Dict[str, str] = {}
_router_paths: Dict[str, List[str]] = {}
initial_state: Union[Type[T], T, None] = None
event_emitter = Signal("event_emitter")
def __class_getitem__(cls: Type["Flow"], item: Type[T]) -> Type["Flow"]:
class _FlowGeneric(cls): # type: ignore
@@ -253,6 +261,14 @@ class Flow(Generic[T], metaclass=FlowMeta):
Returns:
The final output from the flow execution.
"""
self.event_emitter.send(
self,
event=FlowStartedEvent(
type="flow_started",
flow_name=self.__class__.__name__,
),
)
if inputs is not None:
self._initialize_state(inputs)
return asyncio.run(self.kickoff_async())
@@ -267,8 +283,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
Returns:
The final output from the flow execution.
"""
if inputs is not None:
self._initialize_state(inputs)
if not self._start_methods:
raise ValueError("No start method defined")
@@ -285,11 +299,19 @@ class Flow(Generic[T], metaclass=FlowMeta):
# Run all start methods concurrently
await asyncio.gather(*tasks)
# Return the final output (from the last executed method)
if self._method_outputs:
return self._method_outputs[-1]
else:
return None # Or raise an exception if no methods were executed
# Determine the final output (from the last executed method)
final_output = self._method_outputs[-1] if self._method_outputs else None
self.event_emitter.send(
self,
event=FlowFinishedEvent(
type="flow_finished",
flow_name=self.__class__.__name__,
result=final_output,
),
)
return final_output
async def _execute_start_method(self, start_method_name: str) -> None:
result = await self._execute_method(
@@ -352,6 +374,16 @@ class Flow(Generic[T], metaclass=FlowMeta):
async def _execute_single_listener(self, listener_name: str, result: Any) -> None:
try:
method = self._methods[listener_name]
self.event_emitter.send(
self,
event=MethodExecutionStartedEvent(
type="method_execution_started",
method_name=listener_name,
flow_name=self.__class__.__name__,
),
)
sig = inspect.signature(method)
params = list(sig.parameters.values())
@@ -367,6 +399,15 @@ class Flow(Generic[T], metaclass=FlowMeta):
# If listener does not expect parameters, call without arguments
listener_result = await self._execute_method(listener_name, method)
self.event_emitter.send(
self,
event=MethodExecutionFinishedEvent(
type="method_execution_finished",
method_name=listener_name,
flow_name=self.__class__.__name__,
),
)
# Execute listeners of this listener
await self._execute_listeners(listener_name, listener_result)
except Exception as e:

View File

@@ -0,0 +1,33 @@
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Optional
@dataclass
class Event:
type: str
flow_name: str
timestamp: datetime = field(init=False)
def __post_init__(self):
self.timestamp = datetime.now()
@dataclass
class FlowStartedEvent(Event):
pass
@dataclass
class MethodExecutionStartedEvent(Event):
method_name: str
@dataclass
class MethodExecutionFinishedEvent(Event):
method_name: str
@dataclass
class FlowFinishedEvent(Event):
result: Optional[Any] = None