mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-08 07:38:29 +00:00
initial log improvements
This commit is contained in:
@@ -150,9 +150,12 @@ class EventListener(BaseEventListener):
|
||||
def on_task_started(source, event: TaskStartedEvent):
|
||||
span = self._telemetry.task_started(crew=source.agent.crew, task=source)
|
||||
self.execution_spans[source] = span
|
||||
self.formatter.create_task_branch(
|
||||
task_branch = self.formatter.create_task_branch(
|
||||
self.formatter.current_crew_tree, source.id
|
||||
)
|
||||
# Also update current_agent_branch since reasoning happens before agent execution starts
|
||||
# and the agent branch is the same as the task branch anyway
|
||||
self.formatter.current_agent_branch = task_branch
|
||||
|
||||
@crewai_event_bus.on(TaskCompletedEvent)
|
||||
def on_task_completed(source, event: TaskCompletedEvent):
|
||||
@@ -169,6 +172,12 @@ class EventListener(BaseEventListener):
|
||||
"completed",
|
||||
)
|
||||
|
||||
# Reset branch pointers for clean state management
|
||||
self.formatter.current_task_branch = None
|
||||
self.formatter.current_agent_branch = None
|
||||
self.formatter.current_tool_branch = None
|
||||
self.formatter.current_reasoning_branch = None
|
||||
|
||||
@crewai_event_bus.on(TaskFailedEvent)
|
||||
def on_task_failed(source, event: TaskFailedEvent):
|
||||
span = self.execution_spans.get(source)
|
||||
@@ -184,6 +193,12 @@ class EventListener(BaseEventListener):
|
||||
"failed",
|
||||
)
|
||||
|
||||
# Reset branch pointers for clean state management
|
||||
self.formatter.current_task_branch = None
|
||||
self.formatter.current_agent_branch = None
|
||||
self.formatter.current_tool_branch = None
|
||||
self.formatter.current_reasoning_branch = None
|
||||
|
||||
# ----------- AGENT EVENTS -----------
|
||||
|
||||
@crewai_event_bus.on(AgentExecutionStartedEvent)
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
import threading
|
||||
import time
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
from rich.console import Console
|
||||
@@ -7,6 +9,51 @@ from rich.tree import Tree
|
||||
from rich.live import Live
|
||||
|
||||
|
||||
class SimpleSpinner:
|
||||
"""A simple spinner using regular string characters."""
|
||||
|
||||
def __init__(self):
|
||||
self.frames = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]
|
||||
self.current_frame = 0
|
||||
self.is_spinning = False
|
||||
self._stop_event = threading.Event()
|
||||
self._thread = None
|
||||
|
||||
def start(self, text: str = "") -> None:
|
||||
"""Start the spinner with optional text."""
|
||||
if self.is_spinning:
|
||||
return
|
||||
|
||||
self.is_spinning = True
|
||||
self._stop_event.clear()
|
||||
self._thread = threading.Thread(target=self._spin, args=(text,))
|
||||
self._thread.daemon = True
|
||||
self._thread.start()
|
||||
|
||||
def stop(self) -> None:
|
||||
"""Stop the spinner."""
|
||||
if not self.is_spinning:
|
||||
return
|
||||
|
||||
self.is_spinning = False
|
||||
self._stop_event.set()
|
||||
if self._thread:
|
||||
self._thread.join(timeout=0.1)
|
||||
|
||||
def _spin(self, text: str) -> None:
|
||||
"""Internal spinning method."""
|
||||
while not self._stop_event.is_set():
|
||||
frame = self.frames[self.current_frame]
|
||||
print(f"\r{frame} {text}", end="", flush=True)
|
||||
self.current_frame = (self.current_frame + 1) % len(self.frames)
|
||||
time.sleep(0.1)
|
||||
print("\r" + " " * (len(text) + 10) + "\r", end="", flush=True) # Clear the line
|
||||
|
||||
def get_current_frame(self) -> str:
|
||||
"""Get the current spinner frame."""
|
||||
return self.frames[self.current_frame]
|
||||
|
||||
|
||||
class ConsoleFormatter:
|
||||
current_crew_tree: Optional[Tree] = None
|
||||
current_task_branch: Optional[Tree] = None
|
||||
@@ -22,6 +69,9 @@ class ConsoleFormatter:
|
||||
def __init__(self, verbose: bool = False):
|
||||
self.console = Console(width=None)
|
||||
self.verbose = verbose
|
||||
self.spinner = SimpleSpinner()
|
||||
self._spinning_branches = {} # Track which branches are spinning
|
||||
self._spinner_timer = None
|
||||
# Live instance to dynamically update a Tree renderable (e.g. the Crew tree)
|
||||
# When multiple Tree objects are printed sequentially we reuse this Live
|
||||
# instance so the previous render is replaced instead of writing a new one.
|
||||
@@ -428,6 +478,10 @@ class ConsoleFormatter:
|
||||
return method_branch
|
||||
|
||||
def get_llm_tree(self, tool_name: str):
|
||||
# Skip displaying reasoning-related function calls as they have their own handlers
|
||||
if tool_name == "create_reasoning_plan":
|
||||
return None
|
||||
|
||||
text = Text()
|
||||
text.append(f"🔧 Using {tool_name} from LLM available_function", style="yellow")
|
||||
|
||||
@@ -442,7 +496,14 @@ class ConsoleFormatter:
|
||||
self,
|
||||
tool_name: str,
|
||||
):
|
||||
# Skip reasoning functions as they have dedicated handlers
|
||||
if tool_name == "create_reasoning_plan":
|
||||
return None
|
||||
|
||||
tree = self.get_llm_tree(tool_name)
|
||||
if tree is None:
|
||||
return None
|
||||
|
||||
self.add_tree_node(tree, "🔄 Tool Usage Started", "green")
|
||||
self.print(tree)
|
||||
self.print()
|
||||
@@ -452,7 +513,14 @@ class ConsoleFormatter:
|
||||
self,
|
||||
tool_name: str,
|
||||
):
|
||||
# Skip reasoning functions as they have dedicated handlers
|
||||
if tool_name == "create_reasoning_plan":
|
||||
return
|
||||
|
||||
tree = self.get_llm_tree(tool_name)
|
||||
if tree is None:
|
||||
return
|
||||
|
||||
self.add_tree_node(tree, "✅ Tool Usage Completed", "green")
|
||||
self.print(tree)
|
||||
self.print()
|
||||
@@ -462,7 +530,14 @@ class ConsoleFormatter:
|
||||
tool_name: str,
|
||||
error: str,
|
||||
):
|
||||
# Skip reasoning functions as they have dedicated handlers
|
||||
if tool_name == "create_reasoning_plan":
|
||||
return
|
||||
|
||||
tree = self.get_llm_tree(tool_name)
|
||||
if tree is None:
|
||||
return
|
||||
|
||||
self.add_tree_node(tree, "❌ Tool Usage Failed", "red")
|
||||
self.print(tree)
|
||||
self.print()
|
||||
@@ -508,17 +583,9 @@ class ConsoleFormatter:
|
||||
tool_branch = branch_to_use.add("")
|
||||
self.current_tool_branch = tool_branch
|
||||
|
||||
# Update label with current count
|
||||
self.update_tree_label(
|
||||
tool_branch,
|
||||
"🔧",
|
||||
f"Using {tool_name} ({self.tool_usage_counts[tool_name]})",
|
||||
"yellow",
|
||||
)
|
||||
|
||||
# Print updated tree immediately
|
||||
self.print(tree_to_use)
|
||||
self.print()
|
||||
# Start animated spinner for tool usage
|
||||
tool_text = f"Using {tool_name} ({self.tool_usage_counts[tool_name]})"
|
||||
self._start_spinner_animation(tool_branch, "🔧", tool_text, "yellow")
|
||||
|
||||
return tool_branch
|
||||
|
||||
@@ -532,6 +599,9 @@ class ConsoleFormatter:
|
||||
if not self.verbose or tool_branch is None:
|
||||
return
|
||||
|
||||
# Stop spinner animation
|
||||
self._stop_spinner_animation(tool_branch)
|
||||
|
||||
# Decide which tree to render: prefer full crew tree, else parent branch
|
||||
tree_to_use = self.current_crew_tree or crew_tree or self.current_task_branch
|
||||
if tree_to_use is None:
|
||||
@@ -613,10 +683,9 @@ class ConsoleFormatter:
|
||||
# Only add thinking status if we don't have a current tool branch
|
||||
if self.current_tool_branch is None:
|
||||
tool_branch = branch_to_use.add("")
|
||||
self.update_tree_label(tool_branch, "🧠", "Thinking...", "blue")
|
||||
self.current_tool_branch = tool_branch
|
||||
self.print(tree_to_use)
|
||||
self.print()
|
||||
# Start animated spinner for thinking
|
||||
self._start_spinner_animation(tool_branch, "🧠", "Thinking...", "blue")
|
||||
return tool_branch
|
||||
return None
|
||||
|
||||
@@ -630,6 +699,9 @@ class ConsoleFormatter:
|
||||
if not self.verbose or tool_branch is None:
|
||||
return
|
||||
|
||||
# Stop spinner animation for this branch
|
||||
self._stop_spinner_animation(tool_branch)
|
||||
|
||||
# Decide which tree to render: prefer full crew tree, else parent branch
|
||||
tree_to_use = self.current_crew_tree or crew_tree or self.current_task_branch
|
||||
if tree_to_use is None:
|
||||
@@ -920,12 +992,9 @@ class ConsoleFormatter:
|
||||
return None
|
||||
|
||||
knowledge_branch = branch_to_use.add("")
|
||||
self.update_tree_label(
|
||||
knowledge_branch, "🔍", "Knowledge Retrieval Started", "blue"
|
||||
)
|
||||
# Start animated spinner for knowledge retrieval
|
||||
self._start_spinner_animation(knowledge_branch, "🔍", "Knowledge Retrieval Started", "blue")
|
||||
|
||||
self.print(tree_to_use)
|
||||
self.print()
|
||||
return knowledge_branch
|
||||
|
||||
def handle_knowledge_retrieval_completed(
|
||||
@@ -963,6 +1032,8 @@ class ConsoleFormatter:
|
||||
knowledge_branch_found = False
|
||||
for child in branch_to_use.children:
|
||||
if "Knowledge Retrieval Started" in str(child.label):
|
||||
# Stop spinner and update label
|
||||
self._stop_spinner_animation(child)
|
||||
self.update_tree_label(
|
||||
child, "✅", "Knowledge Retrieval Completed", "green"
|
||||
)
|
||||
@@ -976,6 +1047,7 @@ class ConsoleFormatter:
|
||||
and "Started" not in str(child.label)
|
||||
and "Completed" not in str(child.label)
|
||||
):
|
||||
self._stop_spinner_animation(child)
|
||||
self.update_tree_label(
|
||||
child, "✅", "Knowledge Retrieval Completed", "green"
|
||||
)
|
||||
@@ -1132,12 +1204,13 @@ class ConsoleFormatter:
|
||||
reasoning_branch = branch_to_use.add("")
|
||||
self.current_reasoning_branch = reasoning_branch
|
||||
|
||||
# Build label text depending on attempt
|
||||
# Build label text depending on attempt and start spinner
|
||||
status_text = (
|
||||
f"Reasoning (Attempt {attempt})" if attempt > 1 else "Reasoning..."
|
||||
)
|
||||
self.update_tree_label(reasoning_branch, "🧠", status_text, "blue")
|
||||
self._start_spinner_animation(reasoning_branch, "🧠", status_text, "blue")
|
||||
|
||||
# Show initial display
|
||||
self.print(tree_to_use)
|
||||
self.print()
|
||||
|
||||
@@ -1161,10 +1234,12 @@ class ConsoleFormatter:
|
||||
or crew_tree
|
||||
)
|
||||
|
||||
style = "green" if ready else "yellow"
|
||||
status_text = "Reasoning Completed" if ready else "Reasoning Completed (Not Ready)"
|
||||
|
||||
if reasoning_branch is not None:
|
||||
# Stop spinner animation
|
||||
self._stop_spinner_animation(reasoning_branch)
|
||||
|
||||
style = "green" if ready else "yellow"
|
||||
status_text = "Reasoning Completed" if ready else "Reasoning Completed (Not Ready)"
|
||||
self.update_tree_label(reasoning_branch, "✅", status_text, style)
|
||||
|
||||
if tree_to_use is not None:
|
||||
@@ -1175,7 +1250,7 @@ class ConsoleFormatter:
|
||||
plan_panel = Panel(
|
||||
Text(plan, style="white"),
|
||||
title="🧠 Reasoning Plan",
|
||||
border_style=style,
|
||||
border_style="green" if ready else "yellow",
|
||||
padding=(1, 2),
|
||||
)
|
||||
self.print(plan_panel)
|
||||
@@ -1219,3 +1294,87 @@ class ConsoleFormatter:
|
||||
|
||||
# Clear stored branch after failure
|
||||
self.current_reasoning_branch = None
|
||||
|
||||
def _start_spinner_animation(self, branch: Tree, prefix: str, text: str, style: str) -> None:
|
||||
"""Start animating a spinner for a specific branch."""
|
||||
if not self.verbose:
|
||||
return
|
||||
|
||||
self._spinning_branches[id(branch)] = {
|
||||
'branch': branch,
|
||||
'prefix': prefix,
|
||||
'text': text,
|
||||
'style': style,
|
||||
'frame_index': 0
|
||||
}
|
||||
|
||||
# Start the animation timer if not already running
|
||||
if self._spinner_timer is None:
|
||||
self._start_spinner_timer()
|
||||
|
||||
def _stop_spinner_animation(self, branch: Tree) -> None:
|
||||
"""Stop animating a spinner for a specific branch."""
|
||||
if id(branch) in self._spinning_branches:
|
||||
del self._spinning_branches[id(branch)]
|
||||
|
||||
# Stop the timer if no more spinners
|
||||
if not self._spinning_branches and self._spinner_timer:
|
||||
self._spinner_timer.cancel()
|
||||
self._spinner_timer = None
|
||||
|
||||
def _start_spinner_timer(self) -> None:
|
||||
"""Start the spinner animation timer."""
|
||||
if self._spinner_timer is not None:
|
||||
return
|
||||
|
||||
def update_spinners():
|
||||
if not self._spinning_branches:
|
||||
self._spinner_timer = None
|
||||
return
|
||||
|
||||
# Update all spinning branches
|
||||
for branch_data in self._spinning_branches.values():
|
||||
branch = branch_data['branch']
|
||||
prefix = branch_data['prefix']
|
||||
text = branch_data['text']
|
||||
style = branch_data['style']
|
||||
|
||||
# Get next spinner frame
|
||||
frame_index = branch_data['frame_index']
|
||||
spinner_frame = self.spinner.frames[frame_index]
|
||||
branch_data['frame_index'] = (frame_index + 1) % len(self.spinner.frames)
|
||||
|
||||
# Update tree label
|
||||
self.update_tree_label(branch, f"{prefix} {spinner_frame}", text, style)
|
||||
|
||||
# Refresh the display if we have a tree to show
|
||||
tree_to_show = (
|
||||
self.current_crew_tree or
|
||||
self.current_flow_tree or
|
||||
self.current_lite_agent_branch
|
||||
)
|
||||
if tree_to_show:
|
||||
self.print(tree_to_show)
|
||||
|
||||
# Schedule next update
|
||||
self._spinner_timer = threading.Timer(0.2, update_spinners)
|
||||
self._spinner_timer.start()
|
||||
|
||||
self._spinner_timer = threading.Timer(0.2, update_spinners)
|
||||
self._spinner_timer.start()
|
||||
|
||||
def cleanup(self) -> None:
|
||||
"""Clean up resources including stopping any running timers."""
|
||||
if self._spinner_timer:
|
||||
self._spinner_timer.cancel()
|
||||
self._spinner_timer = None
|
||||
self._spinning_branches.clear()
|
||||
if self.spinner.is_spinning:
|
||||
self.spinner.stop()
|
||||
|
||||
def __del__(self) -> None:
|
||||
"""Destructor to ensure cleanup on object deletion."""
|
||||
try:
|
||||
self.cleanup()
|
||||
except:
|
||||
pass # Ignore errors during cleanup
|
||||
|
||||
@@ -48,6 +48,9 @@ from crewai.utilities.events.task_events import (
|
||||
from crewai.utilities.events.tool_usage_events import (
|
||||
ToolUsageErrorEvent,
|
||||
)
|
||||
from crewai.utilities.events.reasoning_events import (
|
||||
AgentReasoningStartedEvent,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
@@ -779,3 +782,85 @@ def test_streaming_empty_response_handling():
|
||||
finally:
|
||||
# Restore the original method
|
||||
llm.call = original_call
|
||||
|
||||
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
def test_reasoning_events_attach_to_correct_task():
|
||||
"""Test that reasoning events are attached to the correct task branch, not previous task's branch."""
|
||||
received_events = []
|
||||
|
||||
@crewai_event_bus.on(TaskStartedEvent)
|
||||
def handle_task_start(source, event):
|
||||
received_events.append(('task_started', source.id))
|
||||
|
||||
@crewai_event_bus.on(AgentReasoningStartedEvent)
|
||||
def handle_reasoning_start(source, event):
|
||||
received_events.append(('reasoning_started', event.task_id))
|
||||
|
||||
@crewai_event_bus.on(TaskCompletedEvent)
|
||||
def handle_task_complete(source, event):
|
||||
received_events.append(('task_completed', source.id))
|
||||
|
||||
# Create agent with reasoning enabled
|
||||
reasoning_agent = Agent(
|
||||
role="Test Reasoning Agent",
|
||||
goal="Test reasoning",
|
||||
backstory="I test reasoning",
|
||||
llm="gpt-4o-mini",
|
||||
reasoning=True,
|
||||
verbose=True
|
||||
)
|
||||
|
||||
# Create two tasks
|
||||
task1 = Task(
|
||||
description="First task",
|
||||
expected_output="First result",
|
||||
agent=reasoning_agent
|
||||
)
|
||||
|
||||
task2 = Task(
|
||||
description="Second task",
|
||||
expected_output="Second result",
|
||||
agent=reasoning_agent
|
||||
)
|
||||
|
||||
crew = Crew(agents=[reasoning_agent], tasks=[task1, task2], name="TestCrew")
|
||||
|
||||
# Mock the LLM to provide reasoning responses
|
||||
def mock_llm_call(messages, *args, **kwargs):
|
||||
if any("create a detailed plan" in str(msg) for msg in messages):
|
||||
return "Test plan\n\nREADY: I'm ready to execute."
|
||||
return "Test execution result"
|
||||
|
||||
reasoning_agent.llm.call = mock_llm_call
|
||||
|
||||
crew.kickoff()
|
||||
|
||||
# Verify events occurred in correct order and with correct task IDs
|
||||
assert len(received_events) >= 4 # At least: task1_start, reasoning1, task1_complete, task2_start, reasoning2, task2_complete
|
||||
|
||||
# Find reasoning events and their associated task IDs
|
||||
reasoning_events = [(event_type, task_id) for event_type, task_id in received_events if event_type == 'reasoning_started']
|
||||
task_started_events = [(event_type, task_id) for event_type, task_id in received_events if event_type == 'task_started']
|
||||
|
||||
# Verify we have reasoning events for both tasks
|
||||
assert len(reasoning_events) >= 2
|
||||
assert len(task_started_events) == 2
|
||||
|
||||
# Verify that each reasoning event has the correct task ID
|
||||
task1_id = str(task1.id)
|
||||
task2_id = str(task2.id)
|
||||
|
||||
reasoning_task_ids = [task_id for _, task_id in reasoning_events]
|
||||
assert task1_id in reasoning_task_ids
|
||||
assert task2_id in reasoning_task_ids
|
||||
|
||||
# Verify that reasoning events match their respective tasks
|
||||
for i, (event_type, task_id) in enumerate(received_events):
|
||||
if event_type == 'reasoning_started':
|
||||
# Find the preceding task_started event
|
||||
preceding_task_events = [e for e in received_events[:i] if e[0] == 'task_started']
|
||||
if preceding_task_events:
|
||||
last_task_started_id = preceding_task_events[-1][1]
|
||||
# The reasoning task_id should match the last started task
|
||||
assert task_id == last_task_started_id, f"Reasoning event task_id {task_id} doesn't match last started task {last_task_started_id}"
|
||||
|
||||
Reference in New Issue
Block a user