include event emitter in flows

This commit is contained in:
Brandon Hancock
2024-12-10 16:22:01 -05:00
parent 2f9a2afd9e
commit bc3fd789d9
5 changed files with 113 additions and 8 deletions

View File

@@ -29,6 +29,7 @@ dependencies = [
"chromadb>=0.5.18",
"pdfplumber>=0.11.4",
"openpyxl>=3.1.5",
"blinker>=1.9.0",
]
[project.urls]

View File

@@ -87,15 +87,20 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self.llm.stop = self.stop
def invoke(self, inputs: Dict[str, str]) -> Dict[str, Any]:
print("prompt: ", self.prompt)
print("inputs: ", inputs)
if "system" in self.prompt:
system_prompt = self._format_prompt(self.prompt.get("system", ""), inputs)
print("system_prompt: ", system_prompt)
user_prompt = self._format_prompt(self.prompt.get("user", ""), inputs)
print("user_prompt: ", user_prompt)
self.messages.append(self._format_msg(system_prompt, role="system"))
self.messages.append(self._format_msg(user_prompt))
else:
user_prompt = self._format_prompt(self.prompt.get("prompt", ""), inputs)
self.messages.append(self._format_msg(user_prompt))
print("total messages at invoke: ", len(self.messages))
self._show_start_logs()
self.ask_for_human_input = bool(inputs.get("ask_for_human_input", False))
@@ -144,7 +149,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
formatted_answer
)
if self.step_callback:
self.step_callback(tool_result)
self.step_callback(tool_result)
formatted_answer.text += f"\nObservation: {tool_result.result}"
formatted_answer.result = tool_result.result
@@ -412,6 +417,10 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
AgentFinish: The final output after incorporating human feedback.
"""
while self.ask_for_human_input:
print("Messages at human feedback:")
for idx, message in enumerate(self.messages, start=1):
print(f"Message {idx}: {message}")
print("Total messages at human feedback: ", len(self.messages))
human_feedback = self._ask_human_input(formatted_answer.output)
print("Human feedback: ", human_feedback)
@@ -464,6 +473,9 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self.ask_for_human_input = True
# Add human feedback to messages
self.messages.append(self._format_msg(f"Feedback: {human_feedback}"))
print("Messages after human feedback:")
for idx, message in enumerate(self.messages, start=1):
print(f"Message {idx}: {message}")
# Invoke the loop again with updated messages
formatted_answer = self._invoke_loop()

View File

@@ -1,5 +1,6 @@
import asyncio
import inspect
from datetime import datetime
from typing import (
Any,
Callable,
@@ -14,8 +15,15 @@ from typing import (
cast,
)
from blinker import Signal
from pydantic import BaseModel, ValidationError
from crewai.flow.flow_events import (
FlowFinishedEvent,
FlowStartedEvent,
MethodExecutionFinishedEvent,
MethodExecutionStartedEvent,
)
from crewai.flow.flow_visualizer import plot_flow
from crewai.flow.utils import get_possible_return_constants
from crewai.telemetry import Telemetry
@@ -160,6 +168,9 @@ class Flow(Generic[T], metaclass=FlowMeta):
_router_paths: Dict[str, List[str]] = {}
initial_state: Union[Type[T], T, None] = None
# Define a single event emitter signal
event_emitter = Signal("event_emitter")
def __class_getitem__(cls: Type["Flow"], item: Type[T]) -> Type["Flow"]:
class _FlowGeneric(cls): # type: ignore
_initial_state_T = item # type: ignore
@@ -253,6 +264,15 @@ class Flow(Generic[T], metaclass=FlowMeta):
Returns:
The final output from the flow execution.
"""
# Emit flow_started event
self.event_emitter.send(
self,
event=FlowStartedEvent(
type="flow_started",
flow_name=self.__class__.__name__,
),
)
if inputs is not None:
self._initialize_state(inputs)
return asyncio.run(self.kickoff_async())
@@ -267,8 +287,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
Returns:
The final output from the flow execution.
"""
if inputs is not None:
self._initialize_state(inputs)
if not self._start_methods:
raise ValueError("No start method defined")
@@ -285,11 +303,20 @@ class Flow(Generic[T], metaclass=FlowMeta):
# Run all start methods concurrently
await asyncio.gather(*tasks)
# Return the final output (from the last executed method)
if self._method_outputs:
return self._method_outputs[-1]
else:
return None # Or raise an exception if no methods were executed
# Determine the final output (from the last executed method)
final_output = self._method_outputs[-1] if self._method_outputs else None
# Emit flow_finished event
self.event_emitter.send(
self,
event=FlowFinishedEvent(
type="flow_finished",
flow_name=self.__class__.__name__,
result=final_output,
),
)
return final_output
async def _execute_start_method(self, start_method_name: str) -> None:
result = await self._execute_method(
@@ -352,6 +379,17 @@ class Flow(Generic[T], metaclass=FlowMeta):
async def _execute_single_listener(self, listener_name: str, result: Any) -> None:
try:
method = self._methods[listener_name]
# Emit method_execution_started event
self.event_emitter.send(
self,
event=MethodExecutionStartedEvent(
type="method_execution_started",
method_name=listener_name,
flow_name=self.__class__.__name__,
),
)
sig = inspect.signature(method)
params = list(sig.parameters.values())
@@ -367,6 +405,16 @@ class Flow(Generic[T], metaclass=FlowMeta):
# If listener does not expect parameters, call without arguments
listener_result = await self._execute_method(listener_name, method)
# Emit method_execution_finished event
self.event_emitter.send(
self,
event=MethodExecutionFinishedEvent(
type="method_execution_finished",
method_name=listener_name,
flow_name=self.__class__.__name__,
),
)
# Execute listeners of this listener
await self._execute_listeners(listener_name, listener_result)
except Exception as e:

View File

@@ -0,0 +1,33 @@
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Optional
@dataclass
class Event:
type: str
flow_name: str
timestamp: datetime = field(init=False)
def __post_init__(self):
self.timestamp = datetime.now()
@dataclass
class FlowStartedEvent(Event):
pass
@dataclass
class MethodExecutionStartedEvent(Event):
method_name: str
@dataclass
class MethodExecutionFinishedEvent(Event):
method_name: str
@dataclass
class FlowFinishedEvent(Event):
result: Optional[Any] = None

11
uv.lock generated
View File

@@ -272,6 +272,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/b1/fe/e8c672695b37eecc5cbf43e1d0638d88d66ba3a44c4d321c796f4e59167f/beautifulsoup4-4.12.3-py3-none-any.whl", hash = "sha256:b80878c9f40111313e55da8ba20bdba06d8fa3969fc68304167741bbf9e082ed", size = 147925 },
]
[[package]]
name = "blinker"
version = "1.9.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/21/28/9b3f50ce0e048515135495f198351908d99540d69bfdc8c1d15b73dc55ce/blinker-1.9.0.tar.gz", hash = "sha256:b4ce2265a7abece45e7cc896e98dbebe6cead56bcf805a3d23136d145f5445bf", size = 22460 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/10/cb/f2ad4230dc2eb1a74edf38f1a38b9b52277f75bef262d8908e60d957e13c/blinker-1.9.0-py3-none-any.whl", hash = "sha256:ba0efaa9080b619ff2f3459d1d500c57bddea4a6b424b60a91141db6fd2f08bc", size = 8458 },
]
[[package]]
name = "build"
version = "1.2.2.post1"
@@ -568,6 +577,7 @@ source = { editable = "." }
dependencies = [
{ name = "appdirs" },
{ name = "auth0-python" },
{ name = "blinker" },
{ name = "chromadb" },
{ name = "click" },
{ name = "instructor" },
@@ -637,6 +647,7 @@ requires-dist = [
{ name = "agentops", marker = "extra == 'agentops'", specifier = ">=0.3.0" },
{ name = "appdirs", specifier = ">=1.4.4" },
{ name = "auth0-python", specifier = ">=4.7.1" },
{ name = "blinker", specifier = ">=1.9.0" },
{ name = "chromadb", specifier = ">=0.5.18" },
{ name = "click", specifier = ">=8.1.7" },
{ name = "crewai-tools", marker = "extra == 'tools'", specifier = ">=0.14.0" },