From bc3fd789d90bd149e088117c01dc4519adf63aef Mon Sep 17 00:00:00 2001 From: Brandon Hancock Date: Tue, 10 Dec 2024 16:22:01 -0500 Subject: [PATCH] include event emitter in flows --- pyproject.toml | 1 + src/crewai/agents/crew_agent_executor.py | 14 +++++- src/crewai/flow/flow.py | 62 +++++++++++++++++++++--- src/crewai/flow/flow_events.py | 33 +++++++++++++ uv.lock | 11 +++++ 5 files changed, 113 insertions(+), 8 deletions(-) create mode 100644 src/crewai/flow/flow_events.py diff --git a/pyproject.toml b/pyproject.toml index ea684cec6..f90110e85 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,6 +29,7 @@ dependencies = [ "chromadb>=0.5.18", "pdfplumber>=0.11.4", "openpyxl>=3.1.5", + "blinker>=1.9.0", ] [project.urls] diff --git a/src/crewai/agents/crew_agent_executor.py b/src/crewai/agents/crew_agent_executor.py index 83ca06e41..b59466b17 100644 --- a/src/crewai/agents/crew_agent_executor.py +++ b/src/crewai/agents/crew_agent_executor.py @@ -87,15 +87,20 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): self.llm.stop = self.stop def invoke(self, inputs: Dict[str, str]) -> Dict[str, Any]: + print("prompt: ", self.prompt) + print("inputs: ", inputs) if "system" in self.prompt: system_prompt = self._format_prompt(self.prompt.get("system", ""), inputs) + print("system_prompt: ", system_prompt) user_prompt = self._format_prompt(self.prompt.get("user", ""), inputs) + print("user_prompt: ", user_prompt) self.messages.append(self._format_msg(system_prompt, role="system")) self.messages.append(self._format_msg(user_prompt)) else: user_prompt = self._format_prompt(self.prompt.get("prompt", ""), inputs) self.messages.append(self._format_msg(user_prompt)) + print("total messages at invoke: ", len(self.messages)) self._show_start_logs() self.ask_for_human_input = bool(inputs.get("ask_for_human_input", False)) @@ -144,7 +149,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): formatted_answer ) if self.step_callback: - self.step_callback(tool_result) + self.step_callback(tool_result) formatted_answer.text += f"\nObservation: {tool_result.result}" formatted_answer.result = tool_result.result @@ -412,6 +417,10 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): AgentFinish: The final output after incorporating human feedback. """ while self.ask_for_human_input: + print("Messages at human feedback:") + for idx, message in enumerate(self.messages, start=1): + print(f"Message {idx}: {message}") + print("Total messages at human feedback: ", len(self.messages)) human_feedback = self._ask_human_input(formatted_answer.output) print("Human feedback: ", human_feedback) @@ -464,6 +473,9 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): self.ask_for_human_input = True # Add human feedback to messages self.messages.append(self._format_msg(f"Feedback: {human_feedback}")) + print("Messages after human feedback:") + for idx, message in enumerate(self.messages, start=1): + print(f"Message {idx}: {message}") # Invoke the loop again with updated messages formatted_answer = self._invoke_loop() diff --git a/src/crewai/flow/flow.py b/src/crewai/flow/flow.py index fa0902594..4b17c1f11 100644 --- a/src/crewai/flow/flow.py +++ b/src/crewai/flow/flow.py @@ -1,5 +1,6 @@ import asyncio import inspect +from datetime import datetime from typing import ( Any, Callable, @@ -14,8 +15,15 @@ from typing import ( cast, ) +from blinker import Signal from pydantic import BaseModel, ValidationError +from crewai.flow.flow_events import ( + FlowFinishedEvent, + FlowStartedEvent, + MethodExecutionFinishedEvent, + MethodExecutionStartedEvent, +) from crewai.flow.flow_visualizer import plot_flow from crewai.flow.utils import get_possible_return_constants from crewai.telemetry import Telemetry @@ -160,6 +168,9 @@ class Flow(Generic[T], metaclass=FlowMeta): _router_paths: Dict[str, List[str]] = {} initial_state: Union[Type[T], T, None] = None + # Define a single event emitter signal + event_emitter = Signal("event_emitter") + def __class_getitem__(cls: Type["Flow"], item: Type[T]) -> Type["Flow"]: class _FlowGeneric(cls): # type: ignore _initial_state_T = item # type: ignore @@ -253,6 +264,15 @@ class Flow(Generic[T], metaclass=FlowMeta): Returns: The final output from the flow execution. """ + # Emit flow_started event + self.event_emitter.send( + self, + event=FlowStartedEvent( + type="flow_started", + flow_name=self.__class__.__name__, + ), + ) + if inputs is not None: self._initialize_state(inputs) return asyncio.run(self.kickoff_async()) @@ -267,8 +287,6 @@ class Flow(Generic[T], metaclass=FlowMeta): Returns: The final output from the flow execution. """ - if inputs is not None: - self._initialize_state(inputs) if not self._start_methods: raise ValueError("No start method defined") @@ -285,11 +303,20 @@ class Flow(Generic[T], metaclass=FlowMeta): # Run all start methods concurrently await asyncio.gather(*tasks) - # Return the final output (from the last executed method) - if self._method_outputs: - return self._method_outputs[-1] - else: - return None # Or raise an exception if no methods were executed + # Determine the final output (from the last executed method) + final_output = self._method_outputs[-1] if self._method_outputs else None + + # Emit flow_finished event + self.event_emitter.send( + self, + event=FlowFinishedEvent( + type="flow_finished", + flow_name=self.__class__.__name__, + result=final_output, + ), + ) + + return final_output async def _execute_start_method(self, start_method_name: str) -> None: result = await self._execute_method( @@ -352,6 +379,17 @@ class Flow(Generic[T], metaclass=FlowMeta): async def _execute_single_listener(self, listener_name: str, result: Any) -> None: try: method = self._methods[listener_name] + + # Emit method_execution_started event + self.event_emitter.send( + self, + event=MethodExecutionStartedEvent( + type="method_execution_started", + method_name=listener_name, + flow_name=self.__class__.__name__, + ), + ) + sig = inspect.signature(method) params = list(sig.parameters.values()) @@ -367,6 +405,16 @@ class Flow(Generic[T], metaclass=FlowMeta): # If listener does not expect parameters, call without arguments listener_result = await self._execute_method(listener_name, method) + # Emit method_execution_finished event + self.event_emitter.send( + self, + event=MethodExecutionFinishedEvent( + type="method_execution_finished", + method_name=listener_name, + flow_name=self.__class__.__name__, + ), + ) + # Execute listeners of this listener await self._execute_listeners(listener_name, listener_result) except Exception as e: diff --git a/src/crewai/flow/flow_events.py b/src/crewai/flow/flow_events.py new file mode 100644 index 000000000..068005ebe --- /dev/null +++ b/src/crewai/flow/flow_events.py @@ -0,0 +1,33 @@ +from dataclasses import dataclass, field +from datetime import datetime +from typing import Any, Optional + + +@dataclass +class Event: + type: str + flow_name: str + timestamp: datetime = field(init=False) + + def __post_init__(self): + self.timestamp = datetime.now() + + +@dataclass +class FlowStartedEvent(Event): + pass + + +@dataclass +class MethodExecutionStartedEvent(Event): + method_name: str + + +@dataclass +class MethodExecutionFinishedEvent(Event): + method_name: str + + +@dataclass +class FlowFinishedEvent(Event): + result: Optional[Any] = None diff --git a/uv.lock b/uv.lock index 26be5064d..9fa1a274c 100644 --- a/uv.lock +++ b/uv.lock @@ -272,6 +272,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b1/fe/e8c672695b37eecc5cbf43e1d0638d88d66ba3a44c4d321c796f4e59167f/beautifulsoup4-4.12.3-py3-none-any.whl", hash = "sha256:b80878c9f40111313e55da8ba20bdba06d8fa3969fc68304167741bbf9e082ed", size = 147925 }, ] +[[package]] +name = "blinker" +version = "1.9.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/21/28/9b3f50ce0e048515135495f198351908d99540d69bfdc8c1d15b73dc55ce/blinker-1.9.0.tar.gz", hash = "sha256:b4ce2265a7abece45e7cc896e98dbebe6cead56bcf805a3d23136d145f5445bf", size = 22460 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/10/cb/f2ad4230dc2eb1a74edf38f1a38b9b52277f75bef262d8908e60d957e13c/blinker-1.9.0-py3-none-any.whl", hash = "sha256:ba0efaa9080b619ff2f3459d1d500c57bddea4a6b424b60a91141db6fd2f08bc", size = 8458 }, +] + [[package]] name = "build" version = "1.2.2.post1" @@ -568,6 +577,7 @@ source = { editable = "." } dependencies = [ { name = "appdirs" }, { name = "auth0-python" }, + { name = "blinker" }, { name = "chromadb" }, { name = "click" }, { name = "instructor" }, @@ -637,6 +647,7 @@ requires-dist = [ { name = "agentops", marker = "extra == 'agentops'", specifier = ">=0.3.0" }, { name = "appdirs", specifier = ">=1.4.4" }, { name = "auth0-python", specifier = ">=4.7.1" }, + { name = "blinker", specifier = ">=1.9.0" }, { name = "chromadb", specifier = ">=0.5.18" }, { name = "click", specifier = ">=8.1.7" }, { name = "crewai-tools", marker = "extra == 'tools'", specifier = ">=0.14.0" },