diff --git a/src/crewai/utilities/events/event_listener.py b/src/crewai/utilities/events/event_listener.py index 52c042321..f978918df 100644 --- a/src/crewai/utilities/events/event_listener.py +++ b/src/crewai/utilities/events/event_listener.py @@ -150,9 +150,12 @@ class EventListener(BaseEventListener): def on_task_started(source, event: TaskStartedEvent): span = self._telemetry.task_started(crew=source.agent.crew, task=source) self.execution_spans[source] = span - self.formatter.create_task_branch( + task_branch = self.formatter.create_task_branch( self.formatter.current_crew_tree, source.id ) + # Also update current_agent_branch since reasoning happens before agent execution starts + # and the agent branch is the same as the task branch anyway + self.formatter.current_agent_branch = task_branch @crewai_event_bus.on(TaskCompletedEvent) def on_task_completed(source, event: TaskCompletedEvent): @@ -169,6 +172,12 @@ class EventListener(BaseEventListener): "completed", ) + # Reset branch pointers for clean state management + self.formatter.current_task_branch = None + self.formatter.current_agent_branch = None + self.formatter.current_tool_branch = None + self.formatter.current_reasoning_branch = None + @crewai_event_bus.on(TaskFailedEvent) def on_task_failed(source, event: TaskFailedEvent): span = self.execution_spans.get(source) @@ -184,6 +193,12 @@ class EventListener(BaseEventListener): "failed", ) + # Reset branch pointers for clean state management + self.formatter.current_task_branch = None + self.formatter.current_agent_branch = None + self.formatter.current_tool_branch = None + self.formatter.current_reasoning_branch = None + # ----------- AGENT EVENTS ----------- @crewai_event_bus.on(AgentExecutionStartedEvent) diff --git a/src/crewai/utilities/events/utils/console_formatter.py b/src/crewai/utilities/events/utils/console_formatter.py index bcf12caaf..4a0006a55 100644 --- a/src/crewai/utilities/events/utils/console_formatter.py +++ b/src/crewai/utilities/events/utils/console_formatter.py @@ -1,3 +1,5 @@ +import threading +import time from typing import Any, Dict, Optional from rich.console import Console @@ -7,6 +9,51 @@ from rich.tree import Tree from rich.live import Live +class SimpleSpinner: + """A simple spinner using regular string characters.""" + + def __init__(self): + self.frames = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"] + self.current_frame = 0 + self.is_spinning = False + self._stop_event = threading.Event() + self._thread = None + + def start(self, text: str = "") -> None: + """Start the spinner with optional text.""" + if self.is_spinning: + return + + self.is_spinning = True + self._stop_event.clear() + self._thread = threading.Thread(target=self._spin, args=(text,)) + self._thread.daemon = True + self._thread.start() + + def stop(self) -> None: + """Stop the spinner.""" + if not self.is_spinning: + return + + self.is_spinning = False + self._stop_event.set() + if self._thread: + self._thread.join(timeout=0.1) + + def _spin(self, text: str) -> None: + """Internal spinning method.""" + while not self._stop_event.is_set(): + frame = self.frames[self.current_frame] + print(f"\r{frame} {text}", end="", flush=True) + self.current_frame = (self.current_frame + 1) % len(self.frames) + time.sleep(0.1) + print("\r" + " " * (len(text) + 10) + "\r", end="", flush=True) # Clear the line + + def get_current_frame(self) -> str: + """Get the current spinner frame.""" + return self.frames[self.current_frame] + + class ConsoleFormatter: current_crew_tree: Optional[Tree] = None current_task_branch: Optional[Tree] = None @@ -22,6 +69,9 @@ class ConsoleFormatter: def __init__(self, verbose: bool = False): self.console = Console(width=None) self.verbose = verbose + self.spinner = SimpleSpinner() + self._spinning_branches = {} # Track which branches are spinning + self._spinner_timer = None # Live instance to dynamically update a Tree renderable (e.g. the Crew tree) # When multiple Tree objects are printed sequentially we reuse this Live # instance so the previous render is replaced instead of writing a new one. @@ -428,6 +478,10 @@ class ConsoleFormatter: return method_branch def get_llm_tree(self, tool_name: str): + # Skip displaying reasoning-related function calls as they have their own handlers + if tool_name == "create_reasoning_plan": + return None + text = Text() text.append(f"🔧 Using {tool_name} from LLM available_function", style="yellow") @@ -442,7 +496,14 @@ class ConsoleFormatter: self, tool_name: str, ): + # Skip reasoning functions as they have dedicated handlers + if tool_name == "create_reasoning_plan": + return None + tree = self.get_llm_tree(tool_name) + if tree is None: + return None + self.add_tree_node(tree, "🔄 Tool Usage Started", "green") self.print(tree) self.print() @@ -452,7 +513,14 @@ class ConsoleFormatter: self, tool_name: str, ): + # Skip reasoning functions as they have dedicated handlers + if tool_name == "create_reasoning_plan": + return + tree = self.get_llm_tree(tool_name) + if tree is None: + return + self.add_tree_node(tree, "✅ Tool Usage Completed", "green") self.print(tree) self.print() @@ -462,7 +530,14 @@ class ConsoleFormatter: tool_name: str, error: str, ): + # Skip reasoning functions as they have dedicated handlers + if tool_name == "create_reasoning_plan": + return + tree = self.get_llm_tree(tool_name) + if tree is None: + return + self.add_tree_node(tree, "❌ Tool Usage Failed", "red") self.print(tree) self.print() @@ -508,17 +583,9 @@ class ConsoleFormatter: tool_branch = branch_to_use.add("") self.current_tool_branch = tool_branch - # Update label with current count - self.update_tree_label( - tool_branch, - "🔧", - f"Using {tool_name} ({self.tool_usage_counts[tool_name]})", - "yellow", - ) - - # Print updated tree immediately - self.print(tree_to_use) - self.print() + # Start animated spinner for tool usage + tool_text = f"Using {tool_name} ({self.tool_usage_counts[tool_name]})" + self._start_spinner_animation(tool_branch, "🔧", tool_text, "yellow") return tool_branch @@ -532,6 +599,9 @@ class ConsoleFormatter: if not self.verbose or tool_branch is None: return + # Stop spinner animation + self._stop_spinner_animation(tool_branch) + # Decide which tree to render: prefer full crew tree, else parent branch tree_to_use = self.current_crew_tree or crew_tree or self.current_task_branch if tree_to_use is None: @@ -613,10 +683,9 @@ class ConsoleFormatter: # Only add thinking status if we don't have a current tool branch if self.current_tool_branch is None: tool_branch = branch_to_use.add("") - self.update_tree_label(tool_branch, "🧠", "Thinking...", "blue") self.current_tool_branch = tool_branch - self.print(tree_to_use) - self.print() + # Start animated spinner for thinking + self._start_spinner_animation(tool_branch, "🧠", "Thinking...", "blue") return tool_branch return None @@ -630,6 +699,9 @@ class ConsoleFormatter: if not self.verbose or tool_branch is None: return + # Stop spinner animation for this branch + self._stop_spinner_animation(tool_branch) + # Decide which tree to render: prefer full crew tree, else parent branch tree_to_use = self.current_crew_tree or crew_tree or self.current_task_branch if tree_to_use is None: @@ -920,12 +992,9 @@ class ConsoleFormatter: return None knowledge_branch = branch_to_use.add("") - self.update_tree_label( - knowledge_branch, "🔍", "Knowledge Retrieval Started", "blue" - ) + # Start animated spinner for knowledge retrieval + self._start_spinner_animation(knowledge_branch, "🔍", "Knowledge Retrieval Started", "blue") - self.print(tree_to_use) - self.print() return knowledge_branch def handle_knowledge_retrieval_completed( @@ -963,6 +1032,8 @@ class ConsoleFormatter: knowledge_branch_found = False for child in branch_to_use.children: if "Knowledge Retrieval Started" in str(child.label): + # Stop spinner and update label + self._stop_spinner_animation(child) self.update_tree_label( child, "✅", "Knowledge Retrieval Completed", "green" ) @@ -976,6 +1047,7 @@ class ConsoleFormatter: and "Started" not in str(child.label) and "Completed" not in str(child.label) ): + self._stop_spinner_animation(child) self.update_tree_label( child, "✅", "Knowledge Retrieval Completed", "green" ) @@ -1132,12 +1204,13 @@ class ConsoleFormatter: reasoning_branch = branch_to_use.add("") self.current_reasoning_branch = reasoning_branch - # Build label text depending on attempt + # Build label text depending on attempt and start spinner status_text = ( f"Reasoning (Attempt {attempt})" if attempt > 1 else "Reasoning..." ) - self.update_tree_label(reasoning_branch, "🧠", status_text, "blue") + self._start_spinner_animation(reasoning_branch, "🧠", status_text, "blue") + # Show initial display self.print(tree_to_use) self.print() @@ -1161,10 +1234,12 @@ class ConsoleFormatter: or crew_tree ) - style = "green" if ready else "yellow" - status_text = "Reasoning Completed" if ready else "Reasoning Completed (Not Ready)" - if reasoning_branch is not None: + # Stop spinner animation + self._stop_spinner_animation(reasoning_branch) + + style = "green" if ready else "yellow" + status_text = "Reasoning Completed" if ready else "Reasoning Completed (Not Ready)" self.update_tree_label(reasoning_branch, "✅", status_text, style) if tree_to_use is not None: @@ -1175,7 +1250,7 @@ class ConsoleFormatter: plan_panel = Panel( Text(plan, style="white"), title="🧠 Reasoning Plan", - border_style=style, + border_style="green" if ready else "yellow", padding=(1, 2), ) self.print(plan_panel) @@ -1219,3 +1294,87 @@ class ConsoleFormatter: # Clear stored branch after failure self.current_reasoning_branch = None + + def _start_spinner_animation(self, branch: Tree, prefix: str, text: str, style: str) -> None: + """Start animating a spinner for a specific branch.""" + if not self.verbose: + return + + self._spinning_branches[id(branch)] = { + 'branch': branch, + 'prefix': prefix, + 'text': text, + 'style': style, + 'frame_index': 0 + } + + # Start the animation timer if not already running + if self._spinner_timer is None: + self._start_spinner_timer() + + def _stop_spinner_animation(self, branch: Tree) -> None: + """Stop animating a spinner for a specific branch.""" + if id(branch) in self._spinning_branches: + del self._spinning_branches[id(branch)] + + # Stop the timer if no more spinners + if not self._spinning_branches and self._spinner_timer: + self._spinner_timer.cancel() + self._spinner_timer = None + + def _start_spinner_timer(self) -> None: + """Start the spinner animation timer.""" + if self._spinner_timer is not None: + return + + def update_spinners(): + if not self._spinning_branches: + self._spinner_timer = None + return + + # Update all spinning branches + for branch_data in self._spinning_branches.values(): + branch = branch_data['branch'] + prefix = branch_data['prefix'] + text = branch_data['text'] + style = branch_data['style'] + + # Get next spinner frame + frame_index = branch_data['frame_index'] + spinner_frame = self.spinner.frames[frame_index] + branch_data['frame_index'] = (frame_index + 1) % len(self.spinner.frames) + + # Update tree label + self.update_tree_label(branch, f"{prefix} {spinner_frame}", text, style) + + # Refresh the display if we have a tree to show + tree_to_show = ( + self.current_crew_tree or + self.current_flow_tree or + self.current_lite_agent_branch + ) + if tree_to_show: + self.print(tree_to_show) + + # Schedule next update + self._spinner_timer = threading.Timer(0.2, update_spinners) + self._spinner_timer.start() + + self._spinner_timer = threading.Timer(0.2, update_spinners) + self._spinner_timer.start() + + def cleanup(self) -> None: + """Clean up resources including stopping any running timers.""" + if self._spinner_timer: + self._spinner_timer.cancel() + self._spinner_timer = None + self._spinning_branches.clear() + if self.spinner.is_spinning: + self.spinner.stop() + + def __del__(self) -> None: + """Destructor to ensure cleanup on object deletion.""" + try: + self.cleanup() + except: + pass # Ignore errors during cleanup diff --git a/tests/utilities/test_events.py b/tests/utilities/test_events.py index 2f6f11b61..8c934704f 100644 --- a/tests/utilities/test_events.py +++ b/tests/utilities/test_events.py @@ -48,6 +48,9 @@ from crewai.utilities.events.task_events import ( from crewai.utilities.events.tool_usage_events import ( ToolUsageErrorEvent, ) +from crewai.utilities.events.reasoning_events import ( + AgentReasoningStartedEvent, +) @pytest.fixture(scope="module") @@ -779,3 +782,85 @@ def test_streaming_empty_response_handling(): finally: # Restore the original method llm.call = original_call + + +@pytest.mark.vcr(filter_headers=["authorization"]) +def test_reasoning_events_attach_to_correct_task(): + """Test that reasoning events are attached to the correct task branch, not previous task's branch.""" + received_events = [] + + @crewai_event_bus.on(TaskStartedEvent) + def handle_task_start(source, event): + received_events.append(('task_started', source.id)) + + @crewai_event_bus.on(AgentReasoningStartedEvent) + def handle_reasoning_start(source, event): + received_events.append(('reasoning_started', event.task_id)) + + @crewai_event_bus.on(TaskCompletedEvent) + def handle_task_complete(source, event): + received_events.append(('task_completed', source.id)) + + # Create agent with reasoning enabled + reasoning_agent = Agent( + role="Test Reasoning Agent", + goal="Test reasoning", + backstory="I test reasoning", + llm="gpt-4o-mini", + reasoning=True, + verbose=True + ) + + # Create two tasks + task1 = Task( + description="First task", + expected_output="First result", + agent=reasoning_agent + ) + + task2 = Task( + description="Second task", + expected_output="Second result", + agent=reasoning_agent + ) + + crew = Crew(agents=[reasoning_agent], tasks=[task1, task2], name="TestCrew") + + # Mock the LLM to provide reasoning responses + def mock_llm_call(messages, *args, **kwargs): + if any("create a detailed plan" in str(msg) for msg in messages): + return "Test plan\n\nREADY: I'm ready to execute." + return "Test execution result" + + reasoning_agent.llm.call = mock_llm_call + + crew.kickoff() + + # Verify events occurred in correct order and with correct task IDs + assert len(received_events) >= 4 # At least: task1_start, reasoning1, task1_complete, task2_start, reasoning2, task2_complete + + # Find reasoning events and their associated task IDs + reasoning_events = [(event_type, task_id) for event_type, task_id in received_events if event_type == 'reasoning_started'] + task_started_events = [(event_type, task_id) for event_type, task_id in received_events if event_type == 'task_started'] + + # Verify we have reasoning events for both tasks + assert len(reasoning_events) >= 2 + assert len(task_started_events) == 2 + + # Verify that each reasoning event has the correct task ID + task1_id = str(task1.id) + task2_id = str(task2.id) + + reasoning_task_ids = [task_id for _, task_id in reasoning_events] + assert task1_id in reasoning_task_ids + assert task2_id in reasoning_task_ids + + # Verify that reasoning events match their respective tasks + for i, (event_type, task_id) in enumerate(received_events): + if event_type == 'reasoning_started': + # Find the preceding task_started event + preceding_task_events = [e for e in received_events[:i] if e[0] == 'task_started'] + if preceding_task_events: + last_task_started_id = preceding_task_events[-1][1] + # The reasoning task_id should match the last started task + assert task_id == last_task_started_id, f"Reasoning event task_id {task_id} doesn't match last started task {last_task_started_id}"