From 62a20426a5824996b4ea0da15f30fbd03656210c Mon Sep 17 00:00:00 2001 From: Lorenze Jay Date: Thu, 13 Feb 2025 10:54:58 -0800 Subject: [PATCH] Refactor Flow and Agent event handling to use event_bus - Remove `event_emitter` from Flow class and replace with `event_bus.emit()` - Update Flow and Agent tests to use event_bus event listeners - Remove redundant event emissions in Flow methods - Add debug print statements in Flow execution - Simplify event tracking in test cases --- src/crewai/flow/flow.py | 32 +----- tests/agent_test.py | 40 ++++--- tests/flow_test.py | 239 +++++++++++++++++++++------------------- 3 files changed, 153 insertions(+), 158 deletions(-) diff --git a/src/crewai/flow/flow.py b/src/crewai/flow/flow.py index 9b2a07540..83d76f8a7 100644 --- a/src/crewai/flow/flow.py +++ b/src/crewai/flow/flow.py @@ -436,7 +436,6 @@ class Flow(Generic[T], metaclass=FlowMeta): _routers: Set[str] = set() _router_paths: Dict[str, List[str]] = {} initial_state: Union[Type[T], T, None] = None - event_emitter = Signal("event_emitter") def __class_getitem__(cls: Type["Flow"], item: Type[T]) -> Type["Flow"]: class _FlowGeneric(cls): # type: ignore @@ -769,8 +768,10 @@ class Flow(Generic[T], metaclass=FlowMeta): for start_method in self._start_methods ] await asyncio.gather(*tasks) + print(f"All method outputs: {self._method_outputs}") # Debug log final_output = self._method_outputs[-1] if self._method_outputs else None + print("final_output", final_output) event_bus.emit( self, @@ -809,9 +810,9 @@ class Flow(Generic[T], metaclass=FlowMeta): self, method_name: str, method: Callable, *args: Any, **kwargs: Any ) -> Any: dumped_params = {f"_{i}": arg for i, arg in enumerate(args)} | (kwargs or {}) - self.event_emitter.send( + event_bus.emit( self, - event=MethodExecutionStartedEvent( + MethodExecutionStartedEvent( type="method_execution_started", method_name=method_name, flow_name=self.__class__.__name__, @@ -830,9 +831,9 @@ class Flow(Generic[T], metaclass=FlowMeta): self._method_execution_counts.get(method_name, 0) + 1 ) - self.event_emitter.send( + event_bus.emit( self, - event=MethodExecutionFinishedEvent( + MethodExecutionFinishedEvent( type="method_execution_finished", method_name=method_name, flow_name=self.__class__.__name__, @@ -980,16 +981,6 @@ class Flow(Generic[T], metaclass=FlowMeta): try: method = self._methods[listener_name] - event_bus.emit( - self, - MethodExecutionStartedEvent( - type="method_execution_started", - method_name=listener_name, - flow_name=self.__class__.__name__, - state=self._copy_state(), - ), - ) - sig = inspect.signature(method) params = list(sig.parameters.values()) method_params = [p for p in params if p.name != "self"] @@ -1001,17 +992,6 @@ class Flow(Generic[T], metaclass=FlowMeta): else: listener_result = await self._execute_method(listener_name, method) - event_bus.emit( - self, - MethodExecutionFinishedEvent( - type="method_execution_finished", - method_name=listener_name, - flow_name=self.__class__.__name__, - state=self._copy_state(), - result=listener_result, - ), - ) - # Execute listeners (and possibly routers) of this listener await self._execute_listeners(listener_name, listener_result) diff --git a/tests/agent_test.py b/tests/agent_test.py index e67a7454a..2301ad452 100644 --- a/tests/agent_test.py +++ b/tests/agent_test.py @@ -16,9 +16,9 @@ from crewai.llm import LLM from crewai.tools import tool from crewai.tools.tool_calling import InstructorToolCalling from crewai.tools.tool_usage import ToolUsage -from crewai.tools.tool_usage_events import ToolUsageFinished from crewai.utilities import RPMController -from crewai.utilities.events import Emitter +from crewai.utilities.events import event_bus +from crewai.utilities.events.tool_usage_events import ToolUsageFinishedEvent def test_agent_llm_creation_with_env_vars(): @@ -154,15 +154,20 @@ def test_agent_execution_with_tools(): agent=agent, expected_output="The result of the multiplication.", ) - with patch.object(Emitter, "emit") as emit: - output = agent.execute_task(task) - assert output == "The result of the multiplication is 12." - assert emit.call_count == 1 - args, _ = emit.call_args - assert isinstance(args[1], ToolUsageFinished) - assert not args[1].from_cache - assert args[1].tool_name == "multiplier" - assert args[1].tool_args == {"first_number": 3, "second_number": 4} + received_events = [] + + @event_bus.on(ToolUsageFinishedEvent) + def handle_tool_end(source, event): + received_events.append(event) + + # with patch.object(EventBus, "emit") as emit: + output = agent.execute_task(task) + assert output == "The result of the multiplication is 12." + + assert len(received_events) == 1 + assert isinstance(received_events[0], ToolUsageFinishedEvent) + assert received_events[0].tool_name == "multiplier" + assert received_events[0].tool_args == {"first_number": 3, "second_number": 4} @pytest.mark.vcr(filter_headers=["authorization"]) @@ -249,10 +254,14 @@ def test_cache_hitting(): "multiplier-{'first_number': 3, 'second_number': 3}": 9, "multiplier-{'first_number': 12, 'second_number': 3}": 36, } + received_events = [] + + @event_bus.on(ToolUsageFinishedEvent) + def handle_tool_end(source, event): + received_events.append(event) with ( patch.object(CacheHandler, "read") as read, - patch.object(Emitter, "emit") as emit, ): read.return_value = "0" task = Task( @@ -265,10 +274,9 @@ def test_cache_hitting(): read.assert_called_with( tool="multiplier", input={"first_number": 2, "second_number": 6} ) - assert emit.call_count == 1 - args, _ = emit.call_args - assert isinstance(args[1], ToolUsageFinished) - assert args[1].from_cache + assert len(received_events) == 1 + assert isinstance(received_events[0], ToolUsageFinishedEvent) + assert received_events[0].from_cache @pytest.mark.vcr(filter_headers=["authorization"]) diff --git a/tests/flow_test.py b/tests/flow_test.py index 9312b25dc..888026ea5 100644 --- a/tests/flow_test.py +++ b/tests/flow_test.py @@ -7,11 +7,12 @@ import pytest from pydantic import BaseModel from crewai.flow.flow import Flow, and_, listen, or_, router, start -from crewai.flow.flow_events import ( +from crewai.utilities.events import ( FlowFinishedEvent, FlowStartedEvent, MethodExecutionFinishedEvent, MethodExecutionStartedEvent, + event_bus, ) @@ -434,90 +435,65 @@ def test_unstructured_flow_event_emission(): @listen(finish_poem) def save_poem_to_database(self): # A method without args/kwargs to ensure events are sent correctly - pass - - event_log = [] - - def handle_event(_, event): - event_log.append(event) + return "roses are red\nviolets are blue" flow = PoemFlow() - flow.event_emitter.connect(handle_event) + received_events = [] + + @event_bus.on(FlowStartedEvent) + def handle_flow_start(source, event): + received_events.append(event) + + @event_bus.on(MethodExecutionStartedEvent) + def handle_method_start(source, event): + received_events.append(event) + + @event_bus.on(FlowFinishedEvent) + def handle_flow_end(source, event): + received_events.append(event) + flow.kickoff(inputs={"separator": ", "}) + assert isinstance(received_events[0], FlowStartedEvent) + assert received_events[0].flow_name == "PoemFlow" + assert received_events[0].inputs == {"separator": ", "} + assert isinstance(received_events[0].timestamp, datetime) - assert isinstance(event_log[0], FlowStartedEvent) - assert event_log[0].flow_name == "PoemFlow" - assert event_log[0].inputs == {"separator": ", "} - assert isinstance(event_log[0].timestamp, datetime) - - # Asserting for concurrent start method executions in a for loop as you - # can't guarantee ordering in asynchronous executions - for i in range(1, 5): - event = event_log[i] + # All subsequent events are MethodExecutionStartedEvent + for event in received_events[1:-1]: + assert isinstance(event, MethodExecutionStartedEvent) + assert event.flow_name == "PoemFlow" assert isinstance(event.state, dict) assert isinstance(event.state["id"], str) + assert event.state["separator"] == ", " - if event.method_name == "prepare_flower": - if isinstance(event, MethodExecutionStartedEvent): - assert event.params == {} - assert event.state["separator"] == ", " - elif isinstance(event, MethodExecutionFinishedEvent): - assert event.result == "foo" - assert event.state["flower"] == "roses" - assert event.state["separator"] == ", " - else: - assert False, "Unexpected event type for prepare_flower" - elif event.method_name == "prepare_color": - if isinstance(event, MethodExecutionStartedEvent): - assert event.params == {} - assert event.state["separator"] == ", " - elif isinstance(event, MethodExecutionFinishedEvent): - assert event.result == "bar" - assert event.state["color"] == "red" - assert event.state["separator"] == ", " - else: - assert False, "Unexpected event type for prepare_color" - else: - assert False, f"Unexpected method {event.method_name} in prepare events" + assert received_events[1].method_name == "prepare_flower" + assert received_events[1].params == {} + assert "flower" not in received_events[1].state - assert isinstance(event_log[5], MethodExecutionStartedEvent) - assert event_log[5].method_name == "write_first_sentence" - assert event_log[5].params == {} - assert isinstance(event_log[5].state, dict) - assert event_log[5].state["flower"] == "roses" - assert event_log[5].state["color"] == "red" - assert event_log[5].state["separator"] == ", " + assert received_events[2].method_name == "prepare_color" + assert received_events[2].params == {} + print("received_events[2]", received_events[2]) + assert "flower" in received_events[2].state - assert isinstance(event_log[6], MethodExecutionFinishedEvent) - assert event_log[6].method_name == "write_first_sentence" - assert event_log[6].result == "roses are red" + assert received_events[3].method_name == "write_first_sentence" + assert received_events[3].params == {} + assert received_events[3].state["flower"] == "roses" + assert received_events[3].state["color"] == "red" - assert isinstance(event_log[7], MethodExecutionStartedEvent) - assert event_log[7].method_name == "finish_poem" - assert event_log[7].params == {"_0": "roses are red"} - assert isinstance(event_log[7].state, dict) - assert event_log[7].state["flower"] == "roses" - assert event_log[7].state["color"] == "red" + assert received_events[4].method_name == "finish_poem" + assert received_events[4].params == {"_0": "roses are red"} + assert received_events[4].state["flower"] == "roses" + assert received_events[4].state["color"] == "red" - assert isinstance(event_log[8], MethodExecutionFinishedEvent) - assert event_log[8].method_name == "finish_poem" - assert event_log[8].result == "roses are red, violets are blue" + assert received_events[5].method_name == "save_poem_to_database" + assert received_events[5].params == {} + assert received_events[5].state["flower"] == "roses" + assert received_events[5].state["color"] == "red" - assert isinstance(event_log[9], MethodExecutionStartedEvent) - assert event_log[9].method_name == "save_poem_to_database" - assert event_log[9].params == {} - assert isinstance(event_log[9].state, dict) - assert event_log[9].state["flower"] == "roses" - assert event_log[9].state["color"] == "red" - - assert isinstance(event_log[10], MethodExecutionFinishedEvent) - assert event_log[10].method_name == "save_poem_to_database" - assert event_log[10].result is None - - assert isinstance(event_log[11], FlowFinishedEvent) - assert event_log[11].flow_name == "PoemFlow" - assert event_log[11].result is None - assert isinstance(event_log[11].timestamp, datetime) + assert isinstance(received_events[6], FlowFinishedEvent) + assert received_events[6].flow_name == "PoemFlow" + assert received_events[6].result == "roses are red\nviolets are blue" + assert isinstance(received_events[6].timestamp, datetime) def test_structured_flow_event_emission(): @@ -538,40 +514,54 @@ def test_structured_flow_event_emission(): self.state.sent = True return f"Welcome, {self.state.name}!" - event_log = [] - - def handle_event(_, event): - event_log.append(event) - flow = OnboardingFlow() - flow.event_emitter.connect(handle_event) flow.kickoff(inputs={"name": "Anakin"}) - assert isinstance(event_log[0], FlowStartedEvent) - assert event_log[0].flow_name == "OnboardingFlow" - assert event_log[0].inputs == {"name": "Anakin"} - assert isinstance(event_log[0].timestamp, datetime) + received_events = [] - assert isinstance(event_log[1], MethodExecutionStartedEvent) - assert event_log[1].method_name == "user_signs_up" + @event_bus.on(FlowStartedEvent) + def handle_flow_start(source, event): + received_events.append(event) - assert isinstance(event_log[2], MethodExecutionFinishedEvent) - assert event_log[2].method_name == "user_signs_up" + @event_bus.on(MethodExecutionStartedEvent) + def handle_method_start(source, event): + received_events.append(event) - assert isinstance(event_log[3], MethodExecutionStartedEvent) - assert event_log[3].method_name == "send_welcome_message" - assert event_log[3].params == {} - assert getattr(event_log[3].state, "sent") is False + @event_bus.on(MethodExecutionFinishedEvent) + def handle_method_end(source, event): + received_events.append(event) - assert isinstance(event_log[4], MethodExecutionFinishedEvent) - assert event_log[4].method_name == "send_welcome_message" - assert getattr(event_log[4].state, "sent") is True - assert event_log[4].result == "Welcome, Anakin!" + @event_bus.on(FlowFinishedEvent) + def handle_flow_end(source, event): + received_events.append(event) - assert isinstance(event_log[5], FlowFinishedEvent) - assert event_log[5].flow_name == "OnboardingFlow" - assert event_log[5].result == "Welcome, Anakin!" - assert isinstance(event_log[5].timestamp, datetime) + flow.kickoff(inputs={"name": "Anakin"}) + + assert isinstance(received_events[0], FlowStartedEvent) + assert received_events[0].flow_name == "OnboardingFlow" + assert received_events[0].inputs == {"name": "Anakin"} + assert isinstance(received_events[0].timestamp, datetime) + + assert isinstance(received_events[1], MethodExecutionStartedEvent) + assert received_events[1].method_name == "user_signs_up" + + assert isinstance(received_events[2], MethodExecutionFinishedEvent) + assert received_events[2].method_name == "user_signs_up" + + assert isinstance(received_events[3], MethodExecutionStartedEvent) + assert received_events[3].method_name == "send_welcome_message" + assert received_events[3].params == {} + assert getattr(received_events[3].state, "sent") is False + + assert isinstance(received_events[4], MethodExecutionFinishedEvent) + assert received_events[4].method_name == "send_welcome_message" + assert getattr(received_events[4].state, "sent") is True + assert received_events[4].result == "Welcome, Anakin!" + + assert isinstance(received_events[5], FlowFinishedEvent) + assert received_events[5].flow_name == "OnboardingFlow" + assert received_events[5].result == "Welcome, Anakin!" + assert isinstance(received_events[5].timestamp, datetime) def test_stateless_flow_event_emission(): @@ -593,30 +583,47 @@ def test_stateless_flow_event_emission(): event_log.append(event) flow = StatelessFlow() - flow.event_emitter.connect(handle_event) + received_events = [] + + @event_bus.on(FlowStartedEvent) + def handle_flow_start(source, event): + received_events.append(event) + + @event_bus.on(MethodExecutionStartedEvent) + def handle_method_start(source, event): + received_events.append(event) + + @event_bus.on(MethodExecutionFinishedEvent) + def handle_method_end(source, event): + received_events.append(event) + + @event_bus.on(FlowFinishedEvent) + def handle_flow_end(source, event): + received_events.append(event) + flow.kickoff() - assert isinstance(event_log[0], FlowStartedEvent) - assert event_log[0].flow_name == "StatelessFlow" - assert event_log[0].inputs is None - assert isinstance(event_log[0].timestamp, datetime) + assert isinstance(received_events[0], FlowStartedEvent) + assert received_events[0].flow_name == "StatelessFlow" + assert received_events[0].inputs is None + assert isinstance(received_events[0].timestamp, datetime) - assert isinstance(event_log[1], MethodExecutionStartedEvent) - assert event_log[1].method_name == "init" + assert isinstance(received_events[1], MethodExecutionStartedEvent) + assert received_events[1].method_name == "init" - assert isinstance(event_log[2], MethodExecutionFinishedEvent) - assert event_log[2].method_name == "init" + assert isinstance(received_events[2], MethodExecutionFinishedEvent) + assert received_events[2].method_name == "init" - assert isinstance(event_log[3], MethodExecutionStartedEvent) - assert event_log[3].method_name == "process" + assert isinstance(received_events[3], MethodExecutionStartedEvent) + assert received_events[3].method_name == "process" - assert isinstance(event_log[4], MethodExecutionFinishedEvent) - assert event_log[4].method_name == "process" + assert isinstance(received_events[4], MethodExecutionFinishedEvent) + assert received_events[4].method_name == "process" - assert isinstance(event_log[5], FlowFinishedEvent) - assert event_log[5].flow_name == "StatelessFlow" + assert isinstance(received_events[5], FlowFinishedEvent) + assert received_events[5].flow_name == "StatelessFlow" assert ( - event_log[5].result + received_events[5].result == "Deeds will not be less valiant because they are unpraised." ) - assert isinstance(event_log[5].timestamp, datetime) + assert isinstance(received_events[5].timestamp, datetime)