Refactor EventListener to utilize ConsoleFormatter for improved logging and visualization

This commit is contained in:
Lorenze Jay
2025-03-12 12:52:38 -07:00
parent 83c5dd1925
commit 0e521fbf2e
3 changed files with 710 additions and 476 deletions

View File

@@ -251,7 +251,9 @@ class Crew(BaseModel):
"""Set private attributes."""
self._cache_handler = CacheHandler()
EventListener().verbose = self.verbose
event_listener = EventListener()
event_listener.verbose = self.verbose
event_listener.formatter.verbose = self.verbose
self._logger = Logger(verbose=self.verbose)
if self.output_log_file:
self._file_handler = FileHandler(self.output_log_file)

View File

@@ -1,11 +1,7 @@
from datetime import datetime
from io import StringIO
from typing import Any, Dict, Optional
from pydantic import Field, PrivateAttr
from rich.console import Console
from rich.panel import Panel
from rich.text import Text
from rich.tree import Tree
from crewai.task import Task
@@ -19,6 +15,7 @@ from crewai.utilities.events.llm_events import (
LLMCallStartedEvent,
LLMStreamChunkEvent,
)
from crewai.utilities.events.utils.console_formatter import ConsoleFormatter
from .agent_events import AgentExecutionCompletedEvent, AgentExecutionStartedEvent
from .crew_events import (
@@ -55,13 +52,6 @@ class EventListener(BaseEventListener):
execution_spans: Dict[Task, Any] = Field(default_factory=dict)
next_chunk = 0
text_stream = StringIO()
current_crew_tree: Optional[Tree] = None
current_task_branch: Optional[Tree] = None
current_agent_branch: Optional[Tree] = None
current_tool_branch: Optional[Tree] = None
current_flow_tree: Optional[Tree] = None
current_method_branch: Optional[Tree] = None
tool_usage_counts: Dict[str, int] = {}
def __new__(cls):
if cls._instance is None:
@@ -76,177 +66,81 @@ class EventListener(BaseEventListener):
self._telemetry.set_tracer()
self.execution_spans = {}
self._initialized = True
self.console = Console(width=None)
self.tool_usage_counts = {}
self.formatter = ConsoleFormatter()
def _create_panel(self, content: Text, title: str, style: str = "blue") -> Panel:
"""Create a standardized panel with consistent styling."""
return Panel(
content,
title=title,
border_style=style,
padding=(1, 2),
)
@property
def current_crew_tree(self) -> Optional[Tree]:
return self.formatter.current_crew_tree
def _create_status_content(
self, title: str, name: str, status_style: str = "blue", **fields
) -> Text:
"""Create standardized status content with consistent formatting."""
content = Text()
content.append(f"{title}\n", style=f"{status_style} bold")
content.append("Name: ", style="white")
content.append(f"{name}\n", style=status_style)
@property
def current_task_branch(self) -> Optional[Tree]:
return self.formatter.current_task_branch
for label, value in fields.items():
content.append(f"{label}: ", style="white")
content.append(
f"{value}\n", style=fields.get(f"{label}_style", status_style)
)
@property
def current_agent_branch(self) -> Optional[Tree]:
return self.formatter.current_agent_branch
return content
@property
def current_tool_branch(self) -> Optional[Tree]:
return self.formatter.current_tool_branch
def _update_tree_label(
self,
tree: Tree,
prefix: str,
name: str,
style: str = "blue",
status: Optional[str] = None,
) -> None:
"""Update tree label with consistent formatting."""
label = Text()
label.append(f"{prefix} ", style=f"{style} bold")
label.append(name, style=style)
if status:
label.append("\n Status: ", style="white")
label.append(status, style=f"{style} bold")
tree.label = label
@property
def current_flow_tree(self) -> Optional[Tree]:
return self.formatter.current_flow_tree
def _add_tree_node(self, parent: Tree, text: str, style: str = "yellow") -> Tree:
"""Add a node to the tree with consistent styling."""
return parent.add(Text(text, style=style))
# ----------- METHODS -----------
def on_crew_start(self, source: Any, event: Any) -> None:
if self.verbose:
self.current_crew_tree = Tree(
Text("🚀 Crew: ", style="cyan bold")
+ Text(event.crew_name, style="cyan")
)
content = self._create_status_content(
"Crew Execution Started",
event.crew_name,
"cyan",
ID=source.id,
)
panel = self._create_panel(content, "Crew Execution Started", "cyan")
self.console.print(panel)
self.console.print()
self._telemetry.crew_execution_span(source, event.inputs)
@property
def current_method_branch(self) -> Optional[Tree]:
return self.formatter.current_method_branch
# ----------- CREW EVENTS -----------
def setup_listeners(self, crewai_event_bus):
@crewai_event_bus.on(CrewKickoffStartedEvent)
def on_crew_started(source, event: CrewKickoffStartedEvent):
self.on_crew_start(source, event)
self.formatter.current_crew_tree = self.formatter.create_crew_tree(
event.crew_name or "Crew", source.id
)
self._telemetry.crew_execution_span(source, event.inputs)
@crewai_event_bus.on(CrewKickoffCompletedEvent)
def on_crew_completed(source, event: CrewKickoffCompletedEvent):
# Handle telemetry
final_string_output = event.output.raw
self._telemetry.end_crew(source, final_string_output)
if self.verbose:
if self.current_crew_tree:
self._update_tree_label(
self.current_crew_tree,
"✅ Crew:",
event.crew_name or "Crew",
"green",
)
completion_content = self._create_status_content(
"Crew Execution Completed",
event.crew_name or "Crew",
"green",
ID=source.id,
)
self.console.print(self.current_crew_tree)
self.console.print()
panel = self._create_panel(
completion_content, "Crew Completion", "green"
)
self.console.print(panel)
self.console.print()
if self.current_crew_tree:
self.formatter.update_crew_tree(
self.current_crew_tree,
event.crew_name or "Crew",
source.id,
"completed",
)
@crewai_event_bus.on(CrewKickoffFailedEvent)
def on_crew_failed(source, event: CrewKickoffFailedEvent):
if self.verbose:
if self.current_crew_tree:
# Update crew tree label to show failure
crew_content = Text()
crew_content.append("❌ Crew: ", style="red bold")
crew_content.append(event.crew_name or "Crew", style="red")
self.current_crew_tree.label = crew_content
# Create failure panel
failure_content = Text()
failure_content.append("Crew Execution Failed\n", style="red bold")
failure_content.append("Name: ", style="white")
failure_content.append(f"{event.crew_name}\n", style="red")
failure_content.append("ID: ", style="white")
failure_content.append(str(source.id), style="blue")
# Show final tree and failure panel
self.console.print(self.current_crew_tree)
self.console.print()
panel = self._create_panel(failure_content, "Crew Failure", "red")
self.console.print(panel)
self.console.print()
@crewai_event_bus.on(CrewTestFailedEvent)
def on_crew_test_failed(source, event: CrewTestFailedEvent):
if self.verbose:
failure_content = Text()
failure_content.append("❌ Crew Test Failed\n", style="red bold")
failure_content.append("Crew: ", style="white")
failure_content.append(event.crew_name or "Crew", style="red")
panel = self._create_panel(failure_content, "Test Failure", "red")
self.console.print(panel)
self.console.print()
if self.current_crew_tree:
self.formatter.update_crew_tree(
self.current_crew_tree,
event.crew_name or "Crew",
source.id,
"failed",
)
@crewai_event_bus.on(CrewTrainStartedEvent)
def on_crew_train_started(source, event: CrewTrainStartedEvent):
self.logger.log(
f"📋 Crew '{event.crew_name}' started train",
event.timestamp,
self.formatter.handle_crew_train_started(
event.crew_name or "Crew", str(event.timestamp)
)
@crewai_event_bus.on(CrewTrainCompletedEvent)
def on_crew_train_completed(source, event: CrewTrainCompletedEvent):
self.logger.log(
f"✅ Crew '{event.crew_name}' completed train",
event.timestamp,
self.formatter.handle_crew_train_completed(
event.crew_name or "Crew", str(event.timestamp)
)
@crewai_event_bus.on(CrewTrainFailedEvent)
def on_crew_train_failed(source, event: CrewTrainFailedEvent):
if self.verbose:
failure_content = Text()
failure_content.append("❌ Crew Training Failed\n", style="red bold")
failure_content.append("Crew: ", style="white")
failure_content.append(event.crew_name or "Crew", style="red")
panel = self._create_panel(failure_content, "Training Failure", "red")
self.console.print(panel)
self.console.print()
self.formatter.handle_crew_train_failed(event.crew_name or "Crew")
# ----------- TASK EVENTS -----------
@@ -254,22 +148,9 @@ class EventListener(BaseEventListener):
def on_task_started(source, event: TaskStartedEvent):
span = self._telemetry.task_started(crew=source.agent.crew, task=source)
self.execution_spans[source] = span
if self.verbose:
task_content = Text()
task_content.append(f"📋 Task: {source.id}", style="yellow bold")
task_content.append("\n Status: ", style="white")
task_content.append("Executing Task...", style="yellow dim")
# Add task to the crew tree
if self.current_crew_tree:
self.current_task_branch = self.current_crew_tree.add(task_content)
self.console.print(self.current_crew_tree)
else:
panel = self._create_panel(task_content, "Task Started", "yellow")
self.console.print(panel)
self.console.print()
self.formatter.current_task_branch = self.formatter.create_task_branch(
self.formatter.current_crew_tree, source.id
)
@crewai_event_bus.on(TaskCompletedEvent)
def on_task_completed(source, event: TaskCompletedEvent):
@@ -279,32 +160,11 @@ class EventListener(BaseEventListener):
self._telemetry.task_ended(span, source, source.agent.crew)
self.execution_spans[source] = None
if self.verbose:
if self.current_crew_tree:
for branch in self.current_crew_tree.children:
if str(source.id) in str(branch.label):
task_content = Text()
task_content.append(
f"📋 Task: {source.id}", style="green bold"
)
task_content.append("\n Assigned to: ", style="white")
task_content.append(source.agent.role, style="green")
task_content.append("\n Status: ", style="white")
task_content.append("✅ Completed", style="green bold")
branch.label = task_content
self.console.print(self.current_crew_tree)
break
completion_content = self._create_status_content(
"Task Completed", str(source.id), "green", Agent=source.agent.role
if self.current_crew_tree:
self.formatter.update_task_status(
self.current_crew_tree, source.id, source.agent.role, "completed"
)
panel = self._create_panel(
completion_content, "Task Completion", "green"
)
self.console.print(panel)
self.console.print()
@crewai_event_bus.on(TaskFailedEvent)
def on_task_failed(source, event: TaskFailedEvent):
span = self.execution_spans.get(source)
@@ -313,284 +173,121 @@ class EventListener(BaseEventListener):
self._telemetry.task_ended(span, source, source.agent.crew)
self.execution_spans[source] = None
if self.verbose:
failure_content = Text()
failure_content.append("❌ Task Failed\n", style="red bold")
failure_content.append(f"Task: {source.id}", style="white")
failure_content.append(source.description, style="red")
if source.agent:
failure_content.append("\nAgent: ", style="white")
failure_content.append(source.agent.role, style="red")
# Update the tree if it exists
if self.current_crew_tree:
# Find the task branch and update it with failure status
for branch in self.current_crew_tree.children:
if source.description in branch.label:
branch.label = Text("", style="red bold") + branch.label
self.console.print(self.current_crew_tree)
break
# Show failure panel
panel = self._create_panel(failure_content, "Task Failure", "red")
self.console.print(panel)
self.console.print()
if self.current_crew_tree:
self.formatter.update_task_status(
self.current_crew_tree, source.id, source.agent.role, "failed"
)
# ----------- AGENT EVENTS -----------
@crewai_event_bus.on(AgentExecutionStartedEvent)
def on_agent_execution_started(source, event: AgentExecutionStartedEvent):
if self.verbose:
if self.current_task_branch:
# Create agent execution branch with empty label
self.current_agent_branch = self.current_task_branch.add("")
self._update_tree_label(
self.current_agent_branch,
"🤖 Agent:",
event.agent.role,
"green",
"In Progress",
)
self.console.print(self.current_crew_tree)
self.console.print()
self.formatter.current_agent_branch = self.formatter.create_agent_branch(
self.formatter.current_task_branch,
event.agent.role,
self.formatter.current_crew_tree,
)
@crewai_event_bus.on(AgentExecutionCompletedEvent)
def on_agent_execution_completed(source, event: AgentExecutionCompletedEvent):
if self.verbose:
if self.current_agent_branch:
self._update_tree_label(
self.current_agent_branch,
"🤖 Agent:",
event.agent.role,
"green",
"✅ Completed",
)
self.console.print(self.current_crew_tree)
self.console.print()
if self.current_agent_branch and self.current_crew_tree:
self.formatter.update_agent_status(
self.current_agent_branch, event.agent.role, self.current_crew_tree
)
# ----------- FLOW EVENTS -----------
@crewai_event_bus.on(FlowCreatedEvent)
def on_flow_created(source, event: FlowCreatedEvent):
self._telemetry.flow_creation_span(event.flow_name)
content = self._create_status_content(
"Starting Flow Execution", event.flow_name, "blue"
)
panel = self._create_panel(content, "Flow Execution", "blue")
self.console.print()
self.console.print(panel)
self.console.print()
# Create and display the initial tree
flow_label = Text()
flow_label.append("🌊 Flow: ", style="blue bold")
flow_label.append(event.flow_name, style="blue")
self.current_flow_tree = Tree(flow_label)
self._add_tree_node(self.current_flow_tree, "✨ Created", "blue")
self._add_tree_node(
self.current_flow_tree, "✅ Initialization Complete", "green"
)
self.console.print(self.current_flow_tree)
self.console.print()
self.formatter.create_flow_tree(event.flow_name)
@crewai_event_bus.on(FlowStartedEvent)
def on_flow_started(source, event: FlowStartedEvent):
self._telemetry.flow_execution_span(
event.flow_name, list(source._methods.keys())
)
self.current_flow_tree = Tree("")
self._update_tree_label(
self.current_flow_tree,
"🌊 Flow:",
event.flow_name,
"blue",
"In Progress",
)
self._add_tree_node(self.current_flow_tree, "🧠 Initializing...", "yellow")
self.console.print()
self.console.print(self.current_flow_tree)
self.console.print()
self.formatter.start_flow(event.flow_name)
@crewai_event_bus.on(FlowFinishedEvent)
def on_flow_finished(source, event: FlowFinishedEvent):
if self.current_flow_tree:
self._update_tree_label(
self.current_flow_tree,
"✅ Flow Finished:",
event.flow_name,
"green",
self.formatter.update_flow_status(
self.current_flow_tree, event.flow_name, source.flow_id
)
content = self._create_status_content(
"Flow Execution Completed",
event.flow_name,
"green",
ID=source.flow_id,
)
panel = self._create_panel(content, "Flow Completion", "green")
self.console.print(panel)
self.console.print()
@crewai_event_bus.on(MethodExecutionStartedEvent)
def on_method_execution_started(source, event: MethodExecutionStartedEvent):
if self.current_flow_tree:
# Find and update the method branch
for branch in self.current_flow_tree.children:
if event.method_name in branch.label:
self.current_method_branch = branch
branch.label = Text("🔄 Running: ", style="yellow bold") + Text(
event.method_name, style="yellow"
)
break
self.console.print(self.current_flow_tree)
self.console.print()
self.formatter.update_method_status(
self.current_method_branch,
self.current_flow_tree,
event.method_name,
"running",
)
@crewai_event_bus.on(MethodExecutionFinishedEvent)
def on_method_execution_finished(source, event: MethodExecutionFinishedEvent):
if self.current_method_branch:
# Update method status
self.current_method_branch.label = Text(
"✅ Completed: ", style="green bold"
) + Text(event.method_name, style="green")
self.console.print(self.current_flow_tree)
self.console.print()
self.formatter.update_method_status(
self.current_method_branch,
self.current_flow_tree,
event.method_name,
"completed",
)
@crewai_event_bus.on(MethodExecutionFailedEvent)
def on_method_execution_failed(source, event: MethodExecutionFailedEvent):
if self.current_method_branch:
self.current_method_branch.label = Text(
"❌ Failed: ", style="red bold"
) + Text(event.method_name, style="red")
self.console.print(self.current_flow_tree)
self.console.print()
if self.current_method_branch and self.current_flow_tree:
self.formatter.update_method_status(
self.current_method_branch,
self.current_flow_tree,
event.method_name,
"failed",
)
# ----------- TOOL USAGE EVENTS -----------
@crewai_event_bus.on(ToolUsageStartedEvent)
def on_tool_usage_started(source, event: ToolUsageStartedEvent):
if self.verbose and self.current_agent_branch:
# Update tool usage count
self.tool_usage_counts[event.tool_name] = (
self.tool_usage_counts.get(event.tool_name, 0) + 1
)
# Find existing tool node or create new one
tool_node = None
for child in self.current_agent_branch.children:
if event.tool_name in child.label.plain:
tool_node = child
break
if not tool_node:
# Create new tool node
self.current_tool_branch = self.current_agent_branch.add("")
else:
self.current_tool_branch = tool_node
# Update label with current count
self._update_tree_label(
self.current_tool_branch,
"🔧",
f"Using {event.tool_name} ({self.tool_usage_counts[event.tool_name]})",
"yellow",
)
self.console.print(self.current_crew_tree)
self.console.print()
self.formatter.handle_tool_usage_started(
self.current_agent_branch, event.tool_name, self.current_crew_tree
)
@crewai_event_bus.on(ToolUsageFinishedEvent)
def on_tool_usage_finished(source, event: ToolUsageFinishedEvent):
if self.verbose and self.current_tool_branch:
self._update_tree_label(
self.current_tool_branch,
"🔧",
f"Used {event.tool_name} ({self.tool_usage_counts[event.tool_name]})",
"green",
)
self.console.print(self.current_crew_tree)
self.console.print()
self.formatter.handle_tool_usage_finished(
self.current_tool_branch, event.tool_name, self.current_crew_tree
)
@crewai_event_bus.on(ToolUsageErrorEvent)
def on_tool_usage_error(source, event: ToolUsageErrorEvent):
if self.verbose:
if self.current_tool_branch:
self._update_tree_label(
self.current_tool_branch,
"🔧 Failed",
f"{event.tool_name} ({self.tool_usage_counts[event.tool_name]})",
"red",
)
self.console.print(self.current_crew_tree)
self.console.print()
# Show error panel
error_content = self._create_status_content(
"Tool Usage Failed", event.tool_name, "red", Error=event.error
)
panel = self._create_panel(error_content, "Tool Error", "red")
self.console.print(panel)
self.console.print()
self.formatter.handle_tool_usage_error(
self.current_tool_branch,
event.tool_name,
event.error,
self.current_crew_tree,
)
# ----------- LLM EVENTS -----------
@crewai_event_bus.on(LLMCallStartedEvent)
def on_llm_call_started(source, event: LLMCallStartedEvent):
if self.verbose and self.current_agent_branch:
if not any(
"Thinking" in str(child.label)
for child in self.current_agent_branch.children
):
self.current_tool_branch = self.current_agent_branch.add("")
self._update_tree_label(
self.current_tool_branch, "🧠", "Thinking...", "blue"
)
self.console.print(self.current_crew_tree)
self.console.print()
self.formatter.handle_llm_call_started(
self.current_agent_branch, self.current_crew_tree
)
@crewai_event_bus.on(LLMCallCompletedEvent)
def on_llm_call_completed(source, event: LLMCallCompletedEvent):
if self.verbose and self.current_tool_branch:
# Remove the thinking status node when complete
if "Thinking" in str(self.current_tool_branch.label):
if self.current_agent_branch:
self.current_agent_branch.children.remove(
self.current_tool_branch
)
self.console.print(self.current_crew_tree)
self.console.print()
self.formatter.handle_llm_call_completed(
self.current_tool_branch,
self.current_agent_branch,
self.current_crew_tree,
)
@crewai_event_bus.on(LLMCallFailedEvent)
def on_llm_call_failed(source, event: LLMCallFailedEvent):
if self.verbose:
error_content = Text()
error_content.append("❌ LLM Call Failed\n", style="red bold")
error_content.append("Error: ", style="white")
error_content.append(str(event.error), style="red")
# Update under the agent branch if it exists
if self.current_tool_branch:
self.current_tool_branch.label = Text(
"❌ LLM Failed", style="red bold"
)
self.console.print(self.current_crew_tree)
self.console.print()
# Show error panel
panel = self._create_panel(error_content, "LLM Error", "red")
self.console.print(panel)
self.console.print()
self.formatter.handle_llm_call_failed(
self.current_tool_branch, event.error, self.current_crew_tree
)
@crewai_event_bus.on(LLMStreamChunkEvent)
def on_llm_stream_chunk(source, event: LLMStreamChunkEvent):
@@ -613,74 +310,20 @@ class EventListener(BaseEventListener):
event.eval_llm or "",
)
if self.verbose:
content = Text()
content.append("🧪 Starting Crew Test\n\n", style="blue bold")
content.append("Crew: ", style="white")
content.append(f"{event.crew_name}\n", style="blue")
content.append("ID: ", style="white")
content.append(str(source.id), style="blue")
content.append("\nIterations: ", style="white")
content.append(str(event.n_iterations), style="yellow")
panel = self._create_panel(content, "Test Execution", "blue")
self.console.print()
self.console.print(panel)
self.console.print()
# Create and display the test tree
test_label = Text()
test_label.append("🧪 Test: ", style="blue bold")
test_label.append(event.crew_name or "Crew", style="blue")
test_label.append("\n Status: ", style="white")
test_label.append("In Progress", style="yellow")
self.current_flow_tree = Tree(test_label)
self._add_tree_node(
self.current_flow_tree, "🔄 Running tests...", "yellow"
)
self.console.print(self.current_flow_tree)
self.console.print()
self.formatter.handle_crew_test_started(
event.crew_name or "Crew", source.id, event.n_iterations
)
@crewai_event_bus.on(CrewTestCompletedEvent)
def on_crew_test_completed(source, event: CrewTestCompletedEvent):
if self.verbose:
if self.current_flow_tree:
# Update test tree label to show completion
test_label = Text()
test_label.append("✅ Test: ", style="green bold")
test_label.append(event.crew_name or "Crew", style="green")
test_label.append("\n Status: ", style="white")
test_label.append("Completed", style="green bold")
self.current_flow_tree.label = test_label
self.formatter.handle_crew_test_completed(
self.current_flow_tree,
event.crew_name or "Crew",
)
# Update the running tests node
for child in self.current_flow_tree.children:
if "Running tests" in str(child.label):
child.label = Text(
"✅ Tests completed successfully", style="green"
)
self.console.print(self.current_flow_tree)
self.console.print()
# Create completion panel
completion_content = Text()
completion_content.append(
"Test Execution Completed\n", style="green bold"
)
completion_content.append("Crew: ", style="white")
completion_content.append(f"{event.crew_name}\n", style="green")
completion_content.append("Status: ", style="white")
completion_content.append("All tests passed", style="green")
panel = self._create_panel(
completion_content, "Test Completion", "green"
)
self.console.print(panel)
self.console.print()
@crewai_event_bus.on(CrewTestFailedEvent)
def on_crew_test_failed(source, event: CrewTestFailedEvent):
self.formatter.handle_crew_test_failed(event.crew_name or "Crew")
event_listener = EventListener()

View File

@@ -0,0 +1,589 @@
from typing import Dict, Optional
from rich.console import Console
from rich.panel import Panel
from rich.text import Text
from rich.tree import Tree
class ConsoleFormatter:
current_crew_tree: Optional[Tree] = None
current_task_branch: Optional[Tree] = None
current_agent_branch: Optional[Tree] = None
current_tool_branch: Optional[Tree] = None
current_flow_tree: Optional[Tree] = None
current_method_branch: Optional[Tree] = None
tool_usage_counts: Dict[str, int] = {}
def __init__(self, verbose: bool = False):
self.console = Console(width=None)
self.verbose = verbose
def create_panel(self, content: Text, title: str, style: str = "blue") -> Panel:
"""Create a standardized panel with consistent styling."""
return Panel(
content,
title=title,
border_style=style,
padding=(1, 2),
)
def create_status_content(
self, title: str, name: str, status_style: str = "blue", **fields
) -> Text:
"""Create standardized status content with consistent formatting."""
content = Text()
content.append(f"{title}\n", style=f"{status_style} bold")
content.append("Name: ", style="white")
content.append(f"{name}\n", style=status_style)
for label, value in fields.items():
content.append(f"{label}: ", style="white")
content.append(
f"{value}\n", style=fields.get(f"{label}_style", status_style)
)
return content
def update_tree_label(
self,
tree: Tree,
prefix: str,
name: str,
style: str = "blue",
status: Optional[str] = None,
) -> None:
"""Update tree label with consistent formatting."""
label = Text()
label.append(f"{prefix} ", style=f"{style} bold")
label.append(name, style=style)
if status:
label.append("\n Status: ", style="white")
label.append(status, style=f"{style} bold")
tree.label = label
def add_tree_node(self, parent: Tree, text: str, style: str = "yellow") -> Tree:
"""Add a node to the tree with consistent styling."""
return parent.add(Text(text, style=style))
def print(self, *args, **kwargs) -> None:
"""Print to console with consistent formatting if verbose is enabled."""
if self.verbose:
self.console.print(*args, **kwargs)
def print_panel(
self, content: Text, title: str, style: str = "blue", is_flow: bool = False
) -> None:
"""Print a panel with consistent formatting if verbose is enabled."""
panel = self.create_panel(content, title, style)
if is_flow:
self.print(panel)
self.print()
else:
if self.verbose:
self.print(panel)
self.print()
def update_crew_tree(
self,
tree: Tree,
crew_name: str,
source_id: str,
status: str = "completed",
) -> None:
"""Handle crew tree updates with consistent formatting."""
if not self.verbose:
return
if status == "completed":
prefix, style = "✅ Crew:", "green"
title = "Crew Completion"
content_title = "Crew Execution Completed"
elif status == "failed":
prefix, style = "❌ Crew:", "red"
title = "Crew Failure"
content_title = "Crew Execution Failed"
else:
prefix, style = "🚀 Crew:", "cyan"
title = "Crew Execution"
content_title = "Crew Execution Started"
self.update_tree_label(
tree,
prefix,
crew_name or "Crew",
style,
)
content = self.create_status_content(
content_title,
crew_name or "Crew",
style,
ID=source_id,
)
self.print(tree)
self.print()
self.print_panel(content, title, style)
def create_crew_tree(self, crew_name: str, source_id: str) -> Optional[Tree]:
"""Create and initialize a new crew tree with initial status."""
if not self.verbose:
return None
tree = Tree(
Text("🚀 Crew: ", style="cyan bold") + Text(crew_name, style="cyan")
)
content = self.create_status_content(
"Crew Execution Started",
crew_name,
"cyan",
ID=source_id,
)
self.print_panel(content, "Crew Execution Started", "cyan")
return tree
def create_task_branch(
self, crew_tree: Optional[Tree], task_id: str
) -> Optional[Tree]:
"""Create and initialize a task branch."""
if not self.verbose:
return None
task_content = Text()
task_content.append(f"📋 Task: {task_id}", style="yellow bold")
task_content.append("\n Status: ", style="white")
task_content.append("Executing Task...", style="yellow dim")
if crew_tree:
task_branch = crew_tree.add(task_content)
self.print(crew_tree)
else:
self.print_panel(task_content, "Task Started", "yellow")
self.print()
return task_branch if crew_tree else None
def update_task_status(
self, crew_tree: Tree, task_id: str, agent_role: str, status: str = "completed"
) -> None:
"""Update task status in the tree."""
if not self.verbose:
return
if status == "completed":
style = "green"
status_text = "✅ Completed"
panel_title = "Task Completion"
else:
style = "red"
status_text = "❌ Failed"
panel_title = "Task Failure"
# Update tree label
for branch in crew_tree.children:
if str(task_id) in str(branch.label):
task_content = Text()
task_content.append(f"📋 Task: {task_id}", style=f"{style} bold")
task_content.append("\n Assigned to: ", style="white")
task_content.append(agent_role, style=style)
task_content.append("\n Status: ", style="white")
task_content.append(status_text, style=f"{style} bold")
branch.label = task_content
self.print(crew_tree)
break
# Show status panel
content = self.create_status_content(
f"Task {status.title()}", str(task_id), style, Agent=agent_role
)
self.print_panel(content, panel_title, style)
def create_agent_branch(
self, task_branch: Optional[Tree], agent_role: str, crew_tree: Optional[Tree]
) -> Optional[Tree]:
"""Create and initialize an agent branch."""
if not self.verbose or not task_branch or not crew_tree:
return None
agent_branch = task_branch.add("")
self.update_tree_label(
agent_branch, "🤖 Agent:", agent_role, "green", "In Progress"
)
self.print(crew_tree)
self.print()
return agent_branch
def update_agent_status(
self,
agent_branch: Tree,
agent_role: str,
crew_tree: Tree,
status: str = "completed",
) -> None:
"""Update agent status in the tree."""
if not self.verbose:
return
self.update_tree_label(
agent_branch,
"🤖 Agent:",
agent_role,
"green",
"✅ Completed" if status == "completed" else "❌ Failed",
)
self.print(crew_tree)
self.print()
def create_flow_tree(self, flow_name: str) -> Optional[Tree]:
"""Create and initialize a flow tree."""
# if not self.verbose:
# return None
content = self.create_status_content(
"Starting Flow Execution", flow_name, "blue"
)
self.print_panel(content, "Flow Execution", "blue", is_flow=True)
# Create initial tree
flow_label = Text()
flow_label.append("🌊 Flow: ", style="blue bold")
flow_label.append(flow_name, style="blue")
flow_tree = Tree(flow_label)
self.add_tree_node(flow_tree, "✨ Created", "blue")
self.add_tree_node(flow_tree, "✅ Initialization Complete", "green")
self.print(flow_tree)
self.print()
return flow_tree
def start_flow(self, flow_name: str) -> Optional[Tree]:
"""Initialize a flow execution tree."""
# if not self.verbose:
# return None
flow_tree = Tree("")
self.update_tree_label(flow_tree, "🌊 Flow:", flow_name, "blue", "In Progress")
self.add_tree_node(flow_tree, "🧠 Initializing...", "yellow")
self.print(flow_tree)
self.print()
return flow_tree
def update_flow_status(
self, flow_tree: Tree, flow_name: str, flow_id: str, status: str = "completed"
) -> None:
"""Update flow status in the tree."""
# if not self.verbose:
# return
self.update_tree_label(
flow_tree,
"✅ Flow Finished:" if status == "completed" else "❌ Flow Failed:",
flow_name,
"green" if status == "completed" else "red",
)
content = self.create_status_content(
(
"Flow Execution Completed"
if status == "completed"
else "Flow Execution Failed"
),
flow_name,
"green" if status == "completed" else "red",
ID=flow_id,
)
self.print_panel(
content, "Flow Completion", "green" if status == "completed" else "red"
)
def update_method_status(
self,
method_branch: Optional[Tree],
flow_tree: Tree,
method_name: str,
status: str = "running",
) -> Optional[Tree]:
"""Update method status in the flow tree."""
# if not flow_tree:
# return None
if status == "running":
prefix, style = "🔄 Running:", "yellow"
elif status == "completed":
prefix, style = "✅ Completed:", "green"
else:
prefix, style = "❌ Failed:", "red"
if not method_branch:
# Find or create method branch
for branch in flow_tree.children:
if method_name in str(branch.label):
method_branch = branch
break
if not method_branch:
method_branch = flow_tree.add("")
method_branch.label = Text(prefix, style=f"{style} bold") + Text(
f" {method_name}", style=style
)
self.print(flow_tree)
self.print()
return method_branch
def handle_tool_usage_started(
self, agent_branch: Tree, tool_name: str, crew_tree: Tree
) -> Optional[Tree]:
"""Handle tool usage started event."""
if not self.verbose or not agent_branch:
return None
# Update tool usage count
self.tool_usage_counts[tool_name] = self.tool_usage_counts.get(tool_name, 0) + 1
# Find existing tool node or create new one
tool_branch = None
for child in agent_branch.children:
if tool_name in str(child.label):
tool_branch = child
break
if not tool_branch:
tool_branch = agent_branch.add("")
# Update label with current count
self.update_tree_label(
tool_branch,
"🔧",
f"Using {tool_name} ({self.tool_usage_counts[tool_name]})",
"yellow",
)
self.print(crew_tree)
self.print()
return tool_branch
def handle_tool_usage_finished(
self, tool_branch: Tree, tool_name: str, crew_tree: Tree
) -> None:
"""Handle tool usage finished event."""
if not self.verbose or not tool_branch:
return
self.update_tree_label(
tool_branch,
"🔧",
f"Used {tool_name} ({self.tool_usage_counts[tool_name]})",
"green",
)
self.print(crew_tree)
self.print()
def handle_tool_usage_error(
self,
tool_branch: Optional[Tree],
tool_name: str,
error: str,
crew_tree: Optional[Tree],
) -> None:
"""Handle tool usage error event."""
if not self.verbose:
return
if tool_branch:
self.update_tree_label(
tool_branch,
"🔧 Failed",
f"{tool_name} ({self.tool_usage_counts[tool_name]})",
"red",
)
self.print(crew_tree)
self.print()
# Show error panel
error_content = self.create_status_content(
"Tool Usage Failed", tool_name, "red", Error=error
)
self.print_panel(error_content, "Tool Error", "red")
def handle_llm_call_started(
self, agent_branch: Tree, crew_tree: Tree
) -> Optional[Tree]:
"""Handle LLM call started event."""
if not self.verbose or not agent_branch:
return None
# Only add thinking status if it doesn't exist
if not any("Thinking" in str(child.label) for child in agent_branch.children):
tool_branch = agent_branch.add("")
self.update_tree_label(tool_branch, "🧠", "Thinking...", "blue")
self.print(crew_tree)
self.print()
return tool_branch
return None
def handle_llm_call_completed(
self, tool_branch: Tree, agent_branch: Tree, crew_tree: Tree
) -> None:
"""Handle LLM call completed event."""
if not self.verbose or not tool_branch:
return
# Remove the thinking status node when complete
if "Thinking" in str(tool_branch.label) and agent_branch:
agent_branch.children.remove(tool_branch)
self.print(crew_tree)
self.print()
def handle_llm_call_failed(
self, tool_branch: Optional[Tree], error: str, crew_tree: Optional[Tree]
) -> None:
"""Handle LLM call failed event."""
if not self.verbose:
return
# Update tool branch if it exists
if tool_branch:
tool_branch.label = Text("❌ LLM Failed", style="red bold")
self.print(crew_tree)
self.print()
# Show error panel
error_content = Text()
error_content.append("❌ LLM Call Failed\n", style="red bold")
error_content.append("Error: ", style="white")
error_content.append(str(error), style="red")
self.print_panel(error_content, "LLM Error", "red")
def handle_crew_test_started(
self, crew_name: str, source_id: str, n_iterations: int
) -> Optional[Tree]:
"""Handle crew test started event."""
if not self.verbose:
return None
# Create initial panel
content = Text()
content.append("🧪 Starting Crew Test\n\n", style="blue bold")
content.append("Crew: ", style="white")
content.append(f"{crew_name}\n", style="blue")
content.append("ID: ", style="white")
content.append(str(source_id), style="blue")
content.append("\nIterations: ", style="white")
content.append(str(n_iterations), style="yellow")
self.print()
self.print_panel(content, "Test Execution", "blue")
self.print()
# Create and display the test tree
test_label = Text()
test_label.append("🧪 Test: ", style="blue bold")
test_label.append(crew_name or "Crew", style="blue")
test_label.append("\n Status: ", style="white")
test_label.append("In Progress", style="yellow")
test_tree = Tree(test_label)
self.add_tree_node(test_tree, "🔄 Running tests...", "yellow")
self.print(test_tree)
self.print()
return test_tree
def handle_crew_test_completed(
self, flow_tree: Optional[Tree], crew_name: str
) -> None:
"""Handle crew test completed event."""
if not self.verbose:
return
if flow_tree:
# Update test tree label to show completion
test_label = Text()
test_label.append("✅ Test: ", style="green bold")
test_label.append(crew_name or "Crew", style="green")
test_label.append("\n Status: ", style="white")
test_label.append("Completed", style="green bold")
flow_tree.label = test_label
# Update the running tests node
for child in flow_tree.children:
if "Running tests" in str(child.label):
child.label = Text("✅ Tests completed successfully", style="green")
self.print(flow_tree)
self.print()
# Create completion panel
completion_content = Text()
completion_content.append("Test Execution Completed\n", style="green bold")
completion_content.append("Crew: ", style="white")
completion_content.append(f"{crew_name}\n", style="green")
completion_content.append("Status: ", style="white")
completion_content.append("Completed", style="green")
self.print_panel(completion_content, "Test Completion", "green")
def handle_crew_train_started(self, crew_name: str, timestamp: str) -> None:
"""Handle crew train started event."""
if not self.verbose:
return
content = Text()
content.append("📋 Crew Training Started\n", style="blue bold")
content.append("Crew: ", style="white")
content.append(f"{crew_name}\n", style="blue")
content.append("Time: ", style="white")
content.append(timestamp, style="blue")
self.print_panel(content, "Training Started", "blue")
self.print()
def handle_crew_train_completed(self, crew_name: str, timestamp: str) -> None:
"""Handle crew train completed event."""
if not self.verbose:
return
content = Text()
content.append("✅ Crew Training Completed\n", style="green bold")
content.append("Crew: ", style="white")
content.append(f"{crew_name}\n", style="green")
content.append("Time: ", style="white")
content.append(timestamp, style="green")
self.print_panel(content, "Training Completed", "green")
self.print()
def handle_crew_train_failed(self, crew_name: str) -> None:
"""Handle crew train failed event."""
if not self.verbose:
return
failure_content = Text()
failure_content.append("❌ Crew Training Failed\n", style="red bold")
failure_content.append("Crew: ", style="white")
failure_content.append(crew_name or "Crew", style="red")
self.print_panel(failure_content, "Training Failure", "red")
self.print()
def handle_crew_test_failed(self, crew_name: str) -> None:
"""Handle crew test failed event."""
if not self.verbose:
return
failure_content = Text()
failure_content.append("❌ Crew Test Failed\n", style="red bold")
failure_content.append("Crew: ", style="white")
failure_content.append(crew_name or "Crew", style="red")
self.print_panel(failure_content, "Test Failure", "red")
self.print()