Refactor AgentOps event listener for crew-level tracking

- Modify AgentOpsListener to handle crew-level events
- Initialize and end AgentOps session at crew kickoff and completion
- Create agents for each crew member during session initialization
- Improve session management and event recording
- Clean up and simplify event handling logic
This commit is contained in:
Lorenze Jay
2025-02-14 15:49:42 -08:00
parent 18791eadd3
commit ec048cf6fe
6 changed files with 43 additions and 27 deletions

View File

@@ -28,6 +28,7 @@ from crewai.utilities.llm_utils import create_llm
from crewai.utilities.token_counter_callback import TokenCalcHandler
from crewai.utilities.training_handler import CrewTrainingHandler
class Agent(BaseAgent):
"""Represents an agent in a system.

View File

@@ -17,12 +17,12 @@ from crewai.tools import BaseTool
from crewai.tools.structured_tool import CrewStructuredTool
from crewai.tools.tool_calling import InstructorToolCalling, ToolCalling
from crewai.utilities import I18N, Converter, ConverterError, Printer
from crewai.utilities.events.event_bus import event_bus
from crewai.utilities.events import (
ToolUsageErrorEvent,
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
from crewai.utilities.events.event_bus import event_bus
OPENAI_BIGGER_MODELS = [
"gpt-4",

View File

@@ -3,10 +3,10 @@ from typing import List
from pydantic import BaseModel, Field
from crewai.utilities import Converter
from crewai.utilities.events import event_bus
from crewai.utilities.events import TaskEvaluationEvent
from crewai.utilities.events import TaskEvaluationEvent, event_bus
from crewai.utilities.pydantic_schema_parser import PydanticSchemaParser
class Entity(BaseModel):
name: str = Field(description="The name of the entity.")
type: str = Field(description="The type of the entity.")

View File

@@ -25,7 +25,11 @@ from .flow_events import (
MethodExecutionStartedEvent,
)
from .task_events import TaskCompletedEvent, TaskFailedEvent, TaskStartedEvent
from .tool_usage_events import ToolUsageErrorEvent, ToolUsageFinishedEvent, ToolUsageStartedEvent
from .tool_usage_events import (
ToolUsageErrorEvent,
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
class EventListener(BaseEventListener):

View File

@@ -28,7 +28,11 @@ from .task_events import (
TaskFailedEvent,
TaskStartedEvent,
)
from .tool_usage_events import ToolUsageErrorEvent, ToolUsageFinishedEvent, ToolUsageStartedEvent
from .tool_usage_events import (
ToolUsageErrorEvent,
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
EventTypes = Union[
CrewKickoffStartedEvent,

View File

@@ -1,11 +1,12 @@
from typing import Optional
from crewai.utilities.events.agent_events import (
AgentExecutionStartedEvent,
AgentExecutionCompletedEvent
from crewai.utilities.events import (
CrewKickoffCompletedEvent,
ToolUsageErrorEvent,
ToolUsageStartedEvent,
)
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events import ToolUsageErrorEvent, ToolUsageStartedEvent, CrewKickoffCompletedEvent
from crewai.utilities.events.crew_events import CrewKickoffStartedEvent
from crewai.utilities.events.task_events import TaskEvaluationEvent
try:
@@ -14,39 +15,44 @@ try:
AGENTOPS_INSTALLED = True
except ImportError:
AGENTOPS_INSTALLED = False
def track_agent():
def noop(f):
return f
return noop
class AgentOpsListener(BaseEventListener):
tool_event: Optional[agentops.ToolEvent] = None
session: Optional[agentops.Session] = None
def __init__(self):
super().__init__()
if AGENTOPS_INSTALLED:
self.session =agentops.init()
def setup_listeners(self, event_bus):
if not AGENTOPS_INSTALLED:
return
@event_bus.on(AgentExecutionStartedEvent)
def on_agent_started(source, event: AgentExecutionStartedEvent):
agent = event.agent
Client().create_agent(name=agent.role, agent_id=str(agent.id))
@event_bus.on(CrewKickoffStartedEvent)
def on_crew_kickoff_started(source, event: CrewKickoffStartedEvent):
self.session = agentops.init()
for agent in source.agents:
if self.session:
self.session.create_agent(
name=agent.role,
agent_id=str(agent.id),
)
@event_bus.on(CrewKickoffCompletedEvent)
def on_agent_error(source, event: CrewKickoffCompletedEvent):
agentops.end_session(
end_state="Success",
end_state_reason="Finished Execution",
is_auto_end=True
)
def on_crew_kickoff_completed(source, event: CrewKickoffCompletedEvent):
if self.session:
self.session.end_session(
end_state="Success",
end_state_reason="Finished Execution",
)
@event_bus.on(ToolUsageStartedEvent)
def on_tool_usage_started(source, event: ToolUsageStartedEvent):
self.tool_event = agentops.ToolEvent(name=event.tool_name)
if self.session:
self.session.record(self.tool_event)
@event_bus.on(ToolUsageErrorEvent)
def on_tool_usage_error(source, event: ToolUsageErrorEvent):
@@ -54,6 +60,7 @@ class AgentOpsListener(BaseEventListener):
@event_bus.on(TaskEvaluationEvent)
def on_task_evaluation(source, event: TaskEvaluationEvent):
Client().create_agent(name="Task Evaluator", agent_id=str(source.original_agent.id))
if self.session:
self.session.create_agent(name="Task Evaluator", agent_id=str(source.original_agent.id))
agentops_listener = AgentOpsListener()