fix: ensure flow execution start panel is not shown on plot

This commit is contained in:
Greyson LaLonde
2025-11-24 12:50:18 -05:00
committed by GitHub
parent a559cedbd1
commit 9da1f0c0aa

View File

@@ -101,24 +101,25 @@ if TYPE_CHECKING:
class EventListener(BaseEventListener): class EventListener(BaseEventListener):
_instance = None _instance: EventListener | None = None
_initialized: bool = False
_telemetry: Telemetry = PrivateAttr(default_factory=lambda: Telemetry()) _telemetry: Telemetry = PrivateAttr(default_factory=lambda: Telemetry())
logger = Logger(verbose=True, default_color=EMITTER_COLOR) logger: Logger = Logger(verbose=True, default_color=EMITTER_COLOR)
execution_spans: dict[Task, Any] = Field(default_factory=dict) execution_spans: dict[Task, Any] = Field(default_factory=dict)
next_chunk = 0 next_chunk: int = 0
text_stream = StringIO() text_stream: StringIO = StringIO()
knowledge_retrieval_in_progress = False knowledge_retrieval_in_progress: bool = False
knowledge_query_in_progress = False knowledge_query_in_progress: bool = False
method_branches: dict[str, Any] = Field(default_factory=dict) method_branches: dict[str, Any] = Field(default_factory=dict)
def __new__(cls): def __new__(cls) -> EventListener:
if cls._instance is None: if cls._instance is None:
cls._instance = super().__new__(cls) cls._instance = super().__new__(cls)
cls._instance._initialized = False cls._instance._initialized = False
return cls._instance return cls._instance
def __init__(self): def __init__(self) -> None:
if not hasattr(self, "_initialized") or not self._initialized: if not self._initialized:
super().__init__() super().__init__()
self._telemetry = Telemetry() self._telemetry = Telemetry()
self._telemetry.set_tracer() self._telemetry.set_tracer()
@@ -136,14 +137,14 @@ class EventListener(BaseEventListener):
def setup_listeners(self, crewai_event_bus: CrewAIEventsBus) -> None: def setup_listeners(self, crewai_event_bus: CrewAIEventsBus) -> None:
@crewai_event_bus.on(CrewKickoffStartedEvent) @crewai_event_bus.on(CrewKickoffStartedEvent)
def on_crew_started(source, event: CrewKickoffStartedEvent) -> None: def on_crew_started(source: Any, event: CrewKickoffStartedEvent) -> None:
with self._crew_tree_lock: with self._crew_tree_lock:
self.formatter.create_crew_tree(event.crew_name or "Crew", source.id) self.formatter.create_crew_tree(event.crew_name or "Crew", source.id)
self._telemetry.crew_execution_span(source, event.inputs) self._telemetry.crew_execution_span(source, event.inputs)
self._crew_tree_lock.notify_all() self._crew_tree_lock.notify_all()
@crewai_event_bus.on(CrewKickoffCompletedEvent) @crewai_event_bus.on(CrewKickoffCompletedEvent)
def on_crew_completed(source, event: CrewKickoffCompletedEvent) -> None: def on_crew_completed(source: Any, event: CrewKickoffCompletedEvent) -> None:
# Handle telemetry # Handle telemetry
final_string_output = event.output.raw final_string_output = event.output.raw
self._telemetry.end_crew(source, final_string_output) self._telemetry.end_crew(source, final_string_output)
@@ -157,7 +158,7 @@ class EventListener(BaseEventListener):
) )
@crewai_event_bus.on(CrewKickoffFailedEvent) @crewai_event_bus.on(CrewKickoffFailedEvent)
def on_crew_failed(source, event: CrewKickoffFailedEvent) -> None: def on_crew_failed(source: Any, event: CrewKickoffFailedEvent) -> None:
self.formatter.update_crew_tree( self.formatter.update_crew_tree(
self.formatter.current_crew_tree, self.formatter.current_crew_tree,
event.crew_name or "Crew", event.crew_name or "Crew",
@@ -166,23 +167,23 @@ class EventListener(BaseEventListener):
) )
@crewai_event_bus.on(CrewTrainStartedEvent) @crewai_event_bus.on(CrewTrainStartedEvent)
def on_crew_train_started(source, event: CrewTrainStartedEvent) -> None: def on_crew_train_started(_: Any, event: CrewTrainStartedEvent) -> None:
self.formatter.handle_crew_train_started( self.formatter.handle_crew_train_started(
event.crew_name or "Crew", str(event.timestamp) event.crew_name or "Crew", str(event.timestamp)
) )
@crewai_event_bus.on(CrewTrainCompletedEvent) @crewai_event_bus.on(CrewTrainCompletedEvent)
def on_crew_train_completed(source, event: CrewTrainCompletedEvent) -> None: def on_crew_train_completed(_: Any, event: CrewTrainCompletedEvent) -> None:
self.formatter.handle_crew_train_completed( self.formatter.handle_crew_train_completed(
event.crew_name or "Crew", str(event.timestamp) event.crew_name or "Crew", str(event.timestamp)
) )
@crewai_event_bus.on(CrewTrainFailedEvent) @crewai_event_bus.on(CrewTrainFailedEvent)
def on_crew_train_failed(source, event: CrewTrainFailedEvent) -> None: def on_crew_train_failed(_: Any, event: CrewTrainFailedEvent) -> None:
self.formatter.handle_crew_train_failed(event.crew_name or "Crew") self.formatter.handle_crew_train_failed(event.crew_name or "Crew")
@crewai_event_bus.on(CrewTestResultEvent) @crewai_event_bus.on(CrewTestResultEvent)
def on_crew_test_result(source, event: CrewTestResultEvent) -> None: def on_crew_test_result(source: Any, event: CrewTestResultEvent) -> None:
self._telemetry.individual_test_result_span( self._telemetry.individual_test_result_span(
source.crew, source.crew,
event.quality, event.quality,
@@ -193,7 +194,7 @@ class EventListener(BaseEventListener):
# ----------- TASK EVENTS ----------- # ----------- TASK EVENTS -----------
@crewai_event_bus.on(TaskStartedEvent) @crewai_event_bus.on(TaskStartedEvent)
def on_task_started(source, event: TaskStartedEvent) -> None: def on_task_started(source: Any, event: TaskStartedEvent) -> None:
span = self._telemetry.task_started(crew=source.agent.crew, task=source) span = self._telemetry.task_started(crew=source.agent.crew, task=source)
self.execution_spans[source] = span self.execution_spans[source] = span
@@ -211,7 +212,7 @@ class EventListener(BaseEventListener):
) )
@crewai_event_bus.on(TaskCompletedEvent) @crewai_event_bus.on(TaskCompletedEvent)
def on_task_completed(source, event: TaskCompletedEvent): def on_task_completed(source: Any, event: TaskCompletedEvent) -> None:
# Handle telemetry # Handle telemetry
span = self.execution_spans.get(source) span = self.execution_spans.get(source)
if span: if span:
@@ -229,7 +230,7 @@ class EventListener(BaseEventListener):
) )
@crewai_event_bus.on(TaskFailedEvent) @crewai_event_bus.on(TaskFailedEvent)
def on_task_failed(source, event: TaskFailedEvent): def on_task_failed(source: Any, event: TaskFailedEvent) -> None:
span = self.execution_spans.get(source) span = self.execution_spans.get(source)
if span: if span:
if source.agent and source.agent.crew: if source.agent and source.agent.crew:
@@ -249,7 +250,9 @@ class EventListener(BaseEventListener):
# ----------- AGENT EVENTS ----------- # ----------- AGENT EVENTS -----------
@crewai_event_bus.on(AgentExecutionStartedEvent) @crewai_event_bus.on(AgentExecutionStartedEvent)
def on_agent_execution_started(source, event: AgentExecutionStartedEvent): def on_agent_execution_started(
_: Any, event: AgentExecutionStartedEvent
) -> None:
self.formatter.create_agent_branch( self.formatter.create_agent_branch(
self.formatter.current_task_branch, self.formatter.current_task_branch,
event.agent.role, event.agent.role,
@@ -257,7 +260,9 @@ class EventListener(BaseEventListener):
) )
@crewai_event_bus.on(AgentExecutionCompletedEvent) @crewai_event_bus.on(AgentExecutionCompletedEvent)
def on_agent_execution_completed(source, event: AgentExecutionCompletedEvent): def on_agent_execution_completed(
_: Any, event: AgentExecutionCompletedEvent
) -> None:
self.formatter.update_agent_status( self.formatter.update_agent_status(
self.formatter.current_agent_branch, self.formatter.current_agent_branch,
event.agent.role, event.agent.role,
@@ -268,8 +273,8 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(LiteAgentExecutionStartedEvent) @crewai_event_bus.on(LiteAgentExecutionStartedEvent)
def on_lite_agent_execution_started( def on_lite_agent_execution_started(
source, event: LiteAgentExecutionStartedEvent _: Any, event: LiteAgentExecutionStartedEvent
): ) -> None:
"""Handle LiteAgent execution started event.""" """Handle LiteAgent execution started event."""
self.formatter.handle_lite_agent_execution( self.formatter.handle_lite_agent_execution(
event.agent_info["role"], status="started", **event.agent_info event.agent_info["role"], status="started", **event.agent_info
@@ -277,15 +282,17 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(LiteAgentExecutionCompletedEvent) @crewai_event_bus.on(LiteAgentExecutionCompletedEvent)
def on_lite_agent_execution_completed( def on_lite_agent_execution_completed(
source, event: LiteAgentExecutionCompletedEvent _: Any, event: LiteAgentExecutionCompletedEvent
): ) -> None:
"""Handle LiteAgent execution completed event.""" """Handle LiteAgent execution completed event."""
self.formatter.handle_lite_agent_execution( self.formatter.handle_lite_agent_execution(
event.agent_info["role"], status="completed", **event.agent_info event.agent_info["role"], status="completed", **event.agent_info
) )
@crewai_event_bus.on(LiteAgentExecutionErrorEvent) @crewai_event_bus.on(LiteAgentExecutionErrorEvent)
def on_lite_agent_execution_error(source, event: LiteAgentExecutionErrorEvent): def on_lite_agent_execution_error(
_: Any, event: LiteAgentExecutionErrorEvent
) -> None:
"""Handle LiteAgent execution error event.""" """Handle LiteAgent execution error event."""
self.formatter.handle_lite_agent_execution( self.formatter.handle_lite_agent_execution(
event.agent_info["role"], event.agent_info["role"],
@@ -297,26 +304,28 @@ class EventListener(BaseEventListener):
# ----------- FLOW EVENTS ----------- # ----------- FLOW EVENTS -----------
@crewai_event_bus.on(FlowCreatedEvent) @crewai_event_bus.on(FlowCreatedEvent)
def on_flow_created(source, event: FlowCreatedEvent): def on_flow_created(_: Any, event: FlowCreatedEvent) -> None:
self._telemetry.flow_creation_span(event.flow_name) self._telemetry.flow_creation_span(event.flow_name)
tree = self.formatter.create_flow_tree(event.flow_name, str(source.flow_id))
self.formatter.current_flow_tree = tree
@crewai_event_bus.on(FlowStartedEvent) @crewai_event_bus.on(FlowStartedEvent)
def on_flow_started(source, event: FlowStartedEvent): def on_flow_started(source: Any, event: FlowStartedEvent) -> None:
self._telemetry.flow_execution_span( self._telemetry.flow_execution_span(
event.flow_name, list(source._methods.keys()) event.flow_name, list(source._methods.keys())
) )
tree = self.formatter.create_flow_tree(event.flow_name, str(source.flow_id))
self.formatter.current_flow_tree = tree
self.formatter.start_flow(event.flow_name, str(source.flow_id)) self.formatter.start_flow(event.flow_name, str(source.flow_id))
@crewai_event_bus.on(FlowFinishedEvent) @crewai_event_bus.on(FlowFinishedEvent)
def on_flow_finished(source, event: FlowFinishedEvent): def on_flow_finished(source: Any, event: FlowFinishedEvent) -> None:
self.formatter.update_flow_status( self.formatter.update_flow_status(
self.formatter.current_flow_tree, event.flow_name, source.flow_id self.formatter.current_flow_tree, event.flow_name, source.flow_id
) )
@crewai_event_bus.on(MethodExecutionStartedEvent) @crewai_event_bus.on(MethodExecutionStartedEvent)
def on_method_execution_started(source, event: MethodExecutionStartedEvent): def on_method_execution_started(
_: Any, event: MethodExecutionStartedEvent
) -> None:
method_branch = self.method_branches.get(event.method_name) method_branch = self.method_branches.get(event.method_name)
updated_branch = self.formatter.update_method_status( updated_branch = self.formatter.update_method_status(
method_branch, method_branch,
@@ -327,7 +336,9 @@ class EventListener(BaseEventListener):
self.method_branches[event.method_name] = updated_branch self.method_branches[event.method_name] = updated_branch
@crewai_event_bus.on(MethodExecutionFinishedEvent) @crewai_event_bus.on(MethodExecutionFinishedEvent)
def on_method_execution_finished(source, event: MethodExecutionFinishedEvent): def on_method_execution_finished(
_: Any, event: MethodExecutionFinishedEvent
) -> None:
method_branch = self.method_branches.get(event.method_name) method_branch = self.method_branches.get(event.method_name)
updated_branch = self.formatter.update_method_status( updated_branch = self.formatter.update_method_status(
method_branch, method_branch,
@@ -338,7 +349,9 @@ class EventListener(BaseEventListener):
self.method_branches[event.method_name] = updated_branch self.method_branches[event.method_name] = updated_branch
@crewai_event_bus.on(MethodExecutionFailedEvent) @crewai_event_bus.on(MethodExecutionFailedEvent)
def on_method_execution_failed(source, event: MethodExecutionFailedEvent): def on_method_execution_failed(
_: Any, event: MethodExecutionFailedEvent
) -> None:
method_branch = self.method_branches.get(event.method_name) method_branch = self.method_branches.get(event.method_name)
updated_branch = self.formatter.update_method_status( updated_branch = self.formatter.update_method_status(
method_branch, method_branch,
@@ -351,7 +364,7 @@ class EventListener(BaseEventListener):
# ----------- TOOL USAGE EVENTS ----------- # ----------- TOOL USAGE EVENTS -----------
@crewai_event_bus.on(ToolUsageStartedEvent) @crewai_event_bus.on(ToolUsageStartedEvent)
def on_tool_usage_started(source, event: ToolUsageStartedEvent): def on_tool_usage_started(source: Any, event: ToolUsageStartedEvent) -> None:
if isinstance(source, LLM): if isinstance(source, LLM):
self.formatter.handle_llm_tool_usage_started( self.formatter.handle_llm_tool_usage_started(
event.tool_name, event.tool_name,
@@ -365,7 +378,7 @@ class EventListener(BaseEventListener):
) )
@crewai_event_bus.on(ToolUsageFinishedEvent) @crewai_event_bus.on(ToolUsageFinishedEvent)
def on_tool_usage_finished(source, event: ToolUsageFinishedEvent): def on_tool_usage_finished(source: Any, event: ToolUsageFinishedEvent) -> None:
if isinstance(source, LLM): if isinstance(source, LLM):
self.formatter.handle_llm_tool_usage_finished( self.formatter.handle_llm_tool_usage_finished(
event.tool_name, event.tool_name,
@@ -378,7 +391,7 @@ class EventListener(BaseEventListener):
) )
@crewai_event_bus.on(ToolUsageErrorEvent) @crewai_event_bus.on(ToolUsageErrorEvent)
def on_tool_usage_error(source, event: ToolUsageErrorEvent): def on_tool_usage_error(source: Any, event: ToolUsageErrorEvent) -> None:
if isinstance(source, LLM): if isinstance(source, LLM):
self.formatter.handle_llm_tool_usage_error( self.formatter.handle_llm_tool_usage_error(
event.tool_name, event.tool_name,
@@ -395,7 +408,7 @@ class EventListener(BaseEventListener):
# ----------- LLM EVENTS ----------- # ----------- LLM EVENTS -----------
@crewai_event_bus.on(LLMCallStartedEvent) @crewai_event_bus.on(LLMCallStartedEvent)
def on_llm_call_started(source, event: LLMCallStartedEvent): def on_llm_call_started(_: Any, event: LLMCallStartedEvent) -> None:
# Capture the returned tool branch and update the current_tool_branch reference # Capture the returned tool branch and update the current_tool_branch reference
thinking_branch = self.formatter.handle_llm_call_started( thinking_branch = self.formatter.handle_llm_call_started(
self.formatter.current_agent_branch, self.formatter.current_agent_branch,
@@ -406,7 +419,7 @@ class EventListener(BaseEventListener):
self.formatter.current_tool_branch = thinking_branch self.formatter.current_tool_branch = thinking_branch
@crewai_event_bus.on(LLMCallCompletedEvent) @crewai_event_bus.on(LLMCallCompletedEvent)
def on_llm_call_completed(source, event: LLMCallCompletedEvent): def on_llm_call_completed(_: Any, event: LLMCallCompletedEvent) -> None:
self.formatter.handle_llm_call_completed( self.formatter.handle_llm_call_completed(
self.formatter.current_tool_branch, self.formatter.current_tool_branch,
self.formatter.current_agent_branch, self.formatter.current_agent_branch,
@@ -414,7 +427,7 @@ class EventListener(BaseEventListener):
) )
@crewai_event_bus.on(LLMCallFailedEvent) @crewai_event_bus.on(LLMCallFailedEvent)
def on_llm_call_failed(source, event: LLMCallFailedEvent): def on_llm_call_failed(_: Any, event: LLMCallFailedEvent) -> None:
self.formatter.handle_llm_call_failed( self.formatter.handle_llm_call_failed(
self.formatter.current_tool_branch, self.formatter.current_tool_branch,
event.error, event.error,
@@ -422,7 +435,7 @@ class EventListener(BaseEventListener):
) )
@crewai_event_bus.on(LLMStreamChunkEvent) @crewai_event_bus.on(LLMStreamChunkEvent)
def on_llm_stream_chunk(source, event: LLMStreamChunkEvent): def on_llm_stream_chunk(_: Any, event: LLMStreamChunkEvent) -> None:
self.text_stream.write(event.chunk) self.text_stream.write(event.chunk)
self.text_stream.seek(self.next_chunk) self.text_stream.seek(self.next_chunk)
self.text_stream.read() self.text_stream.read()
@@ -431,7 +444,7 @@ class EventListener(BaseEventListener):
# ----------- LLM GUARDRAIL EVENTS ----------- # ----------- LLM GUARDRAIL EVENTS -----------
@crewai_event_bus.on(LLMGuardrailStartedEvent) @crewai_event_bus.on(LLMGuardrailStartedEvent)
def on_llm_guardrail_started(source, event: LLMGuardrailStartedEvent): def on_llm_guardrail_started(_: Any, event: LLMGuardrailStartedEvent) -> None:
guardrail_str = str(event.guardrail) guardrail_str = str(event.guardrail)
guardrail_name = ( guardrail_name = (
guardrail_str[:50] + "..." if len(guardrail_str) > 50 else guardrail_str guardrail_str[:50] + "..." if len(guardrail_str) > 50 else guardrail_str
@@ -440,13 +453,15 @@ class EventListener(BaseEventListener):
self.formatter.handle_guardrail_started(guardrail_name, event.retry_count) self.formatter.handle_guardrail_started(guardrail_name, event.retry_count)
@crewai_event_bus.on(LLMGuardrailCompletedEvent) @crewai_event_bus.on(LLMGuardrailCompletedEvent)
def on_llm_guardrail_completed(source, event: LLMGuardrailCompletedEvent): def on_llm_guardrail_completed(
_: Any, event: LLMGuardrailCompletedEvent
) -> None:
self.formatter.handle_guardrail_completed( self.formatter.handle_guardrail_completed(
event.success, event.error, event.retry_count event.success, event.error, event.retry_count
) )
@crewai_event_bus.on(CrewTestStartedEvent) @crewai_event_bus.on(CrewTestStartedEvent)
def on_crew_test_started(source, event: CrewTestStartedEvent): def on_crew_test_started(source: Any, event: CrewTestStartedEvent) -> None:
cloned_crew = source.copy() cloned_crew = source.copy()
self._telemetry.test_execution_span( self._telemetry.test_execution_span(
cloned_crew, cloned_crew,
@@ -460,20 +475,20 @@ class EventListener(BaseEventListener):
) )
@crewai_event_bus.on(CrewTestCompletedEvent) @crewai_event_bus.on(CrewTestCompletedEvent)
def on_crew_test_completed(source, event: CrewTestCompletedEvent): def on_crew_test_completed(_: Any, event: CrewTestCompletedEvent) -> None:
self.formatter.handle_crew_test_completed( self.formatter.handle_crew_test_completed(
self.formatter.current_flow_tree, self.formatter.current_flow_tree,
event.crew_name or "Crew", event.crew_name or "Crew",
) )
@crewai_event_bus.on(CrewTestFailedEvent) @crewai_event_bus.on(CrewTestFailedEvent)
def on_crew_test_failed(source, event: CrewTestFailedEvent): def on_crew_test_failed(_: Any, event: CrewTestFailedEvent) -> None:
self.formatter.handle_crew_test_failed(event.crew_name or "Crew") self.formatter.handle_crew_test_failed(event.crew_name or "Crew")
@crewai_event_bus.on(KnowledgeRetrievalStartedEvent) @crewai_event_bus.on(KnowledgeRetrievalStartedEvent)
def on_knowledge_retrieval_started( def on_knowledge_retrieval_started(
source, event: KnowledgeRetrievalStartedEvent _: Any, event: KnowledgeRetrievalStartedEvent
): ) -> None:
if self.knowledge_retrieval_in_progress: if self.knowledge_retrieval_in_progress:
return return
@@ -486,8 +501,8 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(KnowledgeRetrievalCompletedEvent) @crewai_event_bus.on(KnowledgeRetrievalCompletedEvent)
def on_knowledge_retrieval_completed( def on_knowledge_retrieval_completed(
source, event: KnowledgeRetrievalCompletedEvent _: Any, event: KnowledgeRetrievalCompletedEvent
): ) -> None:
if not self.knowledge_retrieval_in_progress: if not self.knowledge_retrieval_in_progress:
return return
@@ -499,11 +514,13 @@ class EventListener(BaseEventListener):
) )
@crewai_event_bus.on(KnowledgeQueryStartedEvent) @crewai_event_bus.on(KnowledgeQueryStartedEvent)
def on_knowledge_query_started(source, event: KnowledgeQueryStartedEvent): def on_knowledge_query_started(
_: Any, event: KnowledgeQueryStartedEvent
) -> None:
pass pass
@crewai_event_bus.on(KnowledgeQueryFailedEvent) @crewai_event_bus.on(KnowledgeQueryFailedEvent)
def on_knowledge_query_failed(source, event: KnowledgeQueryFailedEvent): def on_knowledge_query_failed(_: Any, event: KnowledgeQueryFailedEvent) -> None:
self.formatter.handle_knowledge_query_failed( self.formatter.handle_knowledge_query_failed(
self.formatter.current_agent_branch, self.formatter.current_agent_branch,
event.error, event.error,
@@ -511,13 +528,15 @@ class EventListener(BaseEventListener):
) )
@crewai_event_bus.on(KnowledgeQueryCompletedEvent) @crewai_event_bus.on(KnowledgeQueryCompletedEvent)
def on_knowledge_query_completed(source, event: KnowledgeQueryCompletedEvent): def on_knowledge_query_completed(
_: Any, event: KnowledgeQueryCompletedEvent
) -> None:
pass pass
@crewai_event_bus.on(KnowledgeSearchQueryFailedEvent) @crewai_event_bus.on(KnowledgeSearchQueryFailedEvent)
def on_knowledge_search_query_failed( def on_knowledge_search_query_failed(
source, event: KnowledgeSearchQueryFailedEvent _: Any, event: KnowledgeSearchQueryFailedEvent
): ) -> None:
self.formatter.handle_knowledge_search_query_failed( self.formatter.handle_knowledge_search_query_failed(
self.formatter.current_agent_branch, self.formatter.current_agent_branch,
event.error, event.error,
@@ -527,7 +546,9 @@ class EventListener(BaseEventListener):
# ----------- REASONING EVENTS ----------- # ----------- REASONING EVENTS -----------
@crewai_event_bus.on(AgentReasoningStartedEvent) @crewai_event_bus.on(AgentReasoningStartedEvent)
def on_agent_reasoning_started(source, event: AgentReasoningStartedEvent): def on_agent_reasoning_started(
_: Any, event: AgentReasoningStartedEvent
) -> None:
self.formatter.handle_reasoning_started( self.formatter.handle_reasoning_started(
self.formatter.current_agent_branch, self.formatter.current_agent_branch,
event.attempt, event.attempt,
@@ -535,7 +556,9 @@ class EventListener(BaseEventListener):
) )
@crewai_event_bus.on(AgentReasoningCompletedEvent) @crewai_event_bus.on(AgentReasoningCompletedEvent)
def on_agent_reasoning_completed(source, event: AgentReasoningCompletedEvent): def on_agent_reasoning_completed(
_: Any, event: AgentReasoningCompletedEvent
) -> None:
self.formatter.handle_reasoning_completed( self.formatter.handle_reasoning_completed(
event.plan, event.plan,
event.ready, event.ready,
@@ -543,7 +566,7 @@ class EventListener(BaseEventListener):
) )
@crewai_event_bus.on(AgentReasoningFailedEvent) @crewai_event_bus.on(AgentReasoningFailedEvent)
def on_agent_reasoning_failed(source, event: AgentReasoningFailedEvent): def on_agent_reasoning_failed(_: Any, event: AgentReasoningFailedEvent) -> None:
self.formatter.handle_reasoning_failed( self.formatter.handle_reasoning_failed(
event.error, event.error,
self.formatter.current_crew_tree, self.formatter.current_crew_tree,
@@ -552,7 +575,7 @@ class EventListener(BaseEventListener):
# ----------- AGENT LOGGING EVENTS ----------- # ----------- AGENT LOGGING EVENTS -----------
@crewai_event_bus.on(AgentLogsStartedEvent) @crewai_event_bus.on(AgentLogsStartedEvent)
def on_agent_logs_started(source, event: AgentLogsStartedEvent): def on_agent_logs_started(_: Any, event: AgentLogsStartedEvent) -> None:
self.formatter.handle_agent_logs_started( self.formatter.handle_agent_logs_started(
event.agent_role, event.agent_role,
event.task_description, event.task_description,
@@ -560,7 +583,7 @@ class EventListener(BaseEventListener):
) )
@crewai_event_bus.on(AgentLogsExecutionEvent) @crewai_event_bus.on(AgentLogsExecutionEvent)
def on_agent_logs_execution(source, event: AgentLogsExecutionEvent): def on_agent_logs_execution(_: Any, event: AgentLogsExecutionEvent) -> None:
self.formatter.handle_agent_logs_execution( self.formatter.handle_agent_logs_execution(
event.agent_role, event.agent_role,
event.formatted_answer, event.formatted_answer,
@@ -568,7 +591,7 @@ class EventListener(BaseEventListener):
) )
@crewai_event_bus.on(A2ADelegationStartedEvent) @crewai_event_bus.on(A2ADelegationStartedEvent)
def on_a2a_delegation_started(source, event: A2ADelegationStartedEvent): def on_a2a_delegation_started(_: Any, event: A2ADelegationStartedEvent) -> None:
self.formatter.handle_a2a_delegation_started( self.formatter.handle_a2a_delegation_started(
event.endpoint, event.endpoint,
event.task_description, event.task_description,
@@ -578,7 +601,9 @@ class EventListener(BaseEventListener):
) )
@crewai_event_bus.on(A2ADelegationCompletedEvent) @crewai_event_bus.on(A2ADelegationCompletedEvent)
def on_a2a_delegation_completed(source, event: A2ADelegationCompletedEvent): def on_a2a_delegation_completed(
_: Any, event: A2ADelegationCompletedEvent
) -> None:
self.formatter.handle_a2a_delegation_completed( self.formatter.handle_a2a_delegation_completed(
event.status, event.status,
event.result, event.result,
@@ -587,7 +612,9 @@ class EventListener(BaseEventListener):
) )
@crewai_event_bus.on(A2AConversationStartedEvent) @crewai_event_bus.on(A2AConversationStartedEvent)
def on_a2a_conversation_started(source, event: A2AConversationStartedEvent): def on_a2a_conversation_started(
_: Any, event: A2AConversationStartedEvent
) -> None:
# Store A2A agent name for display in conversation tree # Store A2A agent name for display in conversation tree
if event.a2a_agent_name: if event.a2a_agent_name:
self.formatter._current_a2a_agent_name = event.a2a_agent_name self.formatter._current_a2a_agent_name = event.a2a_agent_name
@@ -598,7 +625,7 @@ class EventListener(BaseEventListener):
) )
@crewai_event_bus.on(A2AMessageSentEvent) @crewai_event_bus.on(A2AMessageSentEvent)
def on_a2a_message_sent(source, event: A2AMessageSentEvent): def on_a2a_message_sent(_: Any, event: A2AMessageSentEvent) -> None:
self.formatter.handle_a2a_message_sent( self.formatter.handle_a2a_message_sent(
event.message, event.message,
event.turn_number, event.turn_number,
@@ -606,7 +633,7 @@ class EventListener(BaseEventListener):
) )
@crewai_event_bus.on(A2AResponseReceivedEvent) @crewai_event_bus.on(A2AResponseReceivedEvent)
def on_a2a_response_received(source, event: A2AResponseReceivedEvent): def on_a2a_response_received(_: Any, event: A2AResponseReceivedEvent) -> None:
self.formatter.handle_a2a_response_received( self.formatter.handle_a2a_response_received(
event.response, event.response,
event.turn_number, event.turn_number,
@@ -615,7 +642,9 @@ class EventListener(BaseEventListener):
) )
@crewai_event_bus.on(A2AConversationCompletedEvent) @crewai_event_bus.on(A2AConversationCompletedEvent)
def on_a2a_conversation_completed(source, event: A2AConversationCompletedEvent): def on_a2a_conversation_completed(
_: Any, event: A2AConversationCompletedEvent
) -> None:
self.formatter.handle_a2a_conversation_completed( self.formatter.handle_a2a_conversation_completed(
event.status, event.status,
event.final_result, event.final_result,
@@ -626,7 +655,7 @@ class EventListener(BaseEventListener):
# ----------- MCP EVENTS ----------- # ----------- MCP EVENTS -----------
@crewai_event_bus.on(MCPConnectionStartedEvent) @crewai_event_bus.on(MCPConnectionStartedEvent)
def on_mcp_connection_started(source, event: MCPConnectionStartedEvent): def on_mcp_connection_started(_: Any, event: MCPConnectionStartedEvent) -> None:
self.formatter.handle_mcp_connection_started( self.formatter.handle_mcp_connection_started(
event.server_name, event.server_name,
event.server_url, event.server_url,
@@ -636,7 +665,9 @@ class EventListener(BaseEventListener):
) )
@crewai_event_bus.on(MCPConnectionCompletedEvent) @crewai_event_bus.on(MCPConnectionCompletedEvent)
def on_mcp_connection_completed(source, event: MCPConnectionCompletedEvent): def on_mcp_connection_completed(
_: Any, event: MCPConnectionCompletedEvent
) -> None:
self.formatter.handle_mcp_connection_completed( self.formatter.handle_mcp_connection_completed(
event.server_name, event.server_name,
event.server_url, event.server_url,
@@ -646,7 +677,7 @@ class EventListener(BaseEventListener):
) )
@crewai_event_bus.on(MCPConnectionFailedEvent) @crewai_event_bus.on(MCPConnectionFailedEvent)
def on_mcp_connection_failed(source, event: MCPConnectionFailedEvent): def on_mcp_connection_failed(_: Any, event: MCPConnectionFailedEvent) -> None:
self.formatter.handle_mcp_connection_failed( self.formatter.handle_mcp_connection_failed(
event.server_name, event.server_name,
event.server_url, event.server_url,
@@ -656,7 +687,9 @@ class EventListener(BaseEventListener):
) )
@crewai_event_bus.on(MCPToolExecutionStartedEvent) @crewai_event_bus.on(MCPToolExecutionStartedEvent)
def on_mcp_tool_execution_started(source, event: MCPToolExecutionStartedEvent): def on_mcp_tool_execution_started(
_: Any, event: MCPToolExecutionStartedEvent
) -> None:
self.formatter.handle_mcp_tool_execution_started( self.formatter.handle_mcp_tool_execution_started(
event.server_name, event.server_name,
event.tool_name, event.tool_name,
@@ -665,8 +698,8 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(MCPToolExecutionCompletedEvent) @crewai_event_bus.on(MCPToolExecutionCompletedEvent)
def on_mcp_tool_execution_completed( def on_mcp_tool_execution_completed(
source, event: MCPToolExecutionCompletedEvent _: Any, event: MCPToolExecutionCompletedEvent
): ) -> None:
self.formatter.handle_mcp_tool_execution_completed( self.formatter.handle_mcp_tool_execution_completed(
event.server_name, event.server_name,
event.tool_name, event.tool_name,
@@ -676,7 +709,9 @@ class EventListener(BaseEventListener):
) )
@crewai_event_bus.on(MCPToolExecutionFailedEvent) @crewai_event_bus.on(MCPToolExecutionFailedEvent)
def on_mcp_tool_execution_failed(source, event: MCPToolExecutionFailedEvent): def on_mcp_tool_execution_failed(
_: Any, event: MCPToolExecutionFailedEvent
) -> None:
self.formatter.handle_mcp_tool_execution_failed( self.formatter.handle_mcp_tool_execution_failed(
event.server_name, event.server_name,
event.tool_name, event.tool_name,