Refactor EventListener logging and visualization with improved tool usage tracking

This commit is contained in:
Lorenze Jay
2025-03-11 11:37:05 -07:00
parent 51599cc36e
commit e077ffcbbe

View File

@@ -61,6 +61,7 @@ class EventListener(BaseEventListener):
current_tool_branch = None
current_flow_tree = None
current_method_branch = None
tool_usage_counts = {}
def __new__(cls):
if cls._instance is None:
@@ -76,6 +77,7 @@ class EventListener(BaseEventListener):
self.execution_spans = {}
self._initialized = True
self.console = Console(width=None)
self.tool_usage_counts = {}
def _format_timestamp(self, timestamp: float) -> str:
return datetime.fromtimestamp(timestamp).strftime("%H:%M:%S")
@@ -258,11 +260,9 @@ class EventListener(BaseEventListener):
if self.verbose:
task_content = Text()
task_content.append("📋 Task: ", style="yellow bold")
task_content.append("\n Assigned to: ", style="white")
task_content.append(source.agent.role, style="green")
task_content.append(f"📋 Task: {source.id}", style="yellow bold")
task_content.append("\n Status: ", style="white")
task_content.append("Waiting for agent...", style="yellow dim")
task_content.append("Executing Task...", style="yellow dim")
# Add task to the crew tree
if self.current_crew_tree:
@@ -283,26 +283,26 @@ class EventListener(BaseEventListener):
self.execution_spans[source] = None
if self.verbose:
completion_content = Text()
completion_content.append("✅ Task Completed\n", style="green bold")
completion_content.append(f"Task: {str(source.id)}", style="white")
completion_content.append("\nAgent: ", style="white")
completion_content.append(source.agent.role, style="green")
# Update the tree if it exists
if self.current_crew_tree:
for branch in self.current_crew_tree.children:
if source.description in branch.label:
branch.label = (
Text(" ", style="green bold") + branch.label
)
if str(source.id) in str(branch.label):
task_content = Text()
task_content.append("📋 Task: ", style="green bold")
task_content.append("\n Assigned to: ", style="white")
task_content.append(source.agent.role, style="green")
task_content.append("\n Status: ", style="white")
task_content.append("✅ Completed", style="green bold")
branch.label = task_content
self.console.print(self.current_crew_tree)
break
completion_content = self._create_status_content(
"Task Completed", str(source.id), "green", Agent=source.agent.role
)
panel = self._create_panel(
completion_content, "Task Completion", "green"
)
self.console.print(panel)
self.console.print()
@@ -377,168 +377,172 @@ class EventListener(BaseEventListener):
def on_flow_created(source, event: FlowCreatedEvent):
self._telemetry.flow_creation_span(event.flow_name)
if self.verbose:
content = self._create_status_content(
"Starting Flow Execution", event.flow_name, "blue"
)
content = self._create_status_content(
"Starting Flow Execution", event.flow_name, "blue"
)
panel = self._create_panel(content, "Flow Execution", "blue")
panel = self._create_panel(content, "Flow Execution", "blue")
self.console.print()
self.console.print(panel)
self.console.print()
self.console.print()
self.console.print(panel)
self.console.print()
# Create and display the initial tree
flow_label = Text()
flow_label.append("🌊 Flow: ", style="blue bold")
flow_label.append(event.flow_name, style="blue")
# Create and display the initial tree
flow_label = Text()
flow_label.append("🌊 Flow: ", style="blue bold")
flow_label.append(event.flow_name, style="blue")
self.current_flow_tree = Tree(flow_label)
self.current_flow_tree = Tree(flow_label)
# Add both creation steps to show progression
self.current_flow_tree.add(Text("✨ Created", style="blue"))
self.current_flow_tree.add(
Text("✅ Initialization Complete", style="green")
)
# Add both creation steps to show progression
self.current_flow_tree.add(Text("✨ Created", style="blue"))
self.current_flow_tree.add(
Text("✅ Initialization Complete", style="green")
)
self.console.print(self.current_flow_tree)
self.console.print()
self.console.print(self.current_flow_tree)
self.console.print()
@crewai_event_bus.on(FlowStartedEvent)
def on_flow_started(source, event: FlowStartedEvent):
self._telemetry.flow_execution_span(
event.flow_name, list(source._methods.keys())
)
self.current_flow_tree = Tree("")
self._update_tree_label(
self.current_flow_tree,
"🌊 Flow:",
event.flow_name,
"blue",
"In Progress",
)
if self.verbose:
self.current_flow_tree = Tree("")
self._update_tree_label(
self.current_flow_tree,
"🌊 Flow:",
event.flow_name,
"blue",
"In Progress",
)
# Add initial thinking state
self.current_flow_tree.add(Text("🧠 Initializing...", style="yellow"))
# Add initial thinking state
self.current_flow_tree.add(Text("🧠 Initializing...", style="yellow"))
self.console.print()
self.console.print(self.current_flow_tree)
self.console.print()
self.console.print()
self.console.print(self.current_flow_tree)
self.console.print()
@crewai_event_bus.on(FlowFinishedEvent)
def on_flow_finished(source, event: FlowFinishedEvent):
if self.verbose:
if self.current_flow_tree:
self._update_tree_label(
self.current_flow_tree,
"✅ Flow Finished:",
event.flow_name,
"green",
)
if self.current_flow_tree:
self._update_tree_label(
self.current_flow_tree,
"✅ Flow Finished:",
event.flow_name,
"green",
)
content = self._create_status_content(
"Flow Execution Completed",
event.flow_name,
"green",
ID=source.flow_id,
)
content = self._create_status_content(
"Flow Execution Completed",
event.flow_name,
"green",
ID=source.flow_id,
)
panel = self._create_panel(content, "Flow Completion", "green")
self.console.print(panel)
self.console.print()
panel = self._create_panel(content, "Flow Completion", "green")
self.console.print(panel)
self.console.print()
@crewai_event_bus.on(MethodExecutionStartedEvent)
def on_method_execution_started(source, event: MethodExecutionStartedEvent):
if self.verbose:
if self.current_flow_tree:
# Find and update the method branch
for branch in self.current_flow_tree.children:
if event.method_name in branch.label:
self.current_method_branch = branch
branch.label = Text(
"🔄 Running: ", style="yellow bold"
) + Text(event.method_name, style="yellow")
break
if self.current_flow_tree:
# Find and update the method branch
for branch in self.current_flow_tree.children:
if event.method_name in branch.label:
self.current_method_branch = branch
branch.label = Text("🔄 Running: ", style="yellow bold") + Text(
event.method_name, style="yellow"
)
break
self.console.print(self.current_flow_tree)
self.console.print()
self.console.print(self.current_flow_tree)
self.console.print()
@crewai_event_bus.on(MethodExecutionFinishedEvent)
def on_method_execution_finished(source, event: MethodExecutionFinishedEvent):
if self.verbose:
if self.current_method_branch:
# Update method status
self.current_method_branch.label = Text(
"✅ Completed: ", style="green bold"
) + Text(event.method_name, style="green")
self.console.print(self.current_flow_tree)
self.console.print()
if self.current_method_branch:
# Update method status
self.current_method_branch.label = Text(
"✅ Completed: ", style="green bold"
) + Text(event.method_name, style="green")
self.console.print(self.current_flow_tree)
self.console.print()
@crewai_event_bus.on(MethodExecutionFailedEvent)
def on_method_execution_failed(source, event: MethodExecutionFailedEvent):
if self.verbose:
if self.current_method_branch:
self.current_method_branch.label = Text(
"❌ Failed: ", style="red bold"
) + Text(event.method_name, style="red")
self.console.print(self.current_flow_tree)
self.console.print()
if self.current_method_branch:
self.current_method_branch.label = Text(
"❌ Failed: ", style="red bold"
) + Text(event.method_name, style="red")
self.console.print(self.current_flow_tree)
self.console.print()
# ----------- TOOL USAGE EVENTS -----------
@crewai_event_bus.on(ToolUsageStartedEvent)
def on_tool_usage_started(source, event: ToolUsageStartedEvent):
if self.verbose:
# Create tool usage content
tool_content = Text()
tool_content.append("🔧 Using ", style="yellow bold")
tool_content.append(event.tool_name, style="yellow")
if self.verbose and self.current_agent_branch:
# Update tool usage count
self.tool_usage_counts[event.tool_name] = (
self.tool_usage_counts.get(event.tool_name, 0) + 1
)
# Add to tree under the agent branch
if self.current_agent_branch:
self.current_tool_branch = self.current_agent_branch.add(
tool_content
)
self.console.print(self.current_crew_tree)
self.console.print()
# Find existing tool node or create new one
tool_node = None
for child in self.current_agent_branch.children:
if event.tool_name in child.label.plain:
tool_node = child
break
if not tool_node:
# Create new tool node
self.current_tool_branch = self.current_agent_branch.add("")
else:
self.current_tool_branch = tool_node
# Update label with current count
self._update_tree_label(
self.current_tool_branch,
"🔧",
f"Using {event.tool_name} ({self.tool_usage_counts[event.tool_name]})",
"yellow",
)
self.console.print(self.current_crew_tree)
self.console.print()
@crewai_event_bus.on(ToolUsageFinishedEvent)
def on_tool_usage_finished(source, event: ToolUsageFinishedEvent):
if self.verbose:
completion_content = Text()
completion_content.append("🔧 Used ", style="green bold")
completion_content.append(event.tool_name, style="green")
# Update under the agent branch
if self.current_tool_branch:
self.current_tool_branch.label = completion_content
self.console.print(self.current_crew_tree)
self.console.print()
if self.verbose and self.current_tool_branch:
self._update_tree_label(
self.current_tool_branch,
"🔧",
f"Used {event.tool_name} ({self.tool_usage_counts[event.tool_name]})",
"green",
)
self.console.print(self.current_crew_tree)
self.console.print()
@crewai_event_bus.on(ToolUsageErrorEvent)
def on_tool_usage_error(source, event: ToolUsageErrorEvent):
if self.verbose:
error_content = Text()
error_content.append("🔧 Tool Failed: ", style="red bold")
error_content.append(event.tool_name, style="red")
# Update under the agent branch
if self.current_tool_branch:
self.current_tool_branch.label = error_content
self._update_tree_label(
self.current_tool_branch,
"🔧 Failed",
f"{event.tool_name} ({self.tool_usage_counts[event.tool_name]})",
"red",
)
self.console.print(self.current_crew_tree)
self.console.print()
# Show error panel
panel = self._create_panel(
Text(
f"Tool usage failed: {event.tool_name}: {event.error}",
style="red",
),
"Tool Error",
"red",
error_content = self._create_status_content(
"Tool Usage Failed", event.tool_name, "red", Error=event.error
)
panel = self._create_panel(error_content, "Tool Error", "red")
self.console.print(panel)
self.console.print()
@@ -546,27 +550,25 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(LLMCallStartedEvent)
def on_llm_call_started(source, event: LLMCallStartedEvent):
if self.verbose:
llm_content = Text()
llm_content.append("🧠 Thinking...", style="blue bold")
# Add to tree under the agent branch
if self.current_agent_branch:
self.current_tool_branch = self.current_agent_branch.add(
llm_content
if self.verbose and self.current_agent_branch:
# Only show thinking state if no previous LLM calls are shown
if not any(
"Thinking" in str(child.label)
for child in self.current_agent_branch.children
):
self.current_tool_branch = self.current_agent_branch.add("")
self._update_tree_label(
self.current_tool_branch, "🧠", "Thinking...", "blue"
)
self.console.print(self.current_crew_tree)
self.console.print()
self.console.print(self.current_crew_tree)
self.console.print()
@crewai_event_bus.on(LLMCallCompletedEvent)
def on_llm_call_completed(source, event: LLMCallCompletedEvent):
if self.verbose:
completion_content = Text()
completion_content.append("✨ Done", style="green bold")
# Update under the agent branch
if self.current_tool_branch:
self.current_tool_branch.label = completion_content
if self.verbose and self.current_tool_branch:
# Remove the thinking status node when complete
if "Thinking" in str(self.current_tool_branch.label):
self.current_agent_branch.children.remove(self.current_tool_branch)
self.console.print(self.current_crew_tree)
self.console.print()