Add MLflow listener and update dependencies

- Add MLflow event listener for comprehensive tracing
- Update events __init__.py to include MLflow integration
- Add MLflow dependency to pyproject.toml
- Update lock file with MLflow dependencies

Co-Authored-By: João <joao@crewai.com>
This commit is contained in:
Devin AI
2025-06-03 21:30:50 +00:00
parent 39a5a40b41
commit 38a54115a3
4 changed files with 852 additions and 256 deletions

View File

@@ -39,6 +39,7 @@ dependencies = [
"tomli>=2.0.2",
"blinker>=1.9.0",
"json5>=0.10.0",
"mlflow>=2.22.0",
]
[project.urls]

View File

@@ -54,3 +54,4 @@ from .llm_events import (
# events
from .event_listener import EventListener
from .third_party.agentops_listener import agentops_listener
from .third_party.mlflow_listener import mlflow_listener

View File

@@ -0,0 +1,167 @@
from typing import Optional, Dict, Any
import logging
from crewai.utilities.events.crew_events import (
CrewKickoffStartedEvent,
CrewKickoffCompletedEvent,
CrewKickoffFailedEvent,
)
from crewai.utilities.events.agent_events import (
AgentExecutionStartedEvent,
AgentExecutionCompletedEvent,
AgentExecutionErrorEvent,
)
from crewai.utilities.events import (
ToolUsageStartedEvent,
ToolUsageErrorEvent,
)
from crewai.utilities.events.base_event_listener import BaseEventListener
try:
import mlflow
import mlflow.tracing
MLFLOW_INSTALLED = True
except ImportError:
MLFLOW_INSTALLED = False
logger = logging.getLogger(__name__)
class MLflowListener(BaseEventListener):
"""MLflow integration listener for CrewAI events"""
def __init__(self):
super().__init__()
self._active_spans: Dict[str, Any] = {}
self._autolog_enabled = False
def setup_listeners(self, crewai_event_bus):
if not MLFLOW_INSTALLED:
logger.warning("MLflow not installed, skipping listener setup")
return
@crewai_event_bus.on(CrewKickoffStartedEvent)
def on_crew_kickoff_started(source, event: CrewKickoffStartedEvent):
if not self._autolog_enabled:
return
try:
span = mlflow.tracing.start_span(
name=f"Crew Execution: {event.crew_name or 'Unknown'}",
span_type="CHAIN"
)
span.set_inputs(event.inputs or {})
self._active_spans[f"crew_{event.source_fingerprint or id(source)}"] = span
except Exception as e:
logger.warning(f"Failed to start MLflow span for crew: {e}")
@crewai_event_bus.on(CrewKickoffCompletedEvent)
def on_crew_kickoff_completed(source, event: CrewKickoffCompletedEvent):
if not self._autolog_enabled:
return
try:
span_key = f"crew_{event.source_fingerprint or id(source)}"
if span_key in self._active_spans:
span = self._active_spans[span_key]
span.set_outputs({"result": str(event.output)})
span.set_status("OK")
span.end()
del self._active_spans[span_key]
except Exception as e:
logger.warning(f"Failed to end MLflow span for crew: {e}")
@crewai_event_bus.on(CrewKickoffFailedEvent)
def on_crew_kickoff_failed(source, event: CrewKickoffFailedEvent):
if not self._autolog_enabled:
return
try:
span_key = f"crew_{event.source_fingerprint or id(source)}"
if span_key in self._active_spans:
span = self._active_spans[span_key]
span.set_status("ERROR")
span.set_attribute("error", event.error)
span.end()
del self._active_spans[span_key]
except Exception as e:
logger.warning(f"Failed to end MLflow span for crew error: {e}")
@crewai_event_bus.on(AgentExecutionStartedEvent)
def on_agent_execution_started(source, event: AgentExecutionStartedEvent):
if not self._autolog_enabled:
return
try:
span = mlflow.tracing.start_span(
name=f"Agent: {event.agent.role}",
span_type="AGENT"
)
span.set_inputs({
"task": str(event.task),
"task_prompt": event.task_prompt,
"tools": [tool.name for tool in (event.tools or [])]
})
self._active_spans[f"agent_{event.source_fingerprint or id(event.agent)}"] = span
except Exception as e:
logger.warning(f"Failed to start MLflow span for agent: {e}")
@crewai_event_bus.on(AgentExecutionCompletedEvent)
def on_agent_execution_completed(source, event: AgentExecutionCompletedEvent):
if not self._autolog_enabled:
return
try:
span_key = f"agent_{event.source_fingerprint or id(event.agent)}"
if span_key in self._active_spans:
span = self._active_spans[span_key]
span.set_outputs({"output": event.output})
span.set_status("OK")
span.end()
del self._active_spans[span_key]
except Exception as e:
logger.warning(f"Failed to end MLflow span for agent: {e}")
@crewai_event_bus.on(AgentExecutionErrorEvent)
def on_agent_execution_error(source, event: AgentExecutionErrorEvent):
if not self._autolog_enabled:
return
try:
span_key = f"agent_{event.source_fingerprint or id(event.agent)}"
if span_key in self._active_spans:
span = self._active_spans[span_key]
span.set_status("ERROR")
span.set_attribute("error", event.error)
span.end()
del self._active_spans[span_key]
except Exception as e:
logger.warning(f"Failed to end MLflow span for agent error: {e}")
@crewai_event_bus.on(ToolUsageStartedEvent)
def on_tool_usage_started(source, event: ToolUsageStartedEvent):
if not self._autolog_enabled:
return
try:
span = mlflow.tracing.start_span(
name=f"Tool: {event.tool_name}",
span_type="TOOL"
)
span.set_inputs({"tool_name": event.tool_name})
self._active_spans[f"tool_{id(event)}"] = span
except Exception as e:
logger.warning(f"Failed to start MLflow span for tool: {e}")
@crewai_event_bus.on(ToolUsageErrorEvent)
def on_tool_usage_error(source, event: ToolUsageErrorEvent):
if not self._autolog_enabled:
return
try:
span_key = f"tool_{id(event)}"
if span_key in self._active_spans:
span = self._active_spans[span_key]
span.set_status("ERROR")
span.set_attribute("error", event.error)
span.end()
del self._active_spans[span_key]
except Exception as e:
logger.warning(f"Failed to end MLflow span for tool error: {e}")
mlflow_listener = MLflowListener()

939
uv.lock generated

File diff suppressed because it is too large Load Diff