moving to dedicated eventlistener

This commit is contained in:
Lorenze Jay
2025-02-14 14:49:34 -08:00
parent fe7c8b2049
commit 6eab0e3d3b
9 changed files with 82 additions and 57 deletions

View File

@@ -28,21 +28,6 @@ from crewai.utilities.llm_utils import create_llm
from crewai.utilities.token_counter_callback import TokenCalcHandler from crewai.utilities.token_counter_callback import TokenCalcHandler
from crewai.utilities.training_handler import CrewTrainingHandler from crewai.utilities.training_handler import CrewTrainingHandler
agentops = None
try:
import agentops # type: ignore # Name "agentops" is already defined
from agentops import track_agent # type: ignore
except ImportError:
def track_agent():
def noop(f):
return f
return noop
@track_agent()
class Agent(BaseAgent): class Agent(BaseAgent):
"""Represents an agent in a system. """Represents an agent in a system.

View File

@@ -18,15 +18,12 @@ from crewai.tools.structured_tool import CrewStructuredTool
from crewai.tools.tool_calling import InstructorToolCalling, ToolCalling from crewai.tools.tool_calling import InstructorToolCalling, ToolCalling
from crewai.utilities import I18N, Converter, ConverterError, Printer from crewai.utilities import I18N, Converter, ConverterError, Printer
from crewai.utilities.events.event_bus import event_bus from crewai.utilities.events.event_bus import event_bus
from crewai.utilities.events.event_types import ( from crewai.utilities.events import (
ToolUsageErrorEvent, ToolUsageErrorEvent,
ToolUsageFinishedEvent, ToolUsageFinishedEvent,
ToolUsageStartedEvent,
) )
try:
import agentops # type: ignore
except ImportError:
agentops = None
OPENAI_BIGGER_MODELS = [ OPENAI_BIGGER_MODELS = [
"gpt-4", "gpt-4",
"gpt-4o", "gpt-4o",
@@ -141,7 +138,8 @@ class ToolUsage:
tool: Any, tool: Any,
calling: Union[ToolCalling, InstructorToolCalling], calling: Union[ToolCalling, InstructorToolCalling],
) -> str: # TODO: Fix this return type ) -> str: # TODO: Fix this return type
tool_event = agentops.ToolEvent(name=calling.tool_name) if agentops else None # type: ignore event_data = self._prepare_event_data(tool, calling) # type: ignore
event_bus.emit(self, ToolUsageStartedEvent(**event_data))
if self._check_tool_repeated_usage(calling=calling): # type: ignore # _check_tool_repeated_usage of "ToolUsage" does not return a value (it only ever returns None) if self._check_tool_repeated_usage(calling=calling): # type: ignore # _check_tool_repeated_usage of "ToolUsage" does not return a value (it only ever returns None)
try: try:
result = self._i18n.errors("task_repeated_usage").format( result = self._i18n.errors("task_repeated_usage").format(
@@ -219,10 +217,6 @@ class ToolUsage:
return error # type: ignore # No return value expected return error # type: ignore # No return value expected
self.task.increment_tools_errors() self.task.increment_tools_errors()
if agentops:
agentops.record(
agentops.ErrorEvent(exception=e, trigger_event=tool_event)
)
return self.use(calling=calling, tool_string=tool_string) # type: ignore # No return value expected return self.use(calling=calling, tool_string=tool_string) # type: ignore # No return value expected
if self.tools_handler: if self.tools_handler:
@@ -238,9 +232,6 @@ class ToolUsage:
self.tools_handler.on_tool_use( self.tools_handler.on_tool_use(
calling=calling, output=result, should_cache=should_cache calling=calling, output=result, should_cache=should_cache
) )
if agentops:
agentops.record(tool_event)
self._telemetry.tool_usage( self._telemetry.tool_usage(
llm=self.function_calling_llm, llm=self.function_calling_llm,
tool_name=tool.name, tool_name=tool.name,
@@ -468,7 +459,7 @@ class ToolUsage:
def on_tool_error(self, tool: Any, tool_calling: ToolCalling, e: Exception) -> None: def on_tool_error(self, tool: Any, tool_calling: ToolCalling, e: Exception) -> None:
event_data = self._prepare_event_data(tool, tool_calling) event_data = self._prepare_event_data(tool, tool_calling)
event_bus.emit( event_bus.emit(
self, event=ToolUsageErrorEvent(**{**event_data, "error": str(e)}) self, event=ToolUsageErrorEvent(**{**event_data, "error": e})
) )
def on_tool_use_finished( def on_tool_use_finished(

View File

@@ -3,20 +3,10 @@ from typing import List
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from crewai.utilities import Converter from crewai.utilities import Converter
from crewai.utilities.events import event_bus
from crewai.utilities.events import TaskEvaluationEvent
from crewai.utilities.pydantic_schema_parser import PydanticSchemaParser from crewai.utilities.pydantic_schema_parser import PydanticSchemaParser
agentops = None
try:
from agentops import track_agent # type: ignore
except ImportError:
def track_agent(name):
def noop(f):
return f
return noop
class Entity(BaseModel): class Entity(BaseModel):
name: str = Field(description="The name of the entity.") name: str = Field(description="The name of the entity.")
type: str = Field(description="The type of the entity.") type: str = Field(description="The type of the entity.")
@@ -48,12 +38,13 @@ class TrainingTaskEvaluation(BaseModel):
) )
@track_agent(name="Task Evaluator")
class TaskEvaluator: class TaskEvaluator:
def __init__(self, original_agent): def __init__(self, original_agent):
self.llm = original_agent.llm self.llm = original_agent.llm
self.original_agent = original_agent
def evaluate(self, task, output) -> TaskEvaluation: def evaluate(self, task, output) -> TaskEvaluation:
event_bus.emit(self, TaskEvaluationEvent(evaluation_type="task_evaluation"))
evaluation_query = ( evaluation_query = (
f"Assess the quality of the task completed based on the description, expected output, and actual results.\n\n" f"Assess the quality of the task completed based on the description, expected output, and actual results.\n\n"
f"Task Description:\n{task.description}\n\n" f"Task Description:\n{task.description}\n\n"
@@ -90,6 +81,7 @@ class TaskEvaluator:
- training_data (dict): The training data to be evaluated. - training_data (dict): The training data to be evaluated.
- agent_id (str): The ID of the agent. - agent_id (str): The ID of the agent.
""" """
event_bus.emit(self, TaskEvaluationEvent(evaluation_type="training_data_evaluation"))
output_training_data = training_data[agent_id] output_training_data = training_data[agent_id]
final_aggregated_data = "" final_aggregated_data = ""

View File

@@ -14,7 +14,7 @@ from .agent_events import (
AgentExecutionCompletedEvent, AgentExecutionCompletedEvent,
AgentExecutionErrorEvent, AgentExecutionErrorEvent,
) )
from .task_events import TaskStartedEvent, TaskCompletedEvent, TaskFailedEvent from .task_events import TaskStartedEvent, TaskCompletedEvent, TaskFailedEvent, TaskEvaluationEvent
from .flow_events import ( from .flow_events import (
FlowCreatedEvent, FlowCreatedEvent,
FlowStartedEvent, FlowStartedEvent,
@@ -24,7 +24,7 @@ from .flow_events import (
MethodExecutionFailedEvent, MethodExecutionFailedEvent,
) )
from .event_bus import EventBus, event_bus from .event_bus import EventBus, event_bus
from .tool_usage_events import ToolUsageFinishedEvent, ToolUsageErrorEvent from .tool_usage_events import ToolUsageFinishedEvent, ToolUsageErrorEvent, ToolUsageStartedEvent
# events # events
from .event_listener import EventListener from .event_listener import EventListener

View File

@@ -25,7 +25,7 @@ from .flow_events import (
MethodExecutionStartedEvent, MethodExecutionStartedEvent,
) )
from .task_events import TaskCompletedEvent, TaskFailedEvent, TaskStartedEvent from .task_events import TaskCompletedEvent, TaskFailedEvent, TaskStartedEvent
from .tool_usage_events import ToolUsageErrorEvent, ToolUsageFinishedEvent from .tool_usage_events import ToolUsageErrorEvent, ToolUsageFinishedEvent, ToolUsageStartedEvent
class EventListener(BaseEventListener): class EventListener(BaseEventListener):
@@ -136,7 +136,7 @@ class EventListener(BaseEventListener):
@event_bus.on(TaskCompletedEvent) @event_bus.on(TaskCompletedEvent)
def on_task_completed(source, event: TaskCompletedEvent): def on_task_completed(source, event):
if source._execution_span: if source._execution_span:
self._telemetry.task_ended(source._execution_span, source, source.agent.crew) self._telemetry.task_ended(source._execution_span, source, source.agent.crew)
self.logger.log( self.logger.log(
@@ -231,6 +231,14 @@ class EventListener(BaseEventListener):
) )
# Tool Usage Events # Tool Usage Events
@event_bus.on(ToolUsageStartedEvent)
def on_tool_usage_started(source, event: ToolUsageStartedEvent):
self.logger.log(
f"🤖 Tool Usage Started: '{event.tool_name}'",
event.timestamp,
color=self.color,
)
@event_bus.on(ToolUsageFinishedEvent) @event_bus.on(ToolUsageFinishedEvent)
def on_tool_usage_finished(source, event: ToolUsageFinishedEvent): def on_tool_usage_finished(source, event: ToolUsageFinishedEvent):
self.logger.log( self.logger.log(

View File

@@ -28,7 +28,7 @@ from .task_events import (
TaskFailedEvent, TaskFailedEvent,
TaskStartedEvent, TaskStartedEvent,
) )
from .tool_usage_events import ToolUsageErrorEvent, ToolUsageFinishedEvent from .tool_usage_events import ToolUsageErrorEvent, ToolUsageFinishedEvent, ToolUsageStartedEvent
EventTypes = Union[ EventTypes = Union[
CrewKickoffStartedEvent, CrewKickoffStartedEvent,
@@ -53,4 +53,5 @@ EventTypes = Union[
AgentExecutionErrorEvent, AgentExecutionErrorEvent,
ToolUsageFinishedEvent, ToolUsageFinishedEvent,
ToolUsageErrorEvent, ToolUsageErrorEvent,
ToolUsageStartedEvent,
] ]

View File

@@ -1,5 +1,6 @@
from typing import Any, Optional from typing import Any, Optional
from crewai.tasks.task_output import TaskOutput
from crewai.utilities.events.crew_events import CrewEvent from crewai.utilities.events.crew_events import CrewEvent
@@ -14,7 +15,7 @@ class TaskStartedEvent(CrewEvent):
class TaskCompletedEvent(CrewEvent): class TaskCompletedEvent(CrewEvent):
"""Event emitted when a task completes""" """Event emitted when a task completes"""
output: Any output: TaskOutput
type: str = "task_completed" type: str = "task_completed"
@@ -25,3 +26,9 @@ class TaskFailedEvent(CrewEvent):
error: str error: str
type: str = "task_failed" type: str = "task_failed"
class TaskEvaluationEvent(CrewEvent):
"""Event emitted when a task evaluation is completed"""
type: str = "task_evaluation"
evaluation_type: str

View File

@@ -1,24 +1,59 @@
from crewai.utilities.events.agent_events import AgentExecutionStartedEvent from typing import Optional
from crewai.utilities.events.agent_events import (
AgentExecutionStartedEvent,
AgentExecutionCompletedEvent
)
from crewai.utilities.events.base_event_listener import BaseEventListener from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events import ToolUsageErrorEvent, ToolUsageStartedEvent, CrewKickoffCompletedEvent
from crewai.utilities.events.task_events import TaskEvaluationEvent
try: try:
import agentops import agentops
from agentops import Client
AGENTOPS_INSTALLED = True AGENTOPS_INSTALLED = True
except ImportError: except ImportError:
AGENTOPS_INSTALLED = False AGENTOPS_INSTALLED = False
def track_agent():
def noop(f):
return f
return noop
class AgentOpsListener(BaseEventListener): class AgentOpsListener(BaseEventListener):
tool_event: Optional[agentops.ToolEvent] = None
def __init__(self): def __init__(self):
super().__init__() super().__init__()
if AGENTOPS_INSTALLED:
self.session =agentops.init()
def setup_listeners(self, event_bus): def setup_listeners(self, event_bus):
if AGENTOPS_INSTALLED: if not AGENTOPS_INSTALLED:
return
@event_bus.on(AgentExecutionStartedEvent) @event_bus.on(AgentExecutionStartedEvent)
def on_agent_started(source, event: AgentExecutionStartedEvent): def on_agent_started(source, event: AgentExecutionStartedEvent):
print("AGENTOPS WORKS !!!", event.agent) agent = event.agent
Client().create_agent(name=agent.role, agent_id=str(agent.id))
@event_bus.on(CrewKickoffCompletedEvent)
def on_agent_error(source, event: CrewKickoffCompletedEvent):
agentops.end_session(
end_state="Success",
end_state_reason="Finished Execution",
is_auto_end=True
)
@event_bus.on(ToolUsageStartedEvent)
def on_tool_usage_started(source, event: ToolUsageStartedEvent):
self.tool_event = agentops.ToolEvent(name=event.tool_name)
@event_bus.on(ToolUsageErrorEvent)
def on_tool_usage_error(source, event: ToolUsageErrorEvent):
agentops.ErrorEvent(exception=event.error, trigger_event=self.tool_event)
@event_bus.on(TaskEvaluationEvent)
def on_task_evaluation(source, event: TaskEvaluationEvent):
Client().create_agent(name="Task Evaluator", agent_id=str(source.original_agent.id))
agentops_listener = AgentOpsListener() agentops_listener = AgentOpsListener()

View File

@@ -18,6 +18,12 @@ class ToolUsageEvent(CrewEvent):
model_config = {"arbitrary_types_allowed": True} model_config = {"arbitrary_types_allowed": True}
class ToolUsageStartedEvent(ToolUsageEvent):
"""Event emitted when a tool execution is started"""
type: str = "tool_usage_started"
class ToolUsageFinishedEvent(ToolUsageEvent): class ToolUsageFinishedEvent(ToolUsageEvent):
"""Event emitted when a tool execution is completed""" """Event emitted when a tool execution is completed"""
@@ -30,5 +36,5 @@ class ToolUsageFinishedEvent(ToolUsageEvent):
class ToolUsageErrorEvent(ToolUsageEvent): class ToolUsageErrorEvent(ToolUsageEvent):
"""Event emitted when a tool execution encounters an error""" """Event emitted when a tool execution encounters an error"""
error: str error: Any
type: str = "tool_usage_error" type: str = "tool_usage_error"