From 9da1f0c0aa0a738af470e79959887b0a569e26c0 Mon Sep 17 00:00:00 2001 From: Greyson LaLonde Date: Mon, 24 Nov 2025 12:50:18 -0500 Subject: [PATCH] fix: ensure flow execution start panel is not shown on plot --- .../src/crewai/events/event_listener.py | 181 +++++++++++------- 1 file changed, 108 insertions(+), 73 deletions(-) diff --git a/lib/crewai/src/crewai/events/event_listener.py b/lib/crewai/src/crewai/events/event_listener.py index e07ee193c..5cd190cf6 100644 --- a/lib/crewai/src/crewai/events/event_listener.py +++ b/lib/crewai/src/crewai/events/event_listener.py @@ -101,24 +101,25 @@ if TYPE_CHECKING: class EventListener(BaseEventListener): - _instance = None + _instance: EventListener | None = None + _initialized: bool = False _telemetry: Telemetry = PrivateAttr(default_factory=lambda: Telemetry()) - logger = Logger(verbose=True, default_color=EMITTER_COLOR) + logger: Logger = Logger(verbose=True, default_color=EMITTER_COLOR) execution_spans: dict[Task, Any] = Field(default_factory=dict) - next_chunk = 0 - text_stream = StringIO() - knowledge_retrieval_in_progress = False - knowledge_query_in_progress = False + next_chunk: int = 0 + text_stream: StringIO = StringIO() + knowledge_retrieval_in_progress: bool = False + knowledge_query_in_progress: bool = False method_branches: dict[str, Any] = Field(default_factory=dict) - def __new__(cls): + def __new__(cls) -> EventListener: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._initialized = False return cls._instance - def __init__(self): - if not hasattr(self, "_initialized") or not self._initialized: + def __init__(self) -> None: + if not self._initialized: super().__init__() self._telemetry = Telemetry() self._telemetry.set_tracer() @@ -136,14 +137,14 @@ class EventListener(BaseEventListener): def setup_listeners(self, crewai_event_bus: CrewAIEventsBus) -> None: @crewai_event_bus.on(CrewKickoffStartedEvent) - def on_crew_started(source, event: CrewKickoffStartedEvent) -> None: + def on_crew_started(source: Any, event: CrewKickoffStartedEvent) -> None: with self._crew_tree_lock: self.formatter.create_crew_tree(event.crew_name or "Crew", source.id) self._telemetry.crew_execution_span(source, event.inputs) self._crew_tree_lock.notify_all() @crewai_event_bus.on(CrewKickoffCompletedEvent) - def on_crew_completed(source, event: CrewKickoffCompletedEvent) -> None: + def on_crew_completed(source: Any, event: CrewKickoffCompletedEvent) -> None: # Handle telemetry final_string_output = event.output.raw self._telemetry.end_crew(source, final_string_output) @@ -157,7 +158,7 @@ class EventListener(BaseEventListener): ) @crewai_event_bus.on(CrewKickoffFailedEvent) - def on_crew_failed(source, event: CrewKickoffFailedEvent) -> None: + def on_crew_failed(source: Any, event: CrewKickoffFailedEvent) -> None: self.formatter.update_crew_tree( self.formatter.current_crew_tree, event.crew_name or "Crew", @@ -166,23 +167,23 @@ class EventListener(BaseEventListener): ) @crewai_event_bus.on(CrewTrainStartedEvent) - def on_crew_train_started(source, event: CrewTrainStartedEvent) -> None: + def on_crew_train_started(_: Any, event: CrewTrainStartedEvent) -> None: self.formatter.handle_crew_train_started( event.crew_name or "Crew", str(event.timestamp) ) @crewai_event_bus.on(CrewTrainCompletedEvent) - def on_crew_train_completed(source, event: CrewTrainCompletedEvent) -> None: + def on_crew_train_completed(_: Any, event: CrewTrainCompletedEvent) -> None: self.formatter.handle_crew_train_completed( event.crew_name or "Crew", str(event.timestamp) ) @crewai_event_bus.on(CrewTrainFailedEvent) - def on_crew_train_failed(source, event: CrewTrainFailedEvent) -> None: + def on_crew_train_failed(_: Any, event: CrewTrainFailedEvent) -> None: self.formatter.handle_crew_train_failed(event.crew_name or "Crew") @crewai_event_bus.on(CrewTestResultEvent) - def on_crew_test_result(source, event: CrewTestResultEvent) -> None: + def on_crew_test_result(source: Any, event: CrewTestResultEvent) -> None: self._telemetry.individual_test_result_span( source.crew, event.quality, @@ -193,7 +194,7 @@ class EventListener(BaseEventListener): # ----------- TASK EVENTS ----------- @crewai_event_bus.on(TaskStartedEvent) - def on_task_started(source, event: TaskStartedEvent) -> None: + def on_task_started(source: Any, event: TaskStartedEvent) -> None: span = self._telemetry.task_started(crew=source.agent.crew, task=source) self.execution_spans[source] = span @@ -211,7 +212,7 @@ class EventListener(BaseEventListener): ) @crewai_event_bus.on(TaskCompletedEvent) - def on_task_completed(source, event: TaskCompletedEvent): + def on_task_completed(source: Any, event: TaskCompletedEvent) -> None: # Handle telemetry span = self.execution_spans.get(source) if span: @@ -229,7 +230,7 @@ class EventListener(BaseEventListener): ) @crewai_event_bus.on(TaskFailedEvent) - def on_task_failed(source, event: TaskFailedEvent): + def on_task_failed(source: Any, event: TaskFailedEvent) -> None: span = self.execution_spans.get(source) if span: if source.agent and source.agent.crew: @@ -249,7 +250,9 @@ class EventListener(BaseEventListener): # ----------- AGENT EVENTS ----------- @crewai_event_bus.on(AgentExecutionStartedEvent) - def on_agent_execution_started(source, event: AgentExecutionStartedEvent): + def on_agent_execution_started( + _: Any, event: AgentExecutionStartedEvent + ) -> None: self.formatter.create_agent_branch( self.formatter.current_task_branch, event.agent.role, @@ -257,7 +260,9 @@ class EventListener(BaseEventListener): ) @crewai_event_bus.on(AgentExecutionCompletedEvent) - def on_agent_execution_completed(source, event: AgentExecutionCompletedEvent): + def on_agent_execution_completed( + _: Any, event: AgentExecutionCompletedEvent + ) -> None: self.formatter.update_agent_status( self.formatter.current_agent_branch, event.agent.role, @@ -268,8 +273,8 @@ class EventListener(BaseEventListener): @crewai_event_bus.on(LiteAgentExecutionStartedEvent) def on_lite_agent_execution_started( - source, event: LiteAgentExecutionStartedEvent - ): + _: Any, event: LiteAgentExecutionStartedEvent + ) -> None: """Handle LiteAgent execution started event.""" self.formatter.handle_lite_agent_execution( event.agent_info["role"], status="started", **event.agent_info @@ -277,15 +282,17 @@ class EventListener(BaseEventListener): @crewai_event_bus.on(LiteAgentExecutionCompletedEvent) def on_lite_agent_execution_completed( - source, event: LiteAgentExecutionCompletedEvent - ): + _: Any, event: LiteAgentExecutionCompletedEvent + ) -> None: """Handle LiteAgent execution completed event.""" self.formatter.handle_lite_agent_execution( event.agent_info["role"], status="completed", **event.agent_info ) @crewai_event_bus.on(LiteAgentExecutionErrorEvent) - def on_lite_agent_execution_error(source, event: LiteAgentExecutionErrorEvent): + def on_lite_agent_execution_error( + _: Any, event: LiteAgentExecutionErrorEvent + ) -> None: """Handle LiteAgent execution error event.""" self.formatter.handle_lite_agent_execution( event.agent_info["role"], @@ -297,26 +304,28 @@ class EventListener(BaseEventListener): # ----------- FLOW EVENTS ----------- @crewai_event_bus.on(FlowCreatedEvent) - def on_flow_created(source, event: FlowCreatedEvent): + def on_flow_created(_: Any, event: FlowCreatedEvent) -> None: self._telemetry.flow_creation_span(event.flow_name) - tree = self.formatter.create_flow_tree(event.flow_name, str(source.flow_id)) - self.formatter.current_flow_tree = tree @crewai_event_bus.on(FlowStartedEvent) - def on_flow_started(source, event: FlowStartedEvent): + def on_flow_started(source: Any, event: FlowStartedEvent) -> None: self._telemetry.flow_execution_span( event.flow_name, list(source._methods.keys()) ) + tree = self.formatter.create_flow_tree(event.flow_name, str(source.flow_id)) + self.formatter.current_flow_tree = tree self.formatter.start_flow(event.flow_name, str(source.flow_id)) @crewai_event_bus.on(FlowFinishedEvent) - def on_flow_finished(source, event: FlowFinishedEvent): + def on_flow_finished(source: Any, event: FlowFinishedEvent) -> None: self.formatter.update_flow_status( self.formatter.current_flow_tree, event.flow_name, source.flow_id ) @crewai_event_bus.on(MethodExecutionStartedEvent) - def on_method_execution_started(source, event: MethodExecutionStartedEvent): + def on_method_execution_started( + _: Any, event: MethodExecutionStartedEvent + ) -> None: method_branch = self.method_branches.get(event.method_name) updated_branch = self.formatter.update_method_status( method_branch, @@ -327,7 +336,9 @@ class EventListener(BaseEventListener): self.method_branches[event.method_name] = updated_branch @crewai_event_bus.on(MethodExecutionFinishedEvent) - def on_method_execution_finished(source, event: MethodExecutionFinishedEvent): + def on_method_execution_finished( + _: Any, event: MethodExecutionFinishedEvent + ) -> None: method_branch = self.method_branches.get(event.method_name) updated_branch = self.formatter.update_method_status( method_branch, @@ -338,7 +349,9 @@ class EventListener(BaseEventListener): self.method_branches[event.method_name] = updated_branch @crewai_event_bus.on(MethodExecutionFailedEvent) - def on_method_execution_failed(source, event: MethodExecutionFailedEvent): + def on_method_execution_failed( + _: Any, event: MethodExecutionFailedEvent + ) -> None: method_branch = self.method_branches.get(event.method_name) updated_branch = self.formatter.update_method_status( method_branch, @@ -351,7 +364,7 @@ class EventListener(BaseEventListener): # ----------- TOOL USAGE EVENTS ----------- @crewai_event_bus.on(ToolUsageStartedEvent) - def on_tool_usage_started(source, event: ToolUsageStartedEvent): + def on_tool_usage_started(source: Any, event: ToolUsageStartedEvent) -> None: if isinstance(source, LLM): self.formatter.handle_llm_tool_usage_started( event.tool_name, @@ -365,7 +378,7 @@ class EventListener(BaseEventListener): ) @crewai_event_bus.on(ToolUsageFinishedEvent) - def on_tool_usage_finished(source, event: ToolUsageFinishedEvent): + def on_tool_usage_finished(source: Any, event: ToolUsageFinishedEvent) -> None: if isinstance(source, LLM): self.formatter.handle_llm_tool_usage_finished( event.tool_name, @@ -378,7 +391,7 @@ class EventListener(BaseEventListener): ) @crewai_event_bus.on(ToolUsageErrorEvent) - def on_tool_usage_error(source, event: ToolUsageErrorEvent): + def on_tool_usage_error(source: Any, event: ToolUsageErrorEvent) -> None: if isinstance(source, LLM): self.formatter.handle_llm_tool_usage_error( event.tool_name, @@ -395,7 +408,7 @@ class EventListener(BaseEventListener): # ----------- LLM EVENTS ----------- @crewai_event_bus.on(LLMCallStartedEvent) - def on_llm_call_started(source, event: LLMCallStartedEvent): + def on_llm_call_started(_: Any, event: LLMCallStartedEvent) -> None: # Capture the returned tool branch and update the current_tool_branch reference thinking_branch = self.formatter.handle_llm_call_started( self.formatter.current_agent_branch, @@ -406,7 +419,7 @@ class EventListener(BaseEventListener): self.formatter.current_tool_branch = thinking_branch @crewai_event_bus.on(LLMCallCompletedEvent) - def on_llm_call_completed(source, event: LLMCallCompletedEvent): + def on_llm_call_completed(_: Any, event: LLMCallCompletedEvent) -> None: self.formatter.handle_llm_call_completed( self.formatter.current_tool_branch, self.formatter.current_agent_branch, @@ -414,7 +427,7 @@ class EventListener(BaseEventListener): ) @crewai_event_bus.on(LLMCallFailedEvent) - def on_llm_call_failed(source, event: LLMCallFailedEvent): + def on_llm_call_failed(_: Any, event: LLMCallFailedEvent) -> None: self.formatter.handle_llm_call_failed( self.formatter.current_tool_branch, event.error, @@ -422,7 +435,7 @@ class EventListener(BaseEventListener): ) @crewai_event_bus.on(LLMStreamChunkEvent) - def on_llm_stream_chunk(source, event: LLMStreamChunkEvent): + def on_llm_stream_chunk(_: Any, event: LLMStreamChunkEvent) -> None: self.text_stream.write(event.chunk) self.text_stream.seek(self.next_chunk) self.text_stream.read() @@ -431,7 +444,7 @@ class EventListener(BaseEventListener): # ----------- LLM GUARDRAIL EVENTS ----------- @crewai_event_bus.on(LLMGuardrailStartedEvent) - def on_llm_guardrail_started(source, event: LLMGuardrailStartedEvent): + def on_llm_guardrail_started(_: Any, event: LLMGuardrailStartedEvent) -> None: guardrail_str = str(event.guardrail) guardrail_name = ( guardrail_str[:50] + "..." if len(guardrail_str) > 50 else guardrail_str @@ -440,13 +453,15 @@ class EventListener(BaseEventListener): self.formatter.handle_guardrail_started(guardrail_name, event.retry_count) @crewai_event_bus.on(LLMGuardrailCompletedEvent) - def on_llm_guardrail_completed(source, event: LLMGuardrailCompletedEvent): + def on_llm_guardrail_completed( + _: Any, event: LLMGuardrailCompletedEvent + ) -> None: self.formatter.handle_guardrail_completed( event.success, event.error, event.retry_count ) @crewai_event_bus.on(CrewTestStartedEvent) - def on_crew_test_started(source, event: CrewTestStartedEvent): + def on_crew_test_started(source: Any, event: CrewTestStartedEvent) -> None: cloned_crew = source.copy() self._telemetry.test_execution_span( cloned_crew, @@ -460,20 +475,20 @@ class EventListener(BaseEventListener): ) @crewai_event_bus.on(CrewTestCompletedEvent) - def on_crew_test_completed(source, event: CrewTestCompletedEvent): + def on_crew_test_completed(_: Any, event: CrewTestCompletedEvent) -> None: self.formatter.handle_crew_test_completed( self.formatter.current_flow_tree, event.crew_name or "Crew", ) @crewai_event_bus.on(CrewTestFailedEvent) - def on_crew_test_failed(source, event: CrewTestFailedEvent): + def on_crew_test_failed(_: Any, event: CrewTestFailedEvent) -> None: self.formatter.handle_crew_test_failed(event.crew_name or "Crew") @crewai_event_bus.on(KnowledgeRetrievalStartedEvent) def on_knowledge_retrieval_started( - source, event: KnowledgeRetrievalStartedEvent - ): + _: Any, event: KnowledgeRetrievalStartedEvent + ) -> None: if self.knowledge_retrieval_in_progress: return @@ -486,8 +501,8 @@ class EventListener(BaseEventListener): @crewai_event_bus.on(KnowledgeRetrievalCompletedEvent) def on_knowledge_retrieval_completed( - source, event: KnowledgeRetrievalCompletedEvent - ): + _: Any, event: KnowledgeRetrievalCompletedEvent + ) -> None: if not self.knowledge_retrieval_in_progress: return @@ -499,11 +514,13 @@ class EventListener(BaseEventListener): ) @crewai_event_bus.on(KnowledgeQueryStartedEvent) - def on_knowledge_query_started(source, event: KnowledgeQueryStartedEvent): + def on_knowledge_query_started( + _: Any, event: KnowledgeQueryStartedEvent + ) -> None: pass @crewai_event_bus.on(KnowledgeQueryFailedEvent) - def on_knowledge_query_failed(source, event: KnowledgeQueryFailedEvent): + def on_knowledge_query_failed(_: Any, event: KnowledgeQueryFailedEvent) -> None: self.formatter.handle_knowledge_query_failed( self.formatter.current_agent_branch, event.error, @@ -511,13 +528,15 @@ class EventListener(BaseEventListener): ) @crewai_event_bus.on(KnowledgeQueryCompletedEvent) - def on_knowledge_query_completed(source, event: KnowledgeQueryCompletedEvent): + def on_knowledge_query_completed( + _: Any, event: KnowledgeQueryCompletedEvent + ) -> None: pass @crewai_event_bus.on(KnowledgeSearchQueryFailedEvent) def on_knowledge_search_query_failed( - source, event: KnowledgeSearchQueryFailedEvent - ): + _: Any, event: KnowledgeSearchQueryFailedEvent + ) -> None: self.formatter.handle_knowledge_search_query_failed( self.formatter.current_agent_branch, event.error, @@ -527,7 +546,9 @@ class EventListener(BaseEventListener): # ----------- REASONING EVENTS ----------- @crewai_event_bus.on(AgentReasoningStartedEvent) - def on_agent_reasoning_started(source, event: AgentReasoningStartedEvent): + def on_agent_reasoning_started( + _: Any, event: AgentReasoningStartedEvent + ) -> None: self.formatter.handle_reasoning_started( self.formatter.current_agent_branch, event.attempt, @@ -535,7 +556,9 @@ class EventListener(BaseEventListener): ) @crewai_event_bus.on(AgentReasoningCompletedEvent) - def on_agent_reasoning_completed(source, event: AgentReasoningCompletedEvent): + def on_agent_reasoning_completed( + _: Any, event: AgentReasoningCompletedEvent + ) -> None: self.formatter.handle_reasoning_completed( event.plan, event.ready, @@ -543,7 +566,7 @@ class EventListener(BaseEventListener): ) @crewai_event_bus.on(AgentReasoningFailedEvent) - def on_agent_reasoning_failed(source, event: AgentReasoningFailedEvent): + def on_agent_reasoning_failed(_: Any, event: AgentReasoningFailedEvent) -> None: self.formatter.handle_reasoning_failed( event.error, self.formatter.current_crew_tree, @@ -552,7 +575,7 @@ class EventListener(BaseEventListener): # ----------- AGENT LOGGING EVENTS ----------- @crewai_event_bus.on(AgentLogsStartedEvent) - def on_agent_logs_started(source, event: AgentLogsStartedEvent): + def on_agent_logs_started(_: Any, event: AgentLogsStartedEvent) -> None: self.formatter.handle_agent_logs_started( event.agent_role, event.task_description, @@ -560,7 +583,7 @@ class EventListener(BaseEventListener): ) @crewai_event_bus.on(AgentLogsExecutionEvent) - def on_agent_logs_execution(source, event: AgentLogsExecutionEvent): + def on_agent_logs_execution(_: Any, event: AgentLogsExecutionEvent) -> None: self.formatter.handle_agent_logs_execution( event.agent_role, event.formatted_answer, @@ -568,7 +591,7 @@ class EventListener(BaseEventListener): ) @crewai_event_bus.on(A2ADelegationStartedEvent) - def on_a2a_delegation_started(source, event: A2ADelegationStartedEvent): + def on_a2a_delegation_started(_: Any, event: A2ADelegationStartedEvent) -> None: self.formatter.handle_a2a_delegation_started( event.endpoint, event.task_description, @@ -578,7 +601,9 @@ class EventListener(BaseEventListener): ) @crewai_event_bus.on(A2ADelegationCompletedEvent) - def on_a2a_delegation_completed(source, event: A2ADelegationCompletedEvent): + def on_a2a_delegation_completed( + _: Any, event: A2ADelegationCompletedEvent + ) -> None: self.formatter.handle_a2a_delegation_completed( event.status, event.result, @@ -587,7 +612,9 @@ class EventListener(BaseEventListener): ) @crewai_event_bus.on(A2AConversationStartedEvent) - def on_a2a_conversation_started(source, event: A2AConversationStartedEvent): + def on_a2a_conversation_started( + _: Any, event: A2AConversationStartedEvent + ) -> None: # Store A2A agent name for display in conversation tree if event.a2a_agent_name: self.formatter._current_a2a_agent_name = event.a2a_agent_name @@ -598,7 +625,7 @@ class EventListener(BaseEventListener): ) @crewai_event_bus.on(A2AMessageSentEvent) - def on_a2a_message_sent(source, event: A2AMessageSentEvent): + def on_a2a_message_sent(_: Any, event: A2AMessageSentEvent) -> None: self.formatter.handle_a2a_message_sent( event.message, event.turn_number, @@ -606,7 +633,7 @@ class EventListener(BaseEventListener): ) @crewai_event_bus.on(A2AResponseReceivedEvent) - def on_a2a_response_received(source, event: A2AResponseReceivedEvent): + def on_a2a_response_received(_: Any, event: A2AResponseReceivedEvent) -> None: self.formatter.handle_a2a_response_received( event.response, event.turn_number, @@ -615,7 +642,9 @@ class EventListener(BaseEventListener): ) @crewai_event_bus.on(A2AConversationCompletedEvent) - def on_a2a_conversation_completed(source, event: A2AConversationCompletedEvent): + def on_a2a_conversation_completed( + _: Any, event: A2AConversationCompletedEvent + ) -> None: self.formatter.handle_a2a_conversation_completed( event.status, event.final_result, @@ -626,7 +655,7 @@ class EventListener(BaseEventListener): # ----------- MCP EVENTS ----------- @crewai_event_bus.on(MCPConnectionStartedEvent) - def on_mcp_connection_started(source, event: MCPConnectionStartedEvent): + def on_mcp_connection_started(_: Any, event: MCPConnectionStartedEvent) -> None: self.formatter.handle_mcp_connection_started( event.server_name, event.server_url, @@ -636,7 +665,9 @@ class EventListener(BaseEventListener): ) @crewai_event_bus.on(MCPConnectionCompletedEvent) - def on_mcp_connection_completed(source, event: MCPConnectionCompletedEvent): + def on_mcp_connection_completed( + _: Any, event: MCPConnectionCompletedEvent + ) -> None: self.formatter.handle_mcp_connection_completed( event.server_name, event.server_url, @@ -646,7 +677,7 @@ class EventListener(BaseEventListener): ) @crewai_event_bus.on(MCPConnectionFailedEvent) - def on_mcp_connection_failed(source, event: MCPConnectionFailedEvent): + def on_mcp_connection_failed(_: Any, event: MCPConnectionFailedEvent) -> None: self.formatter.handle_mcp_connection_failed( event.server_name, event.server_url, @@ -656,7 +687,9 @@ class EventListener(BaseEventListener): ) @crewai_event_bus.on(MCPToolExecutionStartedEvent) - def on_mcp_tool_execution_started(source, event: MCPToolExecutionStartedEvent): + def on_mcp_tool_execution_started( + _: Any, event: MCPToolExecutionStartedEvent + ) -> None: self.formatter.handle_mcp_tool_execution_started( event.server_name, event.tool_name, @@ -665,8 +698,8 @@ class EventListener(BaseEventListener): @crewai_event_bus.on(MCPToolExecutionCompletedEvent) def on_mcp_tool_execution_completed( - source, event: MCPToolExecutionCompletedEvent - ): + _: Any, event: MCPToolExecutionCompletedEvent + ) -> None: self.formatter.handle_mcp_tool_execution_completed( event.server_name, event.tool_name, @@ -676,7 +709,9 @@ class EventListener(BaseEventListener): ) @crewai_event_bus.on(MCPToolExecutionFailedEvent) - def on_mcp_tool_execution_failed(source, event: MCPToolExecutionFailedEvent): + def on_mcp_tool_execution_failed( + _: Any, event: MCPToolExecutionFailedEvent + ) -> None: self.formatter.handle_mcp_tool_execution_failed( event.server_name, event.tool_name,