Update logger

This commit is contained in:
Brandon Hancock
2025-03-27 16:21:31 -04:00
parent 0fca721b11
commit 30aa5cc3b9
3 changed files with 308 additions and 73 deletions

View File

@@ -2,6 +2,7 @@ import asyncio
import json
import re
import uuid
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional, Type, Union, cast
from pydantic import BaseModel, Field, InstanceOf, PrivateAttr, model_validator
@@ -37,11 +38,20 @@ from crewai.utilities.agent_utils import (
)
from crewai.utilities.converter import convert_to_model, generate_model_description
from crewai.utilities.events.agent_events import (
LiteAgentExecutionCompletedEvent,
LiteAgentExecutionErrorEvent,
LiteAgentExecutionStartedEvent,
)
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.llm_events import (
LLMCallCompletedEvent,
LLMCallFailedEvent,
LLMCallStartedEvent,
LLMCallType,
)
from crewai.utilities.events.tool_usage_events import (
ToolUsageErrorEvent,
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
from crewai.utilities.exceptions.context_window_exceeding_exception import (
@@ -268,6 +278,15 @@ class LiteAgent(BaseModel):
Raises:
Exception: If agent execution fails
"""
# Create agent info for event emission
agent_info = {
"role": self.role,
"goal": self.goal,
"backstory": self.backstory,
"tools": self._parsed_tools,
"verbose": self.verbose,
}
try:
# Reset state for this run
self._iterations = 0
@@ -276,15 +295,6 @@ class LiteAgent(BaseModel):
# Format messages for the LLM
self._messages = self._format_messages(messages)
# Create agent info for event emission
agent_info = {
"role": self.role,
"goal": self.goal,
"backstory": self.backstory,
"tools": self._parsed_tools,
"verbose": self.verbose,
}
# Emit event for agent execution start
crewai_event_bus.emit(
self,
@@ -304,6 +314,14 @@ class LiteAgent(BaseModel):
color="red",
)
handle_unknown_error(self._printer, e)
# Emit error event
crewai_event_bus.emit(
self,
event=LiteAgentExecutionErrorEvent(
agent_info=agent_info,
error=str(e),
),
)
raise e
formatted_result: Optional[BaseModel] = None
@@ -324,15 +342,35 @@ class LiteAgent(BaseModel):
# Calculate token usage metrics
usage_metrics = self._token_process.get_summary()
return LiteAgentOutput(
# Create output
output = LiteAgentOutput(
raw=agent_finish.output,
pydantic=formatted_result,
agent_role=self.role,
usage_metrics=usage_metrics.model_dump() if usage_metrics else None,
)
# Emit completion event
crewai_event_bus.emit(
self,
event=LiteAgentExecutionCompletedEvent(
agent_info=agent_info,
output=agent_finish.output,
),
)
return output
except Exception as e:
handle_unknown_error(self._printer, e)
# Emit error event
crewai_event_bus.emit(
self,
event=LiteAgentExecutionErrorEvent(
agent_info=agent_info,
error=str(e),
),
)
raise e
async def _invoke_loop(self) -> AgentFinish:
@@ -359,22 +397,90 @@ class LiteAgent(BaseModel):
enforce_rpm_limit(self.request_within_rpm_limit)
answer = get_llm_response(
llm=cast(LLM, self.llm),
messages=self._messages,
callbacks=self._callbacks,
printer=self._printer,
# Emit LLM call started event
crewai_event_bus.emit(
self,
event=LLMCallStartedEvent(
messages=self._messages,
tools=None,
callbacks=self._callbacks,
),
)
try:
answer = get_llm_response(
llm=cast(LLM, self.llm),
messages=self._messages,
callbacks=self._callbacks,
printer=self._printer,
)
# Emit LLM call completed event
crewai_event_bus.emit(
self,
event=LLMCallCompletedEvent(
response=answer,
call_type=LLMCallType.LLM_CALL,
),
)
except Exception as e:
# Emit LLM call failed event
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(error=str(e)),
)
raise e
formatted_answer = process_llm_response(answer, self.use_stop_words)
if isinstance(formatted_answer, AgentAction):
tool_result = execute_tool_and_check_finality(
agent_action=formatted_answer,
tools=self._parsed_tools,
i18n=self.i18n,
agent_key=self.key,
agent_role=self.role,
# Emit tool usage started event
crewai_event_bus.emit(
self,
event=ToolUsageStartedEvent(
agent_key=self.key,
agent_role=self.role,
tool_name=formatted_answer.tool,
tool_args=formatted_answer.tool_input,
tool_class=formatted_answer.tool,
),
)
try:
tool_result = execute_tool_and_check_finality(
agent_action=formatted_answer,
tools=self._parsed_tools,
i18n=self.i18n,
agent_key=self.key,
agent_role=self.role,
)
# Emit tool usage finished event
crewai_event_bus.emit(
self,
event=ToolUsageFinishedEvent(
agent_key=self.key,
agent_role=self.role,
tool_name=formatted_answer.tool,
tool_args=formatted_answer.tool_input,
tool_class=formatted_answer.tool,
started_at=datetime.now(),
finished_at=datetime.now(),
),
)
except Exception as e:
# Emit tool usage error event
crewai_event_bus.emit(
self,
event=ToolUsageErrorEvent(
agent_key=self.key,
agent_role=self.role,
tool_name=formatted_answer.tool,
tool_args=formatted_answer.tool_input,
tool_class=formatted_answer.tool,
error=str(e),
),
)
raise e
formatted_answer = handle_agent_action_core(
formatted_answer=formatted_answer,
tool_result=tool_result,

View File

@@ -71,7 +71,7 @@ class EventListener(BaseEventListener):
self._telemetry.set_tracer()
self.execution_spans = {}
self._initialized = True
self.formatter = ConsoleFormatter()
self.formatter = ConsoleFormatter(verbose=True)
# ----------- CREW EVENTS -----------
@@ -183,25 +183,28 @@ class EventListener(BaseEventListener):
def on_lite_agent_execution_started(
source, event: LiteAgentExecutionStartedEvent
):
self.logger.log(
f"🤖 LiteAgent '{event.agent_info['role']}' started execution",
event.timestamp,
"""Handle LiteAgent execution started event."""
self.formatter.handle_lite_agent_execution(
event.agent_info["role"], status="started", **event.agent_info
)
@crewai_event_bus.on(LiteAgentExecutionCompletedEvent)
def on_lite_agent_execution_completed(
source, event: LiteAgentExecutionCompletedEvent
):
self.logger.log(
f"✅ LiteAgent '{event.agent_info['role']}' completed execution",
event.timestamp,
"""Handle LiteAgent execution completed event."""
self.formatter.handle_lite_agent_execution(
event.agent_info["role"], status="completed", **event.agent_info
)
@crewai_event_bus.on(LiteAgentExecutionErrorEvent)
def on_lite_agent_execution_error(source, event: LiteAgentExecutionErrorEvent):
self.logger.log(
f"❌ LiteAgent '{event.agent_info['role']}' execution error: {event.error}",
event.timestamp,
"""Handle LiteAgent execution error event."""
self.formatter.handle_lite_agent_execution(
event.agent_info["role"],
status="failed",
error=event.error,
**event.agent_info,
)
# ----------- FLOW EVENTS -----------

View File

@@ -1,4 +1,4 @@
from typing import Dict, Optional
from typing import Any, Dict, Optional
from rich.console import Console
from rich.panel import Panel
@@ -13,6 +13,7 @@ class ConsoleFormatter:
current_tool_branch: Optional[Tree] = None
current_flow_tree: Optional[Tree] = None
current_method_branch: Optional[Tree] = None
current_lite_agent_branch: Optional[Tree] = None
tool_usage_counts: Dict[str, int] = {}
def __init__(self, verbose: bool = False):
@@ -390,21 +391,24 @@ class ConsoleFormatter:
crew_tree: Optional[Tree],
) -> Optional[Tree]:
"""Handle tool usage started event."""
if not self.verbose or agent_branch is None or crew_tree is None:
if not self.verbose:
return None
# Use LiteAgent branch if available, otherwise use regular agent branch
branch_to_use = self.current_lite_agent_branch or agent_branch
tree_to_use = branch_to_use or crew_tree
if branch_to_use is None or tree_to_use is None:
return None
# Update tool usage count
self.tool_usage_counts[tool_name] = self.tool_usage_counts.get(tool_name, 0) + 1
# Find existing tool node or create new one
tool_branch = None
for child in agent_branch.children:
if tool_name in str(child.label):
tool_branch = child
break
if not tool_branch:
tool_branch = agent_branch.add("")
# Find or create tool node
tool_branch = self.current_tool_branch
if tool_branch is None:
tool_branch = branch_to_use.add("")
self.current_tool_branch = tool_branch
# Update label with current count
self.update_tree_label(
@@ -414,11 +418,10 @@ class ConsoleFormatter:
"yellow",
)
self.print(crew_tree)
self.print()
# Set the current_tool_branch attribute directly
self.current_tool_branch = tool_branch
# Only print if this is a new tool usage
if tool_branch not in branch_to_use.children:
self.print(tree_to_use)
self.print()
return tool_branch
@@ -429,17 +432,29 @@ class ConsoleFormatter:
crew_tree: Optional[Tree],
) -> None:
"""Handle tool usage finished event."""
if not self.verbose or tool_branch is None or crew_tree is None:
if not self.verbose or tool_branch is None:
return
# Use LiteAgent branch if available, otherwise use crew tree
tree_to_use = self.current_lite_agent_branch or crew_tree
if tree_to_use is None:
return
# Update the existing tool node's label
self.update_tree_label(
tool_branch,
"🔧",
f"Used {tool_name} ({self.tool_usage_counts[tool_name]})",
"green",
)
self.print(crew_tree)
self.print()
# Clear the current tool branch as we're done with it
self.current_tool_branch = None
# Only print if we have a valid tree and the tool node is still in it
if isinstance(tree_to_use, Tree) and tool_branch in tree_to_use.children:
self.print(tree_to_use)
self.print()
def handle_tool_usage_error(
self,
@@ -452,6 +467,9 @@ class ConsoleFormatter:
if not self.verbose:
return
# Use LiteAgent branch if available, otherwise use crew tree
tree_to_use = self.current_lite_agent_branch or crew_tree
if tool_branch:
self.update_tree_label(
tool_branch,
@@ -459,8 +477,9 @@ class ConsoleFormatter:
f"{tool_name} ({self.tool_usage_counts[tool_name]})",
"red",
)
self.print(crew_tree)
self.print()
if tree_to_use:
self.print(tree_to_use)
self.print()
# Show error panel
error_content = self.create_status_content(
@@ -474,19 +493,23 @@ class ConsoleFormatter:
crew_tree: Optional[Tree],
) -> Optional[Tree]:
"""Handle LLM call started event."""
if not self.verbose or agent_branch is None or crew_tree is None:
if not self.verbose:
return None
# Only add thinking status if it doesn't exist
if not any("Thinking" in str(child.label) for child in agent_branch.children):
tool_branch = agent_branch.add("")
# Use LiteAgent branch if available, otherwise use regular agent branch
branch_to_use = self.current_lite_agent_branch or agent_branch
tree_to_use = branch_to_use or crew_tree
if branch_to_use is None or tree_to_use is None:
return None
# Only add thinking status if we don't have a current tool branch
if self.current_tool_branch is None:
tool_branch = branch_to_use.add("")
self.update_tree_label(tool_branch, "🧠", "Thinking...", "blue")
self.print(crew_tree)
self.print()
# Set the current_tool_branch attribute directly
self.current_tool_branch = tool_branch
self.print(tree_to_use)
self.print()
return tool_branch
return None
@@ -497,19 +520,27 @@ class ConsoleFormatter:
crew_tree: Optional[Tree],
) -> None:
"""Handle LLM call completed event."""
if (
not self.verbose
or tool_branch is None
or agent_branch is None
or crew_tree is None
):
if not self.verbose or tool_branch is None:
return
# Remove the thinking status node when complete
# Use LiteAgent branch if available, otherwise use regular agent branch
branch_to_use = self.current_lite_agent_branch or agent_branch
tree_to_use = branch_to_use or crew_tree
if branch_to_use is None or tree_to_use is None:
return
# Remove the thinking status node when complete, but only if it exists
if "Thinking" in str(tool_branch.label):
agent_branch.children.remove(tool_branch)
self.print(crew_tree)
self.print()
try:
# Check if the node is actually in the children list
if tool_branch in branch_to_use.children:
branch_to_use.children.remove(tool_branch)
self.print(tree_to_use)
self.print()
except Exception:
# If any error occurs during removal, just continue without removing
pass
def handle_llm_call_failed(
self, tool_branch: Optional[Tree], error: str, crew_tree: Optional[Tree]
@@ -518,11 +549,15 @@ class ConsoleFormatter:
if not self.verbose:
return
# Use LiteAgent branch if available, otherwise use crew tree
tree_to_use = self.current_lite_agent_branch or crew_tree
# Update tool branch if it exists
if tool_branch:
tool_branch.label = Text("❌ LLM Failed", style="red bold")
self.print(crew_tree)
self.print()
if tree_to_use:
self.print(tree_to_use)
self.print()
# Show error panel
error_content = Text()
@@ -656,3 +691,94 @@ class ConsoleFormatter:
self.print_panel(failure_content, "Test Failure", "red")
self.print()
def create_lite_agent_branch(self, lite_agent_role: str) -> Optional[Tree]:
"""Create and initialize a lite agent branch."""
if not self.verbose:
return None
# Create initial tree for LiteAgent if it doesn't exist
if not self.current_lite_agent_branch:
lite_agent_label = Text()
lite_agent_label.append("🤖 LiteAgent: ", style="cyan bold")
lite_agent_label.append(lite_agent_role, style="cyan")
lite_agent_label.append("\n Status: ", style="white")
lite_agent_label.append("In Progress", style="yellow")
lite_agent_tree = Tree(lite_agent_label)
self.current_lite_agent_branch = lite_agent_tree
self.print(lite_agent_tree)
self.print()
return self.current_lite_agent_branch
def update_lite_agent_status(
self,
lite_agent_branch: Optional[Tree],
lite_agent_role: str,
status: str = "completed",
**fields: Dict[str, Any],
) -> None:
"""Update lite agent status in the tree."""
if not self.verbose or lite_agent_branch is None:
return
# Determine style based on status
if status == "completed":
prefix, style = "✅ LiteAgent:", "green"
status_text = "Completed"
title = "LiteAgent Completion"
elif status == "failed":
prefix, style = "❌ LiteAgent:", "red"
status_text = "Failed"
title = "LiteAgent Error"
else:
prefix, style = "🤖 LiteAgent:", "yellow"
status_text = "In Progress"
title = "LiteAgent Status"
# Update the tree label
lite_agent_label = Text()
lite_agent_label.append(f"{prefix} ", style=f"{style} bold")
lite_agent_label.append(lite_agent_role, style=style)
lite_agent_label.append("\n Status: ", style="white")
lite_agent_label.append(status_text, style=f"{style} bold")
lite_agent_branch.label = lite_agent_label
self.print(lite_agent_branch)
self.print()
# Show status panel if additional fields are provided
if fields:
content = self.create_status_content(
f"LiteAgent {status.title()}", lite_agent_role, style, **fields
)
self.print_panel(content, title, style)
def handle_lite_agent_execution(
self,
lite_agent_role: str,
status: str = "started",
error: Any = None,
**fields: Dict[str, Any],
) -> None:
"""Handle lite agent execution events with consistent formatting."""
if not self.verbose:
return
if status == "started":
# Create or get the LiteAgent branch
lite_agent_branch = self.create_lite_agent_branch(lite_agent_role)
if lite_agent_branch and fields:
# Show initial status panel
content = self.create_status_content(
"LiteAgent Session Started", lite_agent_role, "cyan", **fields
)
self.print_panel(content, "LiteAgent Started", "cyan")
else:
# Update existing LiteAgent branch
if error:
fields["Error"] = error
self.update_lite_agent_status(
self.current_lite_agent_branch, lite_agent_role, status, **fields
)