mirror of
https://github.com/crewAIInc/crewAI.git
synced 2025-12-16 04:18:35 +00:00
telemetry initialization and enhance event handling (#2853)
* Refactor Crew class memory initialization and enhance event handling - Simplified the initialization of the external memory attribute in the Crew class. - Updated memory system retrieval logic for consistency in key usage. - Introduced a singleton pattern for the Telemetry class to ensure a single instance. - Replaced telemetry usage in CrewEvaluator with event bus emissions for test results. - Added new CrewTestResultEvent to handle crew test results more effectively. - Updated event listener to process CrewTestResultEvent and log telemetry data accordingly. - Enhanced tests to validate the singleton pattern in Telemetry and the new event handling logic. * linted * Remove unused telemetry attribute from Crew class memory initialization * fix ordering of test * Implement thread-safe singleton pattern in Telemetry class - Introduced a threading lock to ensure safe instantiation of the Telemetry singleton. - Updated the __new__ method to utilize double-checked locking for instance creation.
This commit is contained in:
@@ -315,9 +315,7 @@ class Crew(FlowTrackable, BaseModel):
|
||||
"""Initialize private memory attributes."""
|
||||
self._external_memory = (
|
||||
# External memory doesn’t support a default value since it was designed to be managed entirely externally
|
||||
self.external_memory.set_crew(self)
|
||||
if self.external_memory
|
||||
else None
|
||||
self.external_memory.set_crew(self) if self.external_memory else None
|
||||
)
|
||||
|
||||
self._long_term_memory = self.long_term_memory
|
||||
@@ -1204,7 +1202,6 @@ class Crew(FlowTrackable, BaseModel):
|
||||
"_long_term_memory",
|
||||
"_entity_memory",
|
||||
"_external_memory",
|
||||
"_telemetry",
|
||||
"agents",
|
||||
"tasks",
|
||||
"knowledge_sources",
|
||||
@@ -1397,10 +1394,10 @@ class Crew(FlowTrackable, BaseModel):
|
||||
memory_systems = self._get_memory_systems()
|
||||
|
||||
for memory_type, config in memory_systems.items():
|
||||
if (system := config.get('system')) is not None:
|
||||
name = config.get('name')
|
||||
if (system := config.get("system")) is not None:
|
||||
name = config.get("name")
|
||||
try:
|
||||
reset_fn: Callable = cast(Callable, config.get('reset'))
|
||||
reset_fn: Callable = cast(Callable, config.get("reset"))
|
||||
reset_fn(system)
|
||||
self._logger.log(
|
||||
"info",
|
||||
@@ -1422,14 +1419,14 @@ class Crew(FlowTrackable, BaseModel):
|
||||
"""
|
||||
memory_systems = self._get_memory_systems()
|
||||
config = memory_systems[memory_type]
|
||||
system = config.get('system')
|
||||
name = config.get('name')
|
||||
system = config.get("system")
|
||||
name = config.get("name")
|
||||
|
||||
if system is None:
|
||||
raise RuntimeError(f"{name} memory system is not initialized")
|
||||
|
||||
|
||||
try:
|
||||
reset_fn: Callable = cast(Callable, config.get('reset'))
|
||||
reset_fn: Callable = cast(Callable, config.get("reset"))
|
||||
reset_fn(system)
|
||||
self._logger.log(
|
||||
"info",
|
||||
@@ -1442,58 +1439,67 @@ class Crew(FlowTrackable, BaseModel):
|
||||
|
||||
def _get_memory_systems(self):
|
||||
"""Get all available memory systems with their configuration.
|
||||
|
||||
|
||||
Returns:
|
||||
Dict containing all memory systems with their reset functions and display names.
|
||||
"""
|
||||
|
||||
def default_reset(memory):
|
||||
return memory.reset()
|
||||
|
||||
def knowledge_reset(memory):
|
||||
return self.reset_knowledge(memory)
|
||||
|
||||
# Get knowledge for agents
|
||||
agent_knowledges = [getattr(agent, "knowledge", None) for agent in self.agents
|
||||
if getattr(agent, "knowledge", None) is not None]
|
||||
|
||||
# Get knowledge for agents
|
||||
agent_knowledges = [
|
||||
getattr(agent, "knowledge", None)
|
||||
for agent in self.agents
|
||||
if getattr(agent, "knowledge", None) is not None
|
||||
]
|
||||
# Get knowledge for crew and agents
|
||||
crew_knowledge = getattr(self, "knowledge", None)
|
||||
crew_and_agent_knowledges = ([crew_knowledge] if crew_knowledge is not None else []) + agent_knowledges
|
||||
crew_and_agent_knowledges = (
|
||||
[crew_knowledge] if crew_knowledge is not None else []
|
||||
) + agent_knowledges
|
||||
|
||||
return {
|
||||
'short': {
|
||||
'system': getattr(self, "_short_term_memory", None),
|
||||
'reset': default_reset,
|
||||
'name': 'Short Term'
|
||||
"short": {
|
||||
"system": getattr(self, "_short_term_memory", None),
|
||||
"reset": default_reset,
|
||||
"name": "Short Term",
|
||||
},
|
||||
'entity': {
|
||||
'system': getattr(self, "_entity_memory", None),
|
||||
'reset': default_reset,
|
||||
'name': 'Entity'
|
||||
"entity": {
|
||||
"system": getattr(self, "_entity_memory", None),
|
||||
"reset": default_reset,
|
||||
"name": "Entity",
|
||||
},
|
||||
'external': {
|
||||
'system': getattr(self, "_external_memory", None),
|
||||
'reset': default_reset,
|
||||
'name': 'External'
|
||||
"external": {
|
||||
"system": getattr(self, "_external_memory", None),
|
||||
"reset": default_reset,
|
||||
"name": "External",
|
||||
},
|
||||
'long': {
|
||||
'system': getattr(self, "_long_term_memory", None),
|
||||
'reset': default_reset,
|
||||
'name': 'Long Term'
|
||||
"long": {
|
||||
"system": getattr(self, "_long_term_memory", None),
|
||||
"reset": default_reset,
|
||||
"name": "Long Term",
|
||||
},
|
||||
'kickoff_outputs': {
|
||||
'system': getattr(self, "_task_output_handler", None),
|
||||
'reset': default_reset,
|
||||
'name': 'Task Output'
|
||||
"kickoff_outputs": {
|
||||
"system": getattr(self, "_task_output_handler", None),
|
||||
"reset": default_reset,
|
||||
"name": "Task Output",
|
||||
},
|
||||
'knowledge': {
|
||||
'system': crew_and_agent_knowledges if crew_and_agent_knowledges else None,
|
||||
'reset': knowledge_reset,
|
||||
'name': 'Crew Knowledge and Agent Knowledge'
|
||||
"knowledge": {
|
||||
"system": crew_and_agent_knowledges
|
||||
if crew_and_agent_knowledges
|
||||
else None,
|
||||
"reset": knowledge_reset,
|
||||
"name": "Crew Knowledge and Agent Knowledge",
|
||||
},
|
||||
"agent_knowledge": {
|
||||
"system": agent_knowledges if agent_knowledges else None,
|
||||
"reset": knowledge_reset,
|
||||
"name": "Agent Knowledge",
|
||||
},
|
||||
'agent_knowledge': {
|
||||
'system': agent_knowledges if agent_knowledges else None,
|
||||
'reset': knowledge_reset,
|
||||
'name': 'Agent Knowledge'
|
||||
}
|
||||
}
|
||||
|
||||
def reset_knowledge(self, knowledges: List[Knowledge]) -> None:
|
||||
|
||||
@@ -9,6 +9,7 @@ import warnings
|
||||
from contextlib import contextmanager
|
||||
from importlib.metadata import version
|
||||
from typing import TYPE_CHECKING, Any, Optional
|
||||
import threading
|
||||
|
||||
from opentelemetry import trace
|
||||
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
|
||||
@@ -64,6 +65,16 @@ class Telemetry:
|
||||
attribute in the Crew class.
|
||||
"""
|
||||
|
||||
_instance = None
|
||||
_lock = threading.Lock()
|
||||
|
||||
def __new__(cls):
|
||||
if cls._instance is None:
|
||||
with cls._lock:
|
||||
if cls._instance is None:
|
||||
cls._instance = super(Telemetry, cls).__new__(cls)
|
||||
return cls._instance
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.ready: bool = False
|
||||
self.trace_set: bool = False
|
||||
|
||||
@@ -9,7 +9,8 @@ from crewai.agent import Agent
|
||||
from crewai.llm import BaseLLM
|
||||
from crewai.task import Task
|
||||
from crewai.tasks.task_output import TaskOutput
|
||||
from crewai.telemetry import Telemetry
|
||||
from crewai.utilities.events import crewai_event_bus
|
||||
from crewai.utilities.events.crew_events import CrewTestResultEvent
|
||||
|
||||
|
||||
class TaskEvaluationPydanticOutput(BaseModel):
|
||||
@@ -36,7 +37,6 @@ class CrewEvaluator:
|
||||
def __init__(self, crew, eval_llm: InstanceOf[BaseLLM]):
|
||||
self.crew = crew
|
||||
self.llm = eval_llm
|
||||
self._telemetry = Telemetry()
|
||||
self._setup_for_evaluating()
|
||||
|
||||
def _setup_for_evaluating(self) -> None:
|
||||
@@ -178,11 +178,15 @@ class CrewEvaluator:
|
||||
evaluation_result = evaluation_task.execute_sync()
|
||||
|
||||
if isinstance(evaluation_result.pydantic, TaskEvaluationPydanticOutput):
|
||||
self._test_result_span = self._telemetry.individual_test_result_span(
|
||||
crewai_event_bus.emit(
|
||||
self.crew,
|
||||
evaluation_result.pydantic.quality,
|
||||
current_task.execution_duration,
|
||||
self.llm.model,
|
||||
CrewTestResultEvent(
|
||||
quality=evaluation_result.pydantic.quality,
|
||||
execution_duration=current_task.execution_duration,
|
||||
model=self.llm.model,
|
||||
crew_name=self.crew.name,
|
||||
crew=self.crew,
|
||||
),
|
||||
)
|
||||
self.tasks_scores[self.iteration].append(evaluation_result.pydantic.quality)
|
||||
self.run_execution_times[self.iteration].append(
|
||||
|
||||
@@ -100,3 +100,12 @@ class CrewTestFailedEvent(CrewBaseEvent):
|
||||
|
||||
error: str
|
||||
type: str = "crew_test_failed"
|
||||
|
||||
|
||||
class CrewTestResultEvent(CrewBaseEvent):
|
||||
"""Event emitted when a crew test result is available"""
|
||||
|
||||
quality: float
|
||||
execution_duration: float
|
||||
model: str
|
||||
type: str = "crew_test_result"
|
||||
|
||||
@@ -37,6 +37,7 @@ from .crew_events import (
|
||||
CrewKickoffStartedEvent,
|
||||
CrewTestCompletedEvent,
|
||||
CrewTestFailedEvent,
|
||||
CrewTestResultEvent,
|
||||
CrewTestStartedEvent,
|
||||
CrewTrainCompletedEvent,
|
||||
CrewTrainFailedEvent,
|
||||
@@ -134,6 +135,15 @@ class EventListener(BaseEventListener):
|
||||
def on_crew_train_failed(source, event: CrewTrainFailedEvent):
|
||||
self.formatter.handle_crew_train_failed(event.crew_name or "Crew")
|
||||
|
||||
@crewai_event_bus.on(CrewTestResultEvent)
|
||||
def on_crew_test_result(source, event: CrewTestResultEvent):
|
||||
self._telemetry.individual_test_result_span(
|
||||
source.crew,
|
||||
event.quality,
|
||||
int(event.execution_duration),
|
||||
event.model,
|
||||
)
|
||||
|
||||
# ----------- TASK EVENTS -----------
|
||||
|
||||
@crewai_event_bus.on(TaskStartedEvent)
|
||||
|
||||
@@ -6,6 +6,8 @@ import pytest
|
||||
from crewai import Agent, Crew, Task
|
||||
from crewai.telemetry import Telemetry
|
||||
|
||||
from opentelemetry import trace
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"env_var,value,expected_ready",
|
||||
@@ -34,9 +36,6 @@ def test_telemetry_enabled_by_default():
|
||||
assert telemetry.ready is True
|
||||
|
||||
|
||||
from opentelemetry import trace
|
||||
|
||||
|
||||
@patch("crewai.telemetry.telemetry.logger.error")
|
||||
@patch(
|
||||
"opentelemetry.exporter.otlp.proto.http.trace_exporter.OTLPSpanExporter.export",
|
||||
@@ -67,3 +66,32 @@ def test_telemetry_fails_due_connect_timeout(export_mock, logger_mock):
|
||||
|
||||
export_mock.assert_called_once()
|
||||
logger_mock.assert_called_once_with(error)
|
||||
|
||||
|
||||
def test_telemetry_singleton_pattern():
|
||||
"""Test that Telemetry uses the singleton pattern correctly."""
|
||||
Telemetry._instance = None
|
||||
|
||||
telemetry1 = Telemetry()
|
||||
telemetry2 = Telemetry()
|
||||
|
||||
assert telemetry1 is telemetry2
|
||||
|
||||
setattr(telemetry1, "test_attribute", "test_value")
|
||||
assert hasattr(telemetry2, "test_attribute")
|
||||
assert getattr(telemetry2, "test_attribute") == "test_value"
|
||||
|
||||
import threading
|
||||
|
||||
instances = []
|
||||
|
||||
def create_instance():
|
||||
instances.append(Telemetry())
|
||||
|
||||
threads = [threading.Thread(target=create_instance) for _ in range(5)]
|
||||
for thread in threads:
|
||||
thread.start()
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
|
||||
assert all(instance is telemetry1 for instance in instances)
|
||||
|
||||
File diff suppressed because one or more lines are too long
@@ -1,4 +1,3 @@
|
||||
import os
|
||||
from datetime import datetime
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
@@ -22,6 +21,7 @@ from crewai.utilities.events.crew_events import (
|
||||
CrewKickoffFailedEvent,
|
||||
CrewKickoffStartedEvent,
|
||||
CrewTestCompletedEvent,
|
||||
CrewTestResultEvent,
|
||||
CrewTestStartedEvent,
|
||||
)
|
||||
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
|
||||
@@ -38,7 +38,6 @@ from crewai.utilities.events.llm_events import (
|
||||
LLMCallCompletedEvent,
|
||||
LLMCallFailedEvent,
|
||||
LLMCallStartedEvent,
|
||||
LLMCallType,
|
||||
LLMStreamChunkEvent,
|
||||
)
|
||||
from crewai.utilities.events.task_events import (
|
||||
@@ -132,6 +131,10 @@ def test_crew_emits_test_kickoff_type_event():
|
||||
def handle_crew_test_end(source, event):
|
||||
received_events.append(event)
|
||||
|
||||
@crewai_event_bus.on(CrewTestResultEvent)
|
||||
def handle_crew_test_result(source, event):
|
||||
received_events.append(event)
|
||||
|
||||
eval_llm = LLM(model="gpt-4o-mini")
|
||||
with (
|
||||
patch.object(
|
||||
@@ -149,13 +152,16 @@ def test_crew_emits_test_kickoff_type_event():
|
||||
assert args[2] is None
|
||||
assert args[3] == eval_llm
|
||||
|
||||
assert len(received_events) == 2
|
||||
assert len(received_events) == 3
|
||||
assert received_events[0].crew_name == "TestCrew"
|
||||
assert isinstance(received_events[0].timestamp, datetime)
|
||||
assert received_events[0].type == "crew_test_started"
|
||||
assert received_events[1].crew_name == "TestCrew"
|
||||
assert isinstance(received_events[1].timestamp, datetime)
|
||||
assert received_events[1].type == "crew_test_completed"
|
||||
assert received_events[1].type == "crew_test_result"
|
||||
assert received_events[2].crew_name == "TestCrew"
|
||||
assert isinstance(received_events[2].timestamp, datetime)
|
||||
assert received_events[2].type == "crew_test_completed"
|
||||
|
||||
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
@@ -309,7 +315,7 @@ def test_agent_emits_execution_error_event():
|
||||
) as invoke_mock:
|
||||
invoke_mock.side_effect = Exception(error_message)
|
||||
|
||||
with pytest.raises(Exception) as e:
|
||||
with pytest.raises(Exception):
|
||||
base_agent.execute_task(
|
||||
task=base_task,
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user