diff --git a/pyproject.toml b/pyproject.toml index ea684cec6..f90110e85 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,6 +29,7 @@ dependencies = [ "chromadb>=0.5.18", "pdfplumber>=0.11.4", "openpyxl>=3.1.5", + "blinker>=1.9.0", ] [project.urls] diff --git a/src/crewai/agents/crew_agent_executor.py b/src/crewai/agents/crew_agent_executor.py index 83ca06e41..eb0ff7c5a 100644 --- a/src/crewai/agents/crew_agent_executor.py +++ b/src/crewai/agents/crew_agent_executor.py @@ -144,7 +144,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): formatted_answer ) if self.step_callback: - self.step_callback(tool_result) + self.step_callback(tool_result) formatted_answer.text += f"\nObservation: {tool_result.result}" formatted_answer.result = tool_result.result @@ -413,7 +413,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): """ while self.ask_for_human_input: human_feedback = self._ask_human_input(formatted_answer.output) - print("Human feedback: ", human_feedback) if self.crew and self.crew._train: self._handle_crew_training_output(formatted_answer, human_feedback) diff --git a/src/crewai/flow/flow.py b/src/crewai/flow/flow.py index fa0902594..ccc76dc95 100644 --- a/src/crewai/flow/flow.py +++ b/src/crewai/flow/flow.py @@ -14,8 +14,15 @@ from typing import ( cast, ) +from blinker import Signal from pydantic import BaseModel, ValidationError +from crewai.flow.flow_events import ( + FlowFinishedEvent, + FlowStartedEvent, + MethodExecutionFinishedEvent, + MethodExecutionStartedEvent, +) from crewai.flow.flow_visualizer import plot_flow from crewai.flow.utils import get_possible_return_constants from crewai.telemetry import Telemetry @@ -159,6 +166,7 @@ class Flow(Generic[T], metaclass=FlowMeta): _routers: Dict[str, str] = {} _router_paths: Dict[str, List[str]] = {} initial_state: Union[Type[T], T, None] = None + event_emitter = Signal("event_emitter") def __class_getitem__(cls: Type["Flow"], item: Type[T]) -> Type["Flow"]: class _FlowGeneric(cls): # type: ignore @@ -253,6 +261,14 @@ class Flow(Generic[T], metaclass=FlowMeta): Returns: The final output from the flow execution. """ + self.event_emitter.send( + self, + event=FlowStartedEvent( + type="flow_started", + flow_name=self.__class__.__name__, + ), + ) + if inputs is not None: self._initialize_state(inputs) return asyncio.run(self.kickoff_async()) @@ -267,8 +283,6 @@ class Flow(Generic[T], metaclass=FlowMeta): Returns: The final output from the flow execution. """ - if inputs is not None: - self._initialize_state(inputs) if not self._start_methods: raise ValueError("No start method defined") @@ -285,11 +299,19 @@ class Flow(Generic[T], metaclass=FlowMeta): # Run all start methods concurrently await asyncio.gather(*tasks) - # Return the final output (from the last executed method) - if self._method_outputs: - return self._method_outputs[-1] - else: - return None # Or raise an exception if no methods were executed + # Determine the final output (from the last executed method) + final_output = self._method_outputs[-1] if self._method_outputs else None + + self.event_emitter.send( + self, + event=FlowFinishedEvent( + type="flow_finished", + flow_name=self.__class__.__name__, + result=final_output, + ), + ) + + return final_output async def _execute_start_method(self, start_method_name: str) -> None: result = await self._execute_method( @@ -352,6 +374,16 @@ class Flow(Generic[T], metaclass=FlowMeta): async def _execute_single_listener(self, listener_name: str, result: Any) -> None: try: method = self._methods[listener_name] + + self.event_emitter.send( + self, + event=MethodExecutionStartedEvent( + type="method_execution_started", + method_name=listener_name, + flow_name=self.__class__.__name__, + ), + ) + sig = inspect.signature(method) params = list(sig.parameters.values()) @@ -367,6 +399,15 @@ class Flow(Generic[T], metaclass=FlowMeta): # If listener does not expect parameters, call without arguments listener_result = await self._execute_method(listener_name, method) + self.event_emitter.send( + self, + event=MethodExecutionFinishedEvent( + type="method_execution_finished", + method_name=listener_name, + flow_name=self.__class__.__name__, + ), + ) + # Execute listeners of this listener await self._execute_listeners(listener_name, listener_result) except Exception as e: diff --git a/src/crewai/flow/flow_events.py b/src/crewai/flow/flow_events.py new file mode 100644 index 000000000..068005ebe --- /dev/null +++ b/src/crewai/flow/flow_events.py @@ -0,0 +1,33 @@ +from dataclasses import dataclass, field +from datetime import datetime +from typing import Any, Optional + + +@dataclass +class Event: + type: str + flow_name: str + timestamp: datetime = field(init=False) + + def __post_init__(self): + self.timestamp = datetime.now() + + +@dataclass +class FlowStartedEvent(Event): + pass + + +@dataclass +class MethodExecutionStartedEvent(Event): + method_name: str + + +@dataclass +class MethodExecutionFinishedEvent(Event): + method_name: str + + +@dataclass +class FlowFinishedEvent(Event): + result: Optional[Any] = None diff --git a/uv.lock b/uv.lock index 26be5064d..9fa1a274c 100644 --- a/uv.lock +++ b/uv.lock @@ -272,6 +272,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b1/fe/e8c672695b37eecc5cbf43e1d0638d88d66ba3a44c4d321c796f4e59167f/beautifulsoup4-4.12.3-py3-none-any.whl", hash = "sha256:b80878c9f40111313e55da8ba20bdba06d8fa3969fc68304167741bbf9e082ed", size = 147925 }, ] +[[package]] +name = "blinker" +version = "1.9.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/21/28/9b3f50ce0e048515135495f198351908d99540d69bfdc8c1d15b73dc55ce/blinker-1.9.0.tar.gz", hash = "sha256:b4ce2265a7abece45e7cc896e98dbebe6cead56bcf805a3d23136d145f5445bf", size = 22460 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/10/cb/f2ad4230dc2eb1a74edf38f1a38b9b52277f75bef262d8908e60d957e13c/blinker-1.9.0-py3-none-any.whl", hash = "sha256:ba0efaa9080b619ff2f3459d1d500c57bddea4a6b424b60a91141db6fd2f08bc", size = 8458 }, +] + [[package]] name = "build" version = "1.2.2.post1" @@ -568,6 +577,7 @@ source = { editable = "." } dependencies = [ { name = "appdirs" }, { name = "auth0-python" }, + { name = "blinker" }, { name = "chromadb" }, { name = "click" }, { name = "instructor" }, @@ -637,6 +647,7 @@ requires-dist = [ { name = "agentops", marker = "extra == 'agentops'", specifier = ">=0.3.0" }, { name = "appdirs", specifier = ">=1.4.4" }, { name = "auth0-python", specifier = ">=4.7.1" }, + { name = "blinker", specifier = ">=1.9.0" }, { name = "chromadb", specifier = ">=0.5.18" }, { name = "click", specifier = ">=8.1.7" }, { name = "crewai-tools", marker = "extra == 'tools'", specifier = ">=0.14.0" },