mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-05-01 07:13:00 +00:00
feat: enhance event listener with new telemetry spans for skill and memory events (#5240)
- Added telemetry spans for various skill events: discovery, loading, activation, and load failure. - Introduced telemetry spans for memory events: save, query, and retrieval completion. - Updated event listener to include new MCP tool execution and connection events with telemetry tracking.
This commit is contained in:
@@ -78,9 +78,15 @@ from crewai.events.types.mcp_events import (
|
||||
MCPConnectionCompletedEvent,
|
||||
MCPConnectionFailedEvent,
|
||||
MCPConnectionStartedEvent,
|
||||
MCPToolExecutionCompletedEvent,
|
||||
MCPToolExecutionFailedEvent,
|
||||
MCPToolExecutionStartedEvent,
|
||||
)
|
||||
from crewai.events.types.memory_events import (
|
||||
MemoryQueryCompletedEvent,
|
||||
MemoryRetrievalCompletedEvent,
|
||||
MemorySaveCompletedEvent,
|
||||
)
|
||||
from crewai.events.types.observation_events import (
|
||||
GoalAchievedEarlyEvent,
|
||||
PlanRefinementEvent,
|
||||
@@ -94,6 +100,12 @@ from crewai.events.types.reasoning_events import (
|
||||
AgentReasoningFailedEvent,
|
||||
AgentReasoningStartedEvent,
|
||||
)
|
||||
from crewai.events.types.skill_events import (
|
||||
SkillActivatedEvent,
|
||||
SkillDiscoveryCompletedEvent,
|
||||
SkillLoadFailedEvent,
|
||||
SkillLoadedEvent,
|
||||
)
|
||||
from crewai.events.types.task_events import (
|
||||
TaskCompletedEvent,
|
||||
TaskFailedEvent,
|
||||
@@ -478,6 +490,7 @@ class EventListener(BaseEventListener):
|
||||
self.formatter.handle_guardrail_completed(
|
||||
event.success, event.error, event.retry_count
|
||||
)
|
||||
self._telemetry.feature_usage_span("guardrail:execution")
|
||||
|
||||
@crewai_event_bus.on(CrewTestStartedEvent)
|
||||
def on_crew_test_started(source: Any, event: CrewTestStartedEvent) -> None:
|
||||
@@ -559,6 +572,7 @@ class EventListener(BaseEventListener):
|
||||
event.plan,
|
||||
event.ready,
|
||||
)
|
||||
self._telemetry.feature_usage_span("planning:creation")
|
||||
|
||||
@crewai_event_bus.on(AgentReasoningFailedEvent)
|
||||
def on_agent_reasoning_failed(_: Any, event: AgentReasoningFailedEvent) -> None:
|
||||
@@ -616,6 +630,7 @@ class EventListener(BaseEventListener):
|
||||
event.replan_count,
|
||||
event.completed_steps_preserved,
|
||||
)
|
||||
self._telemetry.feature_usage_span("planning:replan")
|
||||
|
||||
@crewai_event_bus.on(GoalAchievedEarlyEvent)
|
||||
def on_goal_achieved_early(_: Any, event: GoalAchievedEarlyEvent) -> None:
|
||||
@@ -623,6 +638,25 @@ class EventListener(BaseEventListener):
|
||||
event.steps_completed,
|
||||
event.steps_remaining,
|
||||
)
|
||||
self._telemetry.feature_usage_span("planning:goal_achieved_early")
|
||||
|
||||
# ----------- SKILL EVENTS -----------
|
||||
|
||||
@crewai_event_bus.on(SkillDiscoveryCompletedEvent)
|
||||
def on_skill_discovery(_: Any, event: SkillDiscoveryCompletedEvent) -> None:
|
||||
self._telemetry.feature_usage_span("skill:discovery")
|
||||
|
||||
@crewai_event_bus.on(SkillLoadedEvent)
|
||||
def on_skill_loaded(_: Any, event: SkillLoadedEvent) -> None:
|
||||
self._telemetry.feature_usage_span("skill:loaded")
|
||||
|
||||
@crewai_event_bus.on(SkillLoadFailedEvent)
|
||||
def on_skill_load_failed(_: Any, event: SkillLoadFailedEvent) -> None:
|
||||
self._telemetry.feature_usage_span("skill:load_failed")
|
||||
|
||||
@crewai_event_bus.on(SkillActivatedEvent)
|
||||
def on_skill_activated(_: Any, event: SkillActivatedEvent) -> None:
|
||||
self._telemetry.feature_usage_span("skill:activated")
|
||||
|
||||
# ----------- AGENT LOGGING EVENTS -----------
|
||||
|
||||
@@ -662,6 +696,7 @@ class EventListener(BaseEventListener):
|
||||
event.error,
|
||||
event.is_multiturn,
|
||||
)
|
||||
self._telemetry.feature_usage_span("a2a:delegation")
|
||||
|
||||
@crewai_event_bus.on(A2AConversationStartedEvent)
|
||||
def on_a2a_conversation_started(
|
||||
@@ -703,6 +738,7 @@ class EventListener(BaseEventListener):
|
||||
event.error,
|
||||
event.total_turns,
|
||||
)
|
||||
self._telemetry.feature_usage_span("a2a:conversation")
|
||||
|
||||
@crewai_event_bus.on(A2APollingStartedEvent)
|
||||
def on_a2a_polling_started(_: Any, event: A2APollingStartedEvent) -> None:
|
||||
@@ -744,6 +780,7 @@ class EventListener(BaseEventListener):
|
||||
event.connection_duration_ms,
|
||||
event.is_reconnect,
|
||||
)
|
||||
self._telemetry.feature_usage_span("mcp:connection")
|
||||
|
||||
@crewai_event_bus.on(MCPConnectionFailedEvent)
|
||||
def on_mcp_connection_failed(_: Any, event: MCPConnectionFailedEvent) -> None:
|
||||
@@ -754,6 +791,7 @@ class EventListener(BaseEventListener):
|
||||
event.error,
|
||||
event.error_type,
|
||||
)
|
||||
self._telemetry.feature_usage_span("mcp:connection_failed")
|
||||
|
||||
@crewai_event_bus.on(MCPConfigFetchFailedEvent)
|
||||
def on_mcp_config_fetch_failed(
|
||||
@@ -764,6 +802,7 @@ class EventListener(BaseEventListener):
|
||||
event.error,
|
||||
event.error_type,
|
||||
)
|
||||
self._telemetry.feature_usage_span("mcp:config_fetch_failed")
|
||||
|
||||
@crewai_event_bus.on(MCPToolExecutionStartedEvent)
|
||||
def on_mcp_tool_execution_started(
|
||||
@@ -775,6 +814,12 @@ class EventListener(BaseEventListener):
|
||||
event.tool_args,
|
||||
)
|
||||
|
||||
@crewai_event_bus.on(MCPToolExecutionCompletedEvent)
|
||||
def on_mcp_tool_execution_completed(
|
||||
_: Any, event: MCPToolExecutionCompletedEvent
|
||||
) -> None:
|
||||
self._telemetry.feature_usage_span("mcp:tool_execution")
|
||||
|
||||
@crewai_event_bus.on(MCPToolExecutionFailedEvent)
|
||||
def on_mcp_tool_execution_failed(
|
||||
_: Any, event: MCPToolExecutionFailedEvent
|
||||
@@ -786,6 +831,45 @@ class EventListener(BaseEventListener):
|
||||
event.error,
|
||||
event.error_type,
|
||||
)
|
||||
self._telemetry.feature_usage_span("mcp:tool_execution_failed")
|
||||
|
||||
# ----------- MEMORY TELEMETRY -----------
|
||||
|
||||
@crewai_event_bus.on(MemorySaveCompletedEvent)
|
||||
def on_memory_save_completed(_: Any, event: MemorySaveCompletedEvent) -> None:
|
||||
self._telemetry.feature_usage_span("memory:save")
|
||||
|
||||
@crewai_event_bus.on(MemoryQueryCompletedEvent)
|
||||
def on_memory_query_completed(_: Any, event: MemoryQueryCompletedEvent) -> None:
|
||||
self._telemetry.feature_usage_span("memory:query")
|
||||
|
||||
@crewai_event_bus.on(MemoryRetrievalCompletedEvent)
|
||||
def on_memory_retrieval_completed_telemetry(
|
||||
_: Any, event: MemoryRetrievalCompletedEvent
|
||||
) -> None:
|
||||
self._telemetry.feature_usage_span("memory:retrieval")
|
||||
|
||||
@crewai_event_bus.on(CrewKickoffStartedEvent)
|
||||
def on_crew_kickoff_hooks(_: Any, event: CrewKickoffStartedEvent) -> None:
|
||||
from crewai.hooks.llm_hooks import (
|
||||
get_after_llm_call_hooks,
|
||||
get_before_llm_call_hooks,
|
||||
)
|
||||
from crewai.hooks.tool_hooks import (
|
||||
get_after_tool_call_hooks,
|
||||
get_before_tool_call_hooks,
|
||||
)
|
||||
|
||||
has_hooks = any(
|
||||
[
|
||||
get_before_llm_call_hooks(),
|
||||
get_after_llm_call_hooks(),
|
||||
get_before_tool_call_hooks(),
|
||||
get_after_tool_call_hooks(),
|
||||
]
|
||||
)
|
||||
if has_hooks:
|
||||
self._telemetry.feature_usage_span("hooks:registered")
|
||||
|
||||
|
||||
event_listener = EventListener()
|
||||
|
||||
@@ -1040,3 +1040,20 @@ class Telemetry:
|
||||
close_span(span)
|
||||
|
||||
self._safe_telemetry_operation(_operation)
|
||||
|
||||
def feature_usage_span(self, feature: str) -> None:
|
||||
"""Records that a feature was used. One span = one count.
|
||||
|
||||
Args:
|
||||
feature: Feature identifier, e.g. "planning:creation",
|
||||
"mcp:connection", "a2a:delegation".
|
||||
"""
|
||||
|
||||
def _operation() -> None:
|
||||
tracer = trace.get_tracer("crewai.telemetry")
|
||||
span = tracer.start_span("Feature Usage")
|
||||
self._add_attribute(span, "crewai_version", version("crewai"))
|
||||
self._add_attribute(span, "feature", feature)
|
||||
close_span(span)
|
||||
|
||||
self._safe_telemetry_operation(_operation)
|
||||
|
||||
Reference in New Issue
Block a user