diff --git a/src/crewai/crew.py b/src/crewai/crew.py index a7ccd7de0..d3a6870dc 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -251,7 +251,9 @@ class Crew(BaseModel): """Set private attributes.""" self._cache_handler = CacheHandler() - EventListener().verbose = self.verbose + event_listener = EventListener() + event_listener.verbose = self.verbose + event_listener.formatter.verbose = self.verbose self._logger = Logger(verbose=self.verbose) if self.output_log_file: self._file_handler = FileHandler(self.output_log_file) diff --git a/src/crewai/utilities/events/event_listener.py b/src/crewai/utilities/events/event_listener.py index b673485f9..921594de1 100644 --- a/src/crewai/utilities/events/event_listener.py +++ b/src/crewai/utilities/events/event_listener.py @@ -1,11 +1,7 @@ -from datetime import datetime from io import StringIO from typing import Any, Dict, Optional from pydantic import Field, PrivateAttr -from rich.console import Console -from rich.panel import Panel -from rich.text import Text from rich.tree import Tree from crewai.task import Task @@ -19,6 +15,7 @@ from crewai.utilities.events.llm_events import ( LLMCallStartedEvent, LLMStreamChunkEvent, ) +from crewai.utilities.events.utils.console_formatter import ConsoleFormatter from .agent_events import AgentExecutionCompletedEvent, AgentExecutionStartedEvent from .crew_events import ( @@ -55,13 +52,6 @@ class EventListener(BaseEventListener): execution_spans: Dict[Task, Any] = Field(default_factory=dict) next_chunk = 0 text_stream = StringIO() - current_crew_tree: Optional[Tree] = None - current_task_branch: Optional[Tree] = None - current_agent_branch: Optional[Tree] = None - current_tool_branch: Optional[Tree] = None - current_flow_tree: Optional[Tree] = None - current_method_branch: Optional[Tree] = None - tool_usage_counts: Dict[str, int] = {} def __new__(cls): if cls._instance is None: @@ -76,177 +66,81 @@ class EventListener(BaseEventListener): self._telemetry.set_tracer() self.execution_spans = {} self._initialized = True - self.console = Console(width=None) - self.tool_usage_counts = {} + self.formatter = ConsoleFormatter() - def _create_panel(self, content: Text, title: str, style: str = "blue") -> Panel: - """Create a standardized panel with consistent styling.""" - return Panel( - content, - title=title, - border_style=style, - padding=(1, 2), - ) + @property + def current_crew_tree(self) -> Optional[Tree]: + return self.formatter.current_crew_tree - def _create_status_content( - self, title: str, name: str, status_style: str = "blue", **fields - ) -> Text: - """Create standardized status content with consistent formatting.""" - content = Text() - content.append(f"{title}\n", style=f"{status_style} bold") - content.append("Name: ", style="white") - content.append(f"{name}\n", style=status_style) + @property + def current_task_branch(self) -> Optional[Tree]: + return self.formatter.current_task_branch - for label, value in fields.items(): - content.append(f"{label}: ", style="white") - content.append( - f"{value}\n", style=fields.get(f"{label}_style", status_style) - ) + @property + def current_agent_branch(self) -> Optional[Tree]: + return self.formatter.current_agent_branch - return content + @property + def current_tool_branch(self) -> Optional[Tree]: + return self.formatter.current_tool_branch - def _update_tree_label( - self, - tree: Tree, - prefix: str, - name: str, - style: str = "blue", - status: Optional[str] = None, - ) -> None: - """Update tree label with consistent formatting.""" - label = Text() - label.append(f"{prefix} ", style=f"{style} bold") - label.append(name, style=style) - if status: - label.append("\n Status: ", style="white") - label.append(status, style=f"{style} bold") - tree.label = label + @property + def current_flow_tree(self) -> Optional[Tree]: + return self.formatter.current_flow_tree - def _add_tree_node(self, parent: Tree, text: str, style: str = "yellow") -> Tree: - """Add a node to the tree with consistent styling.""" - return parent.add(Text(text, style=style)) - - # ----------- METHODS ----------- - - def on_crew_start(self, source: Any, event: Any) -> None: - if self.verbose: - self.current_crew_tree = Tree( - Text("๐Ÿš€ Crew: ", style="cyan bold") - + Text(event.crew_name, style="cyan") - ) - - content = self._create_status_content( - "Crew Execution Started", - event.crew_name, - "cyan", - ID=source.id, - ) - - panel = self._create_panel(content, "Crew Execution Started", "cyan") - self.console.print(panel) - self.console.print() - - self._telemetry.crew_execution_span(source, event.inputs) + @property + def current_method_branch(self) -> Optional[Tree]: + return self.formatter.current_method_branch # ----------- CREW EVENTS ----------- def setup_listeners(self, crewai_event_bus): @crewai_event_bus.on(CrewKickoffStartedEvent) def on_crew_started(source, event: CrewKickoffStartedEvent): - self.on_crew_start(source, event) + self.formatter.current_crew_tree = self.formatter.create_crew_tree( + event.crew_name or "Crew", source.id + ) + self._telemetry.crew_execution_span(source, event.inputs) @crewai_event_bus.on(CrewKickoffCompletedEvent) def on_crew_completed(source, event: CrewKickoffCompletedEvent): # Handle telemetry final_string_output = event.output.raw self._telemetry.end_crew(source, final_string_output) - if self.verbose: - if self.current_crew_tree: - self._update_tree_label( - self.current_crew_tree, - "โœ… Crew:", - event.crew_name or "Crew", - "green", - ) - completion_content = self._create_status_content( - "Crew Execution Completed", - event.crew_name or "Crew", - "green", - ID=source.id, - ) - - self.console.print(self.current_crew_tree) - self.console.print() - panel = self._create_panel( - completion_content, "Crew Completion", "green" - ) - self.console.print(panel) - self.console.print() + if self.current_crew_tree: + self.formatter.update_crew_tree( + self.current_crew_tree, + event.crew_name or "Crew", + source.id, + "completed", + ) @crewai_event_bus.on(CrewKickoffFailedEvent) def on_crew_failed(source, event: CrewKickoffFailedEvent): - if self.verbose: - if self.current_crew_tree: - # Update crew tree label to show failure - crew_content = Text() - crew_content.append("โŒ Crew: ", style="red bold") - crew_content.append(event.crew_name or "Crew", style="red") - self.current_crew_tree.label = crew_content - - # Create failure panel - failure_content = Text() - failure_content.append("Crew Execution Failed\n", style="red bold") - failure_content.append("Name: ", style="white") - failure_content.append(f"{event.crew_name}\n", style="red") - failure_content.append("ID: ", style="white") - failure_content.append(str(source.id), style="blue") - - # Show final tree and failure panel - self.console.print(self.current_crew_tree) - self.console.print() - - panel = self._create_panel(failure_content, "Crew Failure", "red") - self.console.print(panel) - self.console.print() - - @crewai_event_bus.on(CrewTestFailedEvent) - def on_crew_test_failed(source, event: CrewTestFailedEvent): - if self.verbose: - failure_content = Text() - failure_content.append("โŒ Crew Test Failed\n", style="red bold") - failure_content.append("Crew: ", style="white") - failure_content.append(event.crew_name or "Crew", style="red") - - panel = self._create_panel(failure_content, "Test Failure", "red") - self.console.print(panel) - self.console.print() + if self.current_crew_tree: + self.formatter.update_crew_tree( + self.current_crew_tree, + event.crew_name or "Crew", + source.id, + "failed", + ) @crewai_event_bus.on(CrewTrainStartedEvent) def on_crew_train_started(source, event: CrewTrainStartedEvent): - self.logger.log( - f"๐Ÿ“‹ Crew '{event.crew_name}' started train", - event.timestamp, + self.formatter.handle_crew_train_started( + event.crew_name or "Crew", str(event.timestamp) ) @crewai_event_bus.on(CrewTrainCompletedEvent) def on_crew_train_completed(source, event: CrewTrainCompletedEvent): - self.logger.log( - f"โœ… Crew '{event.crew_name}' completed train", - event.timestamp, + self.formatter.handle_crew_train_completed( + event.crew_name or "Crew", str(event.timestamp) ) @crewai_event_bus.on(CrewTrainFailedEvent) def on_crew_train_failed(source, event: CrewTrainFailedEvent): - if self.verbose: - failure_content = Text() - failure_content.append("โŒ Crew Training Failed\n", style="red bold") - failure_content.append("Crew: ", style="white") - failure_content.append(event.crew_name or "Crew", style="red") - - panel = self._create_panel(failure_content, "Training Failure", "red") - self.console.print(panel) - self.console.print() + self.formatter.handle_crew_train_failed(event.crew_name or "Crew") # ----------- TASK EVENTS ----------- @@ -254,22 +148,9 @@ class EventListener(BaseEventListener): def on_task_started(source, event: TaskStartedEvent): span = self._telemetry.task_started(crew=source.agent.crew, task=source) self.execution_spans[source] = span - - if self.verbose: - task_content = Text() - task_content.append(f"๐Ÿ“‹ Task: {source.id}", style="yellow bold") - task_content.append("\n Status: ", style="white") - task_content.append("Executing Task...", style="yellow dim") - - # Add task to the crew tree - if self.current_crew_tree: - self.current_task_branch = self.current_crew_tree.add(task_content) - self.console.print(self.current_crew_tree) - else: - panel = self._create_panel(task_content, "Task Started", "yellow") - self.console.print(panel) - - self.console.print() + self.formatter.current_task_branch = self.formatter.create_task_branch( + self.formatter.current_crew_tree, source.id + ) @crewai_event_bus.on(TaskCompletedEvent) def on_task_completed(source, event: TaskCompletedEvent): @@ -279,32 +160,11 @@ class EventListener(BaseEventListener): self._telemetry.task_ended(span, source, source.agent.crew) self.execution_spans[source] = None - if self.verbose: - if self.current_crew_tree: - for branch in self.current_crew_tree.children: - if str(source.id) in str(branch.label): - task_content = Text() - task_content.append( - f"๐Ÿ“‹ Task: {source.id}", style="green bold" - ) - task_content.append("\n Assigned to: ", style="white") - task_content.append(source.agent.role, style="green") - task_content.append("\n Status: ", style="white") - task_content.append("โœ… Completed", style="green bold") - branch.label = task_content - self.console.print(self.current_crew_tree) - break - - completion_content = self._create_status_content( - "Task Completed", str(source.id), "green", Agent=source.agent.role + if self.current_crew_tree: + self.formatter.update_task_status( + self.current_crew_tree, source.id, source.agent.role, "completed" ) - panel = self._create_panel( - completion_content, "Task Completion", "green" - ) - self.console.print(panel) - self.console.print() - @crewai_event_bus.on(TaskFailedEvent) def on_task_failed(source, event: TaskFailedEvent): span = self.execution_spans.get(source) @@ -313,284 +173,121 @@ class EventListener(BaseEventListener): self._telemetry.task_ended(span, source, source.agent.crew) self.execution_spans[source] = None - if self.verbose: - failure_content = Text() - failure_content.append("โŒ Task Failed\n", style="red bold") - failure_content.append(f"Task: {source.id}", style="white") - failure_content.append(source.description, style="red") - if source.agent: - failure_content.append("\nAgent: ", style="white") - failure_content.append(source.agent.role, style="red") - - # Update the tree if it exists - if self.current_crew_tree: - # Find the task branch and update it with failure status - for branch in self.current_crew_tree.children: - if source.description in branch.label: - branch.label = Text("โŒ ", style="red bold") + branch.label - self.console.print(self.current_crew_tree) - break - - # Show failure panel - panel = self._create_panel(failure_content, "Task Failure", "red") - self.console.print(panel) - self.console.print() + if self.current_crew_tree: + self.formatter.update_task_status( + self.current_crew_tree, source.id, source.agent.role, "failed" + ) # ----------- AGENT EVENTS ----------- @crewai_event_bus.on(AgentExecutionStartedEvent) def on_agent_execution_started(source, event: AgentExecutionStartedEvent): - if self.verbose: - if self.current_task_branch: - # Create agent execution branch with empty label - self.current_agent_branch = self.current_task_branch.add("") - self._update_tree_label( - self.current_agent_branch, - "๐Ÿค– Agent:", - event.agent.role, - "green", - "In Progress", - ) - - self.console.print(self.current_crew_tree) - self.console.print() + self.formatter.current_agent_branch = self.formatter.create_agent_branch( + self.formatter.current_task_branch, + event.agent.role, + self.formatter.current_crew_tree, + ) @crewai_event_bus.on(AgentExecutionCompletedEvent) def on_agent_execution_completed(source, event: AgentExecutionCompletedEvent): - if self.verbose: - if self.current_agent_branch: - self._update_tree_label( - self.current_agent_branch, - "๐Ÿค– Agent:", - event.agent.role, - "green", - "โœ… Completed", - ) - - self.console.print(self.current_crew_tree) - self.console.print() + if self.current_agent_branch and self.current_crew_tree: + self.formatter.update_agent_status( + self.current_agent_branch, event.agent.role, self.current_crew_tree + ) # ----------- FLOW EVENTS ----------- @crewai_event_bus.on(FlowCreatedEvent) def on_flow_created(source, event: FlowCreatedEvent): self._telemetry.flow_creation_span(event.flow_name) - - content = self._create_status_content( - "Starting Flow Execution", event.flow_name, "blue" - ) - - panel = self._create_panel(content, "Flow Execution", "blue") - - self.console.print() - self.console.print(panel) - self.console.print() - - # Create and display the initial tree - flow_label = Text() - flow_label.append("๐ŸŒŠ Flow: ", style="blue bold") - flow_label.append(event.flow_name, style="blue") - - self.current_flow_tree = Tree(flow_label) - - self._add_tree_node(self.current_flow_tree, "โœจ Created", "blue") - self._add_tree_node( - self.current_flow_tree, "โœ… Initialization Complete", "green" - ) - - self.console.print(self.current_flow_tree) - self.console.print() + self.formatter.create_flow_tree(event.flow_name) @crewai_event_bus.on(FlowStartedEvent) def on_flow_started(source, event: FlowStartedEvent): self._telemetry.flow_execution_span( event.flow_name, list(source._methods.keys()) ) - self.current_flow_tree = Tree("") - self._update_tree_label( - self.current_flow_tree, - "๐ŸŒŠ Flow:", - event.flow_name, - "blue", - "In Progress", - ) - - self._add_tree_node(self.current_flow_tree, "๐Ÿง  Initializing...", "yellow") - - self.console.print() - self.console.print(self.current_flow_tree) - self.console.print() + self.formatter.start_flow(event.flow_name) @crewai_event_bus.on(FlowFinishedEvent) def on_flow_finished(source, event: FlowFinishedEvent): if self.current_flow_tree: - self._update_tree_label( - self.current_flow_tree, - "โœ… Flow Finished:", - event.flow_name, - "green", + self.formatter.update_flow_status( + self.current_flow_tree, event.flow_name, source.flow_id ) - content = self._create_status_content( - "Flow Execution Completed", - event.flow_name, - "green", - ID=source.flow_id, - ) - - panel = self._create_panel(content, "Flow Completion", "green") - self.console.print(panel) - self.console.print() - @crewai_event_bus.on(MethodExecutionStartedEvent) def on_method_execution_started(source, event: MethodExecutionStartedEvent): - if self.current_flow_tree: - # Find and update the method branch - for branch in self.current_flow_tree.children: - if event.method_name in branch.label: - self.current_method_branch = branch - branch.label = Text("๐Ÿ”„ Running: ", style="yellow bold") + Text( - event.method_name, style="yellow" - ) - break - - self.console.print(self.current_flow_tree) - self.console.print() + self.formatter.update_method_status( + self.current_method_branch, + self.current_flow_tree, + event.method_name, + "running", + ) @crewai_event_bus.on(MethodExecutionFinishedEvent) def on_method_execution_finished(source, event: MethodExecutionFinishedEvent): - if self.current_method_branch: - # Update method status - self.current_method_branch.label = Text( - "โœ… Completed: ", style="green bold" - ) + Text(event.method_name, style="green") - self.console.print(self.current_flow_tree) - self.console.print() + self.formatter.update_method_status( + self.current_method_branch, + self.current_flow_tree, + event.method_name, + "completed", + ) @crewai_event_bus.on(MethodExecutionFailedEvent) def on_method_execution_failed(source, event: MethodExecutionFailedEvent): - if self.current_method_branch: - self.current_method_branch.label = Text( - "โŒ Failed: ", style="red bold" - ) + Text(event.method_name, style="red") - self.console.print(self.current_flow_tree) - self.console.print() + if self.current_method_branch and self.current_flow_tree: + self.formatter.update_method_status( + self.current_method_branch, + self.current_flow_tree, + event.method_name, + "failed", + ) # ----------- TOOL USAGE EVENTS ----------- @crewai_event_bus.on(ToolUsageStartedEvent) def on_tool_usage_started(source, event: ToolUsageStartedEvent): - if self.verbose and self.current_agent_branch: - # Update tool usage count - self.tool_usage_counts[event.tool_name] = ( - self.tool_usage_counts.get(event.tool_name, 0) + 1 - ) - - # Find existing tool node or create new one - tool_node = None - for child in self.current_agent_branch.children: - if event.tool_name in child.label.plain: - tool_node = child - break - - if not tool_node: - # Create new tool node - self.current_tool_branch = self.current_agent_branch.add("") - else: - self.current_tool_branch = tool_node - - # Update label with current count - self._update_tree_label( - self.current_tool_branch, - "๐Ÿ”ง", - f"Using {event.tool_name} ({self.tool_usage_counts[event.tool_name]})", - "yellow", - ) - - self.console.print(self.current_crew_tree) - self.console.print() + self.formatter.handle_tool_usage_started( + self.current_agent_branch, event.tool_name, self.current_crew_tree + ) @crewai_event_bus.on(ToolUsageFinishedEvent) def on_tool_usage_finished(source, event: ToolUsageFinishedEvent): - if self.verbose and self.current_tool_branch: - self._update_tree_label( - self.current_tool_branch, - "๐Ÿ”ง", - f"Used {event.tool_name} ({self.tool_usage_counts[event.tool_name]})", - "green", - ) - self.console.print(self.current_crew_tree) - self.console.print() + self.formatter.handle_tool_usage_finished( + self.current_tool_branch, event.tool_name, self.current_crew_tree + ) @crewai_event_bus.on(ToolUsageErrorEvent) def on_tool_usage_error(source, event: ToolUsageErrorEvent): - if self.verbose: - if self.current_tool_branch: - self._update_tree_label( - self.current_tool_branch, - "๐Ÿ”ง Failed", - f"{event.tool_name} ({self.tool_usage_counts[event.tool_name]})", - "red", - ) - self.console.print(self.current_crew_tree) - self.console.print() - - # Show error panel - error_content = self._create_status_content( - "Tool Usage Failed", event.tool_name, "red", Error=event.error - ) - panel = self._create_panel(error_content, "Tool Error", "red") - self.console.print(panel) - self.console.print() + self.formatter.handle_tool_usage_error( + self.current_tool_branch, + event.tool_name, + event.error, + self.current_crew_tree, + ) # ----------- LLM EVENTS ----------- @crewai_event_bus.on(LLMCallStartedEvent) def on_llm_call_started(source, event: LLMCallStartedEvent): - if self.verbose and self.current_agent_branch: - if not any( - "Thinking" in str(child.label) - for child in self.current_agent_branch.children - ): - self.current_tool_branch = self.current_agent_branch.add("") - self._update_tree_label( - self.current_tool_branch, "๐Ÿง ", "Thinking...", "blue" - ) - self.console.print(self.current_crew_tree) - self.console.print() + self.formatter.handle_llm_call_started( + self.current_agent_branch, self.current_crew_tree + ) @crewai_event_bus.on(LLMCallCompletedEvent) def on_llm_call_completed(source, event: LLMCallCompletedEvent): - if self.verbose and self.current_tool_branch: - # Remove the thinking status node when complete - if "Thinking" in str(self.current_tool_branch.label): - if self.current_agent_branch: - self.current_agent_branch.children.remove( - self.current_tool_branch - ) - self.console.print(self.current_crew_tree) - self.console.print() + self.formatter.handle_llm_call_completed( + self.current_tool_branch, + self.current_agent_branch, + self.current_crew_tree, + ) @crewai_event_bus.on(LLMCallFailedEvent) def on_llm_call_failed(source, event: LLMCallFailedEvent): - if self.verbose: - error_content = Text() - error_content.append("โŒ LLM Call Failed\n", style="red bold") - error_content.append("Error: ", style="white") - error_content.append(str(event.error), style="red") - - # Update under the agent branch if it exists - if self.current_tool_branch: - self.current_tool_branch.label = Text( - "โŒ LLM Failed", style="red bold" - ) - self.console.print(self.current_crew_tree) - self.console.print() - - # Show error panel - panel = self._create_panel(error_content, "LLM Error", "red") - self.console.print(panel) - self.console.print() + self.formatter.handle_llm_call_failed( + self.current_tool_branch, event.error, self.current_crew_tree + ) @crewai_event_bus.on(LLMStreamChunkEvent) def on_llm_stream_chunk(source, event: LLMStreamChunkEvent): @@ -613,74 +310,20 @@ class EventListener(BaseEventListener): event.eval_llm or "", ) - if self.verbose: - content = Text() - content.append("๐Ÿงช Starting Crew Test\n\n", style="blue bold") - content.append("Crew: ", style="white") - content.append(f"{event.crew_name}\n", style="blue") - content.append("ID: ", style="white") - content.append(str(source.id), style="blue") - content.append("\nIterations: ", style="white") - content.append(str(event.n_iterations), style="yellow") - - panel = self._create_panel(content, "Test Execution", "blue") - - self.console.print() - self.console.print(panel) - self.console.print() - - # Create and display the test tree - test_label = Text() - test_label.append("๐Ÿงช Test: ", style="blue bold") - test_label.append(event.crew_name or "Crew", style="blue") - test_label.append("\n Status: ", style="white") - test_label.append("In Progress", style="yellow") - - self.current_flow_tree = Tree(test_label) - self._add_tree_node( - self.current_flow_tree, "๐Ÿ”„ Running tests...", "yellow" - ) - - self.console.print(self.current_flow_tree) - self.console.print() + self.formatter.handle_crew_test_started( + event.crew_name or "Crew", source.id, event.n_iterations + ) @crewai_event_bus.on(CrewTestCompletedEvent) def on_crew_test_completed(source, event: CrewTestCompletedEvent): - if self.verbose: - if self.current_flow_tree: - # Update test tree label to show completion - test_label = Text() - test_label.append("โœ… Test: ", style="green bold") - test_label.append(event.crew_name or "Crew", style="green") - test_label.append("\n Status: ", style="white") - test_label.append("Completed", style="green bold") - self.current_flow_tree.label = test_label + self.formatter.handle_crew_test_completed( + self.current_flow_tree, + event.crew_name or "Crew", + ) - # Update the running tests node - for child in self.current_flow_tree.children: - if "Running tests" in str(child.label): - child.label = Text( - "โœ… Tests completed successfully", style="green" - ) - - self.console.print(self.current_flow_tree) - self.console.print() - - # Create completion panel - completion_content = Text() - completion_content.append( - "Test Execution Completed\n", style="green bold" - ) - completion_content.append("Crew: ", style="white") - completion_content.append(f"{event.crew_name}\n", style="green") - completion_content.append("Status: ", style="white") - completion_content.append("All tests passed", style="green") - - panel = self._create_panel( - completion_content, "Test Completion", "green" - ) - self.console.print(panel) - self.console.print() + @crewai_event_bus.on(CrewTestFailedEvent) + def on_crew_test_failed(source, event: CrewTestFailedEvent): + self.formatter.handle_crew_test_failed(event.crew_name or "Crew") event_listener = EventListener() diff --git a/src/crewai/utilities/events/utils/console_formatter.py b/src/crewai/utilities/events/utils/console_formatter.py new file mode 100644 index 000000000..a279d3ff6 --- /dev/null +++ b/src/crewai/utilities/events/utils/console_formatter.py @@ -0,0 +1,589 @@ +from typing import Dict, Optional + +from rich.console import Console +from rich.panel import Panel +from rich.text import Text +from rich.tree import Tree + + +class ConsoleFormatter: + current_crew_tree: Optional[Tree] = None + current_task_branch: Optional[Tree] = None + current_agent_branch: Optional[Tree] = None + current_tool_branch: Optional[Tree] = None + current_flow_tree: Optional[Tree] = None + current_method_branch: Optional[Tree] = None + tool_usage_counts: Dict[str, int] = {} + + def __init__(self, verbose: bool = False): + self.console = Console(width=None) + self.verbose = verbose + + def create_panel(self, content: Text, title: str, style: str = "blue") -> Panel: + """Create a standardized panel with consistent styling.""" + return Panel( + content, + title=title, + border_style=style, + padding=(1, 2), + ) + + def create_status_content( + self, title: str, name: str, status_style: str = "blue", **fields + ) -> Text: + """Create standardized status content with consistent formatting.""" + content = Text() + content.append(f"{title}\n", style=f"{status_style} bold") + content.append("Name: ", style="white") + content.append(f"{name}\n", style=status_style) + + for label, value in fields.items(): + content.append(f"{label}: ", style="white") + content.append( + f"{value}\n", style=fields.get(f"{label}_style", status_style) + ) + + return content + + def update_tree_label( + self, + tree: Tree, + prefix: str, + name: str, + style: str = "blue", + status: Optional[str] = None, + ) -> None: + """Update tree label with consistent formatting.""" + label = Text() + label.append(f"{prefix} ", style=f"{style} bold") + label.append(name, style=style) + if status: + label.append("\n Status: ", style="white") + label.append(status, style=f"{style} bold") + tree.label = label + + def add_tree_node(self, parent: Tree, text: str, style: str = "yellow") -> Tree: + """Add a node to the tree with consistent styling.""" + return parent.add(Text(text, style=style)) + + def print(self, *args, **kwargs) -> None: + """Print to console with consistent formatting if verbose is enabled.""" + if self.verbose: + self.console.print(*args, **kwargs) + + def print_panel( + self, content: Text, title: str, style: str = "blue", is_flow: bool = False + ) -> None: + """Print a panel with consistent formatting if verbose is enabled.""" + panel = self.create_panel(content, title, style) + if is_flow: + self.print(panel) + self.print() + else: + if self.verbose: + self.print(panel) + self.print() + + def update_crew_tree( + self, + tree: Tree, + crew_name: str, + source_id: str, + status: str = "completed", + ) -> None: + """Handle crew tree updates with consistent formatting.""" + if not self.verbose: + return + + if status == "completed": + prefix, style = "โœ… Crew:", "green" + title = "Crew Completion" + content_title = "Crew Execution Completed" + elif status == "failed": + prefix, style = "โŒ Crew:", "red" + title = "Crew Failure" + content_title = "Crew Execution Failed" + else: + prefix, style = "๐Ÿš€ Crew:", "cyan" + title = "Crew Execution" + content_title = "Crew Execution Started" + + self.update_tree_label( + tree, + prefix, + crew_name or "Crew", + style, + ) + + content = self.create_status_content( + content_title, + crew_name or "Crew", + style, + ID=source_id, + ) + + self.print(tree) + self.print() + self.print_panel(content, title, style) + + def create_crew_tree(self, crew_name: str, source_id: str) -> Optional[Tree]: + """Create and initialize a new crew tree with initial status.""" + if not self.verbose: + return None + + tree = Tree( + Text("๐Ÿš€ Crew: ", style="cyan bold") + Text(crew_name, style="cyan") + ) + + content = self.create_status_content( + "Crew Execution Started", + crew_name, + "cyan", + ID=source_id, + ) + + self.print_panel(content, "Crew Execution Started", "cyan") + return tree + + def create_task_branch( + self, crew_tree: Optional[Tree], task_id: str + ) -> Optional[Tree]: + """Create and initialize a task branch.""" + if not self.verbose: + return None + + task_content = Text() + task_content.append(f"๐Ÿ“‹ Task: {task_id}", style="yellow bold") + task_content.append("\n Status: ", style="white") + task_content.append("Executing Task...", style="yellow dim") + + if crew_tree: + task_branch = crew_tree.add(task_content) + self.print(crew_tree) + else: + self.print_panel(task_content, "Task Started", "yellow") + + self.print() + return task_branch if crew_tree else None + + def update_task_status( + self, crew_tree: Tree, task_id: str, agent_role: str, status: str = "completed" + ) -> None: + """Update task status in the tree.""" + if not self.verbose: + return + + if status == "completed": + style = "green" + status_text = "โœ… Completed" + panel_title = "Task Completion" + else: + style = "red" + status_text = "โŒ Failed" + panel_title = "Task Failure" + + # Update tree label + for branch in crew_tree.children: + if str(task_id) in str(branch.label): + task_content = Text() + task_content.append(f"๐Ÿ“‹ Task: {task_id}", style=f"{style} bold") + task_content.append("\n Assigned to: ", style="white") + task_content.append(agent_role, style=style) + task_content.append("\n Status: ", style="white") + task_content.append(status_text, style=f"{style} bold") + branch.label = task_content + self.print(crew_tree) + break + + # Show status panel + content = self.create_status_content( + f"Task {status.title()}", str(task_id), style, Agent=agent_role + ) + self.print_panel(content, panel_title, style) + + def create_agent_branch( + self, task_branch: Optional[Tree], agent_role: str, crew_tree: Optional[Tree] + ) -> Optional[Tree]: + """Create and initialize an agent branch.""" + if not self.verbose or not task_branch or not crew_tree: + return None + + agent_branch = task_branch.add("") + self.update_tree_label( + agent_branch, "๐Ÿค– Agent:", agent_role, "green", "In Progress" + ) + + self.print(crew_tree) + self.print() + return agent_branch + + def update_agent_status( + self, + agent_branch: Tree, + agent_role: str, + crew_tree: Tree, + status: str = "completed", + ) -> None: + """Update agent status in the tree.""" + if not self.verbose: + return + + self.update_tree_label( + agent_branch, + "๐Ÿค– Agent:", + agent_role, + "green", + "โœ… Completed" if status == "completed" else "โŒ Failed", + ) + + self.print(crew_tree) + self.print() + + def create_flow_tree(self, flow_name: str) -> Optional[Tree]: + """Create and initialize a flow tree.""" + # if not self.verbose: + # return None + + content = self.create_status_content( + "Starting Flow Execution", flow_name, "blue" + ) + self.print_panel(content, "Flow Execution", "blue", is_flow=True) + + # Create initial tree + flow_label = Text() + flow_label.append("๐ŸŒŠ Flow: ", style="blue bold") + flow_label.append(flow_name, style="blue") + + flow_tree = Tree(flow_label) + self.add_tree_node(flow_tree, "โœจ Created", "blue") + self.add_tree_node(flow_tree, "โœ… Initialization Complete", "green") + + self.print(flow_tree) + self.print() + return flow_tree + + def start_flow(self, flow_name: str) -> Optional[Tree]: + """Initialize a flow execution tree.""" + # if not self.verbose: + # return None + + flow_tree = Tree("") + self.update_tree_label(flow_tree, "๐ŸŒŠ Flow:", flow_name, "blue", "In Progress") + self.add_tree_node(flow_tree, "๐Ÿง  Initializing...", "yellow") + + self.print(flow_tree) + self.print() + return flow_tree + + def update_flow_status( + self, flow_tree: Tree, flow_name: str, flow_id: str, status: str = "completed" + ) -> None: + """Update flow status in the tree.""" + # if not self.verbose: + # return + + self.update_tree_label( + flow_tree, + "โœ… Flow Finished:" if status == "completed" else "โŒ Flow Failed:", + flow_name, + "green" if status == "completed" else "red", + ) + + content = self.create_status_content( + ( + "Flow Execution Completed" + if status == "completed" + else "Flow Execution Failed" + ), + flow_name, + "green" if status == "completed" else "red", + ID=flow_id, + ) + self.print_panel( + content, "Flow Completion", "green" if status == "completed" else "red" + ) + + def update_method_status( + self, + method_branch: Optional[Tree], + flow_tree: Tree, + method_name: str, + status: str = "running", + ) -> Optional[Tree]: + """Update method status in the flow tree.""" + # if not flow_tree: + # return None + + if status == "running": + prefix, style = "๐Ÿ”„ Running:", "yellow" + elif status == "completed": + prefix, style = "โœ… Completed:", "green" + else: + prefix, style = "โŒ Failed:", "red" + + if not method_branch: + # Find or create method branch + for branch in flow_tree.children: + if method_name in str(branch.label): + method_branch = branch + break + if not method_branch: + method_branch = flow_tree.add("") + + method_branch.label = Text(prefix, style=f"{style} bold") + Text( + f" {method_name}", style=style + ) + + self.print(flow_tree) + self.print() + return method_branch + + def handle_tool_usage_started( + self, agent_branch: Tree, tool_name: str, crew_tree: Tree + ) -> Optional[Tree]: + """Handle tool usage started event.""" + if not self.verbose or not agent_branch: + return None + + # Update tool usage count + self.tool_usage_counts[tool_name] = self.tool_usage_counts.get(tool_name, 0) + 1 + + # Find existing tool node or create new one + tool_branch = None + for child in agent_branch.children: + if tool_name in str(child.label): + tool_branch = child + break + + if not tool_branch: + tool_branch = agent_branch.add("") + + # Update label with current count + self.update_tree_label( + tool_branch, + "๐Ÿ”ง", + f"Using {tool_name} ({self.tool_usage_counts[tool_name]})", + "yellow", + ) + + self.print(crew_tree) + self.print() + return tool_branch + + def handle_tool_usage_finished( + self, tool_branch: Tree, tool_name: str, crew_tree: Tree + ) -> None: + """Handle tool usage finished event.""" + if not self.verbose or not tool_branch: + return + + self.update_tree_label( + tool_branch, + "๐Ÿ”ง", + f"Used {tool_name} ({self.tool_usage_counts[tool_name]})", + "green", + ) + self.print(crew_tree) + self.print() + + def handle_tool_usage_error( + self, + tool_branch: Optional[Tree], + tool_name: str, + error: str, + crew_tree: Optional[Tree], + ) -> None: + """Handle tool usage error event.""" + if not self.verbose: + return + + if tool_branch: + self.update_tree_label( + tool_branch, + "๐Ÿ”ง Failed", + f"{tool_name} ({self.tool_usage_counts[tool_name]})", + "red", + ) + self.print(crew_tree) + self.print() + + # Show error panel + error_content = self.create_status_content( + "Tool Usage Failed", tool_name, "red", Error=error + ) + self.print_panel(error_content, "Tool Error", "red") + + def handle_llm_call_started( + self, agent_branch: Tree, crew_tree: Tree + ) -> Optional[Tree]: + """Handle LLM call started event.""" + if not self.verbose or not agent_branch: + return None + + # Only add thinking status if it doesn't exist + if not any("Thinking" in str(child.label) for child in agent_branch.children): + tool_branch = agent_branch.add("") + self.update_tree_label(tool_branch, "๐Ÿง ", "Thinking...", "blue") + self.print(crew_tree) + self.print() + return tool_branch + return None + + def handle_llm_call_completed( + self, tool_branch: Tree, agent_branch: Tree, crew_tree: Tree + ) -> None: + """Handle LLM call completed event.""" + if not self.verbose or not tool_branch: + return + + # Remove the thinking status node when complete + if "Thinking" in str(tool_branch.label) and agent_branch: + agent_branch.children.remove(tool_branch) + self.print(crew_tree) + self.print() + + def handle_llm_call_failed( + self, tool_branch: Optional[Tree], error: str, crew_tree: Optional[Tree] + ) -> None: + """Handle LLM call failed event.""" + if not self.verbose: + return + + # Update tool branch if it exists + if tool_branch: + tool_branch.label = Text("โŒ LLM Failed", style="red bold") + self.print(crew_tree) + self.print() + + # Show error panel + error_content = Text() + error_content.append("โŒ LLM Call Failed\n", style="red bold") + error_content.append("Error: ", style="white") + error_content.append(str(error), style="red") + + self.print_panel(error_content, "LLM Error", "red") + + def handle_crew_test_started( + self, crew_name: str, source_id: str, n_iterations: int + ) -> Optional[Tree]: + """Handle crew test started event.""" + if not self.verbose: + return None + + # Create initial panel + content = Text() + content.append("๐Ÿงช Starting Crew Test\n\n", style="blue bold") + content.append("Crew: ", style="white") + content.append(f"{crew_name}\n", style="blue") + content.append("ID: ", style="white") + content.append(str(source_id), style="blue") + content.append("\nIterations: ", style="white") + content.append(str(n_iterations), style="yellow") + + self.print() + self.print_panel(content, "Test Execution", "blue") + self.print() + + # Create and display the test tree + test_label = Text() + test_label.append("๐Ÿงช Test: ", style="blue bold") + test_label.append(crew_name or "Crew", style="blue") + test_label.append("\n Status: ", style="white") + test_label.append("In Progress", style="yellow") + + test_tree = Tree(test_label) + self.add_tree_node(test_tree, "๐Ÿ”„ Running tests...", "yellow") + + self.print(test_tree) + self.print() + return test_tree + + def handle_crew_test_completed( + self, flow_tree: Optional[Tree], crew_name: str + ) -> None: + """Handle crew test completed event.""" + if not self.verbose: + return + + if flow_tree: + # Update test tree label to show completion + test_label = Text() + test_label.append("โœ… Test: ", style="green bold") + test_label.append(crew_name or "Crew", style="green") + test_label.append("\n Status: ", style="white") + test_label.append("Completed", style="green bold") + flow_tree.label = test_label + + # Update the running tests node + for child in flow_tree.children: + if "Running tests" in str(child.label): + child.label = Text("โœ… Tests completed successfully", style="green") + + self.print(flow_tree) + self.print() + + # Create completion panel + completion_content = Text() + completion_content.append("Test Execution Completed\n", style="green bold") + completion_content.append("Crew: ", style="white") + completion_content.append(f"{crew_name}\n", style="green") + completion_content.append("Status: ", style="white") + completion_content.append("Completed", style="green") + + self.print_panel(completion_content, "Test Completion", "green") + + def handle_crew_train_started(self, crew_name: str, timestamp: str) -> None: + """Handle crew train started event.""" + if not self.verbose: + return + + content = Text() + content.append("๐Ÿ“‹ Crew Training Started\n", style="blue bold") + content.append("Crew: ", style="white") + content.append(f"{crew_name}\n", style="blue") + content.append("Time: ", style="white") + content.append(timestamp, style="blue") + + self.print_panel(content, "Training Started", "blue") + self.print() + + def handle_crew_train_completed(self, crew_name: str, timestamp: str) -> None: + """Handle crew train completed event.""" + if not self.verbose: + return + + content = Text() + content.append("โœ… Crew Training Completed\n", style="green bold") + content.append("Crew: ", style="white") + content.append(f"{crew_name}\n", style="green") + content.append("Time: ", style="white") + content.append(timestamp, style="green") + + self.print_panel(content, "Training Completed", "green") + self.print() + + def handle_crew_train_failed(self, crew_name: str) -> None: + """Handle crew train failed event.""" + if not self.verbose: + return + + failure_content = Text() + failure_content.append("โŒ Crew Training Failed\n", style="red bold") + failure_content.append("Crew: ", style="white") + failure_content.append(crew_name or "Crew", style="red") + + self.print_panel(failure_content, "Training Failure", "red") + self.print() + + def handle_crew_test_failed(self, crew_name: str) -> None: + """Handle crew test failed event.""" + if not self.verbose: + return + + failure_content = Text() + failure_content.append("โŒ Crew Test Failed\n", style="red bold") + failure_content.append("Crew: ", style="white") + failure_content.append(crew_name or "Crew", style="red") + + self.print_panel(failure_content, "Test Failure", "red") + self.print()