From da9220fa81121ff8ffa5c7aa53123c7be03f9851 Mon Sep 17 00:00:00 2001 From: Paul Cowgill Date: Wed, 11 Dec 2024 09:13:57 -0600 Subject: [PATCH 1/2] Remove manager_callbacks reference (#1741) --- docs/concepts/crews.mdx | 1 - 1 file changed, 1 deletion(-) diff --git a/docs/concepts/crews.mdx b/docs/concepts/crews.mdx index 3225b48b6..58511b07c 100644 --- a/docs/concepts/crews.mdx +++ b/docs/concepts/crews.mdx @@ -32,7 +32,6 @@ A crew in crewAI represents a collaborative group of agents working together to | **Share Crew** _(optional)_ | `share_crew` | Whether you want to share the complete crew information and execution with the crewAI team to make the library better, and allow us to train models. | | **Output Log File** _(optional)_ | `output_log_file` | Whether you want to have a file with the complete crew output and execution. You can set it using True and it will default to the folder you are currently in and it will be called logs.txt or passing a string with the full path and name of the file. | | **Manager Agent** _(optional)_ | `manager_agent` | `manager` sets a custom agent that will be used as a manager. | -| **Manager Callbacks** _(optional)_ | `manager_callbacks` | `manager_callbacks` takes a list of callback handlers to be executed by the manager agent when a hierarchical process is used. | | **Prompt File** _(optional)_ | `prompt_file` | Path to the prompt JSON file to be used for the crew. | | **Planning** *(optional)* | `planning` | Adds planning ability to the Crew. When activated before each Crew iteration, all Crew data is sent to an AgentPlanner that will plan the tasks and this plan will be added to each task description. | | **Planning LLM** *(optional)* | `planning_llm` | The language model used by the AgentPlanner in a planning process. | From 1df61aba4c4e24060ce6d4049f7848d9caa41d3b Mon Sep 17 00:00:00 2001 From: "Brandon Hancock (bhancock_ai)" <109994880+bhancockio@users.noreply.github.com> Date: Wed, 11 Dec 2024 10:16:05 -0500 Subject: [PATCH 2/2] include event emitter in flows (#1740) * include event emitter in flows * Clean up * Fix linter --- pyproject.toml | 1 + src/crewai/agents/crew_agent_executor.py | 3 +- src/crewai/flow/flow.py | 55 +++++++++++++++++++++--- src/crewai/flow/flow_events.py | 33 ++++++++++++++ uv.lock | 11 +++++ 5 files changed, 94 insertions(+), 9 deletions(-) create mode 100644 src/crewai/flow/flow_events.py diff --git a/pyproject.toml b/pyproject.toml index ea684cec6..f90110e85 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,6 +29,7 @@ dependencies = [ "chromadb>=0.5.18", "pdfplumber>=0.11.4", "openpyxl>=3.1.5", + "blinker>=1.9.0", ] [project.urls] diff --git a/src/crewai/agents/crew_agent_executor.py b/src/crewai/agents/crew_agent_executor.py index 83ca06e41..eb0ff7c5a 100644 --- a/src/crewai/agents/crew_agent_executor.py +++ b/src/crewai/agents/crew_agent_executor.py @@ -144,7 +144,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): formatted_answer ) if self.step_callback: - self.step_callback(tool_result) + self.step_callback(tool_result) formatted_answer.text += f"\nObservation: {tool_result.result}" formatted_answer.result = tool_result.result @@ -413,7 +413,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): """ while self.ask_for_human_input: human_feedback = self._ask_human_input(formatted_answer.output) - print("Human feedback: ", human_feedback) if self.crew and self.crew._train: self._handle_crew_training_output(formatted_answer, human_feedback) diff --git a/src/crewai/flow/flow.py b/src/crewai/flow/flow.py index fa0902594..ccc76dc95 100644 --- a/src/crewai/flow/flow.py +++ b/src/crewai/flow/flow.py @@ -14,8 +14,15 @@ from typing import ( cast, ) +from blinker import Signal from pydantic import BaseModel, ValidationError +from crewai.flow.flow_events import ( + FlowFinishedEvent, + FlowStartedEvent, + MethodExecutionFinishedEvent, + MethodExecutionStartedEvent, +) from crewai.flow.flow_visualizer import plot_flow from crewai.flow.utils import get_possible_return_constants from crewai.telemetry import Telemetry @@ -159,6 +166,7 @@ class Flow(Generic[T], metaclass=FlowMeta): _routers: Dict[str, str] = {} _router_paths: Dict[str, List[str]] = {} initial_state: Union[Type[T], T, None] = None + event_emitter = Signal("event_emitter") def __class_getitem__(cls: Type["Flow"], item: Type[T]) -> Type["Flow"]: class _FlowGeneric(cls): # type: ignore @@ -253,6 +261,14 @@ class Flow(Generic[T], metaclass=FlowMeta): Returns: The final output from the flow execution. """ + self.event_emitter.send( + self, + event=FlowStartedEvent( + type="flow_started", + flow_name=self.__class__.__name__, + ), + ) + if inputs is not None: self._initialize_state(inputs) return asyncio.run(self.kickoff_async()) @@ -267,8 +283,6 @@ class Flow(Generic[T], metaclass=FlowMeta): Returns: The final output from the flow execution. """ - if inputs is not None: - self._initialize_state(inputs) if not self._start_methods: raise ValueError("No start method defined") @@ -285,11 +299,19 @@ class Flow(Generic[T], metaclass=FlowMeta): # Run all start methods concurrently await asyncio.gather(*tasks) - # Return the final output (from the last executed method) - if self._method_outputs: - return self._method_outputs[-1] - else: - return None # Or raise an exception if no methods were executed + # Determine the final output (from the last executed method) + final_output = self._method_outputs[-1] if self._method_outputs else None + + self.event_emitter.send( + self, + event=FlowFinishedEvent( + type="flow_finished", + flow_name=self.__class__.__name__, + result=final_output, + ), + ) + + return final_output async def _execute_start_method(self, start_method_name: str) -> None: result = await self._execute_method( @@ -352,6 +374,16 @@ class Flow(Generic[T], metaclass=FlowMeta): async def _execute_single_listener(self, listener_name: str, result: Any) -> None: try: method = self._methods[listener_name] + + self.event_emitter.send( + self, + event=MethodExecutionStartedEvent( + type="method_execution_started", + method_name=listener_name, + flow_name=self.__class__.__name__, + ), + ) + sig = inspect.signature(method) params = list(sig.parameters.values()) @@ -367,6 +399,15 @@ class Flow(Generic[T], metaclass=FlowMeta): # If listener does not expect parameters, call without arguments listener_result = await self._execute_method(listener_name, method) + self.event_emitter.send( + self, + event=MethodExecutionFinishedEvent( + type="method_execution_finished", + method_name=listener_name, + flow_name=self.__class__.__name__, + ), + ) + # Execute listeners of this listener await self._execute_listeners(listener_name, listener_result) except Exception as e: diff --git a/src/crewai/flow/flow_events.py b/src/crewai/flow/flow_events.py new file mode 100644 index 000000000..068005ebe --- /dev/null +++ b/src/crewai/flow/flow_events.py @@ -0,0 +1,33 @@ +from dataclasses import dataclass, field +from datetime import datetime +from typing import Any, Optional + + +@dataclass +class Event: + type: str + flow_name: str + timestamp: datetime = field(init=False) + + def __post_init__(self): + self.timestamp = datetime.now() + + +@dataclass +class FlowStartedEvent(Event): + pass + + +@dataclass +class MethodExecutionStartedEvent(Event): + method_name: str + + +@dataclass +class MethodExecutionFinishedEvent(Event): + method_name: str + + +@dataclass +class FlowFinishedEvent(Event): + result: Optional[Any] = None diff --git a/uv.lock b/uv.lock index 26be5064d..9fa1a274c 100644 --- a/uv.lock +++ b/uv.lock @@ -272,6 +272,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b1/fe/e8c672695b37eecc5cbf43e1d0638d88d66ba3a44c4d321c796f4e59167f/beautifulsoup4-4.12.3-py3-none-any.whl", hash = "sha256:b80878c9f40111313e55da8ba20bdba06d8fa3969fc68304167741bbf9e082ed", size = 147925 }, ] +[[package]] +name = "blinker" +version = "1.9.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/21/28/9b3f50ce0e048515135495f198351908d99540d69bfdc8c1d15b73dc55ce/blinker-1.9.0.tar.gz", hash = "sha256:b4ce2265a7abece45e7cc896e98dbebe6cead56bcf805a3d23136d145f5445bf", size = 22460 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/10/cb/f2ad4230dc2eb1a74edf38f1a38b9b52277f75bef262d8908e60d957e13c/blinker-1.9.0-py3-none-any.whl", hash = "sha256:ba0efaa9080b619ff2f3459d1d500c57bddea4a6b424b60a91141db6fd2f08bc", size = 8458 }, +] + [[package]] name = "build" version = "1.2.2.post1" @@ -568,6 +577,7 @@ source = { editable = "." } dependencies = [ { name = "appdirs" }, { name = "auth0-python" }, + { name = "blinker" }, { name = "chromadb" }, { name = "click" }, { name = "instructor" }, @@ -637,6 +647,7 @@ requires-dist = [ { name = "agentops", marker = "extra == 'agentops'", specifier = ">=0.3.0" }, { name = "appdirs", specifier = ">=1.4.4" }, { name = "auth0-python", specifier = ">=4.7.1" }, + { name = "blinker", specifier = ">=1.9.0" }, { name = "chromadb", specifier = ">=0.5.18" }, { name = "click", specifier = ">=8.1.7" }, { name = "crewai-tools", marker = "extra == 'tools'", specifier = ">=0.14.0" },