Refactor EventListener and TraceCollectionListener for improved event handling

- Removed unused threading and method branches from EventListener to simplify the code.
- Updated event handling methods in EventListener to use new formatter methods for better clarity and consistency.
- Refactored TraceCollectionListener to eliminate unnecessary parameters in formatter calls, enhancing readability.
- Simplified ConsoleFormatter by removing outdated tree management methods and focusing on panel-based output for status updates.
- Enhanced ToolUsage to track run attempts for better tool usage metrics.
This commit is contained in:
lorenzejay
2025-12-29 10:10:34 -08:00
parent c73b36a4c5
commit 305fe4c6b8
4 changed files with 438 additions and 1736 deletions

View File

@@ -1,7 +1,6 @@
from __future__ import annotations
from io import StringIO
import threading
from typing import TYPE_CHECKING, Any
from pydantic import Field, PrivateAttr
@@ -17,8 +16,6 @@ from crewai.events.types.a2a_events import (
A2AResponseReceivedEvent,
)
from crewai.events.types.agent_events import (
AgentExecutionCompletedEvent,
AgentExecutionStartedEvent,
LiteAgentExecutionCompletedEvent,
LiteAgentExecutionErrorEvent,
LiteAgentExecutionStartedEvent,
@@ -48,7 +45,6 @@ from crewai.events.types.flow_events import (
from crewai.events.types.knowledge_events import (
KnowledgeQueryCompletedEvent,
KnowledgeQueryFailedEvent,
KnowledgeQueryStartedEvent,
KnowledgeRetrievalCompletedEvent,
KnowledgeRetrievalStartedEvent,
KnowledgeSearchQueryFailedEvent,
@@ -112,7 +108,6 @@ class EventListener(BaseEventListener):
text_stream: StringIO = StringIO()
knowledge_retrieval_in_progress: bool = False
knowledge_query_in_progress: bool = False
method_branches: dict[str, Any] = Field(default_factory=dict)
def __new__(cls) -> EventListener:
if cls._instance is None:
@@ -126,10 +121,8 @@ class EventListener(BaseEventListener):
self._telemetry = Telemetry()
self._telemetry.set_tracer()
self.execution_spans = {}
self.method_branches = {}
self._initialized = True
self.formatter = ConsoleFormatter(verbose=True)
self._crew_tree_lock = threading.Condition()
# Initialize trace listener with formatter for memory event handling
trace_listener = TraceCollectionListener()
@@ -140,12 +133,10 @@ class EventListener(BaseEventListener):
def setup_listeners(self, crewai_event_bus: CrewAIEventsBus) -> None:
@crewai_event_bus.on(CrewKickoffStartedEvent)
def on_crew_started(source: Any, event: CrewKickoffStartedEvent) -> None:
with self._crew_tree_lock:
self.formatter.create_crew_tree(event.crew_name or "Crew", source.id)
source._execution_span = self._telemetry.crew_execution_span(
source, event.inputs
)
self._crew_tree_lock.notify_all()
self.formatter.handle_crew_started(event.crew_name or "Crew", source.id)
source._execution_span = self._telemetry.crew_execution_span(
source, event.inputs
)
@crewai_event_bus.on(CrewKickoffCompletedEvent)
def on_crew_completed(source: Any, event: CrewKickoffCompletedEvent) -> None:
@@ -153,8 +144,7 @@ class EventListener(BaseEventListener):
final_string_output = event.output.raw
self._telemetry.end_crew(source, final_string_output)
self.formatter.update_crew_tree(
self.formatter.current_crew_tree,
self.formatter.handle_crew_status(
event.crew_name or "Crew",
source.id,
"completed",
@@ -163,8 +153,7 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(CrewKickoffFailedEvent)
def on_crew_failed(source: Any, event: CrewKickoffFailedEvent) -> None:
self.formatter.update_crew_tree(
self.formatter.current_crew_tree,
self.formatter.handle_crew_status(
event.crew_name or "Crew",
source.id,
"failed",
@@ -202,18 +191,8 @@ class EventListener(BaseEventListener):
span = self._telemetry.task_started(crew=source.agent.crew, task=source)
self.execution_spans[source] = span
with self._crew_tree_lock:
self._crew_tree_lock.wait_for(
lambda: self.formatter.current_crew_tree is not None, timeout=5.0
)
if self.formatter.current_crew_tree is not None:
task_name = (
source.name if hasattr(source, "name") and source.name else None
)
self.formatter.create_task_branch(
self.formatter.current_crew_tree, source.id, task_name
)
task_name = source.name if hasattr(source, "name") and source.name else None
self.formatter.handle_task_started(source.id, task_name)
@crewai_event_bus.on(TaskCompletedEvent)
def on_task_completed(source: Any, event: TaskCompletedEvent) -> None:
@@ -225,12 +204,8 @@ class EventListener(BaseEventListener):
# Pass task name if it exists
task_name = source.name if hasattr(source, "name") and source.name else None
self.formatter.update_task_status(
self.formatter.current_crew_tree,
source.id,
source.agent.role,
"completed",
task_name,
self.formatter.handle_task_status(
source.id, source.agent.role, "completed", task_name
)
@crewai_event_bus.on(TaskFailedEvent)
@@ -243,36 +218,11 @@ class EventListener(BaseEventListener):
# Pass task name if it exists
task_name = source.name if hasattr(source, "name") and source.name else None
self.formatter.update_task_status(
self.formatter.current_crew_tree,
source.id,
source.agent.role,
"failed",
task_name,
self.formatter.handle_task_status(
source.id, source.agent.role, "failed", task_name
)
# ----------- AGENT EVENTS -----------
@crewai_event_bus.on(AgentExecutionStartedEvent)
def on_agent_execution_started(
_: Any, event: AgentExecutionStartedEvent
) -> None:
self.formatter.create_agent_branch(
self.formatter.current_task_branch,
event.agent.role,
self.formatter.current_crew_tree,
)
@crewai_event_bus.on(AgentExecutionCompletedEvent)
def on_agent_execution_completed(
_: Any, event: AgentExecutionCompletedEvent
) -> None:
self.formatter.update_agent_status(
self.formatter.current_agent_branch,
event.agent.role,
self.formatter.current_crew_tree,
)
# ----------- LITE AGENT EVENTS -----------
@crewai_event_bus.on(LiteAgentExecutionStartedEvent)
@@ -316,79 +266,61 @@ class EventListener(BaseEventListener):
self._telemetry.flow_execution_span(
event.flow_name, list(source._methods.keys())
)
tree = self.formatter.create_flow_tree(event.flow_name, str(source.flow_id))
self.formatter.current_flow_tree = tree
self.formatter.start_flow(event.flow_name, str(source.flow_id))
self.formatter.handle_flow_created(event.flow_name, str(source.flow_id))
self.formatter.handle_flow_started(event.flow_name, str(source.flow_id))
@crewai_event_bus.on(FlowFinishedEvent)
def on_flow_finished(source: Any, event: FlowFinishedEvent) -> None:
self.formatter.update_flow_status(
self.formatter.current_flow_tree, event.flow_name, source.flow_id
self.formatter.handle_flow_status(
event.flow_name,
source.flow_id,
)
@crewai_event_bus.on(MethodExecutionStartedEvent)
def on_method_execution_started(
_: Any, event: MethodExecutionStartedEvent
) -> None:
method_branch = self.method_branches.get(event.method_name)
updated_branch = self.formatter.update_method_status(
method_branch,
self.formatter.current_flow_tree,
self.formatter.handle_method_status(
event.method_name,
"running",
)
self.method_branches[event.method_name] = updated_branch
@crewai_event_bus.on(MethodExecutionFinishedEvent)
def on_method_execution_finished(
_: Any, event: MethodExecutionFinishedEvent
) -> None:
method_branch = self.method_branches.get(event.method_name)
updated_branch = self.formatter.update_method_status(
method_branch,
self.formatter.current_flow_tree,
self.formatter.handle_method_status(
event.method_name,
"completed",
)
self.method_branches[event.method_name] = updated_branch
@crewai_event_bus.on(MethodExecutionFailedEvent)
def on_method_execution_failed(
_: Any, event: MethodExecutionFailedEvent
) -> None:
method_branch = self.method_branches.get(event.method_name)
updated_branch = self.formatter.update_method_status(
method_branch,
self.formatter.current_flow_tree,
self.formatter.handle_method_status(
event.method_name,
"failed",
)
self.method_branches[event.method_name] = updated_branch
@crewai_event_bus.on(MethodExecutionPausedEvent)
def on_method_execution_paused(
_: Any, event: MethodExecutionPausedEvent
) -> None:
method_branch = self.method_branches.get(event.method_name)
updated_branch = self.formatter.update_method_status(
method_branch,
self.formatter.current_flow_tree,
self.formatter.handle_method_status(
event.method_name,
"paused",
)
self.method_branches[event.method_name] = updated_branch
@crewai_event_bus.on(FlowPausedEvent)
def on_flow_paused(_: Any, event: FlowPausedEvent) -> None:
self.formatter.update_flow_status(
self.formatter.current_flow_tree,
self.formatter.handle_flow_status(
event.flow_name,
event.flow_id,
"paused",
)
# ----------- TOOL USAGE EVENTS -----------
@crewai_event_bus.on(ToolUsageStartedEvent)
def on_tool_usage_started(source: Any, event: ToolUsageStartedEvent) -> None:
if isinstance(source, LLM):
@@ -398,9 +330,9 @@ class EventListener(BaseEventListener):
)
else:
self.formatter.handle_tool_usage_started(
self.formatter.current_agent_branch,
event.tool_name,
self.formatter.current_crew_tree,
event.tool_args,
event.run_attempts,
)
@crewai_event_bus.on(ToolUsageFinishedEvent)
@@ -409,12 +341,6 @@ class EventListener(BaseEventListener):
self.formatter.handle_llm_tool_usage_finished(
event.tool_name,
)
else:
self.formatter.handle_tool_usage_finished(
self.formatter.current_tool_branch,
event.tool_name,
self.formatter.current_crew_tree,
)
@crewai_event_bus.on(ToolUsageErrorEvent)
def on_tool_usage_error(source: Any, event: ToolUsageErrorEvent) -> None:
@@ -425,10 +351,9 @@ class EventListener(BaseEventListener):
)
else:
self.formatter.handle_tool_usage_error(
self.formatter.current_tool_branch,
event.tool_name,
event.error,
self.formatter.current_crew_tree,
event.run_attempts,
)
# ----------- LLM EVENTS -----------
@@ -437,32 +362,15 @@ class EventListener(BaseEventListener):
def on_llm_call_started(_: Any, event: LLMCallStartedEvent) -> None:
self.text_stream = StringIO()
self.next_chunk = 0
# Capture the returned tool branch and update the current_tool_branch reference
thinking_branch = self.formatter.handle_llm_call_started(
self.formatter.current_agent_branch,
self.formatter.current_crew_tree,
)
# Update the formatter's current_tool_branch to ensure proper cleanup
if thinking_branch is not None:
self.formatter.current_tool_branch = thinking_branch
@crewai_event_bus.on(LLMCallCompletedEvent)
def on_llm_call_completed(_: Any, event: LLMCallCompletedEvent) -> None:
self.formatter.handle_llm_stream_completed()
self.formatter.handle_llm_call_completed(
self.formatter.current_tool_branch,
self.formatter.current_agent_branch,
self.formatter.current_crew_tree,
)
@crewai_event_bus.on(LLMCallFailedEvent)
def on_llm_call_failed(_: Any, event: LLMCallFailedEvent) -> None:
self.formatter.handle_llm_stream_completed()
self.formatter.handle_llm_call_failed(
self.formatter.current_tool_branch,
event.error,
self.formatter.current_crew_tree,
)
self.formatter.handle_llm_call_failed(event.error)
@crewai_event_bus.on(LLMStreamChunkEvent)
def on_llm_stream_chunk(_: Any, event: LLMStreamChunkEvent) -> None:
@@ -473,9 +381,7 @@ class EventListener(BaseEventListener):
accumulated_text = self.text_stream.getvalue()
self.formatter.handle_llm_stream_chunk(
event.chunk,
accumulated_text,
self.formatter.current_crew_tree,
event.call_type,
)
@@ -515,7 +421,6 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(CrewTestCompletedEvent)
def on_crew_test_completed(_: Any, event: CrewTestCompletedEvent) -> None:
self.formatter.handle_crew_test_completed(
self.formatter.current_flow_tree,
event.crew_name or "Crew",
)
@@ -532,10 +437,7 @@ class EventListener(BaseEventListener):
self.knowledge_retrieval_in_progress = True
self.formatter.handle_knowledge_retrieval_started(
self.formatter.current_agent_branch,
self.formatter.current_crew_tree,
)
self.formatter.handle_knowledge_retrieval_started()
@crewai_event_bus.on(KnowledgeRetrievalCompletedEvent)
def on_knowledge_retrieval_completed(
@@ -546,24 +448,12 @@ class EventListener(BaseEventListener):
self.knowledge_retrieval_in_progress = False
self.formatter.handle_knowledge_retrieval_completed(
self.formatter.current_agent_branch,
self.formatter.current_crew_tree,
event.retrieved_knowledge,
)
@crewai_event_bus.on(KnowledgeQueryStartedEvent)
def on_knowledge_query_started(
_: Any, event: KnowledgeQueryStartedEvent
) -> None:
pass
@crewai_event_bus.on(KnowledgeQueryFailedEvent)
def on_knowledge_query_failed(_: Any, event: KnowledgeQueryFailedEvent) -> None:
self.formatter.handle_knowledge_query_failed(
self.formatter.current_agent_branch,
event.error,
self.formatter.current_crew_tree,
)
self.formatter.handle_knowledge_query_failed(event.error)
@crewai_event_bus.on(KnowledgeQueryCompletedEvent)
def on_knowledge_query_completed(
@@ -575,11 +465,7 @@ class EventListener(BaseEventListener):
def on_knowledge_search_query_failed(
_: Any, event: KnowledgeSearchQueryFailedEvent
) -> None:
self.formatter.handle_knowledge_search_query_failed(
self.formatter.current_agent_branch,
event.error,
self.formatter.current_crew_tree,
)
self.formatter.handle_knowledge_search_query_failed(event.error)
# ----------- REASONING EVENTS -----------
@@ -587,11 +473,7 @@ class EventListener(BaseEventListener):
def on_agent_reasoning_started(
_: Any, event: AgentReasoningStartedEvent
) -> None:
self.formatter.handle_reasoning_started(
self.formatter.current_agent_branch,
event.attempt,
self.formatter.current_crew_tree,
)
self.formatter.handle_reasoning_started(event.attempt)
@crewai_event_bus.on(AgentReasoningCompletedEvent)
def on_agent_reasoning_completed(
@@ -600,14 +482,12 @@ class EventListener(BaseEventListener):
self.formatter.handle_reasoning_completed(
event.plan,
event.ready,
self.formatter.current_crew_tree,
)
@crewai_event_bus.on(AgentReasoningFailedEvent)
def on_agent_reasoning_failed(_: Any, event: AgentReasoningFailedEvent) -> None:
self.formatter.handle_reasoning_failed(
event.error,
self.formatter.current_crew_tree,
)
# ----------- AGENT LOGGING EVENTS -----------

View File

@@ -1,7 +1,7 @@
"""Trace collection listener for orchestrating trace collection."""
import os
from typing import Any, ClassVar
from typing import Any, ClassVar, cast
import uuid
from typing_extensions import Self
@@ -105,7 +105,7 @@ class TraceCollectionListener(BaseEventListener):
"""Create or return singleton instance."""
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
return cast(Self, cls._instance)
def __init__(
self,
@@ -319,21 +319,12 @@ class TraceCollectionListener(BaseEventListener):
source: Any, event: MemoryQueryCompletedEvent
) -> None:
self._handle_action_event("memory_query_completed", source, event)
if self.formatter and self.memory_retrieval_in_progress:
self.formatter.handle_memory_query_completed(
self.formatter.current_agent_branch,
event.source_type or "memory",
event.query_time_ms,
self.formatter.current_crew_tree,
)
@event_bus.on(MemoryQueryFailedEvent)
def on_memory_query_failed(source: Any, event: MemoryQueryFailedEvent) -> None:
self._handle_action_event("memory_query_failed", source, event)
if self.formatter and self.memory_retrieval_in_progress:
self.formatter.handle_memory_query_failed(
self.formatter.current_agent_branch,
self.formatter.current_crew_tree,
event.error,
event.source_type or "memory",
)
@@ -347,10 +338,7 @@ class TraceCollectionListener(BaseEventListener):
self.memory_save_in_progress = True
self.formatter.handle_memory_save_started(
self.formatter.current_agent_branch,
self.formatter.current_crew_tree,
)
self.formatter.handle_memory_save_started()
@event_bus.on(MemorySaveCompletedEvent)
def on_memory_save_completed(
@@ -364,8 +352,6 @@ class TraceCollectionListener(BaseEventListener):
self.memory_save_in_progress = False
self.formatter.handle_memory_save_completed(
self.formatter.current_agent_branch,
self.formatter.current_crew_tree,
event.save_time_ms,
event.source_type or "memory",
)
@@ -375,10 +361,8 @@ class TraceCollectionListener(BaseEventListener):
self._handle_action_event("memory_save_failed", source, event)
if self.formatter and self.memory_save_in_progress:
self.formatter.handle_memory_save_failed(
self.formatter.current_agent_branch,
event.error,
event.source_type or "memory",
self.formatter.current_crew_tree,
)
@event_bus.on(MemoryRetrievalStartedEvent)
@@ -391,10 +375,7 @@ class TraceCollectionListener(BaseEventListener):
self.memory_retrieval_in_progress = True
self.formatter.handle_memory_retrieval_started(
self.formatter.current_agent_branch,
self.formatter.current_crew_tree,
)
self.formatter.handle_memory_retrieval_started()
@event_bus.on(MemoryRetrievalCompletedEvent)
def on_memory_retrieval_completed(
@@ -406,8 +387,6 @@ class TraceCollectionListener(BaseEventListener):
self.memory_retrieval_in_progress = False
self.formatter.handle_memory_retrieval_completed(
self.formatter.current_agent_branch,
self.formatter.current_crew_tree,
event.memory_content,
event.retrieval_time_ms,
)

File diff suppressed because it is too large Load Diff

View File

@@ -148,6 +148,7 @@ class ToolUsage:
and tool.name == self._i18n.tools("add_image")["name"] # type: ignore
):
try:
self._run_attempts += 1
return self._use(tool_string=tool_string, tool=tool, calling=calling)
except Exception as e:
@@ -249,6 +250,7 @@ class ToolUsage:
"tool_args": self.action.tool_input,
"tool_class": self.action.tool,
"agent": self.agent,
"run_attempts": self._run_attempts,
}
if self.agent.fingerprint: # type: ignore
@@ -435,6 +437,7 @@ class ToolUsage:
"tool_args": self.action.tool_input,
"tool_class": self.action.tool,
"agent": self.agent,
"run_attempts": self._run_attempts,
}
# TODO: Investigate fingerprint attribute availability on BaseAgent/LiteAgent