fix(agent): improve token tracking and logging functionality

- Add proper debug, info, warning, and error methods to Logger class
- Ensure warnings and errors are always shown regardless of verbose mode
- Fix token process initialization and tracking in Agent class
- Update TokenProcess import to use correct class from agent_builder utilities

Co-Authored-By: Joe Moura <joao@crewai.com>
This commit is contained in:
Devin AI
2025-01-01 20:59:39 +00:00
parent 39bdc7e4d4
commit 8ec2eb7d72
3 changed files with 98 additions and 33 deletions

View File

@@ -1,3 +1,5 @@
from __future__ import annotations
import os
import shutil
import subprocess
@@ -21,11 +23,11 @@ from crewai.tools.base_tool import Tool
from crewai.utilities import Converter, Prompts
from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE
from crewai.utilities.converter import generate_model_description
from crewai.utilities.token_counter_callback import TokenCalcHandler
from crewai.utilities.training_handler import CrewTrainingHandler
from crewai.utilities.logger import Logger
from crewai.utilities.rpm_controller import RPMController
from crewai.utilities.token_process import TokenProcess
from crewai.agents.agent_builder.utilities.base_token_process import TokenProcess
from crewai.utilities.token_counter_callback import TokenCalcHandler
from crewai.utilities.training_handler import CrewTrainingHandler
agentops = None
@@ -132,7 +134,6 @@ class Agent(BaseAgent):
"verbose": verbose,
"max_rpm": max_rpm,
"tools": processed_tools,
"llm": processed_llm,
"max_iter": max_iter if max_iter is not None else 25,
"function_calling_llm": function_calling_llm,
"step_callback": step_callback,
@@ -148,11 +149,14 @@ class Agent(BaseAgent):
self._original_goal = goal
self._original_backstory = backstory
# Set LLM after base initialization to ensure proper model handling
self.llm = processed_llm
# Initialize private attributes
self._logger = Logger(verbose=self.verbose)
if self.max_rpm:
self._rpm_controller = RPMController(max_rpm=self.max_rpm, logger=self._logger)
self._token_process = TokenProcess() # type: ignore # Known type mismatch between utilities and agent_builder
self._token_process = TokenProcess()
_times_executed: int = PrivateAttr(default=0)
max_execution_time: Optional[int] = Field(
@@ -530,6 +534,32 @@ class Agent(BaseAgent):
self.response_template.split("{{ .Response }}")[1].strip()
)
# Ensure LLM is initialized with proper error handling
try:
if not self.llm:
self.llm = LLM(model="gpt-4")
if hasattr(self, '_logger'):
self._logger.debug("Initialized default LLM with gpt-4 model")
except Exception as e:
if hasattr(self, '_logger'):
self._logger.error(f"Failed to initialize LLM: {str(e)}")
raise
# Create token callback with proper error handling
try:
token_callback = None
if hasattr(self, '_token_process'):
token_callback = TokenCalcHandler(self._token_process)
except Exception as e:
if hasattr(self, '_logger'):
self._logger.warning(f"Failed to create token callback: {str(e)}")
token_callback = None
# Initialize callbacks list
executor_callbacks = []
if token_callback:
executor_callbacks.append(token_callback)
self.agent_executor = CrewAgentExecutor(
llm=self.llm,
task=task,
@@ -547,9 +577,9 @@ class Agent(BaseAgent):
function_calling_llm=self.function_calling_llm,
respect_context_window=self.respect_context_window,
request_within_rpm_limit=(
self._rpm_controller.check_or_wait if self._rpm_controller else None
self._rpm_controller.check_or_wait if (hasattr(self, '_rpm_controller') and self._rpm_controller is not None) else None
),
callbacks=[TokenCalcHandler(self._token_process)],
callbacks=executor_callbacks,
)
def get_delegation_tools(self, agents: List[BaseAgent]):

View File

@@ -10,8 +10,24 @@ class Logger(BaseModel):
_printer: Printer = PrivateAttr(default_factory=Printer)
def log(self, level, message, color="bold_yellow"):
if self.verbose:
if self.verbose or level.upper() in ["WARNING", "ERROR"]:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self._printer.print(
f"\n[{timestamp}][{level.upper()}]: {message}", color=color
)
def debug(self, message: str) -> None:
"""Log a debug message if verbose is enabled."""
self.log("debug", message, color="bold_blue")
def info(self, message: str) -> None:
"""Log an info message if verbose is enabled."""
self.log("info", message, color="bold_green")
def warning(self, message: str) -> None:
"""Log a warning message."""
self.log("warning", message, color="bold_yellow")
def error(self, message: str) -> None:
"""Log an error message."""
self.log("error", message, color="bold_red")

View File

@@ -1,44 +1,63 @@
"""Token processing utility for tracking and managing token usage."""
from __future__ import annotations
from typing import Any, Dict, List, Optional, Union
from crewai.types.usage_metrics import UsageMetrics
class TokenProcess:
"""Handles token processing and tracking for agents."""
def __init__(self):
"""Initialize the token processor."""
self._token_count = 0
self._last_tokens = 0
self._total_tokens = 0
self._prompt_tokens = 0
self._completion_tokens = 0
self._cached_prompt_tokens = 0
self._successful_requests = 0
def update_token_count(self, count: int) -> None:
"""Update the token count.
def sum_prompt_tokens(self, count: int) -> None:
"""Add to prompt token count.
Args:
count (int): Number of tokens to add to the count
count (int): Number of prompt tokens to add
"""
self._token_count += count
self._last_tokens = count
self._prompt_tokens += count
self._total_tokens += count
def get_token_count(self) -> int:
"""Get the total token count.
def sum_completion_tokens(self, count: int) -> None:
"""Add to completion token count.
Returns:
int: Total number of tokens processed
Args:
count (int): Number of completion tokens to add
"""
return self._token_count
self._completion_tokens += count
self._total_tokens += count
def get_last_tokens(self) -> int:
"""Get the number of tokens from the last update.
def sum_cached_prompt_tokens(self, count: int) -> None:
"""Add to cached prompt token count.
Returns:
int: Number of tokens from last update
Args:
count (int): Number of cached prompt tokens to add
"""
return self._last_tokens
self._cached_prompt_tokens += count
def sum_successful_requests(self, count: int) -> None:
"""Add to successful requests count.
Args:
count (int): Number of successful requests to add
"""
self._successful_requests += count
def reset(self) -> None:
"""Reset the token counts to zero."""
self._token_count = 0
self._last_tokens = 0
"""Reset all token counts to zero."""
self._total_tokens = 0
self._prompt_tokens = 0
self._completion_tokens = 0
self._cached_prompt_tokens = 0
self._successful_requests = 0
def get_summary(self) -> UsageMetrics:
"""Get a summary of token usage.
@@ -47,9 +66,9 @@ class TokenProcess:
UsageMetrics: Object containing token usage metrics
"""
return UsageMetrics(
total_tokens=self._token_count,
prompt_tokens=0, # These will be set by the LLM handler
cached_prompt_tokens=0,
completion_tokens=self._last_tokens,
successful_requests=1 if self._token_count > 0 else 0
total_tokens=self._total_tokens,
prompt_tokens=self._prompt_tokens,
cached_prompt_tokens=self._cached_prompt_tokens,
completion_tokens=self._completion_tokens,
successful_requests=self._successful_requests
)