refactor: Move events module to crewai.events (#3425)

refactor(events): relocate events module & update imports

- Move events from utilities/ to top-level events/ with types/, listeners/, utils/ structure
- Update all source/tests/docs to new import paths
- Add backwards compatibility stubs in crewai.utilities.events with deprecation warnings
- Restore test mocks and fix related test imports
This commit is contained in:
Greyson LaLonde
2025-09-02 10:06:42 -04:00
committed by GitHub
parent 1b1a8fdbf4
commit 878c1a649a
81 changed files with 1094 additions and 751 deletions

View File

@@ -0,0 +1,56 @@
"""CrewAI events system for monitoring and extending agent behavior.
This module provides the event infrastructure that allows users to:
- Monitor agent, task, and crew execution
- Track memory operations and performance
- Build custom logging and analytics
- Extend CrewAI with custom event handlers
"""
from crewai.events.base_event_listener import BaseEventListener
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.memory_events import (
MemoryQueryCompletedEvent,
MemorySaveCompletedEvent,
MemorySaveStartedEvent,
MemoryQueryStartedEvent,
MemoryRetrievalCompletedEvent,
MemorySaveFailedEvent,
MemoryQueryFailedEvent,
)
from crewai.events.types.knowledge_events import (
KnowledgeRetrievalStartedEvent,
KnowledgeRetrievalCompletedEvent,
)
from crewai.events.types.crew_events import (
CrewKickoffStartedEvent,
CrewKickoffCompletedEvent,
)
from crewai.events.types.agent_events import (
AgentExecutionCompletedEvent,
)
from crewai.events.types.llm_events import (
LLMStreamChunkEvent,
)
__all__ = [
"BaseEventListener",
"crewai_event_bus",
"MemoryQueryCompletedEvent",
"MemorySaveCompletedEvent",
"MemorySaveStartedEvent",
"MemoryQueryStartedEvent",
"MemoryRetrievalCompletedEvent",
"MemorySaveFailedEvent",
"MemoryQueryFailedEvent",
"KnowledgeRetrievalStartedEvent",
"KnowledgeRetrievalCompletedEvent",
"CrewKickoffStartedEvent",
"CrewKickoffCompletedEvent",
"AgentExecutionCompletedEvent",
"LLMStreamChunkEvent",
]

View File

@@ -0,0 +1,15 @@
from abc import ABC, abstractmethod
from crewai.events.event_bus import CrewAIEventsBus, crewai_event_bus
class BaseEventListener(ABC):
verbose: bool = False
def __init__(self):
super().__init__()
self.setup_listeners(crewai_event_bus)
@abstractmethod
def setup_listeners(self, crewai_event_bus: CrewAIEventsBus):
pass

View File

@@ -0,0 +1,46 @@
from datetime import datetime, timezone
from typing import Any, Dict, Optional
from pydantic import BaseModel, Field
from crewai.utilities.serialization import to_serializable
class BaseEvent(BaseModel):
"""Base class for all events"""
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
type: str
source_fingerprint: Optional[str] = None # UUID string of the source entity
source_type: Optional[str] = (
None # "agent", "task", "crew", "memory", "entity_memory", "short_term_memory", "long_term_memory", "external_memory"
)
fingerprint_metadata: Optional[Dict[str, Any]] = None # Any relevant metadata
def to_json(self, exclude: set[str] | None = None):
"""
Converts the event to a JSON-serializable dictionary.
Args:
exclude (set[str], optional): Set of keys to exclude from the result. Defaults to None.
Returns:
dict: A JSON-serializable dictionary.
"""
return to_serializable(self, exclude=exclude)
def _set_task_params(self, data: Dict[str, Any]):
if "from_task" in data and (task := data["from_task"]):
self.task_id = task.id
self.task_name = task.name or task.description
self.from_task = None
def _set_agent_params(self, data: Dict[str, Any]):
task = data.get("from_task", None)
agent = task.agent if task else data.get("from_agent", None)
if not agent:
return
self.agent_id = agent.id
self.agent_role = agent.role
self.from_agent = None

View File

@@ -0,0 +1,117 @@
from __future__ import annotations
import threading
from contextlib import contextmanager
from typing import Any, Callable, Dict, List, Type, TypeVar, cast
from blinker import Signal
from crewai.events.base_events import BaseEvent
from crewai.events.event_types import EventTypes
EventT = TypeVar("EventT", bound=BaseEvent)
class CrewAIEventsBus:
"""
A singleton event bus that uses blinker signals for event handling.
Allows both internal (Flow/Crew) and external event handling.
"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None: # prevent race condition
cls._instance = super(CrewAIEventsBus, cls).__new__(cls)
cls._instance._initialize()
return cls._instance
def _initialize(self) -> None:
"""Initialize the event bus internal state"""
self._signal = Signal("crewai_event_bus")
self._handlers: Dict[Type[BaseEvent], List[Callable]] = {}
def on(
self, event_type: Type[EventT]
) -> Callable[[Callable[[Any, EventT], None]], Callable[[Any, EventT], None]]:
"""
Decorator to register an event handler for a specific event type.
Usage:
@crewai_event_bus.on(AgentExecutionCompletedEvent)
def on_agent_execution_completed(
source: Any, event: AgentExecutionCompletedEvent
):
print(f"👍 Agent '{event.agent}' completed task")
print(f" Output: {event.output}")
"""
def decorator(
handler: Callable[[Any, EventT], None],
) -> Callable[[Any, EventT], None]:
if event_type not in self._handlers:
self._handlers[event_type] = []
self._handlers[event_type].append(
cast(Callable[[Any, EventT], None], handler)
)
return handler
return decorator
def emit(self, source: Any, event: BaseEvent) -> None:
"""
Emit an event to all registered handlers
Args:
source: The object emitting the event
event: The event instance to emit
"""
for event_type, handlers in self._handlers.items():
if isinstance(event, event_type):
for handler in handlers:
try:
handler(source, event)
except Exception as e:
print(
f"[EventBus Error] Handler '{handler.__name__}' failed for event '{event_type.__name__}': {e}"
)
self._signal.send(source, event=event)
def register_handler(
self, event_type: Type[EventTypes], handler: Callable[[Any, EventTypes], None]
) -> None:
"""Register an event handler for a specific event type"""
if event_type not in self._handlers:
self._handlers[event_type] = []
self._handlers[event_type].append(
cast(Callable[[Any, EventTypes], None], handler)
)
@contextmanager
def scoped_handlers(self):
"""
Context manager for temporary event handling scope.
Useful for testing or temporary event handling.
Usage:
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(CrewKickoffStarted)
def temp_handler(source, event):
print("Temporary handler")
# Do stuff...
# Handlers are cleared after the context
"""
previous_handlers = self._handlers.copy()
self._handlers.clear()
try:
yield
finally:
self._handlers = previous_handlers
# Global instance
crewai_event_bus = CrewAIEventsBus()

View File

@@ -0,0 +1,529 @@
from __future__ import annotations
from io import StringIO
from typing import Any, Dict
from pydantic import Field, PrivateAttr
from crewai.llm import LLM
from crewai.task import Task
from crewai.telemetry.telemetry import Telemetry
from crewai.utilities import Logger
from crewai.utilities.constants import EMITTER_COLOR
from crewai.events.base_event_listener import BaseEventListener
from crewai.events.types.knowledge_events import (
KnowledgeQueryCompletedEvent,
KnowledgeQueryFailedEvent,
KnowledgeQueryStartedEvent,
KnowledgeRetrievalCompletedEvent,
KnowledgeRetrievalStartedEvent,
KnowledgeSearchQueryFailedEvent,
)
from crewai.events.types.llm_events import (
LLMCallCompletedEvent,
LLMCallFailedEvent,
LLMCallStartedEvent,
LLMStreamChunkEvent,
)
from crewai.events.types.llm_guardrail_events import (
LLMGuardrailStartedEvent,
LLMGuardrailCompletedEvent,
)
from crewai.events.utils.console_formatter import ConsoleFormatter
from crewai.events.types.agent_events import (
AgentExecutionCompletedEvent,
AgentExecutionStartedEvent,
LiteAgentExecutionCompletedEvent,
LiteAgentExecutionErrorEvent,
LiteAgentExecutionStartedEvent,
)
from crewai.events.types.logging_events import (
AgentLogsStartedEvent,
AgentLogsExecutionEvent,
)
from crewai.events.types.crew_events import (
CrewKickoffCompletedEvent,
CrewKickoffFailedEvent,
CrewKickoffStartedEvent,
CrewTestCompletedEvent,
CrewTestFailedEvent,
CrewTestResultEvent,
CrewTestStartedEvent,
CrewTrainCompletedEvent,
CrewTrainFailedEvent,
CrewTrainStartedEvent,
)
from .types.flow_events import (
FlowCreatedEvent,
FlowFinishedEvent,
FlowStartedEvent,
MethodExecutionFailedEvent,
MethodExecutionFinishedEvent,
MethodExecutionStartedEvent,
)
from .types.task_events import TaskCompletedEvent, TaskFailedEvent, TaskStartedEvent
from .types.tool_usage_events import (
ToolUsageErrorEvent,
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
from .types.reasoning_events import (
AgentReasoningStartedEvent,
AgentReasoningCompletedEvent,
AgentReasoningFailedEvent,
)
from .listeners.memory_listener import MemoryListener
class EventListener(BaseEventListener):
_instance = None
_telemetry: Telemetry = PrivateAttr(default_factory=lambda: Telemetry())
logger = Logger(verbose=True, default_color=EMITTER_COLOR)
execution_spans: Dict[Task, Any] = Field(default_factory=dict)
next_chunk = 0
text_stream = StringIO()
knowledge_retrieval_in_progress = False
knowledge_query_in_progress = False
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if not hasattr(self, "_initialized") or not self._initialized:
super().__init__()
self._telemetry = Telemetry()
self._telemetry.set_tracer()
self.execution_spans = {}
self._initialized = True
self.formatter = ConsoleFormatter(verbose=True)
MemoryListener(formatter=self.formatter)
# ----------- CREW EVENTS -----------
def setup_listeners(self, crewai_event_bus):
@crewai_event_bus.on(CrewKickoffStartedEvent)
def on_crew_started(source, event: CrewKickoffStartedEvent):
self.formatter.create_crew_tree(event.crew_name or "Crew", source.id)
self._telemetry.crew_execution_span(source, event.inputs)
@crewai_event_bus.on(CrewKickoffCompletedEvent)
def on_crew_completed(source, event: CrewKickoffCompletedEvent):
# Handle telemetry
final_string_output = event.output.raw
self._telemetry.end_crew(source, final_string_output)
self.formatter.update_crew_tree(
self.formatter.current_crew_tree,
event.crew_name or "Crew",
source.id,
"completed",
final_string_output,
)
@crewai_event_bus.on(CrewKickoffFailedEvent)
def on_crew_failed(source, event: CrewKickoffFailedEvent):
self.formatter.update_crew_tree(
self.formatter.current_crew_tree,
event.crew_name or "Crew",
source.id,
"failed",
)
@crewai_event_bus.on(CrewTrainStartedEvent)
def on_crew_train_started(source, event: CrewTrainStartedEvent):
self.formatter.handle_crew_train_started(
event.crew_name or "Crew", str(event.timestamp)
)
@crewai_event_bus.on(CrewTrainCompletedEvent)
def on_crew_train_completed(source, event: CrewTrainCompletedEvent):
self.formatter.handle_crew_train_completed(
event.crew_name or "Crew", str(event.timestamp)
)
@crewai_event_bus.on(CrewTrainFailedEvent)
def on_crew_train_failed(source, event: CrewTrainFailedEvent):
self.formatter.handle_crew_train_failed(event.crew_name or "Crew")
@crewai_event_bus.on(CrewTestResultEvent)
def on_crew_test_result(source, event: CrewTestResultEvent):
self._telemetry.individual_test_result_span(
source.crew,
event.quality,
int(event.execution_duration),
event.model,
)
# ----------- TASK EVENTS -----------
@crewai_event_bus.on(TaskStartedEvent)
def on_task_started(source, event: TaskStartedEvent):
span = self._telemetry.task_started(crew=source.agent.crew, task=source)
self.execution_spans[source] = span
# Pass both task ID and task name (if set)
task_name = source.name if hasattr(source, "name") and source.name else None
self.formatter.create_task_branch(
self.formatter.current_crew_tree, source.id, task_name
)
@crewai_event_bus.on(TaskCompletedEvent)
def on_task_completed(source, event: TaskCompletedEvent):
# Handle telemetry
span = self.execution_spans.get(source)
if span:
self._telemetry.task_ended(span, source, source.agent.crew)
self.execution_spans[source] = None
# Pass task name if it exists
task_name = source.name if hasattr(source, "name") and source.name else None
self.formatter.update_task_status(
self.formatter.current_crew_tree,
source.id,
source.agent.role,
"completed",
task_name,
)
@crewai_event_bus.on(TaskFailedEvent)
def on_task_failed(source, event: TaskFailedEvent):
span = self.execution_spans.get(source)
if span:
if source.agent and source.agent.crew:
self._telemetry.task_ended(span, source, source.agent.crew)
self.execution_spans[source] = None
# Pass task name if it exists
task_name = source.name if hasattr(source, "name") and source.name else None
self.formatter.update_task_status(
self.formatter.current_crew_tree,
source.id,
source.agent.role,
"failed",
task_name,
)
# ----------- AGENT EVENTS -----------
@crewai_event_bus.on(AgentExecutionStartedEvent)
def on_agent_execution_started(source, event: AgentExecutionStartedEvent):
self.formatter.create_agent_branch(
self.formatter.current_task_branch,
event.agent.role,
self.formatter.current_crew_tree,
)
@crewai_event_bus.on(AgentExecutionCompletedEvent)
def on_agent_execution_completed(source, event: AgentExecutionCompletedEvent):
self.formatter.update_agent_status(
self.formatter.current_agent_branch,
event.agent.role,
self.formatter.current_crew_tree,
)
# ----------- LITE AGENT EVENTS -----------
@crewai_event_bus.on(LiteAgentExecutionStartedEvent)
def on_lite_agent_execution_started(
source, event: LiteAgentExecutionStartedEvent
):
"""Handle LiteAgent execution started event."""
self.formatter.handle_lite_agent_execution(
event.agent_info["role"], status="started", **event.agent_info
)
@crewai_event_bus.on(LiteAgentExecutionCompletedEvent)
def on_lite_agent_execution_completed(
source, event: LiteAgentExecutionCompletedEvent
):
"""Handle LiteAgent execution completed event."""
self.formatter.handle_lite_agent_execution(
event.agent_info["role"], status="completed", **event.agent_info
)
@crewai_event_bus.on(LiteAgentExecutionErrorEvent)
def on_lite_agent_execution_error(source, event: LiteAgentExecutionErrorEvent):
"""Handle LiteAgent execution error event."""
self.formatter.handle_lite_agent_execution(
event.agent_info["role"],
status="failed",
error=event.error,
**event.agent_info,
)
# ----------- FLOW EVENTS -----------
@crewai_event_bus.on(FlowCreatedEvent)
def on_flow_created(source, event: FlowCreatedEvent):
self._telemetry.flow_creation_span(event.flow_name)
self.formatter.create_flow_tree(event.flow_name, str(source.flow_id))
@crewai_event_bus.on(FlowStartedEvent)
def on_flow_started(source, event: FlowStartedEvent):
self._telemetry.flow_execution_span(
event.flow_name, list(source._methods.keys())
)
self.formatter.start_flow(event.flow_name, str(source.flow_id))
@crewai_event_bus.on(FlowFinishedEvent)
def on_flow_finished(source, event: FlowFinishedEvent):
self.formatter.update_flow_status(
self.formatter.current_flow_tree, event.flow_name, source.flow_id
)
@crewai_event_bus.on(MethodExecutionStartedEvent)
def on_method_execution_started(source, event: MethodExecutionStartedEvent):
self.formatter.update_method_status(
self.formatter.current_method_branch,
self.formatter.current_flow_tree,
event.method_name,
"running",
)
@crewai_event_bus.on(MethodExecutionFinishedEvent)
def on_method_execution_finished(source, event: MethodExecutionFinishedEvent):
self.formatter.update_method_status(
self.formatter.current_method_branch,
self.formatter.current_flow_tree,
event.method_name,
"completed",
)
@crewai_event_bus.on(MethodExecutionFailedEvent)
def on_method_execution_failed(source, event: MethodExecutionFailedEvent):
self.formatter.update_method_status(
self.formatter.current_method_branch,
self.formatter.current_flow_tree,
event.method_name,
"failed",
)
# ----------- TOOL USAGE EVENTS -----------
@crewai_event_bus.on(ToolUsageStartedEvent)
def on_tool_usage_started(source, event: ToolUsageStartedEvent):
if isinstance(source, LLM):
self.formatter.handle_llm_tool_usage_started(
event.tool_name,
event.tool_args,
)
else:
self.formatter.handle_tool_usage_started(
self.formatter.current_agent_branch,
event.tool_name,
self.formatter.current_crew_tree,
)
@crewai_event_bus.on(ToolUsageFinishedEvent)
def on_tool_usage_finished(source, event: ToolUsageFinishedEvent):
if isinstance(source, LLM):
self.formatter.handle_llm_tool_usage_finished(
event.tool_name,
)
else:
self.formatter.handle_tool_usage_finished(
self.formatter.current_tool_branch,
event.tool_name,
self.formatter.current_crew_tree,
)
@crewai_event_bus.on(ToolUsageErrorEvent)
def on_tool_usage_error(source, event: ToolUsageErrorEvent):
if isinstance(source, LLM):
self.formatter.handle_llm_tool_usage_error(
event.tool_name,
event.error,
)
else:
self.formatter.handle_tool_usage_error(
self.formatter.current_tool_branch,
event.tool_name,
event.error,
self.formatter.current_crew_tree,
)
# ----------- LLM EVENTS -----------
@crewai_event_bus.on(LLMCallStartedEvent)
def on_llm_call_started(source, event: LLMCallStartedEvent):
# Capture the returned tool branch and update the current_tool_branch reference
thinking_branch = self.formatter.handle_llm_call_started(
self.formatter.current_agent_branch,
self.formatter.current_crew_tree,
)
# Update the formatter's current_tool_branch to ensure proper cleanup
if thinking_branch is not None:
self.formatter.current_tool_branch = thinking_branch
@crewai_event_bus.on(LLMCallCompletedEvent)
def on_llm_call_completed(source, event: LLMCallCompletedEvent):
self.formatter.handle_llm_call_completed(
self.formatter.current_tool_branch,
self.formatter.current_agent_branch,
self.formatter.current_crew_tree,
)
@crewai_event_bus.on(LLMCallFailedEvent)
def on_llm_call_failed(source, event: LLMCallFailedEvent):
self.formatter.handle_llm_call_failed(
self.formatter.current_tool_branch,
event.error,
self.formatter.current_crew_tree,
)
@crewai_event_bus.on(LLMStreamChunkEvent)
def on_llm_stream_chunk(source, event: LLMStreamChunkEvent):
self.text_stream.write(event.chunk)
self.text_stream.seek(self.next_chunk)
# Read from the in-memory stream
content = self.text_stream.read()
print(content, end="", flush=True)
self.next_chunk = self.text_stream.tell()
# ----------- LLM GUARDRAIL EVENTS -----------
@crewai_event_bus.on(LLMGuardrailStartedEvent)
def on_llm_guardrail_started(source, event: LLMGuardrailStartedEvent):
guardrail_str = str(event.guardrail)
guardrail_name = (
guardrail_str[:50] + "..." if len(guardrail_str) > 50 else guardrail_str
)
self.formatter.handle_guardrail_started(guardrail_name, event.retry_count)
@crewai_event_bus.on(LLMGuardrailCompletedEvent)
def on_llm_guardrail_completed(source, event: LLMGuardrailCompletedEvent):
self.formatter.handle_guardrail_completed(
event.success, event.error, event.retry_count
)
@crewai_event_bus.on(CrewTestStartedEvent)
def on_crew_test_started(source, event: CrewTestStartedEvent):
cloned_crew = source.copy()
self._telemetry.test_execution_span(
cloned_crew,
event.n_iterations,
event.inputs,
event.eval_llm or "",
)
self.formatter.handle_crew_test_started(
event.crew_name or "Crew", source.id, event.n_iterations
)
@crewai_event_bus.on(CrewTestCompletedEvent)
def on_crew_test_completed(source, event: CrewTestCompletedEvent):
self.formatter.handle_crew_test_completed(
self.formatter.current_flow_tree,
event.crew_name or "Crew",
)
@crewai_event_bus.on(CrewTestFailedEvent)
def on_crew_test_failed(source, event: CrewTestFailedEvent):
self.formatter.handle_crew_test_failed(event.crew_name or "Crew")
@crewai_event_bus.on(KnowledgeRetrievalStartedEvent)
def on_knowledge_retrieval_started(
source, event: KnowledgeRetrievalStartedEvent
):
if self.knowledge_retrieval_in_progress:
return
self.knowledge_retrieval_in_progress = True
self.formatter.handle_knowledge_retrieval_started(
self.formatter.current_agent_branch,
self.formatter.current_crew_tree,
)
@crewai_event_bus.on(KnowledgeRetrievalCompletedEvent)
def on_knowledge_retrieval_completed(
source, event: KnowledgeRetrievalCompletedEvent
):
if not self.knowledge_retrieval_in_progress:
return
self.knowledge_retrieval_in_progress = False
self.formatter.handle_knowledge_retrieval_completed(
self.formatter.current_agent_branch,
self.formatter.current_crew_tree,
event.retrieved_knowledge,
)
@crewai_event_bus.on(KnowledgeQueryStartedEvent)
def on_knowledge_query_started(source, event: KnowledgeQueryStartedEvent):
pass
@crewai_event_bus.on(KnowledgeQueryFailedEvent)
def on_knowledge_query_failed(source, event: KnowledgeQueryFailedEvent):
self.formatter.handle_knowledge_query_failed(
self.formatter.current_agent_branch,
event.error,
self.formatter.current_crew_tree,
)
@crewai_event_bus.on(KnowledgeQueryCompletedEvent)
def on_knowledge_query_completed(source, event: KnowledgeQueryCompletedEvent):
pass
@crewai_event_bus.on(KnowledgeSearchQueryFailedEvent)
def on_knowledge_search_query_failed(
source, event: KnowledgeSearchQueryFailedEvent
):
self.formatter.handle_knowledge_search_query_failed(
self.formatter.current_agent_branch,
event.error,
self.formatter.current_crew_tree,
)
# ----------- REASONING EVENTS -----------
@crewai_event_bus.on(AgentReasoningStartedEvent)
def on_agent_reasoning_started(source, event: AgentReasoningStartedEvent):
self.formatter.handle_reasoning_started(
self.formatter.current_agent_branch,
event.attempt,
self.formatter.current_crew_tree,
)
@crewai_event_bus.on(AgentReasoningCompletedEvent)
def on_agent_reasoning_completed(source, event: AgentReasoningCompletedEvent):
self.formatter.handle_reasoning_completed(
event.plan,
event.ready,
self.formatter.current_crew_tree,
)
@crewai_event_bus.on(AgentReasoningFailedEvent)
def on_agent_reasoning_failed(source, event: AgentReasoningFailedEvent):
self.formatter.handle_reasoning_failed(
event.error,
self.formatter.current_crew_tree,
)
# ----------- AGENT LOGGING EVENTS -----------
@crewai_event_bus.on(AgentLogsStartedEvent)
def on_agent_logs_started(source, event: AgentLogsStartedEvent):
self.formatter.handle_agent_logs_started(
event.agent_role,
event.task_description,
event.verbose,
)
@crewai_event_bus.on(AgentLogsExecutionEvent)
def on_agent_logs_execution(source, event: AgentLogsExecutionEvent):
self.formatter.handle_agent_logs_execution(
event.agent_role,
event.formatted_answer,
event.verbose,
)
event_listener = EventListener()

View File

@@ -0,0 +1,120 @@
from typing import Union
from crewai.events.types.agent_events import (
AgentExecutionCompletedEvent,
AgentExecutionErrorEvent,
AgentExecutionStartedEvent,
LiteAgentExecutionCompletedEvent,
)
from .types.crew_events import (
CrewKickoffCompletedEvent,
CrewKickoffFailedEvent,
CrewKickoffStartedEvent,
CrewTestCompletedEvent,
CrewTestFailedEvent,
CrewTestStartedEvent,
CrewTrainCompletedEvent,
CrewTrainFailedEvent,
CrewTrainStartedEvent,
)
from .types.flow_events import (
FlowFinishedEvent,
FlowStartedEvent,
MethodExecutionFailedEvent,
MethodExecutionFinishedEvent,
MethodExecutionStartedEvent,
)
from .types.llm_events import (
LLMCallCompletedEvent,
LLMCallFailedEvent,
LLMCallStartedEvent,
LLMStreamChunkEvent,
)
from .types.llm_guardrail_events import (
LLMGuardrailCompletedEvent,
LLMGuardrailStartedEvent,
)
from .types.task_events import (
TaskCompletedEvent,
TaskFailedEvent,
TaskStartedEvent,
)
from .types.tool_usage_events import (
ToolUsageErrorEvent,
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
from .types.reasoning_events import (
AgentReasoningStartedEvent,
AgentReasoningCompletedEvent,
AgentReasoningFailedEvent,
)
from .types.knowledge_events import (
KnowledgeRetrievalStartedEvent,
KnowledgeRetrievalCompletedEvent,
KnowledgeQueryStartedEvent,
KnowledgeQueryCompletedEvent,
KnowledgeQueryFailedEvent,
KnowledgeSearchQueryFailedEvent,
)
from .types.memory_events import (
MemorySaveStartedEvent,
MemorySaveCompletedEvent,
MemorySaveFailedEvent,
MemoryQueryStartedEvent,
MemoryQueryCompletedEvent,
MemoryQueryFailedEvent,
MemoryRetrievalStartedEvent,
MemoryRetrievalCompletedEvent,
)
EventTypes = Union[
CrewKickoffStartedEvent,
CrewKickoffCompletedEvent,
CrewKickoffFailedEvent,
CrewTestStartedEvent,
CrewTestCompletedEvent,
CrewTestFailedEvent,
CrewTrainStartedEvent,
CrewTrainCompletedEvent,
CrewTrainFailedEvent,
AgentExecutionStartedEvent,
AgentExecutionCompletedEvent,
LiteAgentExecutionCompletedEvent,
TaskStartedEvent,
TaskCompletedEvent,
TaskFailedEvent,
FlowStartedEvent,
FlowFinishedEvent,
MethodExecutionStartedEvent,
MethodExecutionFinishedEvent,
MethodExecutionFailedEvent,
AgentExecutionErrorEvent,
ToolUsageFinishedEvent,
ToolUsageErrorEvent,
ToolUsageStartedEvent,
LLMCallStartedEvent,
LLMCallCompletedEvent,
LLMCallFailedEvent,
LLMStreamChunkEvent,
LLMGuardrailStartedEvent,
LLMGuardrailCompletedEvent,
AgentReasoningStartedEvent,
AgentReasoningCompletedEvent,
AgentReasoningFailedEvent,
KnowledgeRetrievalStartedEvent,
KnowledgeRetrievalCompletedEvent,
KnowledgeQueryStartedEvent,
KnowledgeQueryCompletedEvent,
KnowledgeQueryFailedEvent,
KnowledgeSearchQueryFailedEvent,
MemorySaveStartedEvent,
MemorySaveCompletedEvent,
MemorySaveFailedEvent,
MemoryQueryStartedEvent,
MemoryQueryCompletedEvent,
MemoryQueryFailedEvent,
MemoryRetrievalStartedEvent,
MemoryRetrievalCompletedEvent,
]

View File

@@ -0,0 +1,5 @@
"""Event listener implementations for CrewAI.
This module contains various event listener implementations
for handling memory, tracing, and other event-driven functionality.
"""

View File

@@ -0,0 +1,106 @@
from crewai.events.base_event_listener import BaseEventListener
from crewai.events.types.memory_events import (
MemoryRetrievalCompletedEvent,
MemoryRetrievalStartedEvent,
MemoryQueryFailedEvent,
MemoryQueryCompletedEvent,
MemorySaveStartedEvent,
MemorySaveCompletedEvent,
MemorySaveFailedEvent,
)
class MemoryListener(BaseEventListener):
def __init__(self, formatter):
super().__init__()
self.formatter = formatter
self.memory_retrieval_in_progress = False
self.memory_save_in_progress = False
def setup_listeners(self, crewai_event_bus):
@crewai_event_bus.on(MemoryRetrievalStartedEvent)
def on_memory_retrieval_started(source, event: MemoryRetrievalStartedEvent):
if self.memory_retrieval_in_progress:
return
self.memory_retrieval_in_progress = True
self.formatter.handle_memory_retrieval_started(
self.formatter.current_agent_branch,
self.formatter.current_crew_tree,
)
@crewai_event_bus.on(MemoryRetrievalCompletedEvent)
def on_memory_retrieval_completed(source, event: MemoryRetrievalCompletedEvent):
if not self.memory_retrieval_in_progress:
return
self.memory_retrieval_in_progress = False
self.formatter.handle_memory_retrieval_completed(
self.formatter.current_agent_branch,
self.formatter.current_crew_tree,
event.memory_content,
event.retrieval_time_ms,
)
@crewai_event_bus.on(MemoryQueryCompletedEvent)
def on_memory_query_completed(source, event: MemoryQueryCompletedEvent):
if not self.memory_retrieval_in_progress:
return
self.formatter.handle_memory_query_completed(
self.formatter.current_agent_branch,
event.source_type,
event.query_time_ms,
self.formatter.current_crew_tree,
)
@crewai_event_bus.on(MemoryQueryFailedEvent)
def on_memory_query_failed(source, event: MemoryQueryFailedEvent):
if not self.memory_retrieval_in_progress:
return
self.formatter.handle_memory_query_failed(
self.formatter.current_agent_branch,
self.formatter.current_crew_tree,
event.error,
event.source_type,
)
@crewai_event_bus.on(MemorySaveStartedEvent)
def on_memory_save_started(source, event: MemorySaveStartedEvent):
if self.memory_save_in_progress:
return
self.memory_save_in_progress = True
self.formatter.handle_memory_save_started(
self.formatter.current_agent_branch,
self.formatter.current_crew_tree,
)
@crewai_event_bus.on(MemorySaveCompletedEvent)
def on_memory_save_completed(source, event: MemorySaveCompletedEvent):
if not self.memory_save_in_progress:
return
self.memory_save_in_progress = False
self.formatter.handle_memory_save_completed(
self.formatter.current_agent_branch,
self.formatter.current_crew_tree,
event.save_time_ms,
event.source_type,
)
@crewai_event_bus.on(MemorySaveFailedEvent)
def on_memory_save_failed(source, event: MemorySaveFailedEvent):
if not self.memory_save_in_progress:
return
self.formatter.handle_memory_save_failed(
self.formatter.current_agent_branch,
event.error,
event.source_type,
self.formatter.current_crew_tree,
)

View File

@@ -0,0 +1,313 @@
import uuid
from datetime import datetime, timezone
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, field
from crewai.utilities.constants import CREWAI_BASE_URL
from crewai.cli.authentication.token import AuthError, get_auth_token
from crewai.cli.version import get_crewai_version
from crewai.cli.plus_api import PlusAPI
from rich.console import Console
from rich.panel import Panel
from crewai.events.listeners.tracing.types import TraceEvent
from logging import getLogger
logger = getLogger(__name__)
@dataclass
class TraceBatch:
"""Batch of events to send to backend"""
version: str = field(default_factory=get_crewai_version)
batch_id: str = field(default_factory=lambda: str(uuid.uuid4()))
user_context: Dict[str, str] = field(default_factory=dict)
execution_metadata: Dict[str, Any] = field(default_factory=dict)
events: List[TraceEvent] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
return {
"version": self.version,
"batch_id": self.batch_id,
"user_context": self.user_context,
"execution_metadata": self.execution_metadata,
"events": [event.to_dict() for event in self.events],
}
class TraceBatchManager:
"""Single responsibility: Manage batches and event buffering"""
is_current_batch_ephemeral: bool = False
trace_batch_id: Optional[str] = None
current_batch: Optional[TraceBatch] = None
event_buffer: List[TraceEvent] = []
execution_start_times: Dict[str, datetime] = {}
batch_owner_type: Optional[str] = None
batch_owner_id: Optional[str] = None
def __init__(self):
try:
self.plus_api = PlusAPI(
api_key=get_auth_token(),
)
except AuthError:
self.plus_api = PlusAPI(api_key="")
def initialize_batch(
self,
user_context: Dict[str, str],
execution_metadata: Dict[str, Any],
use_ephemeral: bool = False,
) -> TraceBatch:
"""Initialize a new trace batch"""
self.current_batch = TraceBatch(
user_context=user_context, execution_metadata=execution_metadata
)
self.event_buffer.clear()
self.is_current_batch_ephemeral = use_ephemeral
self.record_start_time("execution")
self._initialize_backend_batch(user_context, execution_metadata, use_ephemeral)
return self.current_batch
def _initialize_backend_batch(
self,
user_context: Dict[str, str],
execution_metadata: Dict[str, Any],
use_ephemeral: bool = False,
):
"""Send batch initialization to backend"""
if not self.plus_api or not self.current_batch:
return
try:
payload = {
"trace_id": self.current_batch.batch_id,
"execution_type": execution_metadata.get("execution_type", "crew"),
"user_identifier": execution_metadata.get("user_context", None),
"execution_context": {
"crew_fingerprint": execution_metadata.get("crew_fingerprint"),
"crew_name": execution_metadata.get("crew_name", None),
"flow_name": execution_metadata.get("flow_name", None),
"crewai_version": self.current_batch.version,
"privacy_level": user_context.get("privacy_level", "standard"),
},
"execution_metadata": {
"expected_duration_estimate": execution_metadata.get(
"expected_duration_estimate", 300
),
"agent_count": execution_metadata.get("agent_count", 0),
"task_count": execution_metadata.get("task_count", 0),
"flow_method_count": execution_metadata.get("flow_method_count", 0),
"execution_started_at": datetime.now(timezone.utc).isoformat(),
},
}
if use_ephemeral:
payload["ephemeral_trace_id"] = self.current_batch.batch_id
response = (
self.plus_api.initialize_ephemeral_trace_batch(payload)
if use_ephemeral
else self.plus_api.initialize_trace_batch(payload)
)
if response is None:
logger.warning(
"Trace batch initialization failed gracefully. Continuing without tracing."
)
return
if response.status_code in [201, 200]:
response_data = response.json()
self.trace_batch_id = (
response_data["trace_id"]
if not use_ephemeral
else response_data["ephemeral_trace_id"]
)
console = Console()
panel = Panel(
f"✅ Trace batch initialized with session ID: {self.trace_batch_id}",
title="Trace Batch Initialization",
border_style="green",
)
console.print(panel)
else:
logger.warning(
f"Trace batch initialization returned status {response.status_code}. Continuing without tracing."
)
except Exception as e:
logger.warning(
f"Error initializing trace batch: {str(e)}. Continuing without tracing."
)
def add_event(self, trace_event: TraceEvent):
"""Add event to buffer"""
self.event_buffer.append(trace_event)
def _send_events_to_backend(self) -> int:
"""Send buffered events to backend with graceful failure handling"""
if not self.plus_api or not self.trace_batch_id or not self.event_buffer:
return 500
try:
payload = {
"events": [event.to_dict() for event in self.event_buffer],
"batch_metadata": {
"events_count": len(self.event_buffer),
"batch_sequence": 1,
"is_final_batch": False,
},
}
response = (
self.plus_api.send_ephemeral_trace_events(self.trace_batch_id, payload)
if self.is_current_batch_ephemeral
else self.plus_api.send_trace_events(self.trace_batch_id, payload)
)
if response is None:
logger.warning("Failed to send trace events. Events will be lost.")
return 500
if response.status_code in [200, 201]:
self.event_buffer.clear()
return 200
else:
logger.warning(
f"Failed to send events: {response.status_code}. Events will be lost."
)
return 500
except Exception as e:
logger.warning(
f"Error sending events to backend: {str(e)}. Events will be lost."
)
return 500
def finalize_batch(self) -> Optional[TraceBatch]:
"""Finalize batch and return it for sending"""
if not self.current_batch:
return None
self.current_batch.events = self.event_buffer.copy()
if self.event_buffer:
events_sent_to_backend_status = self._send_events_to_backend()
if events_sent_to_backend_status == 500:
return None
self._finalize_backend_batch()
finalized_batch = self.current_batch
self.batch_owner_type = None
self.batch_owner_id = None
self.current_batch = None
self.event_buffer.clear()
self.trace_batch_id = None
self.is_current_batch_ephemeral = False
self._cleanup_batch_data()
return finalized_batch
def _finalize_backend_batch(self):
"""Send batch finalization to backend"""
if not self.plus_api or not self.trace_batch_id:
return
try:
total_events = len(self.current_batch.events) if self.current_batch else 0
payload = {
"status": "completed",
"duration_ms": self.calculate_duration("execution"),
"final_event_count": total_events,
}
response = (
self.plus_api.finalize_ephemeral_trace_batch(
self.trace_batch_id, payload
)
if self.is_current_batch_ephemeral
else self.plus_api.finalize_trace_batch(self.trace_batch_id, payload)
)
if response.status_code == 200:
access_code = response.json().get("access_code", None)
console = Console()
return_link = (
f"{CREWAI_BASE_URL}/crewai_plus/trace_batches/{self.trace_batch_id}"
if not self.is_current_batch_ephemeral and access_code is None
else f"{CREWAI_BASE_URL}/crewai_plus/ephemeral_trace_batches/{self.trace_batch_id}?access_code={access_code}"
)
panel = Panel(
f"✅ Trace batch finalized with session ID: {self.trace_batch_id}. View here: {return_link} {f', Access Code: {access_code}' if access_code else ''}",
title="Trace Batch Finalization",
border_style="green",
)
console.print(panel)
else:
logger.error(
f"❌ Failed to finalize trace batch: {response.status_code} - {response.text}"
)
except Exception as e:
logger.error(f"❌ Error finalizing trace batch: {str(e)}")
# TODO: send error to app
def _cleanup_batch_data(self):
"""Clean up batch data after successful finalization to free memory"""
try:
if hasattr(self, "event_buffer") and self.event_buffer:
self.event_buffer.clear()
if hasattr(self, "current_batch") and self.current_batch:
if hasattr(self.current_batch, "events") and self.current_batch.events:
self.current_batch.events.clear()
self.current_batch = None
if hasattr(self, "batch_sequence"):
self.batch_sequence = 0
except Exception as e:
logger.error(f"Warning: Error during cleanup: {str(e)}")
def has_events(self) -> bool:
"""Check if there are events in the buffer"""
return len(self.event_buffer) > 0
def get_event_count(self) -> int:
"""Get number of events in buffer"""
return len(self.event_buffer)
def is_batch_initialized(self) -> bool:
"""Check if batch is initialized"""
return self.current_batch is not None
def record_start_time(self, key: str):
"""Record start time for duration calculation"""
self.execution_start_times[key] = datetime.now(timezone.utc)
def calculate_duration(self, key: str) -> int:
"""Calculate duration in milliseconds from recorded start time"""
start_time = self.execution_start_times.get(key)
if start_time:
duration_ms = int(
(datetime.now(timezone.utc) - start_time).total_seconds() * 1000
)
del self.execution_start_times[key]
return duration_ms
return 0
def get_trace_id(self) -> Optional[str]:
"""Get current trace ID"""
if self.current_batch:
return self.current_batch.user_context.get("trace_id")
return None

View File

@@ -0,0 +1,459 @@
import os
import uuid
from typing import Dict, Any, Optional
from crewai.events.base_event_listener import BaseEventListener
from crewai.events.types.agent_events import (
AgentExecutionCompletedEvent,
AgentExecutionStartedEvent,
LiteAgentExecutionStartedEvent,
LiteAgentExecutionCompletedEvent,
LiteAgentExecutionErrorEvent,
AgentExecutionErrorEvent,
)
from crewai.events.listeners.tracing.types import TraceEvent
from crewai.events.types.reasoning_events import (
AgentReasoningStartedEvent,
AgentReasoningCompletedEvent,
AgentReasoningFailedEvent,
)
from crewai.events.types.crew_events import (
CrewKickoffCompletedEvent,
CrewKickoffFailedEvent,
CrewKickoffStartedEvent,
)
from crewai.events.types.task_events import (
TaskCompletedEvent,
TaskFailedEvent,
TaskStartedEvent,
)
from crewai.events.types.tool_usage_events import (
ToolUsageErrorEvent,
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
from crewai.events.types.llm_events import (
LLMCallCompletedEvent,
LLMCallFailedEvent,
LLMCallStartedEvent,
)
from crewai.events.types.flow_events import (
FlowCreatedEvent,
FlowStartedEvent,
FlowFinishedEvent,
MethodExecutionStartedEvent,
MethodExecutionFinishedEvent,
MethodExecutionFailedEvent,
FlowPlotEvent,
)
from crewai.events.types.llm_guardrail_events import (
LLMGuardrailStartedEvent,
LLMGuardrailCompletedEvent,
)
from crewai.utilities.serialization import to_serializable
from .trace_batch_manager import TraceBatchManager
from crewai.events.types.memory_events import (
MemoryQueryStartedEvent,
MemoryQueryCompletedEvent,
MemoryQueryFailedEvent,
MemorySaveStartedEvent,
MemorySaveCompletedEvent,
MemorySaveFailedEvent,
)
from crewai.cli.authentication.token import AuthError, get_auth_token
from crewai.cli.version import get_crewai_version
class TraceCollectionListener(BaseEventListener):
"""
Trace collection listener that orchestrates trace collection
"""
complex_events = [
"task_started",
"task_completed",
"llm_call_started",
"llm_call_completed",
"agent_execution_started",
"agent_execution_completed",
]
_instance = None
_initialized = False
_listeners_setup = False
def __new__(cls, batch_manager=None):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(
self,
batch_manager: Optional[TraceBatchManager] = None,
):
if self._initialized:
return
super().__init__()
self.batch_manager = batch_manager or TraceBatchManager()
self._initialized = True
def _check_authenticated(self) -> bool:
"""Check if tracing should be enabled"""
try:
res = bool(get_auth_token())
return res
except AuthError:
return False
def _get_user_context(self) -> Dict[str, str]:
"""Extract user context for tracing"""
return {
"user_id": os.getenv("CREWAI_USER_ID", "anonymous"),
"organization_id": os.getenv("CREWAI_ORG_ID", ""),
"session_id": str(uuid.uuid4()),
"trace_id": str(uuid.uuid4()),
}
def setup_listeners(self, crewai_event_bus):
"""Setup event listeners - delegates to specific handlers"""
if self._listeners_setup:
return
self._register_flow_event_handlers(crewai_event_bus)
self._register_context_event_handlers(crewai_event_bus)
self._register_action_event_handlers(crewai_event_bus)
self._listeners_setup = True
def _register_flow_event_handlers(self, event_bus):
"""Register handlers for flow events"""
@event_bus.on(FlowCreatedEvent)
def on_flow_created(source, event):
pass
@event_bus.on(FlowStartedEvent)
def on_flow_started(source, event):
if not self.batch_manager.is_batch_initialized():
self._initialize_flow_batch(source, event)
self._handle_trace_event("flow_started", source, event)
@event_bus.on(MethodExecutionStartedEvent)
def on_method_started(source, event):
self._handle_trace_event("method_execution_started", source, event)
@event_bus.on(MethodExecutionFinishedEvent)
def on_method_finished(source, event):
self._handle_trace_event("method_execution_finished", source, event)
@event_bus.on(MethodExecutionFailedEvent)
def on_method_failed(source, event):
self._handle_trace_event("method_execution_failed", source, event)
@event_bus.on(FlowFinishedEvent)
def on_flow_finished(source, event):
self._handle_trace_event("flow_finished", source, event)
if self.batch_manager.batch_owner_type == "flow":
self.batch_manager.finalize_batch()
@event_bus.on(FlowPlotEvent)
def on_flow_plot(source, event):
self._handle_action_event("flow_plot", source, event)
def _register_context_event_handlers(self, event_bus):
"""Register handlers for context events (start/end)"""
@event_bus.on(CrewKickoffStartedEvent)
def on_crew_started(source, event):
if not self.batch_manager.is_batch_initialized():
self._initialize_crew_batch(source, event)
self._handle_trace_event("crew_kickoff_started", source, event)
@event_bus.on(CrewKickoffCompletedEvent)
def on_crew_completed(source, event):
self._handle_trace_event("crew_kickoff_completed", source, event)
if self.batch_manager.batch_owner_type == "crew":
self.batch_manager.finalize_batch()
@event_bus.on(CrewKickoffFailedEvent)
def on_crew_failed(source, event):
self._handle_trace_event("crew_kickoff_failed", source, event)
self.batch_manager.finalize_batch()
@event_bus.on(TaskStartedEvent)
def on_task_started(source, event):
self._handle_trace_event("task_started", source, event)
@event_bus.on(TaskCompletedEvent)
def on_task_completed(source, event):
self._handle_trace_event("task_completed", source, event)
@event_bus.on(TaskFailedEvent)
def on_task_failed(source, event):
self._handle_trace_event("task_failed", source, event)
@event_bus.on(AgentExecutionStartedEvent)
def on_agent_started(source, event):
self._handle_trace_event("agent_execution_started", source, event)
@event_bus.on(AgentExecutionCompletedEvent)
def on_agent_completed(source, event):
self._handle_trace_event("agent_execution_completed", source, event)
@event_bus.on(LiteAgentExecutionStartedEvent)
def on_lite_agent_started(source, event):
self._handle_trace_event("lite_agent_execution_started", source, event)
@event_bus.on(LiteAgentExecutionCompletedEvent)
def on_lite_agent_completed(source, event):
self._handle_trace_event("lite_agent_execution_completed", source, event)
@event_bus.on(LiteAgentExecutionErrorEvent)
def on_lite_agent_error(source, event):
self._handle_trace_event("lite_agent_execution_error", source, event)
@event_bus.on(AgentExecutionErrorEvent)
def on_agent_error(source, event):
self._handle_trace_event("agent_execution_error", source, event)
@event_bus.on(LLMGuardrailStartedEvent)
def on_guardrail_started(source, event):
self._handle_trace_event("llm_guardrail_started", source, event)
@event_bus.on(LLMGuardrailCompletedEvent)
def on_guardrail_completed(source, event):
self._handle_trace_event("llm_guardrail_completed", source, event)
def _register_action_event_handlers(self, event_bus):
"""Register handlers for action events (LLM calls, tool usage)"""
@event_bus.on(LLMCallStartedEvent)
def on_llm_call_started(source, event):
self._handle_action_event("llm_call_started", source, event)
@event_bus.on(LLMCallCompletedEvent)
def on_llm_call_completed(source, event):
self._handle_action_event("llm_call_completed", source, event)
@event_bus.on(LLMCallFailedEvent)
def on_llm_call_failed(source, event):
self._handle_action_event("llm_call_failed", source, event)
@event_bus.on(ToolUsageStartedEvent)
def on_tool_started(source, event):
self._handle_action_event("tool_usage_started", source, event)
@event_bus.on(ToolUsageFinishedEvent)
def on_tool_finished(source, event):
self._handle_action_event("tool_usage_finished", source, event)
@event_bus.on(ToolUsageErrorEvent)
def on_tool_error(source, event):
self._handle_action_event("tool_usage_error", source, event)
@event_bus.on(MemoryQueryStartedEvent)
def on_memory_query_started(source, event):
self._handle_action_event("memory_query_started", source, event)
@event_bus.on(MemoryQueryCompletedEvent)
def on_memory_query_completed(source, event):
self._handle_action_event("memory_query_completed", source, event)
@event_bus.on(MemoryQueryFailedEvent)
def on_memory_query_failed(source, event):
self._handle_action_event("memory_query_failed", source, event)
@event_bus.on(MemorySaveStartedEvent)
def on_memory_save_started(source, event):
self._handle_action_event("memory_save_started", source, event)
@event_bus.on(MemorySaveCompletedEvent)
def on_memory_save_completed(source, event):
self._handle_action_event("memory_save_completed", source, event)
@event_bus.on(MemorySaveFailedEvent)
def on_memory_save_failed(source, event):
self._handle_action_event("memory_save_failed", source, event)
@event_bus.on(AgentReasoningStartedEvent)
def on_agent_reasoning_started(source, event):
self._handle_action_event("agent_reasoning_started", source, event)
@event_bus.on(AgentReasoningCompletedEvent)
def on_agent_reasoning_completed(source, event):
self._handle_action_event("agent_reasoning_completed", source, event)
@event_bus.on(AgentReasoningFailedEvent)
def on_agent_reasoning_failed(source, event):
self._handle_action_event("agent_reasoning_failed", source, event)
def _initialize_crew_batch(self, source: Any, event: Any):
"""Initialize trace batch"""
user_context = self._get_user_context()
execution_metadata = {
"crew_name": getattr(event, "crew_name", "Unknown Crew"),
"execution_start": event.timestamp if hasattr(event, "timestamp") else None,
"crewai_version": get_crewai_version(),
}
self.batch_manager.batch_owner_type = "crew"
self.batch_manager.batch_owner_id = getattr(source, "id", str(uuid.uuid4()))
self._initialize_batch(user_context, execution_metadata)
def _initialize_flow_batch(self, source: Any, event: Any):
"""Initialize trace batch for Flow execution"""
user_context = self._get_user_context()
execution_metadata = {
"flow_name": getattr(event, "flow_name", "Unknown Flow"),
"execution_start": event.timestamp if hasattr(event, "timestamp") else None,
"crewai_version": get_crewai_version(),
"execution_type": "flow",
}
self.batch_manager.batch_owner_type = "flow"
self.batch_manager.batch_owner_id = getattr(source, "id", str(uuid.uuid4()))
self._initialize_batch(user_context, execution_metadata)
def _initialize_batch(
self, user_context: Dict[str, str], execution_metadata: Dict[str, Any]
):
"""Initialize trace batch if ephemeral"""
if not self._check_authenticated():
self.batch_manager.initialize_batch(
user_context, execution_metadata, use_ephemeral=True
)
else:
self.batch_manager.initialize_batch(
user_context, execution_metadata, use_ephemeral=False
)
def _handle_trace_event(self, event_type: str, source: Any, event: Any):
"""Generic handler for context end events"""
trace_event = self._create_trace_event(event_type, source, event)
self.batch_manager.add_event(trace_event)
def _handle_action_event(self, event_type: str, source: Any, event: Any):
"""Generic handler for action events (LLM calls, tool usage)"""
if not self.batch_manager.is_batch_initialized():
user_context = self._get_user_context()
execution_metadata = {
"crew_name": getattr(source, "name", "Unknown Crew"),
"crewai_version": get_crewai_version(),
}
self.batch_manager.initialize_batch(user_context, execution_metadata)
trace_event = self._create_trace_event(event_type, source, event)
self.batch_manager.add_event(trace_event)
def _create_trace_event(
self, event_type: str, source: Any, event: Any
) -> TraceEvent:
"""Create a trace event"""
trace_event = TraceEvent(
type=event_type,
)
trace_event.event_data = self._build_event_data(event_type, event, source)
return trace_event
def _build_event_data(
self, event_type: str, event: Any, source: Any
) -> Dict[str, Any]:
"""Build event data"""
if event_type not in self.complex_events:
return self._safe_serialize_to_dict(event)
elif event_type == "task_started":
return {
"task_description": event.task.description,
"expected_output": event.task.expected_output,
"task_name": event.task.name or event.task.description,
"context": event.context,
"agent_role": source.agent.role,
"task_id": str(event.task.id),
}
elif event_type == "task_completed":
return {
"task_description": event.task.description if event.task else None,
"task_name": event.task.name or event.task.description
if event.task
else None,
"task_id": str(event.task.id) if event.task else None,
"output_raw": event.output.raw if event.output else None,
"output_format": str(event.output.output_format)
if event.output
else None,
"agent_role": event.output.agent if event.output else None,
}
elif event_type == "agent_execution_started":
return {
"agent_role": event.agent.role,
"agent_goal": event.agent.goal,
"agent_backstory": event.agent.backstory,
}
elif event_type == "agent_execution_completed":
return {
"agent_role": event.agent.role,
"agent_goal": event.agent.goal,
"agent_backstory": event.agent.backstory,
}
elif event_type == "llm_call_started":
event_data = self._safe_serialize_to_dict(event)
event_data["task_name"] = (
event.task_name or event.task_description
if hasattr(event, "task_name") and event.task_name
else None
)
return event_data
elif event_type == "llm_call_completed":
return self._safe_serialize_to_dict(event)
else:
return {
"event_type": event_type,
"event": self._safe_serialize_to_dict(event),
"source": source,
}
# TODO: move to utils
def _safe_serialize_to_dict(
self, obj, exclude: set[str] | None = None
) -> Dict[str, Any]:
"""Safely serialize an object to a dictionary for event data."""
try:
serialized = to_serializable(obj, exclude)
if isinstance(serialized, dict):
return serialized
else:
return {"serialized_data": serialized}
except Exception as e:
return {"serialization_error": str(e), "object_type": type(obj).__name__}
# TODO: move to utils
def _truncate_messages(self, messages, max_content_length=500, max_messages=5):
"""Truncate message content and limit number of messages"""
if not messages or not isinstance(messages, list):
return messages
# Limit number of messages
limited_messages = messages[:max_messages]
# Truncate each message content
for msg in limited_messages:
if isinstance(msg, dict) and "content" in msg:
content = msg["content"]
if len(content) > max_content_length:
msg["content"] = content[:max_content_length] + "..."
return limited_messages

View File

@@ -0,0 +1,19 @@
from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone
from typing import Dict, Any
import uuid
@dataclass
class TraceEvent:
"""Individual trace event payload"""
event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
timestamp: str = field(
default_factory=lambda: datetime.now(timezone.utc).isoformat()
)
type: str = ""
event_data: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
return asdict(self)

View File

@@ -0,0 +1,153 @@
import os
import platform
import uuid
import hashlib
import subprocess
import getpass
from pathlib import Path
from datetime import datetime
import re
import json
import click
from crewai.utilities.paths import db_storage_path
def is_tracing_enabled() -> bool:
return os.getenv("CREWAI_TRACING_ENABLED", "false").lower() == "true"
def on_first_execution_tracing_confirmation() -> bool:
if _is_test_environment():
return False
if is_first_execution():
mark_first_execution_done()
return click.confirm(
"This is the first execution of CrewAI. Do you want to enable tracing?",
default=True,
show_default=True,
)
return False
def _is_test_environment() -> bool:
"""Detect if we're running in a test environment."""
return os.environ.get("CREWAI_TESTING", "").lower() == "true"
def _get_machine_id() -> str:
"""Stable, privacy-preserving machine fingerprint (cross-platform)."""
parts = []
try:
mac = ":".join(
["{:02x}".format((uuid.getnode() >> b) & 0xFF) for b in range(0, 12, 2)][
::-1
]
)
parts.append(mac)
except Exception:
pass
sysname = platform.system()
parts.append(sysname)
try:
if sysname == "Darwin":
res = subprocess.run(
["system_profiler", "SPHardwareDataType"],
capture_output=True,
text=True,
timeout=2,
)
m = re.search(r"Hardware UUID:\s*([A-Fa-f0-9\-]+)", res.stdout)
if m:
parts.append(m.group(1))
elif sysname == "Linux":
try:
parts.append(Path("/etc/machine-id").read_text().strip())
except Exception:
parts.append(Path("/sys/class/dmi/id/product_uuid").read_text().strip())
elif sysname == "Windows":
res = subprocess.run(
["wmic", "csproduct", "get", "UUID"],
capture_output=True,
text=True,
timeout=2,
)
lines = [line.strip() for line in res.stdout.splitlines() if line.strip()]
if len(lines) >= 2:
parts.append(lines[1])
except Exception:
pass
return hashlib.sha256("".join(parts).encode()).hexdigest()
def _user_data_file() -> Path:
base = Path(db_storage_path())
base.mkdir(parents=True, exist_ok=True)
return base / ".crewai_user.json"
def _load_user_data() -> dict:
p = _user_data_file()
if p.exists():
try:
return json.loads(p.read_text())
except Exception:
pass
return {}
def _save_user_data(data: dict) -> None:
try:
p = _user_data_file()
p.write_text(json.dumps(data, indent=2))
except Exception:
pass
def get_user_id() -> str:
"""Stable, anonymized user identifier with caching."""
data = _load_user_data()
if "user_id" in data:
return data["user_id"]
try:
username = getpass.getuser()
except Exception:
username = "unknown"
seed = f"{username}|{_get_machine_id()}"
uid = hashlib.sha256(seed.encode()).hexdigest()
data["user_id"] = uid
_save_user_data(data)
return uid
def is_first_execution() -> bool:
"""True if this is the first execution for this user."""
data = _load_user_data()
return not data.get("first_execution_done", False)
def mark_first_execution_done() -> None:
"""Mark that the first execution has been completed."""
data = _load_user_data()
if data.get("first_execution_done", False):
return
data.update(
{
"first_execution_done": True,
"first_execution_at": datetime.now().timestamp(),
"user_id": get_user_id(),
"machine_id": _get_machine_id(),
}
)
_save_user_data(data)

View File

@@ -0,0 +1,5 @@
"""Event type definitions for CrewAI.
This module contains all event types used throughout the CrewAI system
for monitoring and extending agent, crew, task, and tool execution.
"""

View File

@@ -0,0 +1,141 @@
"""Agent-related events moved to break circular dependencies."""
from __future__ import annotations
from typing import Any, Dict, List, Optional, Sequence, Union
from pydantic import model_validator
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.tools.base_tool import BaseTool
from crewai.tools.structured_tool import CrewStructuredTool
from crewai.events.base_events import BaseEvent
class AgentExecutionStartedEvent(BaseEvent):
"""Event emitted when an agent starts executing a task"""
agent: BaseAgent
task: Any
tools: Optional[Sequence[Union[BaseTool, CrewStructuredTool]]]
task_prompt: str
type: str = "agent_execution_started"
model_config = {"arbitrary_types_allowed": True}
@model_validator(mode="after")
def set_fingerprint_data(self):
"""Set fingerprint data from the agent if available."""
if hasattr(self.agent, "fingerprint") and self.agent.fingerprint:
self.source_fingerprint = self.agent.fingerprint.uuid_str
self.source_type = "agent"
if (
hasattr(self.agent.fingerprint, "metadata")
and self.agent.fingerprint.metadata
):
self.fingerprint_metadata = self.agent.fingerprint.metadata
return self
class AgentExecutionCompletedEvent(BaseEvent):
"""Event emitted when an agent completes executing a task"""
agent: BaseAgent
task: Any
output: str
type: str = "agent_execution_completed"
model_config = {"arbitrary_types_allowed": True}
@model_validator(mode="after")
def set_fingerprint_data(self):
"""Set fingerprint data from the agent if available."""
if hasattr(self.agent, "fingerprint") and self.agent.fingerprint:
self.source_fingerprint = self.agent.fingerprint.uuid_str
self.source_type = "agent"
if (
hasattr(self.agent.fingerprint, "metadata")
and self.agent.fingerprint.metadata
):
self.fingerprint_metadata = self.agent.fingerprint.metadata
return self
class AgentExecutionErrorEvent(BaseEvent):
"""Event emitted when an agent encounters an error during execution"""
agent: BaseAgent
task: Any
error: str
type: str = "agent_execution_error"
model_config = {"arbitrary_types_allowed": True}
@model_validator(mode="after")
def set_fingerprint_data(self):
"""Set fingerprint data from the agent if available."""
if hasattr(self.agent, "fingerprint") and self.agent.fingerprint:
self.source_fingerprint = self.agent.fingerprint.uuid_str
self.source_type = "agent"
if (
hasattr(self.agent.fingerprint, "metadata")
and self.agent.fingerprint.metadata
):
self.fingerprint_metadata = self.agent.fingerprint.metadata
return self
# New event classes for LiteAgent
class LiteAgentExecutionStartedEvent(BaseEvent):
"""Event emitted when a LiteAgent starts executing"""
agent_info: Dict[str, Any]
tools: Optional[Sequence[Union[BaseTool, CrewStructuredTool]]]
messages: Union[str, List[Dict[str, str]]]
type: str = "lite_agent_execution_started"
model_config = {"arbitrary_types_allowed": True}
class LiteAgentExecutionCompletedEvent(BaseEvent):
"""Event emitted when a LiteAgent completes execution"""
agent_info: Dict[str, Any]
output: str
type: str = "lite_agent_execution_completed"
class LiteAgentExecutionErrorEvent(BaseEvent):
"""Event emitted when a LiteAgent encounters an error during execution"""
agent_info: Dict[str, Any]
error: str
type: str = "lite_agent_execution_error"
# Agent Eval events
class AgentEvaluationStartedEvent(BaseEvent):
agent_id: str
agent_role: str
task_id: str | None = None
iteration: int
type: str = "agent_evaluation_started"
class AgentEvaluationCompletedEvent(BaseEvent):
agent_id: str
agent_role: str
task_id: str | None = None
iteration: int
metric_category: Any
score: Any
type: str = "agent_evaluation_completed"
class AgentEvaluationFailedEvent(BaseEvent):
agent_id: str
agent_role: str
task_id: str | None = None
iteration: int
error: str
type: str = "agent_evaluation_failed"

View File

@@ -0,0 +1,112 @@
from typing import TYPE_CHECKING, Any, Dict, Optional, Union
from crewai.events.base_events import BaseEvent
if TYPE_CHECKING:
from crewai.crew import Crew
else:
Crew = Any
class CrewBaseEvent(BaseEvent):
"""Base class for crew events with fingerprint handling"""
crew_name: Optional[str]
crew: Optional[Crew] = None
def __init__(self, **data):
super().__init__(**data)
self.set_crew_fingerprint()
def set_crew_fingerprint(self) -> None:
if self.crew and hasattr(self.crew, "fingerprint") and self.crew.fingerprint:
self.source_fingerprint = self.crew.fingerprint.uuid_str
self.source_type = "crew"
if (
hasattr(self.crew.fingerprint, "metadata")
and self.crew.fingerprint.metadata
):
self.fingerprint_metadata = self.crew.fingerprint.metadata
def to_json(self, exclude: set[str] | None = None):
if exclude is None:
exclude = set()
exclude.add("crew")
return super().to_json(exclude=exclude)
class CrewKickoffStartedEvent(CrewBaseEvent):
"""Event emitted when a crew starts execution"""
inputs: Optional[Dict[str, Any]]
type: str = "crew_kickoff_started"
class CrewKickoffCompletedEvent(CrewBaseEvent):
"""Event emitted when a crew completes execution"""
output: Any
type: str = "crew_kickoff_completed"
total_tokens: int = 0
class CrewKickoffFailedEvent(CrewBaseEvent):
"""Event emitted when a crew fails to complete execution"""
error: str
type: str = "crew_kickoff_failed"
class CrewTrainStartedEvent(CrewBaseEvent):
"""Event emitted when a crew starts training"""
n_iterations: int
filename: str
inputs: Optional[Dict[str, Any]]
type: str = "crew_train_started"
class CrewTrainCompletedEvent(CrewBaseEvent):
"""Event emitted when a crew completes training"""
n_iterations: int
filename: str
type: str = "crew_train_completed"
class CrewTrainFailedEvent(CrewBaseEvent):
"""Event emitted when a crew fails to complete training"""
error: str
type: str = "crew_train_failed"
class CrewTestStartedEvent(CrewBaseEvent):
"""Event emitted when a crew starts testing"""
n_iterations: int
eval_llm: Optional[Union[str, Any]]
inputs: Optional[Dict[str, Any]]
type: str = "crew_test_started"
class CrewTestCompletedEvent(CrewBaseEvent):
"""Event emitted when a crew completes testing"""
type: str = "crew_test_completed"
class CrewTestFailedEvent(CrewBaseEvent):
"""Event emitted when a crew fails to complete testing"""
error: str
type: str = "crew_test_failed"
class CrewTestResultEvent(CrewBaseEvent):
"""Event emitted when a crew test result is available"""
quality: float
execution_duration: float
model: str
type: str = "crew_test_result"

View File

@@ -0,0 +1,73 @@
from typing import Any, Dict, Optional, Union
from pydantic import BaseModel, ConfigDict
from crewai.events.base_events import BaseEvent
class FlowEvent(BaseEvent):
"""Base class for all flow events"""
type: str
flow_name: str
class FlowStartedEvent(FlowEvent):
"""Event emitted when a flow starts execution"""
flow_name: str
inputs: Optional[Dict[str, Any]] = None
type: str = "flow_started"
class FlowCreatedEvent(FlowEvent):
"""Event emitted when a flow is created"""
flow_name: str
type: str = "flow_created"
class MethodExecutionStartedEvent(FlowEvent):
"""Event emitted when a flow method starts execution"""
flow_name: str
method_name: str
state: Union[Dict[str, Any], BaseModel]
params: Optional[Dict[str, Any]] = None
type: str = "method_execution_started"
class MethodExecutionFinishedEvent(FlowEvent):
"""Event emitted when a flow method completes execution"""
flow_name: str
method_name: str
result: Any = None
state: Union[Dict[str, Any], BaseModel]
type: str = "method_execution_finished"
class MethodExecutionFailedEvent(FlowEvent):
"""Event emitted when a flow method fails execution"""
flow_name: str
method_name: str
error: Exception
type: str = "method_execution_failed"
model_config = ConfigDict(arbitrary_types_allowed=True)
class FlowFinishedEvent(FlowEvent):
"""Event emitted when a flow completes execution"""
flow_name: str
result: Optional[Any] = None
type: str = "flow_finished"
class FlowPlotEvent(FlowEvent):
"""Event emitted when a flow plot is created"""
flow_name: str
type: str = "flow_plot"

View File

@@ -0,0 +1,52 @@
from crewai.events.base_events import BaseEvent
from crewai.agents.agent_builder.base_agent import BaseAgent
class KnowledgeRetrievalStartedEvent(BaseEvent):
"""Event emitted when a knowledge retrieval is started."""
type: str = "knowledge_search_query_started"
agent: BaseAgent
class KnowledgeRetrievalCompletedEvent(BaseEvent):
"""Event emitted when a knowledge retrieval is completed."""
query: str
type: str = "knowledge_search_query_completed"
agent: BaseAgent
retrieved_knowledge: str
class KnowledgeQueryStartedEvent(BaseEvent):
"""Event emitted when a knowledge query is started."""
task_prompt: str
type: str = "knowledge_query_started"
agent: BaseAgent
class KnowledgeQueryFailedEvent(BaseEvent):
"""Event emitted when a knowledge query fails."""
type: str = "knowledge_query_failed"
agent: BaseAgent
error: str
class KnowledgeQueryCompletedEvent(BaseEvent):
"""Event emitted when a knowledge query is completed."""
query: str
type: str = "knowledge_query_completed"
agent: BaseAgent
class KnowledgeSearchQueryFailedEvent(BaseEvent):
"""Event emitted when a knowledge search query fails."""
query: str
type: str = "knowledge_search_query_failed"
agent: BaseAgent
error: str

View File

@@ -0,0 +1,82 @@
from enum import Enum
from typing import Any, Dict, List, Optional, Union
from pydantic import BaseModel
from crewai.events.base_events import BaseEvent
class LLMEventBase(BaseEvent):
task_name: Optional[str] = None
task_id: Optional[str] = None
agent_id: Optional[str] = None
agent_role: Optional[str] = None
from_task: Optional[Any] = None
from_agent: Optional[Any] = None
def __init__(self, **data):
super().__init__(**data)
self._set_agent_params(data)
self._set_task_params(data)
class LLMCallType(Enum):
"""Type of LLM call being made"""
TOOL_CALL = "tool_call"
LLM_CALL = "llm_call"
class LLMCallStartedEvent(LLMEventBase):
"""Event emitted when a LLM call starts
Attributes:
messages: Content can be either a string or a list of dictionaries that support
multimodal content (text, images, etc.)
"""
type: str = "llm_call_started"
model: Optional[str] = None
messages: Optional[Union[str, List[Dict[str, Any]]]] = None
tools: Optional[List[dict[str, Any]]] = None
callbacks: Optional[List[Any]] = None
available_functions: Optional[Dict[str, Any]] = None
class LLMCallCompletedEvent(LLMEventBase):
"""Event emitted when a LLM call completes"""
type: str = "llm_call_completed"
messages: str | list[dict[str, Any]] | None = None
response: Any
call_type: LLMCallType
model: Optional[str] = None
class LLMCallFailedEvent(LLMEventBase):
"""Event emitted when a LLM call fails"""
error: str
type: str = "llm_call_failed"
class FunctionCall(BaseModel):
arguments: str
name: Optional[str] = None
class ToolCall(BaseModel):
id: Optional[str] = None
function: FunctionCall
type: Optional[str] = None
index: int
class LLMStreamChunkEvent(LLMEventBase):
"""Event emitted when a streaming chunk is received"""
type: str = "llm_stream_chunk"
chunk: str
tool_call: Optional[ToolCall] = None

View File

@@ -0,0 +1,45 @@
from inspect import getsource
from typing import Any, Callable, Optional, Union
from crewai.events.base_events import BaseEvent
class LLMGuardrailStartedEvent(BaseEvent):
"""Event emitted when a guardrail task starts
Attributes:
guardrail: The guardrail callable or LLMGuardrail instance
retry_count: The number of times the guardrail has been retried
"""
type: str = "llm_guardrail_started"
guardrail: Union[str, Callable]
retry_count: int
def __init__(self, **data):
from crewai.tasks.llm_guardrail import LLMGuardrail
from crewai.tasks.hallucination_guardrail import HallucinationGuardrail
super().__init__(**data)
if isinstance(self.guardrail, (LLMGuardrail, HallucinationGuardrail)):
self.guardrail = self.guardrail.description.strip()
elif isinstance(self.guardrail, Callable):
self.guardrail = getsource(self.guardrail).strip()
class LLMGuardrailCompletedEvent(BaseEvent):
"""Event emitted when a guardrail task completes
Attributes:
success: Whether the guardrail validation passed
result: The validation result
error: Error message if validation failed
retry_count: The number of times the guardrail has been retried
"""
type: str = "llm_guardrail_completed"
success: bool
result: Any
error: Optional[str] = None
retry_count: int

View File

@@ -0,0 +1,25 @@
"""Agent logging events that don't reference BaseAgent to avoid circular imports."""
from typing import Any, Optional
from crewai.events.base_events import BaseEvent
class AgentLogsStartedEvent(BaseEvent):
"""Event emitted when agent logs should be shown at start"""
agent_role: str
task_description: Optional[str] = None
verbose: bool = False
type: str = "agent_logs_started"
class AgentLogsExecutionEvent(BaseEvent):
"""Event emitted when agent logs should be shown during execution"""
agent_role: str
formatted_answer: Any
verbose: bool = False
type: str = "agent_logs_execution"
model_config = {"arbitrary_types_allowed": True}

View File

@@ -0,0 +1,95 @@
from typing import Any, Dict, Optional
from crewai.events.base_events import BaseEvent
class MemoryBaseEvent(BaseEvent):
"""Base event for memory operations"""
type: str
task_id: Optional[str] = None
task_name: Optional[str] = None
from_task: Optional[Any] = None
from_agent: Optional[Any] = None
agent_role: Optional[str] = None
agent_id: Optional[str] = None
def __init__(self, **data):
super().__init__(**data)
self._set_agent_params(data)
self._set_task_params(data)
class MemoryQueryStartedEvent(MemoryBaseEvent):
"""Event emitted when a memory query is started"""
type: str = "memory_query_started"
query: str
limit: int
score_threshold: Optional[float] = None
class MemoryQueryCompletedEvent(MemoryBaseEvent):
"""Event emitted when a memory query is completed successfully"""
type: str = "memory_query_completed"
query: str
results: Any
limit: int
score_threshold: Optional[float] = None
query_time_ms: float
class MemoryQueryFailedEvent(MemoryBaseEvent):
"""Event emitted when a memory query fails"""
type: str = "memory_query_failed"
query: str
limit: int
score_threshold: Optional[float] = None
error: str
class MemorySaveStartedEvent(MemoryBaseEvent):
"""Event emitted when a memory save operation is started"""
type: str = "memory_save_started"
value: Optional[str] = None
metadata: Optional[Dict[str, Any]] = None
agent_role: Optional[str] = None
class MemorySaveCompletedEvent(MemoryBaseEvent):
"""Event emitted when a memory save operation is completed successfully"""
type: str = "memory_save_completed"
value: str
metadata: Optional[Dict[str, Any]] = None
agent_role: Optional[str] = None
save_time_ms: float
class MemorySaveFailedEvent(MemoryBaseEvent):
"""Event emitted when a memory save operation fails"""
type: str = "memory_save_failed"
value: Optional[str] = None
metadata: Optional[Dict[str, Any]] = None
agent_role: Optional[str] = None
error: str
class MemoryRetrievalStartedEvent(MemoryBaseEvent):
"""Event emitted when memory retrieval for a task prompt starts"""
type: str = "memory_retrieval_started"
task_id: Optional[str] = None
class MemoryRetrievalCompletedEvent(MemoryBaseEvent):
"""Event emitted when memory retrieval for a task prompt completes successfully"""
type: str = "memory_retrieval_completed"
task_id: Optional[str] = None
memory_content: str
retrieval_time_ms: float

View File

@@ -0,0 +1,47 @@
from crewai.events.base_events import BaseEvent
from typing import Any, Optional
class ReasoningEvent(BaseEvent):
"""Base event for reasoning events."""
type: str
attempt: int = 1
agent_role: str
task_id: str
task_name: Optional[str] = None
from_task: Optional[Any] = None
agent_id: Optional[str] = None
from_agent: Optional[Any] = None
def __init__(self, **data):
super().__init__(**data)
self._set_task_params(data)
self._set_agent_params(data)
class AgentReasoningStartedEvent(ReasoningEvent):
"""Event emitted when an agent starts reasoning about a task."""
type: str = "agent_reasoning_started"
agent_role: str
task_id: str
class AgentReasoningCompletedEvent(ReasoningEvent):
"""Event emitted when an agent finishes its reasoning process."""
type: str = "agent_reasoning_completed"
agent_role: str
task_id: str
plan: str
ready: bool
class AgentReasoningFailedEvent(ReasoningEvent):
"""Event emitted when the reasoning process fails."""
type: str = "agent_reasoning_failed"
agent_role: str
task_id: str
error: str

View File

@@ -0,0 +1,84 @@
from typing import Any, Optional
from crewai.tasks.task_output import TaskOutput
from crewai.events.base_events import BaseEvent
class TaskStartedEvent(BaseEvent):
"""Event emitted when a task starts"""
type: str = "task_started"
context: Optional[str]
task: Optional[Any] = None
def __init__(self, **data):
super().__init__(**data)
# Set fingerprint data from the task
if hasattr(self.task, "fingerprint") and self.task.fingerprint:
self.source_fingerprint = self.task.fingerprint.uuid_str
self.source_type = "task"
if (
hasattr(self.task.fingerprint, "metadata")
and self.task.fingerprint.metadata
):
self.fingerprint_metadata = self.task.fingerprint.metadata
class TaskCompletedEvent(BaseEvent):
"""Event emitted when a task completes"""
output: TaskOutput
type: str = "task_completed"
task: Optional[Any] = None
def __init__(self, **data):
super().__init__(**data)
# Set fingerprint data from the task
if hasattr(self.task, "fingerprint") and self.task.fingerprint:
self.source_fingerprint = self.task.fingerprint.uuid_str
self.source_type = "task"
if (
hasattr(self.task.fingerprint, "metadata")
and self.task.fingerprint.metadata
):
self.fingerprint_metadata = self.task.fingerprint.metadata
class TaskFailedEvent(BaseEvent):
"""Event emitted when a task fails"""
error: str
type: str = "task_failed"
task: Optional[Any] = None
def __init__(self, **data):
super().__init__(**data)
# Set fingerprint data from the task
if hasattr(self.task, "fingerprint") and self.task.fingerprint:
self.source_fingerprint = self.task.fingerprint.uuid_str
self.source_type = "task"
if (
hasattr(self.task.fingerprint, "metadata")
and self.task.fingerprint.metadata
):
self.fingerprint_metadata = self.task.fingerprint.metadata
class TaskEvaluationEvent(BaseEvent):
"""Event emitted when a task evaluation is completed"""
type: str = "task_evaluation"
evaluation_type: str
task: Optional[Any] = None
def __init__(self, **data):
super().__init__(**data)
# Set fingerprint data from the task
if hasattr(self.task, "fingerprint") and self.task.fingerprint:
self.source_fingerprint = self.task.fingerprint.uuid_str
self.source_type = "task"
if (
hasattr(self.task.fingerprint, "metadata")
and self.task.fingerprint.metadata
):
self.fingerprint_metadata = self.task.fingerprint.metadata

View File

@@ -0,0 +1,98 @@
from datetime import datetime
from typing import Any, Callable, Dict, Optional
from crewai.events.base_events import BaseEvent
class ToolUsageEvent(BaseEvent):
"""Base event for tool usage tracking"""
agent_key: Optional[str] = None
agent_role: Optional[str] = None
agent_id: Optional[str] = None
tool_name: str
tool_args: Dict[str, Any] | str
tool_class: Optional[str] = None
run_attempts: int | None = None
delegations: int | None = None
agent: Optional[Any] = None
task_name: Optional[str] = None
task_id: Optional[str] = None
from_task: Optional[Any] = None
from_agent: Optional[Any] = None
model_config = {"arbitrary_types_allowed": True}
def __init__(self, **data):
super().__init__(**data)
self._set_agent_params(data)
self._set_task_params(data)
# Set fingerprint data from the agent
if self.agent and hasattr(self.agent, "fingerprint") and self.agent.fingerprint:
self.source_fingerprint = self.agent.fingerprint.uuid_str
self.source_type = "agent"
if (
hasattr(self.agent.fingerprint, "metadata")
and self.agent.fingerprint.metadata
):
self.fingerprint_metadata = self.agent.fingerprint.metadata
class ToolUsageStartedEvent(ToolUsageEvent):
"""Event emitted when a tool execution is started"""
type: str = "tool_usage_started"
class ToolUsageFinishedEvent(ToolUsageEvent):
"""Event emitted when a tool execution is completed"""
started_at: datetime
finished_at: datetime
from_cache: bool = False
output: Any
type: str = "tool_usage_finished"
class ToolUsageErrorEvent(ToolUsageEvent):
"""Event emitted when a tool execution encounters an error"""
error: Any
type: str = "tool_usage_error"
class ToolValidateInputErrorEvent(ToolUsageEvent):
"""Event emitted when a tool input validation encounters an error"""
error: Any
type: str = "tool_validate_input_error"
class ToolSelectionErrorEvent(ToolUsageEvent):
"""Event emitted when a tool selection encounters an error"""
error: Any
type: str = "tool_selection_error"
class ToolExecutionErrorEvent(BaseEvent):
"""Event emitted when a tool execution encounters an error"""
error: Any
type: str = "tool_execution_error"
tool_name: str
tool_args: Dict[str, Any]
tool_class: Callable
agent: Optional[Any] = None
def __init__(self, **data):
super().__init__(**data)
# Set fingerprint data from the agent
if self.agent and hasattr(self.agent, "fingerprint") and self.agent.fingerprint:
self.source_fingerprint = self.agent.fingerprint.uuid_str
self.source_type = "agent"
if (
hasattr(self.agent.fingerprint, "metadata")
and self.agent.fingerprint.metadata
):
self.fingerprint_metadata = self.agent.fingerprint.metadata

View File

@@ -0,0 +1 @@
"""Event utilities for crewAI."""

File diff suppressed because it is too large Load Diff