Refactor Flow and Agent event handling to use event_bus

- Remove `event_emitter` from Flow class and replace with `event_bus.emit()`
- Update Flow and Agent tests to use event_bus event listeners
- Remove redundant event emissions in Flow methods
- Add debug print statements in Flow execution
- Simplify event tracking in test cases
This commit is contained in:
Lorenze Jay
2025-02-13 10:54:58 -08:00
parent 097ed1f0df
commit 62a20426a5
3 changed files with 153 additions and 158 deletions

View File

@@ -436,7 +436,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
_routers: Set[str] = set() _routers: Set[str] = set()
_router_paths: Dict[str, List[str]] = {} _router_paths: Dict[str, List[str]] = {}
initial_state: Union[Type[T], T, None] = None initial_state: Union[Type[T], T, None] = None
event_emitter = Signal("event_emitter")
def __class_getitem__(cls: Type["Flow"], item: Type[T]) -> Type["Flow"]: def __class_getitem__(cls: Type["Flow"], item: Type[T]) -> Type["Flow"]:
class _FlowGeneric(cls): # type: ignore class _FlowGeneric(cls): # type: ignore
@@ -769,8 +768,10 @@ class Flow(Generic[T], metaclass=FlowMeta):
for start_method in self._start_methods for start_method in self._start_methods
] ]
await asyncio.gather(*tasks) await asyncio.gather(*tasks)
print(f"All method outputs: {self._method_outputs}") # Debug log
final_output = self._method_outputs[-1] if self._method_outputs else None final_output = self._method_outputs[-1] if self._method_outputs else None
print("final_output", final_output)
event_bus.emit( event_bus.emit(
self, self,
@@ -809,9 +810,9 @@ class Flow(Generic[T], metaclass=FlowMeta):
self, method_name: str, method: Callable, *args: Any, **kwargs: Any self, method_name: str, method: Callable, *args: Any, **kwargs: Any
) -> Any: ) -> Any:
dumped_params = {f"_{i}": arg for i, arg in enumerate(args)} | (kwargs or {}) dumped_params = {f"_{i}": arg for i, arg in enumerate(args)} | (kwargs or {})
self.event_emitter.send( event_bus.emit(
self, self,
event=MethodExecutionStartedEvent( MethodExecutionStartedEvent(
type="method_execution_started", type="method_execution_started",
method_name=method_name, method_name=method_name,
flow_name=self.__class__.__name__, flow_name=self.__class__.__name__,
@@ -830,9 +831,9 @@ class Flow(Generic[T], metaclass=FlowMeta):
self._method_execution_counts.get(method_name, 0) + 1 self._method_execution_counts.get(method_name, 0) + 1
) )
self.event_emitter.send( event_bus.emit(
self, self,
event=MethodExecutionFinishedEvent( MethodExecutionFinishedEvent(
type="method_execution_finished", type="method_execution_finished",
method_name=method_name, method_name=method_name,
flow_name=self.__class__.__name__, flow_name=self.__class__.__name__,
@@ -980,16 +981,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
try: try:
method = self._methods[listener_name] method = self._methods[listener_name]
event_bus.emit(
self,
MethodExecutionStartedEvent(
type="method_execution_started",
method_name=listener_name,
flow_name=self.__class__.__name__,
state=self._copy_state(),
),
)
sig = inspect.signature(method) sig = inspect.signature(method)
params = list(sig.parameters.values()) params = list(sig.parameters.values())
method_params = [p for p in params if p.name != "self"] method_params = [p for p in params if p.name != "self"]
@@ -1001,17 +992,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
else: else:
listener_result = await self._execute_method(listener_name, method) listener_result = await self._execute_method(listener_name, method)
event_bus.emit(
self,
MethodExecutionFinishedEvent(
type="method_execution_finished",
method_name=listener_name,
flow_name=self.__class__.__name__,
state=self._copy_state(),
result=listener_result,
),
)
# Execute listeners (and possibly routers) of this listener # Execute listeners (and possibly routers) of this listener
await self._execute_listeners(listener_name, listener_result) await self._execute_listeners(listener_name, listener_result)

View File

@@ -16,9 +16,9 @@ from crewai.llm import LLM
from crewai.tools import tool from crewai.tools import tool
from crewai.tools.tool_calling import InstructorToolCalling from crewai.tools.tool_calling import InstructorToolCalling
from crewai.tools.tool_usage import ToolUsage from crewai.tools.tool_usage import ToolUsage
from crewai.tools.tool_usage_events import ToolUsageFinished
from crewai.utilities import RPMController from crewai.utilities import RPMController
from crewai.utilities.events import Emitter from crewai.utilities.events import event_bus
from crewai.utilities.events.tool_usage_events import ToolUsageFinishedEvent
def test_agent_llm_creation_with_env_vars(): def test_agent_llm_creation_with_env_vars():
@@ -154,15 +154,20 @@ def test_agent_execution_with_tools():
agent=agent, agent=agent,
expected_output="The result of the multiplication.", expected_output="The result of the multiplication.",
) )
with patch.object(Emitter, "emit") as emit: received_events = []
output = agent.execute_task(task)
assert output == "The result of the multiplication is 12." @event_bus.on(ToolUsageFinishedEvent)
assert emit.call_count == 1 def handle_tool_end(source, event):
args, _ = emit.call_args received_events.append(event)
assert isinstance(args[1], ToolUsageFinished)
assert not args[1].from_cache # with patch.object(EventBus, "emit") as emit:
assert args[1].tool_name == "multiplier" output = agent.execute_task(task)
assert args[1].tool_args == {"first_number": 3, "second_number": 4} assert output == "The result of the multiplication is 12."
assert len(received_events) == 1
assert isinstance(received_events[0], ToolUsageFinishedEvent)
assert received_events[0].tool_name == "multiplier"
assert received_events[0].tool_args == {"first_number": 3, "second_number": 4}
@pytest.mark.vcr(filter_headers=["authorization"]) @pytest.mark.vcr(filter_headers=["authorization"])
@@ -249,10 +254,14 @@ def test_cache_hitting():
"multiplier-{'first_number': 3, 'second_number': 3}": 9, "multiplier-{'first_number': 3, 'second_number': 3}": 9,
"multiplier-{'first_number': 12, 'second_number': 3}": 36, "multiplier-{'first_number': 12, 'second_number': 3}": 36,
} }
received_events = []
@event_bus.on(ToolUsageFinishedEvent)
def handle_tool_end(source, event):
received_events.append(event)
with ( with (
patch.object(CacheHandler, "read") as read, patch.object(CacheHandler, "read") as read,
patch.object(Emitter, "emit") as emit,
): ):
read.return_value = "0" read.return_value = "0"
task = Task( task = Task(
@@ -265,10 +274,9 @@ def test_cache_hitting():
read.assert_called_with( read.assert_called_with(
tool="multiplier", input={"first_number": 2, "second_number": 6} tool="multiplier", input={"first_number": 2, "second_number": 6}
) )
assert emit.call_count == 1 assert len(received_events) == 1
args, _ = emit.call_args assert isinstance(received_events[0], ToolUsageFinishedEvent)
assert isinstance(args[1], ToolUsageFinished) assert received_events[0].from_cache
assert args[1].from_cache
@pytest.mark.vcr(filter_headers=["authorization"]) @pytest.mark.vcr(filter_headers=["authorization"])

View File

@@ -7,11 +7,12 @@ import pytest
from pydantic import BaseModel from pydantic import BaseModel
from crewai.flow.flow import Flow, and_, listen, or_, router, start from crewai.flow.flow import Flow, and_, listen, or_, router, start
from crewai.flow.flow_events import ( from crewai.utilities.events import (
FlowFinishedEvent, FlowFinishedEvent,
FlowStartedEvent, FlowStartedEvent,
MethodExecutionFinishedEvent, MethodExecutionFinishedEvent,
MethodExecutionStartedEvent, MethodExecutionStartedEvent,
event_bus,
) )
@@ -434,90 +435,65 @@ def test_unstructured_flow_event_emission():
@listen(finish_poem) @listen(finish_poem)
def save_poem_to_database(self): def save_poem_to_database(self):
# A method without args/kwargs to ensure events are sent correctly # A method without args/kwargs to ensure events are sent correctly
pass return "roses are red\nviolets are blue"
event_log = []
def handle_event(_, event):
event_log.append(event)
flow = PoemFlow() flow = PoemFlow()
flow.event_emitter.connect(handle_event) received_events = []
@event_bus.on(FlowStartedEvent)
def handle_flow_start(source, event):
received_events.append(event)
@event_bus.on(MethodExecutionStartedEvent)
def handle_method_start(source, event):
received_events.append(event)
@event_bus.on(FlowFinishedEvent)
def handle_flow_end(source, event):
received_events.append(event)
flow.kickoff(inputs={"separator": ", "}) flow.kickoff(inputs={"separator": ", "})
assert isinstance(received_events[0], FlowStartedEvent)
assert received_events[0].flow_name == "PoemFlow"
assert received_events[0].inputs == {"separator": ", "}
assert isinstance(received_events[0].timestamp, datetime)
assert isinstance(event_log[0], FlowStartedEvent) # All subsequent events are MethodExecutionStartedEvent
assert event_log[0].flow_name == "PoemFlow" for event in received_events[1:-1]:
assert event_log[0].inputs == {"separator": ", "} assert isinstance(event, MethodExecutionStartedEvent)
assert isinstance(event_log[0].timestamp, datetime) assert event.flow_name == "PoemFlow"
# Asserting for concurrent start method executions in a for loop as you
# can't guarantee ordering in asynchronous executions
for i in range(1, 5):
event = event_log[i]
assert isinstance(event.state, dict) assert isinstance(event.state, dict)
assert isinstance(event.state["id"], str) assert isinstance(event.state["id"], str)
assert event.state["separator"] == ", "
if event.method_name == "prepare_flower": assert received_events[1].method_name == "prepare_flower"
if isinstance(event, MethodExecutionStartedEvent): assert received_events[1].params == {}
assert event.params == {} assert "flower" not in received_events[1].state
assert event.state["separator"] == ", "
elif isinstance(event, MethodExecutionFinishedEvent):
assert event.result == "foo"
assert event.state["flower"] == "roses"
assert event.state["separator"] == ", "
else:
assert False, "Unexpected event type for prepare_flower"
elif event.method_name == "prepare_color":
if isinstance(event, MethodExecutionStartedEvent):
assert event.params == {}
assert event.state["separator"] == ", "
elif isinstance(event, MethodExecutionFinishedEvent):
assert event.result == "bar"
assert event.state["color"] == "red"
assert event.state["separator"] == ", "
else:
assert False, "Unexpected event type for prepare_color"
else:
assert False, f"Unexpected method {event.method_name} in prepare events"
assert isinstance(event_log[5], MethodExecutionStartedEvent) assert received_events[2].method_name == "prepare_color"
assert event_log[5].method_name == "write_first_sentence" assert received_events[2].params == {}
assert event_log[5].params == {} print("received_events[2]", received_events[2])
assert isinstance(event_log[5].state, dict) assert "flower" in received_events[2].state
assert event_log[5].state["flower"] == "roses"
assert event_log[5].state["color"] == "red"
assert event_log[5].state["separator"] == ", "
assert isinstance(event_log[6], MethodExecutionFinishedEvent) assert received_events[3].method_name == "write_first_sentence"
assert event_log[6].method_name == "write_first_sentence" assert received_events[3].params == {}
assert event_log[6].result == "roses are red" assert received_events[3].state["flower"] == "roses"
assert received_events[3].state["color"] == "red"
assert isinstance(event_log[7], MethodExecutionStartedEvent) assert received_events[4].method_name == "finish_poem"
assert event_log[7].method_name == "finish_poem" assert received_events[4].params == {"_0": "roses are red"}
assert event_log[7].params == {"_0": "roses are red"} assert received_events[4].state["flower"] == "roses"
assert isinstance(event_log[7].state, dict) assert received_events[4].state["color"] == "red"
assert event_log[7].state["flower"] == "roses"
assert event_log[7].state["color"] == "red"
assert isinstance(event_log[8], MethodExecutionFinishedEvent) assert received_events[5].method_name == "save_poem_to_database"
assert event_log[8].method_name == "finish_poem" assert received_events[5].params == {}
assert event_log[8].result == "roses are red, violets are blue" assert received_events[5].state["flower"] == "roses"
assert received_events[5].state["color"] == "red"
assert isinstance(event_log[9], MethodExecutionStartedEvent) assert isinstance(received_events[6], FlowFinishedEvent)
assert event_log[9].method_name == "save_poem_to_database" assert received_events[6].flow_name == "PoemFlow"
assert event_log[9].params == {} assert received_events[6].result == "roses are red\nviolets are blue"
assert isinstance(event_log[9].state, dict) assert isinstance(received_events[6].timestamp, datetime)
assert event_log[9].state["flower"] == "roses"
assert event_log[9].state["color"] == "red"
assert isinstance(event_log[10], MethodExecutionFinishedEvent)
assert event_log[10].method_name == "save_poem_to_database"
assert event_log[10].result is None
assert isinstance(event_log[11], FlowFinishedEvent)
assert event_log[11].flow_name == "PoemFlow"
assert event_log[11].result is None
assert isinstance(event_log[11].timestamp, datetime)
def test_structured_flow_event_emission(): def test_structured_flow_event_emission():
@@ -538,40 +514,54 @@ def test_structured_flow_event_emission():
self.state.sent = True self.state.sent = True
return f"Welcome, {self.state.name}!" return f"Welcome, {self.state.name}!"
event_log = []
def handle_event(_, event):
event_log.append(event)
flow = OnboardingFlow() flow = OnboardingFlow()
flow.event_emitter.connect(handle_event)
flow.kickoff(inputs={"name": "Anakin"}) flow.kickoff(inputs={"name": "Anakin"})
assert isinstance(event_log[0], FlowStartedEvent) received_events = []
assert event_log[0].flow_name == "OnboardingFlow"
assert event_log[0].inputs == {"name": "Anakin"}
assert isinstance(event_log[0].timestamp, datetime)
assert isinstance(event_log[1], MethodExecutionStartedEvent) @event_bus.on(FlowStartedEvent)
assert event_log[1].method_name == "user_signs_up" def handle_flow_start(source, event):
received_events.append(event)
assert isinstance(event_log[2], MethodExecutionFinishedEvent) @event_bus.on(MethodExecutionStartedEvent)
assert event_log[2].method_name == "user_signs_up" def handle_method_start(source, event):
received_events.append(event)
assert isinstance(event_log[3], MethodExecutionStartedEvent) @event_bus.on(MethodExecutionFinishedEvent)
assert event_log[3].method_name == "send_welcome_message" def handle_method_end(source, event):
assert event_log[3].params == {} received_events.append(event)
assert getattr(event_log[3].state, "sent") is False
assert isinstance(event_log[4], MethodExecutionFinishedEvent) @event_bus.on(FlowFinishedEvent)
assert event_log[4].method_name == "send_welcome_message" def handle_flow_end(source, event):
assert getattr(event_log[4].state, "sent") is True received_events.append(event)
assert event_log[4].result == "Welcome, Anakin!"
assert isinstance(event_log[5], FlowFinishedEvent) flow.kickoff(inputs={"name": "Anakin"})
assert event_log[5].flow_name == "OnboardingFlow"
assert event_log[5].result == "Welcome, Anakin!" assert isinstance(received_events[0], FlowStartedEvent)
assert isinstance(event_log[5].timestamp, datetime) assert received_events[0].flow_name == "OnboardingFlow"
assert received_events[0].inputs == {"name": "Anakin"}
assert isinstance(received_events[0].timestamp, datetime)
assert isinstance(received_events[1], MethodExecutionStartedEvent)
assert received_events[1].method_name == "user_signs_up"
assert isinstance(received_events[2], MethodExecutionFinishedEvent)
assert received_events[2].method_name == "user_signs_up"
assert isinstance(received_events[3], MethodExecutionStartedEvent)
assert received_events[3].method_name == "send_welcome_message"
assert received_events[3].params == {}
assert getattr(received_events[3].state, "sent") is False
assert isinstance(received_events[4], MethodExecutionFinishedEvent)
assert received_events[4].method_name == "send_welcome_message"
assert getattr(received_events[4].state, "sent") is True
assert received_events[4].result == "Welcome, Anakin!"
assert isinstance(received_events[5], FlowFinishedEvent)
assert received_events[5].flow_name == "OnboardingFlow"
assert received_events[5].result == "Welcome, Anakin!"
assert isinstance(received_events[5].timestamp, datetime)
def test_stateless_flow_event_emission(): def test_stateless_flow_event_emission():
@@ -593,30 +583,47 @@ def test_stateless_flow_event_emission():
event_log.append(event) event_log.append(event)
flow = StatelessFlow() flow = StatelessFlow()
flow.event_emitter.connect(handle_event) received_events = []
@event_bus.on(FlowStartedEvent)
def handle_flow_start(source, event):
received_events.append(event)
@event_bus.on(MethodExecutionStartedEvent)
def handle_method_start(source, event):
received_events.append(event)
@event_bus.on(MethodExecutionFinishedEvent)
def handle_method_end(source, event):
received_events.append(event)
@event_bus.on(FlowFinishedEvent)
def handle_flow_end(source, event):
received_events.append(event)
flow.kickoff() flow.kickoff()
assert isinstance(event_log[0], FlowStartedEvent) assert isinstance(received_events[0], FlowStartedEvent)
assert event_log[0].flow_name == "StatelessFlow" assert received_events[0].flow_name == "StatelessFlow"
assert event_log[0].inputs is None assert received_events[0].inputs is None
assert isinstance(event_log[0].timestamp, datetime) assert isinstance(received_events[0].timestamp, datetime)
assert isinstance(event_log[1], MethodExecutionStartedEvent) assert isinstance(received_events[1], MethodExecutionStartedEvent)
assert event_log[1].method_name == "init" assert received_events[1].method_name == "init"
assert isinstance(event_log[2], MethodExecutionFinishedEvent) assert isinstance(received_events[2], MethodExecutionFinishedEvent)
assert event_log[2].method_name == "init" assert received_events[2].method_name == "init"
assert isinstance(event_log[3], MethodExecutionStartedEvent) assert isinstance(received_events[3], MethodExecutionStartedEvent)
assert event_log[3].method_name == "process" assert received_events[3].method_name == "process"
assert isinstance(event_log[4], MethodExecutionFinishedEvent) assert isinstance(received_events[4], MethodExecutionFinishedEvent)
assert event_log[4].method_name == "process" assert received_events[4].method_name == "process"
assert isinstance(event_log[5], FlowFinishedEvent) assert isinstance(received_events[5], FlowFinishedEvent)
assert event_log[5].flow_name == "StatelessFlow" assert received_events[5].flow_name == "StatelessFlow"
assert ( assert (
event_log[5].result received_events[5].result
== "Deeds will not be less valiant because they are unpraised." == "Deeds will not be less valiant because they are unpraised."
) )
assert isinstance(event_log[5].timestamp, datetime) assert isinstance(received_events[5].timestamp, datetime)