diff --git a/src/crewai/utilities/events/event_listener.py b/src/crewai/utilities/events/event_listener.py index c5c049bc6..b455c65a0 100644 --- a/src/crewai/utilities/events/event_listener.py +++ b/src/crewai/utilities/events/event_listener.py @@ -1,7 +1,12 @@ +from datetime import datetime from io import StringIO from typing import Any, Dict from pydantic import Field, PrivateAttr +from rich.console import Console +from rich.panel import Panel +from rich.text import Text +from rich.tree import Tree from crewai.task import Task from crewai.telemetry.telemetry import Telemetry @@ -50,6 +55,12 @@ class EventListener(BaseEventListener): execution_spans: Dict[Task, Any] = Field(default_factory=dict) next_chunk = 0 text_stream = StringIO() + current_crew_tree = None + current_task_branch = None + current_agent_branch = None + current_tool_branch = None + current_flow_tree = None + current_method_branch = None def __new__(cls): if cls._instance is None: @@ -64,61 +75,152 @@ class EventListener(BaseEventListener): self._telemetry.set_tracer() self.execution_spans = {} self._initialized = True + self.console = Console(width=None) + + def _format_timestamp(self, timestamp: float) -> str: + return datetime.fromtimestamp(timestamp).strftime("%H:%M:%S") + + def _create_panel(self, content: Text, title: str, style: str = "blue") -> Panel: + """Create a standardized panel with consistent styling.""" + return Panel( + content, + title=title, + border_style=style, + padding=(1, 2), + ) + + def _create_status_content( + self, title: str, name: str, status_style: str = "blue", **fields + ) -> Text: + """Create standardized status content with consistent formatting.""" + content = Text() + content.append(f"{title}\n", style=f"{status_style} bold") + content.append("Name: ", style="white") + content.append(f"{name}\n", style=status_style) + + for label, value in fields.items(): + content.append(f"{label}: ", style="white") + content.append( + f"{value}\n", style=fields.get(f"{label}_style", status_style) + ) + + return content + + def _update_tree_label( + self, + tree: Tree, + prefix: str, + name: str, + style: str = "blue", + status: str = None, + ) -> None: + """Update tree label with consistent formatting.""" + label = Text() + label.append(f"{prefix} ", style=f"{style} bold") + label.append(name, style=style) + if status: + label.append("\n Status: ", style="white") + label.append(status, style=f"{style} bold") + tree.label = label + + def _add_tree_node(self, parent: Tree, text: str, style: str = "yellow") -> Tree: + """Add a node to the tree with consistent styling.""" + return parent.add(Text(text, style=style)) + + # ----------- METHODS ----------- + + def on_crew_start(self, source: Any, event: Any) -> None: + # Create the crew tree that will hold tasks + self.current_crew_tree = Tree( + Text("๐Ÿš€ Crew: ", style="cyan bold") + Text(event.crew_name, style="cyan") + ) + + content = Text() + content.append(f"NAME: {event.crew_name}", style="white") + content.append(f"\nID: {source.id}", style="blue") + + panel = self._create_panel(content, "Crew Execution Started", "cyan") + + self.console.print(panel) + self.console.print() # Add spacing + self._telemetry.crew_execution_span(source, event.inputs) # ----------- CREW EVENTS ----------- def setup_listeners(self, crewai_event_bus): @crewai_event_bus.on(CrewKickoffStartedEvent) def on_crew_started(source, event: CrewKickoffStartedEvent): - self.logger.log( - f"๐Ÿš€ Crew '{event.crew_name}' started, {source.id}", - event.timestamp, - ) - self._telemetry.crew_execution_span(source, event.inputs) + self.on_crew_start(source, event) @crewai_event_bus.on(CrewKickoffCompletedEvent) def on_crew_completed(source, event: CrewKickoffCompletedEvent): + # Handle telemetry final_string_output = event.output.raw self._telemetry.end_crew(source, final_string_output) - self.logger.log( - f"โœ… Crew '{event.crew_name}' completed, {source.id}", - event.timestamp, - ) + + if self.current_crew_tree: + # Update crew tree label to show completion + crew_content = Text() + crew_content.append("โœ… Crew: ", style="green bold") + crew_content.append(event.crew_name or "Crew", style="green") + self.current_crew_tree.label = crew_content + + # Create completion panel + completion_content = Text() + completion_content.append( + "Crew Execution Completed\n", style="green bold" + ) + completion_content.append("Name: ", style="white") + completion_content.append(f"{event.crew_name}\n", style="green") + completion_content.append("ID: ", style="white") + completion_content.append(str(source.id), style="blue") + + # Show final tree and completion panel + self.console.print(self.current_crew_tree) + self.console.print() + + panel = self._create_panel( + completion_content, "Crew Completion", "green" + ) + self.console.print(panel) + self.console.print() @crewai_event_bus.on(CrewKickoffFailedEvent) def on_crew_failed(source, event: CrewKickoffFailedEvent): - self.logger.log( - f"โŒ Crew '{event.crew_name}' failed, {source.id}", - event.timestamp, - ) + if self.current_crew_tree: + # Update crew tree label to show failure + crew_content = Text() + crew_content.append("โŒ Crew: ", style="red bold") + crew_content.append(event.crew_name or "Crew", style="red") + self.current_crew_tree.label = crew_content - @crewai_event_bus.on(CrewTestStartedEvent) - def on_crew_test_started(source, event: CrewTestStartedEvent): - cloned_crew = source.copy() - self._telemetry.test_execution_span( - cloned_crew, - event.n_iterations, - event.inputs, - event.eval_llm or "", - ) - self.logger.log( - f"๐Ÿš€ Crew '{event.crew_name}' started test, {source.id}", - event.timestamp, - ) + # Create failure panel + failure_content = Text() + failure_content.append("Crew Execution Failed\n", style="red bold") + failure_content.append("Name: ", style="white") + failure_content.append(f"{event.crew_name}\n", style="red") + failure_content.append("ID: ", style="white") + failure_content.append(str(source.id), style="blue") - @crewai_event_bus.on(CrewTestCompletedEvent) - def on_crew_test_completed(source, event: CrewTestCompletedEvent): - self.logger.log( - f"โœ… Crew '{event.crew_name}' completed test", - event.timestamp, - ) + # Show final tree and failure panel + self.console.print(self.current_crew_tree) + self.console.print() + + panel = self._create_panel(failure_content, "Crew Failure", "red") + self.console.print(panel) + self.console.print() @crewai_event_bus.on(CrewTestFailedEvent) def on_crew_test_failed(source, event: CrewTestFailedEvent): - self.logger.log( - f"โŒ Crew '{event.crew_name}' failed test", - event.timestamp, - ) + # Create test failure content + failure_content = Text() + failure_content.append("โŒ Crew Test Failed\n", style="red bold") + failure_content.append("Crew: ", style="white") + failure_content.append(event.crew_name or "Crew", style="red") + + panel = self._create_panel(failure_content, "Test Failure", "red") + self.console.print(panel) + self.console.print() @crewai_event_bus.on(CrewTrainStartedEvent) def on_crew_train_started(source, event: CrewTrainStartedEvent): @@ -136,10 +238,15 @@ class EventListener(BaseEventListener): @crewai_event_bus.on(CrewTrainFailedEvent) def on_crew_train_failed(source, event: CrewTrainFailedEvent): - self.logger.log( - f"โŒ Crew '{event.crew_name}' failed train", - event.timestamp, - ) + # Create training failure content + failure_content = Text() + failure_content.append("โŒ Crew Training Failed\n", style="red bold") + failure_content.append("Crew: ", style="white") + failure_content.append(event.crew_name or "Crew", style="red") + + panel = self._create_panel(failure_content, "Training Failure", "red") + self.console.print(panel) + self.console.print() # ----------- TASK EVENTS ----------- @@ -148,22 +255,54 @@ class EventListener(BaseEventListener): span = self._telemetry.task_started(crew=source.agent.crew, task=source) self.execution_spans[source] = span - self.logger.log( - f"๐Ÿ“‹ Task started: {source.description}", - event.timestamp, - ) + # Create task content + task_content = Text() + task_content.append("๐Ÿ“‹ Task: ", style="yellow bold") + task_content.append("\n Assigned to: ", style="white") + task_content.append(source.agent.role, style="green") + task_content.append("\n Status: ", style="white") + task_content.append("Waiting for agent...", style="yellow dim") + + # Add task to the crew tree + if self.current_crew_tree: + self.current_task_branch = self.current_crew_tree.add(task_content) + self.console.print(self.current_crew_tree) + else: + panel = self._create_panel(task_content, "Task Started", "yellow") + self.console.print(panel) + + self.console.print() @crewai_event_bus.on(TaskCompletedEvent) def on_task_completed(source, event: TaskCompletedEvent): + # Handle telemetry span = self.execution_spans.get(source) if span: self._telemetry.task_ended(span, source, source.agent.crew) - self.logger.log( - f"โœ… Task completed: {source.description}", - event.timestamp, - ) self.execution_spans[source] = None + # Create completion content + completion_content = Text() + completion_content.append("โœ… Task Completed\n", style="green bold") + completion_content.append(f"Task: {str(source.id)}", style="white") + completion_content.append("\nAgent: ", style="white") + completion_content.append(source.agent.role, style="green") + + # Update the tree if it exists + if self.current_crew_tree: + # Find the task branch and update it with completion status + for branch in self.current_crew_tree.children: + if source.description in branch.label: + branch.label = Text("โœ… ", style="green bold") + branch.label + self.console.print(self.current_crew_tree) + break + + # Always show completion panel + panel = self._create_panel(completion_content, "Task Completion", "green") + + self.console.print(panel) + self.console.print() # Add spacing + @crewai_event_bus.on(TaskFailedEvent) def on_task_failed(source, event: TaskFailedEvent): span = self.execution_spans.get(source) @@ -171,122 +310,269 @@ class EventListener(BaseEventListener): if source.agent and source.agent.crew: self._telemetry.task_ended(span, source, source.agent.crew) self.execution_spans[source] = None - self.logger.log( - f"โŒ Task failed: {source.description}", - event.timestamp, - ) + + # Create failure content + failure_content = Text() + failure_content.append("โŒ Task Failed\n", style="red bold") + failure_content.append("Task: ", style="white") + failure_content.append(source.description, style="red") + if source.agent: + failure_content.append("\nAgent: ", style="white") + failure_content.append(source.agent.role, style="red") + + # Update the tree if it exists + if self.current_crew_tree: + # Find the task branch and update it with failure status + for branch in self.current_crew_tree.children: + if source.description in branch.label: + branch.label = Text("โŒ ", style="red bold") + branch.label + self.console.print(self.current_crew_tree) + break + + # Show failure panel + panel = self._create_panel(failure_content, "Task Failure", "red") + self.console.print(panel) + self.console.print() # ----------- AGENT EVENTS ----------- @crewai_event_bus.on(AgentExecutionStartedEvent) def on_agent_execution_started(source, event: AgentExecutionStartedEvent): - self.logger.log( - f"๐Ÿค– Agent '{event.agent.role}' started task", - event.timestamp, - ) + if self.current_task_branch: + # Create agent execution branch + agent_content = Text() + agent_content.append("๐Ÿค– Agent: ", style="green bold") + agent_content.append(event.agent.role, style="green") + agent_content.append("\n Status: ", style="white") + agent_content.append("In Progress", style="blue bold") + + # Create a branch for the agent's activities + self.current_agent_branch = self.current_task_branch.add(agent_content) + self.console.print(self.current_crew_tree) + self.console.print() @crewai_event_bus.on(AgentExecutionCompletedEvent) def on_agent_execution_completed(source, event: AgentExecutionCompletedEvent): - self.logger.log( - f"โœ… Agent '{event.agent.role}' completed task", - event.timestamp, - ) + if self.current_agent_branch: + # Update agent branch to show completion + agent_content = Text() + agent_content.append("๐Ÿค– Agent: ", style="green bold") + agent_content.append(event.agent.role, style="green") + agent_content.append("\n Status: ", style="white") + agent_content.append("โœ… Completed", style="green bold") + + # Update the agent branch label + self.current_agent_branch.label = agent_content + self.console.print(self.current_crew_tree) + self.console.print() # ----------- FLOW EVENTS ----------- @crewai_event_bus.on(FlowCreatedEvent) def on_flow_created(source, event: FlowCreatedEvent): self._telemetry.flow_creation_span(event.flow_name) - self.logger.log( - f"๐ŸŒŠ Flow Created: '{event.flow_name}'", - event.timestamp, + + # Create flow content for panel + content = Text() + content.append("๐ŸŒŠ Starting Flow Execution\n\n", style="blue bold") + content.append("Name: ", style="white") + content.append(event.flow_name, style="blue") + + panel = self._create_panel(content, "Flow Execution", "blue") + + self.console.print() + self.console.print(panel) + self.console.print() + + # Create and display the initial tree + flow_label = Text() + flow_label.append("๐ŸŒŠ Flow: ", style="blue bold") + flow_label.append(event.flow_name, style="blue") + + self.current_flow_tree = Tree(flow_label) + + # Add both creation steps to show progression + self.current_flow_tree.add(Text("โœจ Created", style="blue")) + self.current_flow_tree.add( + Text("โœ… Initialization Complete", style="green") ) + self.console.print(self.current_flow_tree) + self.console.print() + @crewai_event_bus.on(FlowStartedEvent) def on_flow_started(source, event: FlowStartedEvent): self._telemetry.flow_execution_span( event.flow_name, list(source._methods.keys()) ) - self.logger.log( - f"๐Ÿค– Flow Started: '{event.flow_name}', {source.flow_id}", - event.timestamp, - ) + + # Create flow tree with status + flow_label = Text() + flow_label.append("๐ŸŒŠ Flow: ", style="blue bold") + flow_label.append(event.flow_name, style="blue") + flow_label.append("\n Status: ", style="white") + flow_label.append("In Progress", style="yellow") + + self.current_flow_tree = Tree(flow_label) + + # Add initial thinking state + self.current_flow_tree.add(Text("๐Ÿง  Initializing...", style="yellow")) + + self.console.print() + self.console.print(self.current_flow_tree) + self.console.print() @crewai_event_bus.on(FlowFinishedEvent) def on_flow_finished(source, event: FlowFinishedEvent): - self.logger.log( - f"๐Ÿ‘ Flow Finished: '{event.flow_name}', {source.flow_id}", - event.timestamp, - ) + if self.current_flow_tree: + # Update flow tree label to show completion + self.current_flow_tree.label = Text( + "โœ… Flow Finished: ", style="green bold" + ) + Text(event.flow_name, style="green") + + # Create completion content + content = Text() + content.append("Flow Execution Completed\n", style="green bold") + content.append("Name: ", style="white") + content.append(f"{event.flow_name}\n", style="green") + content.append("ID: ", style="white") + content.append(source.flow_id, style="blue") + + panel = self._create_panel(content, "Flow Completion", "green") + self.console.print(panel) + self.console.print() @crewai_event_bus.on(MethodExecutionStartedEvent) def on_method_execution_started(source, event: MethodExecutionStartedEvent): - self.logger.log( - f"๐Ÿค– Flow Method Started: '{event.method_name}'", - event.timestamp, - ) + if self.current_flow_tree: + # Find and update the method branch + for branch in self.current_flow_tree.children: + if event.method_name in branch.label: + self.current_method_branch = branch + branch.label = Text("๐Ÿ”„ Running: ", style="yellow bold") + Text( + event.method_name, style="yellow" + ) + break - @crewai_event_bus.on(MethodExecutionFailedEvent) - def on_method_execution_failed(source, event: MethodExecutionFailedEvent): - self.logger.log( - f"โŒ Flow Method Failed: '{event.method_name}'", - event.timestamp, - ) + self.console.print(self.current_flow_tree) + self.console.print() @crewai_event_bus.on(MethodExecutionFinishedEvent) def on_method_execution_finished(source, event: MethodExecutionFinishedEvent): - self.logger.log( - f"๐Ÿ‘ Flow Method Finished: '{event.method_name}'", - event.timestamp, - ) + if self.current_method_branch: + # Update method status + self.current_method_branch.label = Text( + "โœ… Completed: ", style="green bold" + ) + Text(event.method_name, style="green") + self.console.print(self.current_flow_tree) + self.console.print() + + @crewai_event_bus.on(MethodExecutionFailedEvent) + def on_method_execution_failed(source, event: MethodExecutionFailedEvent): + if self.current_method_branch: + # Update method status to show failure + self.current_method_branch.label = Text( + "โŒ Failed: ", style="red bold" + ) + Text(event.method_name, style="red") + self.console.print(self.current_flow_tree) + self.console.print() # ----------- TOOL USAGE EVENTS ----------- @crewai_event_bus.on(ToolUsageStartedEvent) def on_tool_usage_started(source, event: ToolUsageStartedEvent): - self.logger.log( - f"๐Ÿค– Tool Usage Started: '{event.tool_name}'", - event.timestamp, - ) + # Create tool usage content + tool_content = Text() + tool_content.append("๐Ÿ”ง Using ", style="yellow bold") + tool_content.append(event.tool_name, style="yellow") + + # Add to tree under the agent branch + if self.current_agent_branch: + self.current_tool_branch = self.current_agent_branch.add(tool_content) + self.console.print(self.current_crew_tree) + self.console.print() @crewai_event_bus.on(ToolUsageFinishedEvent) def on_tool_usage_finished(source, event: ToolUsageFinishedEvent): - self.logger.log( - f"โœ… Tool Usage Finished: '{event.tool_name}'", - event.timestamp, - # - ) + # Create completion content + completion_content = Text() + completion_content.append("๐Ÿ”ง Used ", style="green bold") + completion_content.append(event.tool_name, style="green") + + # Update under the agent branch + if self.current_tool_branch: + self.current_tool_branch.label = completion_content + self.console.print(self.current_crew_tree) + self.console.print() @crewai_event_bus.on(ToolUsageErrorEvent) def on_tool_usage_error(source, event: ToolUsageErrorEvent): - self.logger.log( - f"โŒ Tool Usage Error: '{event.tool_name}'", - event.timestamp, - # + # Create tool error content + error_content = Text() + error_content.append("๐Ÿ”ง Tool Failed: ", style="red bold") + error_content.append(event.tool_name, style="red") + + # Update under the agent branch + if self.current_tool_branch: + self.current_tool_branch.label = error_content + self.console.print(self.current_crew_tree) + self.console.print() + + # Show error panel + panel = self._create_panel( + Text( + f"Tool usage failed: {event.tool_name}: {event.error}", style="red" + ), + "Tool Error", + "red", ) + self.console.print(panel) + self.console.print() # ----------- LLM EVENTS ----------- @crewai_event_bus.on(LLMCallStartedEvent) def on_llm_call_started(source, event: LLMCallStartedEvent): - self.logger.log( - f"๐Ÿค– LLM Call Started", - event.timestamp, - ) + # Create simple LLM call content + llm_content = Text() + llm_content.append("๐Ÿง  Thinking...", style="blue bold") + + # Add to tree under the agent branch + if self.current_agent_branch: + self.current_tool_branch = self.current_agent_branch.add(llm_content) + self.console.print(self.current_crew_tree) + self.console.print() @crewai_event_bus.on(LLMCallCompletedEvent) def on_llm_call_completed(source, event: LLMCallCompletedEvent): - self.logger.log( - f"โœ… LLM Call Completed", - event.timestamp, - ) + # Create simple completion content + completion_content = Text() + completion_content.append("โœจ Done", style="green bold") + + # Update under the agent branch + if self.current_tool_branch: + self.current_tool_branch.label = completion_content + self.console.print(self.current_crew_tree) + self.console.print() @crewai_event_bus.on(LLMCallFailedEvent) def on_llm_call_failed(source, event: LLMCallFailedEvent): - self.logger.log( - f"โŒ LLM call failed: {event.error}", - event.timestamp, - ) + # Create LLM error content + error_content = Text() + error_content.append("โŒ LLM Call Failed\n", style="red bold") + error_content.append("Error: ", style="white") + error_content.append(str(event.error), style="red") + + # Update under the agent branch if it exists + if self.current_tool_branch: + self.current_tool_branch.label = Text("โŒ LLM Failed", style="red bold") + self.console.print(self.current_crew_tree) + self.console.print() + + # Show error panel + panel = self._create_panel(error_content, "LLM Error", "red") + self.console.print(panel) + self.console.print() @crewai_event_bus.on(LLMStreamChunkEvent) def on_llm_stream_chunk(source, event: LLMStreamChunkEvent): @@ -299,5 +585,77 @@ class EventListener(BaseEventListener): print(content, end="", flush=True) self.next_chunk = self.text_stream.tell() + @crewai_event_bus.on(CrewTestStartedEvent) + def on_crew_test_started(source, event: CrewTestStartedEvent): + cloned_crew = source.copy() + self._telemetry.test_execution_span( + cloned_crew, + event.n_iterations, + event.inputs, + event.eval_llm or "", + ) + + # Create test content for panel + content = Text() + content.append("๐Ÿงช Starting Crew Test\n\n", style="blue bold") + content.append("Crew: ", style="white") + content.append(f"{event.crew_name}\n", style="blue") + content.append("ID: ", style="white") + content.append(str(source.id), style="blue") + content.append("\nIterations: ", style="white") + content.append(str(event.n_iterations), style="yellow") + + panel = self._create_panel(content, "Test Execution", "blue") + + self.console.print() + self.console.print(panel) + self.console.print() + + # Create and display the test tree + test_label = Text() + test_label.append("๐Ÿงช Test: ", style="blue bold") + test_label.append(event.crew_name or "Crew", style="blue") + test_label.append("\n Status: ", style="white") + test_label.append("In Progress", style="yellow") + + self.current_flow_tree = Tree(test_label) + self.current_flow_tree.add(Text("๐Ÿ”„ Running tests...", style="yellow")) + + self.console.print(self.current_flow_tree) + self.console.print() + + @crewai_event_bus.on(CrewTestCompletedEvent) + def on_crew_test_completed(source, event: CrewTestCompletedEvent): + if self.current_flow_tree: + # Update test tree label to show completion + test_label = Text() + test_label.append("โœ… Test: ", style="green bold") + test_label.append(event.crew_name or "Crew", style="green") + test_label.append("\n Status: ", style="white") + test_label.append("Completed", style="green bold") + self.current_flow_tree.label = test_label + + # Update the running tests node + for child in self.current_flow_tree.children: + if "Running tests" in str(child.label): + child.label = Text( + "โœ… Tests completed successfully", style="green" + ) + + self.console.print(self.current_flow_tree) + self.console.print() + + # Create completion panel + completion_content = Text() + completion_content.append("Test Execution Completed\n", style="green bold") + completion_content.append("Crew: ", style="white") + completion_content.append(f"{event.crew_name}\n", style="green") + completion_content.append("Status: ", style="white") + completion_content.append("All tests passed", style="green") + + panel = self._create_panel(completion_content, "Test Completion", "green") + self.console.print(panel) + self.console.print() + event_listener = EventListener()