Enhance Event Listener with Rich Visualization and Improved Logging

This commit is contained in:
Lorenze Jay
2025-03-10 16:43:53 -07:00
parent 7122a29a20
commit f2d53350d8

View File

@@ -1,7 +1,12 @@
from datetime import datetime
from io import StringIO from io import StringIO
from typing import Any, Dict from typing import Any, Dict
from pydantic import Field, PrivateAttr from pydantic import Field, PrivateAttr
from rich.console import Console
from rich.panel import Panel
from rich.text import Text
from rich.tree import Tree
from crewai.task import Task from crewai.task import Task
from crewai.telemetry.telemetry import Telemetry from crewai.telemetry.telemetry import Telemetry
@@ -50,6 +55,12 @@ class EventListener(BaseEventListener):
execution_spans: Dict[Task, Any] = Field(default_factory=dict) execution_spans: Dict[Task, Any] = Field(default_factory=dict)
next_chunk = 0 next_chunk = 0
text_stream = StringIO() text_stream = StringIO()
current_crew_tree = None
current_task_branch = None
current_agent_branch = None
current_tool_branch = None
current_flow_tree = None
current_method_branch = None
def __new__(cls): def __new__(cls):
if cls._instance is None: if cls._instance is None:
@@ -64,61 +75,152 @@ class EventListener(BaseEventListener):
self._telemetry.set_tracer() self._telemetry.set_tracer()
self.execution_spans = {} self.execution_spans = {}
self._initialized = True self._initialized = True
self.console = Console(width=None)
def _format_timestamp(self, timestamp: float) -> str:
return datetime.fromtimestamp(timestamp).strftime("%H:%M:%S")
def _create_panel(self, content: Text, title: str, style: str = "blue") -> Panel:
"""Create a standardized panel with consistent styling."""
return Panel(
content,
title=title,
border_style=style,
padding=(1, 2),
)
def _create_status_content(
self, title: str, name: str, status_style: str = "blue", **fields
) -> Text:
"""Create standardized status content with consistent formatting."""
content = Text()
content.append(f"{title}\n", style=f"{status_style} bold")
content.append("Name: ", style="white")
content.append(f"{name}\n", style=status_style)
for label, value in fields.items():
content.append(f"{label}: ", style="white")
content.append(
f"{value}\n", style=fields.get(f"{label}_style", status_style)
)
return content
def _update_tree_label(
self,
tree: Tree,
prefix: str,
name: str,
style: str = "blue",
status: str = None,
) -> None:
"""Update tree label with consistent formatting."""
label = Text()
label.append(f"{prefix} ", style=f"{style} bold")
label.append(name, style=style)
if status:
label.append("\n Status: ", style="white")
label.append(status, style=f"{style} bold")
tree.label = label
def _add_tree_node(self, parent: Tree, text: str, style: str = "yellow") -> Tree:
"""Add a node to the tree with consistent styling."""
return parent.add(Text(text, style=style))
# ----------- METHODS -----------
def on_crew_start(self, source: Any, event: Any) -> None:
# Create the crew tree that will hold tasks
self.current_crew_tree = Tree(
Text("🚀 Crew: ", style="cyan bold") + Text(event.crew_name, style="cyan")
)
content = Text()
content.append(f"NAME: {event.crew_name}", style="white")
content.append(f"\nID: {source.id}", style="blue")
panel = self._create_panel(content, "Crew Execution Started", "cyan")
self.console.print(panel)
self.console.print() # Add spacing
self._telemetry.crew_execution_span(source, event.inputs)
# ----------- CREW EVENTS ----------- # ----------- CREW EVENTS -----------
def setup_listeners(self, crewai_event_bus): def setup_listeners(self, crewai_event_bus):
@crewai_event_bus.on(CrewKickoffStartedEvent) @crewai_event_bus.on(CrewKickoffStartedEvent)
def on_crew_started(source, event: CrewKickoffStartedEvent): def on_crew_started(source, event: CrewKickoffStartedEvent):
self.logger.log( self.on_crew_start(source, event)
f"🚀 Crew '{event.crew_name}' started, {source.id}",
event.timestamp,
)
self._telemetry.crew_execution_span(source, event.inputs)
@crewai_event_bus.on(CrewKickoffCompletedEvent) @crewai_event_bus.on(CrewKickoffCompletedEvent)
def on_crew_completed(source, event: CrewKickoffCompletedEvent): def on_crew_completed(source, event: CrewKickoffCompletedEvent):
# Handle telemetry
final_string_output = event.output.raw final_string_output = event.output.raw
self._telemetry.end_crew(source, final_string_output) self._telemetry.end_crew(source, final_string_output)
self.logger.log(
f"✅ Crew '{event.crew_name}' completed, {source.id}", if self.current_crew_tree:
event.timestamp, # Update crew tree label to show completion
) crew_content = Text()
crew_content.append("✅ Crew: ", style="green bold")
crew_content.append(event.crew_name or "Crew", style="green")
self.current_crew_tree.label = crew_content
# Create completion panel
completion_content = Text()
completion_content.append(
"Crew Execution Completed\n", style="green bold"
)
completion_content.append("Name: ", style="white")
completion_content.append(f"{event.crew_name}\n", style="green")
completion_content.append("ID: ", style="white")
completion_content.append(str(source.id), style="blue")
# Show final tree and completion panel
self.console.print(self.current_crew_tree)
self.console.print()
panel = self._create_panel(
completion_content, "Crew Completion", "green"
)
self.console.print(panel)
self.console.print()
@crewai_event_bus.on(CrewKickoffFailedEvent) @crewai_event_bus.on(CrewKickoffFailedEvent)
def on_crew_failed(source, event: CrewKickoffFailedEvent): def on_crew_failed(source, event: CrewKickoffFailedEvent):
self.logger.log( if self.current_crew_tree:
f"❌ Crew '{event.crew_name}' failed, {source.id}", # Update crew tree label to show failure
event.timestamp, crew_content = Text()
) crew_content.append("❌ Crew: ", style="red bold")
crew_content.append(event.crew_name or "Crew", style="red")
self.current_crew_tree.label = crew_content
@crewai_event_bus.on(CrewTestStartedEvent) # Create failure panel
def on_crew_test_started(source, event: CrewTestStartedEvent): failure_content = Text()
cloned_crew = source.copy() failure_content.append("Crew Execution Failed\n", style="red bold")
self._telemetry.test_execution_span( failure_content.append("Name: ", style="white")
cloned_crew, failure_content.append(f"{event.crew_name}\n", style="red")
event.n_iterations, failure_content.append("ID: ", style="white")
event.inputs, failure_content.append(str(source.id), style="blue")
event.eval_llm or "",
)
self.logger.log(
f"🚀 Crew '{event.crew_name}' started test, {source.id}",
event.timestamp,
)
@crewai_event_bus.on(CrewTestCompletedEvent) # Show final tree and failure panel
def on_crew_test_completed(source, event: CrewTestCompletedEvent): self.console.print(self.current_crew_tree)
self.logger.log( self.console.print()
f"✅ Crew '{event.crew_name}' completed test",
event.timestamp, panel = self._create_panel(failure_content, "Crew Failure", "red")
) self.console.print(panel)
self.console.print()
@crewai_event_bus.on(CrewTestFailedEvent) @crewai_event_bus.on(CrewTestFailedEvent)
def on_crew_test_failed(source, event: CrewTestFailedEvent): def on_crew_test_failed(source, event: CrewTestFailedEvent):
self.logger.log( # Create test failure content
f"❌ Crew '{event.crew_name}' failed test", failure_content = Text()
event.timestamp, failure_content.append("❌ Crew Test Failed\n", style="red bold")
) failure_content.append("Crew: ", style="white")
failure_content.append(event.crew_name or "Crew", style="red")
panel = self._create_panel(failure_content, "Test Failure", "red")
self.console.print(panel)
self.console.print()
@crewai_event_bus.on(CrewTrainStartedEvent) @crewai_event_bus.on(CrewTrainStartedEvent)
def on_crew_train_started(source, event: CrewTrainStartedEvent): def on_crew_train_started(source, event: CrewTrainStartedEvent):
@@ -136,10 +238,15 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(CrewTrainFailedEvent) @crewai_event_bus.on(CrewTrainFailedEvent)
def on_crew_train_failed(source, event: CrewTrainFailedEvent): def on_crew_train_failed(source, event: CrewTrainFailedEvent):
self.logger.log( # Create training failure content
f"❌ Crew '{event.crew_name}' failed train", failure_content = Text()
event.timestamp, failure_content.append("❌ Crew Training Failed\n", style="red bold")
) failure_content.append("Crew: ", style="white")
failure_content.append(event.crew_name or "Crew", style="red")
panel = self._create_panel(failure_content, "Training Failure", "red")
self.console.print(panel)
self.console.print()
# ----------- TASK EVENTS ----------- # ----------- TASK EVENTS -----------
@@ -148,22 +255,54 @@ class EventListener(BaseEventListener):
span = self._telemetry.task_started(crew=source.agent.crew, task=source) span = self._telemetry.task_started(crew=source.agent.crew, task=source)
self.execution_spans[source] = span self.execution_spans[source] = span
self.logger.log( # Create task content
f"📋 Task started: {source.description}", task_content = Text()
event.timestamp, task_content.append("📋 Task: ", style="yellow bold")
) task_content.append("\n Assigned to: ", style="white")
task_content.append(source.agent.role, style="green")
task_content.append("\n Status: ", style="white")
task_content.append("Waiting for agent...", style="yellow dim")
# Add task to the crew tree
if self.current_crew_tree:
self.current_task_branch = self.current_crew_tree.add(task_content)
self.console.print(self.current_crew_tree)
else:
panel = self._create_panel(task_content, "Task Started", "yellow")
self.console.print(panel)
self.console.print()
@crewai_event_bus.on(TaskCompletedEvent) @crewai_event_bus.on(TaskCompletedEvent)
def on_task_completed(source, event: TaskCompletedEvent): def on_task_completed(source, event: TaskCompletedEvent):
# Handle telemetry
span = self.execution_spans.get(source) span = self.execution_spans.get(source)
if span: if span:
self._telemetry.task_ended(span, source, source.agent.crew) self._telemetry.task_ended(span, source, source.agent.crew)
self.logger.log(
f"✅ Task completed: {source.description}",
event.timestamp,
)
self.execution_spans[source] = None self.execution_spans[source] = None
# Create completion content
completion_content = Text()
completion_content.append("✅ Task Completed\n", style="green bold")
completion_content.append(f"Task: {str(source.id)}", style="white")
completion_content.append("\nAgent: ", style="white")
completion_content.append(source.agent.role, style="green")
# Update the tree if it exists
if self.current_crew_tree:
# Find the task branch and update it with completion status
for branch in self.current_crew_tree.children:
if source.description in branch.label:
branch.label = Text("", style="green bold") + branch.label
self.console.print(self.current_crew_tree)
break
# Always show completion panel
panel = self._create_panel(completion_content, "Task Completion", "green")
self.console.print(panel)
self.console.print() # Add spacing
@crewai_event_bus.on(TaskFailedEvent) @crewai_event_bus.on(TaskFailedEvent)
def on_task_failed(source, event: TaskFailedEvent): def on_task_failed(source, event: TaskFailedEvent):
span = self.execution_spans.get(source) span = self.execution_spans.get(source)
@@ -171,122 +310,269 @@ class EventListener(BaseEventListener):
if source.agent and source.agent.crew: if source.agent and source.agent.crew:
self._telemetry.task_ended(span, source, source.agent.crew) self._telemetry.task_ended(span, source, source.agent.crew)
self.execution_spans[source] = None self.execution_spans[source] = None
self.logger.log(
f"❌ Task failed: {source.description}", # Create failure content
event.timestamp, failure_content = Text()
) failure_content.append("❌ Task Failed\n", style="red bold")
failure_content.append("Task: ", style="white")
failure_content.append(source.description, style="red")
if source.agent:
failure_content.append("\nAgent: ", style="white")
failure_content.append(source.agent.role, style="red")
# Update the tree if it exists
if self.current_crew_tree:
# Find the task branch and update it with failure status
for branch in self.current_crew_tree.children:
if source.description in branch.label:
branch.label = Text("", style="red bold") + branch.label
self.console.print(self.current_crew_tree)
break
# Show failure panel
panel = self._create_panel(failure_content, "Task Failure", "red")
self.console.print(panel)
self.console.print()
# ----------- AGENT EVENTS ----------- # ----------- AGENT EVENTS -----------
@crewai_event_bus.on(AgentExecutionStartedEvent) @crewai_event_bus.on(AgentExecutionStartedEvent)
def on_agent_execution_started(source, event: AgentExecutionStartedEvent): def on_agent_execution_started(source, event: AgentExecutionStartedEvent):
self.logger.log( if self.current_task_branch:
f"🤖 Agent '{event.agent.role}' started task", # Create agent execution branch
event.timestamp, agent_content = Text()
) agent_content.append("🤖 Agent: ", style="green bold")
agent_content.append(event.agent.role, style="green")
agent_content.append("\n Status: ", style="white")
agent_content.append("In Progress", style="blue bold")
# Create a branch for the agent's activities
self.current_agent_branch = self.current_task_branch.add(agent_content)
self.console.print(self.current_crew_tree)
self.console.print()
@crewai_event_bus.on(AgentExecutionCompletedEvent) @crewai_event_bus.on(AgentExecutionCompletedEvent)
def on_agent_execution_completed(source, event: AgentExecutionCompletedEvent): def on_agent_execution_completed(source, event: AgentExecutionCompletedEvent):
self.logger.log( if self.current_agent_branch:
f"✅ Agent '{event.agent.role}' completed task", # Update agent branch to show completion
event.timestamp, agent_content = Text()
) agent_content.append("🤖 Agent: ", style="green bold")
agent_content.append(event.agent.role, style="green")
agent_content.append("\n Status: ", style="white")
agent_content.append("✅ Completed", style="green bold")
# Update the agent branch label
self.current_agent_branch.label = agent_content
self.console.print(self.current_crew_tree)
self.console.print()
# ----------- FLOW EVENTS ----------- # ----------- FLOW EVENTS -----------
@crewai_event_bus.on(FlowCreatedEvent) @crewai_event_bus.on(FlowCreatedEvent)
def on_flow_created(source, event: FlowCreatedEvent): def on_flow_created(source, event: FlowCreatedEvent):
self._telemetry.flow_creation_span(event.flow_name) self._telemetry.flow_creation_span(event.flow_name)
self.logger.log(
f"🌊 Flow Created: '{event.flow_name}'", # Create flow content for panel
event.timestamp, content = Text()
content.append("🌊 Starting Flow Execution\n\n", style="blue bold")
content.append("Name: ", style="white")
content.append(event.flow_name, style="blue")
panel = self._create_panel(content, "Flow Execution", "blue")
self.console.print()
self.console.print(panel)
self.console.print()
# Create and display the initial tree
flow_label = Text()
flow_label.append("🌊 Flow: ", style="blue bold")
flow_label.append(event.flow_name, style="blue")
self.current_flow_tree = Tree(flow_label)
# Add both creation steps to show progression
self.current_flow_tree.add(Text("✨ Created", style="blue"))
self.current_flow_tree.add(
Text("✅ Initialization Complete", style="green")
) )
self.console.print(self.current_flow_tree)
self.console.print()
@crewai_event_bus.on(FlowStartedEvent) @crewai_event_bus.on(FlowStartedEvent)
def on_flow_started(source, event: FlowStartedEvent): def on_flow_started(source, event: FlowStartedEvent):
self._telemetry.flow_execution_span( self._telemetry.flow_execution_span(
event.flow_name, list(source._methods.keys()) event.flow_name, list(source._methods.keys())
) )
self.logger.log(
f"🤖 Flow Started: '{event.flow_name}', {source.flow_id}", # Create flow tree with status
event.timestamp, flow_label = Text()
) flow_label.append("🌊 Flow: ", style="blue bold")
flow_label.append(event.flow_name, style="blue")
flow_label.append("\n Status: ", style="white")
flow_label.append("In Progress", style="yellow")
self.current_flow_tree = Tree(flow_label)
# Add initial thinking state
self.current_flow_tree.add(Text("🧠 Initializing...", style="yellow"))
self.console.print()
self.console.print(self.current_flow_tree)
self.console.print()
@crewai_event_bus.on(FlowFinishedEvent) @crewai_event_bus.on(FlowFinishedEvent)
def on_flow_finished(source, event: FlowFinishedEvent): def on_flow_finished(source, event: FlowFinishedEvent):
self.logger.log( if self.current_flow_tree:
f"👍 Flow Finished: '{event.flow_name}', {source.flow_id}", # Update flow tree label to show completion
event.timestamp, self.current_flow_tree.label = Text(
) "✅ Flow Finished: ", style="green bold"
) + Text(event.flow_name, style="green")
# Create completion content
content = Text()
content.append("Flow Execution Completed\n", style="green bold")
content.append("Name: ", style="white")
content.append(f"{event.flow_name}\n", style="green")
content.append("ID: ", style="white")
content.append(source.flow_id, style="blue")
panel = self._create_panel(content, "Flow Completion", "green")
self.console.print(panel)
self.console.print()
@crewai_event_bus.on(MethodExecutionStartedEvent) @crewai_event_bus.on(MethodExecutionStartedEvent)
def on_method_execution_started(source, event: MethodExecutionStartedEvent): def on_method_execution_started(source, event: MethodExecutionStartedEvent):
self.logger.log( if self.current_flow_tree:
f"🤖 Flow Method Started: '{event.method_name}'", # Find and update the method branch
event.timestamp, for branch in self.current_flow_tree.children:
) if event.method_name in branch.label:
self.current_method_branch = branch
branch.label = Text("🔄 Running: ", style="yellow bold") + Text(
event.method_name, style="yellow"
)
break
@crewai_event_bus.on(MethodExecutionFailedEvent) self.console.print(self.current_flow_tree)
def on_method_execution_failed(source, event: MethodExecutionFailedEvent): self.console.print()
self.logger.log(
f"❌ Flow Method Failed: '{event.method_name}'",
event.timestamp,
)
@crewai_event_bus.on(MethodExecutionFinishedEvent) @crewai_event_bus.on(MethodExecutionFinishedEvent)
def on_method_execution_finished(source, event: MethodExecutionFinishedEvent): def on_method_execution_finished(source, event: MethodExecutionFinishedEvent):
self.logger.log( if self.current_method_branch:
f"👍 Flow Method Finished: '{event.method_name}'", # Update method status
event.timestamp, self.current_method_branch.label = Text(
) "✅ Completed: ", style="green bold"
) + Text(event.method_name, style="green")
self.console.print(self.current_flow_tree)
self.console.print()
@crewai_event_bus.on(MethodExecutionFailedEvent)
def on_method_execution_failed(source, event: MethodExecutionFailedEvent):
if self.current_method_branch:
# Update method status to show failure
self.current_method_branch.label = Text(
"❌ Failed: ", style="red bold"
) + Text(event.method_name, style="red")
self.console.print(self.current_flow_tree)
self.console.print()
# ----------- TOOL USAGE EVENTS ----------- # ----------- TOOL USAGE EVENTS -----------
@crewai_event_bus.on(ToolUsageStartedEvent) @crewai_event_bus.on(ToolUsageStartedEvent)
def on_tool_usage_started(source, event: ToolUsageStartedEvent): def on_tool_usage_started(source, event: ToolUsageStartedEvent):
self.logger.log( # Create tool usage content
f"🤖 Tool Usage Started: '{event.tool_name}'", tool_content = Text()
event.timestamp, tool_content.append("🔧 Using ", style="yellow bold")
) tool_content.append(event.tool_name, style="yellow")
# Add to tree under the agent branch
if self.current_agent_branch:
self.current_tool_branch = self.current_agent_branch.add(tool_content)
self.console.print(self.current_crew_tree)
self.console.print()
@crewai_event_bus.on(ToolUsageFinishedEvent) @crewai_event_bus.on(ToolUsageFinishedEvent)
def on_tool_usage_finished(source, event: ToolUsageFinishedEvent): def on_tool_usage_finished(source, event: ToolUsageFinishedEvent):
self.logger.log( # Create completion content
f"✅ Tool Usage Finished: '{event.tool_name}'", completion_content = Text()
event.timestamp, completion_content.append("🔧 Used ", style="green bold")
# completion_content.append(event.tool_name, style="green")
)
# Update under the agent branch
if self.current_tool_branch:
self.current_tool_branch.label = completion_content
self.console.print(self.current_crew_tree)
self.console.print()
@crewai_event_bus.on(ToolUsageErrorEvent) @crewai_event_bus.on(ToolUsageErrorEvent)
def on_tool_usage_error(source, event: ToolUsageErrorEvent): def on_tool_usage_error(source, event: ToolUsageErrorEvent):
self.logger.log( # Create tool error content
f"❌ Tool Usage Error: '{event.tool_name}'", error_content = Text()
event.timestamp, error_content.append("🔧 Tool Failed: ", style="red bold")
# error_content.append(event.tool_name, style="red")
# Update under the agent branch
if self.current_tool_branch:
self.current_tool_branch.label = error_content
self.console.print(self.current_crew_tree)
self.console.print()
# Show error panel
panel = self._create_panel(
Text(
f"Tool usage failed: {event.tool_name}: {event.error}", style="red"
),
"Tool Error",
"red",
) )
self.console.print(panel)
self.console.print()
# ----------- LLM EVENTS ----------- # ----------- LLM EVENTS -----------
@crewai_event_bus.on(LLMCallStartedEvent) @crewai_event_bus.on(LLMCallStartedEvent)
def on_llm_call_started(source, event: LLMCallStartedEvent): def on_llm_call_started(source, event: LLMCallStartedEvent):
self.logger.log( # Create simple LLM call content
f"🤖 LLM Call Started", llm_content = Text()
event.timestamp, llm_content.append("🧠 Thinking...", style="blue bold")
)
# Add to tree under the agent branch
if self.current_agent_branch:
self.current_tool_branch = self.current_agent_branch.add(llm_content)
self.console.print(self.current_crew_tree)
self.console.print()
@crewai_event_bus.on(LLMCallCompletedEvent) @crewai_event_bus.on(LLMCallCompletedEvent)
def on_llm_call_completed(source, event: LLMCallCompletedEvent): def on_llm_call_completed(source, event: LLMCallCompletedEvent):
self.logger.log( # Create simple completion content
f"✅ LLM Call Completed", completion_content = Text()
event.timestamp, completion_content.append("✨ Done", style="green bold")
)
# Update under the agent branch
if self.current_tool_branch:
self.current_tool_branch.label = completion_content
self.console.print(self.current_crew_tree)
self.console.print()
@crewai_event_bus.on(LLMCallFailedEvent) @crewai_event_bus.on(LLMCallFailedEvent)
def on_llm_call_failed(source, event: LLMCallFailedEvent): def on_llm_call_failed(source, event: LLMCallFailedEvent):
self.logger.log( # Create LLM error content
f"❌ LLM call failed: {event.error}", error_content = Text()
event.timestamp, error_content.append("❌ LLM Call Failed\n", style="red bold")
) error_content.append("Error: ", style="white")
error_content.append(str(event.error), style="red")
# Update under the agent branch if it exists
if self.current_tool_branch:
self.current_tool_branch.label = Text("❌ LLM Failed", style="red bold")
self.console.print(self.current_crew_tree)
self.console.print()
# Show error panel
panel = self._create_panel(error_content, "LLM Error", "red")
self.console.print(panel)
self.console.print()
@crewai_event_bus.on(LLMStreamChunkEvent) @crewai_event_bus.on(LLMStreamChunkEvent)
def on_llm_stream_chunk(source, event: LLMStreamChunkEvent): def on_llm_stream_chunk(source, event: LLMStreamChunkEvent):
@@ -299,5 +585,77 @@ class EventListener(BaseEventListener):
print(content, end="", flush=True) print(content, end="", flush=True)
self.next_chunk = self.text_stream.tell() self.next_chunk = self.text_stream.tell()
@crewai_event_bus.on(CrewTestStartedEvent)
def on_crew_test_started(source, event: CrewTestStartedEvent):
cloned_crew = source.copy()
self._telemetry.test_execution_span(
cloned_crew,
event.n_iterations,
event.inputs,
event.eval_llm or "",
)
# Create test content for panel
content = Text()
content.append("🧪 Starting Crew Test\n\n", style="blue bold")
content.append("Crew: ", style="white")
content.append(f"{event.crew_name}\n", style="blue")
content.append("ID: ", style="white")
content.append(str(source.id), style="blue")
content.append("\nIterations: ", style="white")
content.append(str(event.n_iterations), style="yellow")
panel = self._create_panel(content, "Test Execution", "blue")
self.console.print()
self.console.print(panel)
self.console.print()
# Create and display the test tree
test_label = Text()
test_label.append("🧪 Test: ", style="blue bold")
test_label.append(event.crew_name or "Crew", style="blue")
test_label.append("\n Status: ", style="white")
test_label.append("In Progress", style="yellow")
self.current_flow_tree = Tree(test_label)
self.current_flow_tree.add(Text("🔄 Running tests...", style="yellow"))
self.console.print(self.current_flow_tree)
self.console.print()
@crewai_event_bus.on(CrewTestCompletedEvent)
def on_crew_test_completed(source, event: CrewTestCompletedEvent):
if self.current_flow_tree:
# Update test tree label to show completion
test_label = Text()
test_label.append("✅ Test: ", style="green bold")
test_label.append(event.crew_name or "Crew", style="green")
test_label.append("\n Status: ", style="white")
test_label.append("Completed", style="green bold")
self.current_flow_tree.label = test_label
# Update the running tests node
for child in self.current_flow_tree.children:
if "Running tests" in str(child.label):
child.label = Text(
"✅ Tests completed successfully", style="green"
)
self.console.print(self.current_flow_tree)
self.console.print()
# Create completion panel
completion_content = Text()
completion_content.append("Test Execution Completed\n", style="green bold")
completion_content.append("Crew: ", style="white")
completion_content.append(f"{event.crew_name}\n", style="green")
completion_content.append("Status: ", style="white")
completion_content.append("All tests passed", style="green")
panel = self._create_panel(completion_content, "Test Completion", "green")
self.console.print(panel)
self.console.print()
event_listener = EventListener() event_listener = EventListener()