Merge branch 'main' of github.com:crewAIInc/crewAI into better/event-emitter

This commit is contained in:
Lorenze Jay
2025-02-12 15:47:50 -08:00
3 changed files with 259 additions and 2 deletions

View File

@@ -1,4 +1,5 @@
import asyncio import asyncio
import copy
import inspect import inspect
import logging import logging
from typing import ( from typing import (
@@ -569,6 +570,9 @@ class Flow(Generic[T], metaclass=FlowMeta):
f"Initial state must be dict or BaseModel, got {type(self.initial_state)}" f"Initial state must be dict or BaseModel, got {type(self.initial_state)}"
) )
def _copy_state(self) -> T:
return copy.deepcopy(self._state)
@property @property
def state(self) -> T: def state(self) -> T:
return self._state return self._state
@@ -740,6 +744,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
FlowStarted( FlowStarted(
type="flow_started", type="flow_started",
flow_name=self.__class__.__name__, flow_name=self.__class__.__name__,
inputs=inputs,
), ),
) )
self._log_flow_event( self._log_flow_event(
@@ -803,6 +808,18 @@ class Flow(Generic[T], metaclass=FlowMeta):
async def _execute_method( async def _execute_method(
self, method_name: str, method: Callable, *args: Any, **kwargs: Any self, method_name: str, method: Callable, *args: Any, **kwargs: Any
) -> Any: ) -> Any:
dumped_params = {f"_{i}": arg for i, arg in enumerate(args)} | (kwargs or {})
self.event_emitter.send(
self,
event=MethodExecutionStartedEvent(
type="method_execution_started",
method_name=method_name,
flow_name=self.__class__.__name__,
params=dumped_params,
state=self._copy_state(),
),
)
result = ( result = (
await method(*args, **kwargs) await method(*args, **kwargs)
if asyncio.iscoroutinefunction(method) if asyncio.iscoroutinefunction(method)
@@ -812,6 +829,18 @@ class Flow(Generic[T], metaclass=FlowMeta):
self._method_execution_counts[method_name] = ( self._method_execution_counts[method_name] = (
self._method_execution_counts.get(method_name, 0) + 1 self._method_execution_counts.get(method_name, 0) + 1
) )
self.event_emitter.send(
self,
event=MethodExecutionFinishedEvent(
type="method_execution_finished",
method_name=method_name,
flow_name=self.__class__.__name__,
state=self._copy_state(),
result=result,
),
)
return result return result
async def _execute_listeners(self, trigger_method: str, result: Any) -> None: async def _execute_listeners(self, trigger_method: str, result: Any) -> None:

View File

@@ -1,6 +1,8 @@
from dataclasses import dataclass, field from dataclasses import dataclass, field
from datetime import datetime from datetime import datetime
from typing import Any, Optional from typing import Any, Dict, Optional, Union
from pydantic import BaseModel
@dataclass @dataclass
@@ -15,17 +17,21 @@ class Event:
@dataclass @dataclass
class FlowStartedEvent(Event): class FlowStartedEvent(Event):
pass inputs: Optional[Dict[str, Any]] = None
@dataclass @dataclass
class MethodExecutionStartedEvent(Event): class MethodExecutionStartedEvent(Event):
method_name: str method_name: str
state: Union[Dict[str, Any], BaseModel]
params: Optional[Dict[str, Any]] = None
@dataclass @dataclass
class MethodExecutionFinishedEvent(Event): class MethodExecutionFinishedEvent(Event):
method_name: str method_name: str
state: Union[Dict[str, Any], BaseModel]
result: Any = None
@dataclass @dataclass

View File

@@ -1,11 +1,18 @@
"""Test Flow creation and execution basic functionality.""" """Test Flow creation and execution basic functionality."""
import asyncio import asyncio
from datetime import datetime
import pytest import pytest
from pydantic import BaseModel from pydantic import BaseModel
from crewai.flow.flow import Flow, and_, listen, or_, router, start from crewai.flow.flow import Flow, and_, listen, or_, router, start
from crewai.flow.flow_events import (
FlowFinishedEvent,
FlowStartedEvent,
MethodExecutionFinishedEvent,
MethodExecutionStartedEvent,
)
def test_simple_sequential_flow(): def test_simple_sequential_flow():
@@ -398,3 +405,218 @@ def test_router_with_multiple_conditions():
# final_step should run after router_and # final_step should run after router_and
assert execution_order.index("log_final_step") > execution_order.index("router_and") assert execution_order.index("log_final_step") > execution_order.index("router_and")
def test_unstructured_flow_event_emission():
"""Test that the correct events are emitted during unstructured flow
execution with all fields validated."""
class PoemFlow(Flow):
@start()
def prepare_flower(self):
self.state["flower"] = "roses"
return "foo"
@start()
def prepare_color(self):
self.state["color"] = "red"
return "bar"
@listen(prepare_color)
def write_first_sentence(self):
return f"{self.state["flower"]} are {self.state["color"]}"
@listen(write_first_sentence)
def finish_poem(self, first_sentence):
separator = self.state.get("separator", "\n")
return separator.join([first_sentence, "violets are blue"])
@listen(finish_poem)
def save_poem_to_database(self):
# A method without args/kwargs to ensure events are sent correctly
pass
event_log = []
def handle_event(_, event):
event_log.append(event)
flow = PoemFlow()
flow.event_emitter.connect(handle_event)
flow.kickoff(inputs={"separator": ", "})
assert isinstance(event_log[0], FlowStartedEvent)
assert event_log[0].flow_name == "PoemFlow"
assert event_log[0].inputs == {"separator": ", "}
assert isinstance(event_log[0].timestamp, datetime)
# Asserting for concurrent start method executions in a for loop as you
# can't guarantee ordering in asynchronous executions
for i in range(1, 5):
event = event_log[i]
assert isinstance(event.state, dict)
assert isinstance(event.state["id"], str)
if event.method_name == "prepare_flower":
if isinstance(event, MethodExecutionStartedEvent):
assert event.params == {}
assert event.state["separator"] == ", "
elif isinstance(event, MethodExecutionFinishedEvent):
assert event.result == "foo"
assert event.state["flower"] == "roses"
assert event.state["separator"] == ", "
else:
assert False, "Unexpected event type for prepare_flower"
elif event.method_name == "prepare_color":
if isinstance(event, MethodExecutionStartedEvent):
assert event.params == {}
assert event.state["separator"] == ", "
elif isinstance(event, MethodExecutionFinishedEvent):
assert event.result == "bar"
assert event.state["color"] == "red"
assert event.state["separator"] == ", "
else:
assert False, "Unexpected event type for prepare_color"
else:
assert False, f"Unexpected method {event.method_name} in prepare events"
assert isinstance(event_log[5], MethodExecutionStartedEvent)
assert event_log[5].method_name == "write_first_sentence"
assert event_log[5].params == {}
assert isinstance(event_log[5].state, dict)
assert event_log[5].state["flower"] == "roses"
assert event_log[5].state["color"] == "red"
assert event_log[5].state["separator"] == ", "
assert isinstance(event_log[6], MethodExecutionFinishedEvent)
assert event_log[6].method_name == "write_first_sentence"
assert event_log[6].result == "roses are red"
assert isinstance(event_log[7], MethodExecutionStartedEvent)
assert event_log[7].method_name == "finish_poem"
assert event_log[7].params == {"_0": "roses are red"}
assert isinstance(event_log[7].state, dict)
assert event_log[7].state["flower"] == "roses"
assert event_log[7].state["color"] == "red"
assert isinstance(event_log[8], MethodExecutionFinishedEvent)
assert event_log[8].method_name == "finish_poem"
assert event_log[8].result == "roses are red, violets are blue"
assert isinstance(event_log[9], MethodExecutionStartedEvent)
assert event_log[9].method_name == "save_poem_to_database"
assert event_log[9].params == {}
assert isinstance(event_log[9].state, dict)
assert event_log[9].state["flower"] == "roses"
assert event_log[9].state["color"] == "red"
assert isinstance(event_log[10], MethodExecutionFinishedEvent)
assert event_log[10].method_name == "save_poem_to_database"
assert event_log[10].result is None
assert isinstance(event_log[11], FlowFinishedEvent)
assert event_log[11].flow_name == "PoemFlow"
assert event_log[11].result is None
assert isinstance(event_log[11].timestamp, datetime)
def test_structured_flow_event_emission():
"""Test that the correct events are emitted during structured flow
execution with all fields validated."""
class OnboardingState(BaseModel):
name: str = ""
sent: bool = False
class OnboardingFlow(Flow[OnboardingState]):
@start()
def user_signs_up(self):
self.state.sent = False
@listen(user_signs_up)
def send_welcome_message(self):
self.state.sent = True
return f"Welcome, {self.state.name}!"
event_log = []
def handle_event(_, event):
event_log.append(event)
flow = OnboardingFlow()
flow.event_emitter.connect(handle_event)
flow.kickoff(inputs={"name": "Anakin"})
assert isinstance(event_log[0], FlowStartedEvent)
assert event_log[0].flow_name == "OnboardingFlow"
assert event_log[0].inputs == {"name": "Anakin"}
assert isinstance(event_log[0].timestamp, datetime)
assert isinstance(event_log[1], MethodExecutionStartedEvent)
assert event_log[1].method_name == "user_signs_up"
assert isinstance(event_log[2], MethodExecutionFinishedEvent)
assert event_log[2].method_name == "user_signs_up"
assert isinstance(event_log[3], MethodExecutionStartedEvent)
assert event_log[3].method_name == "send_welcome_message"
assert event_log[3].params == {}
assert getattr(event_log[3].state, "sent") is False
assert isinstance(event_log[4], MethodExecutionFinishedEvent)
assert event_log[4].method_name == "send_welcome_message"
assert getattr(event_log[4].state, "sent") is True
assert event_log[4].result == "Welcome, Anakin!"
assert isinstance(event_log[5], FlowFinishedEvent)
assert event_log[5].flow_name == "OnboardingFlow"
assert event_log[5].result == "Welcome, Anakin!"
assert isinstance(event_log[5].timestamp, datetime)
def test_stateless_flow_event_emission():
"""Test that the correct events are emitted stateless during flow execution
with all fields validated."""
class StatelessFlow(Flow):
@start()
def init(self):
pass
@listen(init)
def process(self):
return "Deeds will not be less valiant because they are unpraised."
event_log = []
def handle_event(_, event):
event_log.append(event)
flow = StatelessFlow()
flow.event_emitter.connect(handle_event)
flow.kickoff()
assert isinstance(event_log[0], FlowStartedEvent)
assert event_log[0].flow_name == "StatelessFlow"
assert event_log[0].inputs is None
assert isinstance(event_log[0].timestamp, datetime)
assert isinstance(event_log[1], MethodExecutionStartedEvent)
assert event_log[1].method_name == "init"
assert isinstance(event_log[2], MethodExecutionFinishedEvent)
assert event_log[2].method_name == "init"
assert isinstance(event_log[3], MethodExecutionStartedEvent)
assert event_log[3].method_name == "process"
assert isinstance(event_log[4], MethodExecutionFinishedEvent)
assert event_log[4].method_name == "process"
assert isinstance(event_log[5], FlowFinishedEvent)
assert event_log[5].flow_name == "StatelessFlow"
assert (
event_log[5].result
== "Deeds will not be less valiant because they are unpraised."
)
assert isinstance(event_log[5].timestamp, datetime)