Files
crewAI/src/crewai/utilities/agent_utils.py
Lucas Gomide b7bf15681e feat: add capability to track LLM calls by task and agent (#3087)
* feat: add capability to track LLM calls by task and agent

This makes it possible to filter or scope LLM events by specific agents or tasks, which can be very useful for debugging or analytics in real-time application

* feat: add docs about LLM tracking by Agents and Tasks

* fix incompatible BaseLLM.call method signature

* feat: support to filter LLM Events from Lite Agent
2025-07-01 09:30:16 -04:00

498 lines
16 KiB
Python

import json
import re
from typing import Any, Callable, Dict, List, Optional, Sequence, Union
from crewai.agents.parser import (
FINAL_ANSWER_AND_PARSABLE_ACTION_ERROR_MESSAGE,
AgentAction,
AgentFinish,
CrewAgentParser,
OutputParserException,
)
from crewai.llm import LLM
from crewai.llms.base_llm import BaseLLM
from crewai.tools import BaseTool as CrewAITool
from crewai.tools.base_tool import BaseTool
from crewai.tools.structured_tool import CrewStructuredTool
from crewai.tools.tool_types import ToolResult
from crewai.utilities import I18N, Printer
from crewai.utilities.errors import AgentRepositoryError
from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededException,
)
from rich.console import Console
from crewai.cli.config import Settings
console = Console()
def parse_tools(tools: List[BaseTool]) -> List[CrewStructuredTool]:
"""Parse tools to be used for the task."""
tools_list = []
for tool in tools:
if isinstance(tool, CrewAITool):
tools_list.append(tool.to_structured_tool())
else:
raise ValueError("Tool is not a CrewStructuredTool or BaseTool")
return tools_list
def get_tool_names(tools: Sequence[Union[CrewStructuredTool, BaseTool]]) -> str:
"""Get the names of the tools."""
return ", ".join([t.name for t in tools])
def render_text_description_and_args(
tools: Sequence[Union[CrewStructuredTool, BaseTool]],
) -> str:
"""Render the tool name, description, and args in plain text.
search: This tool is used for search, args: {"query": {"type": "string"}}
calculator: This tool is used for math, \
args: {"expression": {"type": "string"}}
"""
tool_strings = []
for tool in tools:
tool_strings.append(tool.description)
return "\n".join(tool_strings)
def has_reached_max_iterations(iterations: int, max_iterations: int) -> bool:
"""Check if the maximum number of iterations has been reached."""
return iterations >= max_iterations
def handle_max_iterations_exceeded(
formatted_answer: Union[AgentAction, AgentFinish, None],
printer: Printer,
i18n: I18N,
messages: List[Dict[str, str]],
llm: Union[LLM, BaseLLM],
callbacks: List[Any],
) -> Union[AgentAction, AgentFinish]:
"""
Handles the case when the maximum number of iterations is exceeded.
Performs one more LLM call to get the final answer.
Parameters:
formatted_answer: The last formatted answer from the agent.
Returns:
The final formatted answer after exceeding max iterations.
"""
printer.print(
content="Maximum iterations reached. Requesting final answer.",
color="yellow",
)
if formatted_answer and hasattr(formatted_answer, "text"):
assistant_message = (
formatted_answer.text + f'\n{i18n.errors("force_final_answer")}'
)
else:
assistant_message = i18n.errors("force_final_answer")
messages.append(format_message_for_llm(assistant_message, role="assistant"))
# Perform one more LLM call to get the final answer
answer = llm.call(
messages,
callbacks=callbacks,
)
if answer is None or answer == "":
printer.print(
content="Received None or empty response from LLM call.",
color="red",
)
raise ValueError("Invalid response from LLM call - None or empty.")
formatted_answer = format_answer(answer)
# Return the formatted answer, regardless of its type
return formatted_answer
def format_message_for_llm(prompt: str, role: str = "user") -> Dict[str, str]:
prompt = prompt.rstrip()
return {"role": role, "content": prompt}
def format_answer(answer: str) -> Union[AgentAction, AgentFinish]:
"""Format a response from the LLM into an AgentAction or AgentFinish."""
try:
return CrewAgentParser.parse_text(answer)
except Exception:
# If parsing fails, return a default AgentFinish
return AgentFinish(
thought="Failed to parse LLM response",
output=answer,
text=answer,
)
def enforce_rpm_limit(
request_within_rpm_limit: Optional[Callable[[], bool]] = None,
) -> None:
"""Enforce the requests per minute (RPM) limit if applicable."""
if request_within_rpm_limit:
request_within_rpm_limit()
def get_llm_response(
llm: Union[LLM, BaseLLM],
messages: List[Dict[str, str]],
callbacks: List[Any],
printer: Printer,
from_task: Optional[Any] = None,
from_agent: Optional[Any] = None,
) -> str:
"""Call the LLM and return the response, handling any invalid responses."""
try:
answer = llm.call(
messages,
callbacks=callbacks,
from_task=from_task,
from_agent=from_agent,
)
except Exception as e:
printer.print(
content=f"Error during LLM call: {e}",
color="red",
)
raise e
if not answer:
printer.print(
content="Received None or empty response from LLM call.",
color="red",
)
raise ValueError("Invalid response from LLM call - None or empty.")
return answer
def process_llm_response(
answer: str, use_stop_words: bool
) -> Union[AgentAction, AgentFinish]:
"""Process the LLM response and format it into an AgentAction or AgentFinish."""
if not use_stop_words:
try:
# Preliminary parsing to check for errors.
format_answer(answer)
except OutputParserException as e:
if FINAL_ANSWER_AND_PARSABLE_ACTION_ERROR_MESSAGE in e.error:
answer = answer.split("Observation:")[0].strip()
return format_answer(answer)
def handle_agent_action_core(
formatted_answer: AgentAction,
tool_result: ToolResult,
messages: Optional[List[Dict[str, str]]] = None,
step_callback: Optional[Callable] = None,
show_logs: Optional[Callable] = None,
) -> Union[AgentAction, AgentFinish]:
"""Core logic for handling agent actions and tool results.
Args:
formatted_answer: The agent's action
tool_result: The result of executing the tool
messages: Optional list of messages to append results to
step_callback: Optional callback to execute after processing
show_logs: Optional function to show logs
Returns:
Either an AgentAction or AgentFinish
"""
if step_callback:
step_callback(tool_result)
formatted_answer.text += f"\nObservation: {tool_result.result}"
formatted_answer.result = tool_result.result
if tool_result.result_as_answer:
return AgentFinish(
thought="",
output=tool_result.result,
text=formatted_answer.text,
)
if show_logs:
show_logs(formatted_answer)
return formatted_answer
def handle_unknown_error(printer: Any, exception: Exception) -> None:
"""Handle unknown errors by informing the user.
Args:
printer: Printer instance for output
exception: The exception that occurred
"""
printer.print(
content="An unknown error occurred. Please check the details below.",
color="red",
)
printer.print(
content=f"Error details: {exception}",
color="red",
)
def handle_output_parser_exception(
e: OutputParserException,
messages: List[Dict[str, str]],
iterations: int,
log_error_after: int = 3,
printer: Optional[Any] = None,
) -> AgentAction:
"""Handle OutputParserException by updating messages and formatted_answer.
Args:
e: The OutputParserException that occurred
messages: List of messages to append to
iterations: Current iteration count
log_error_after: Number of iterations after which to log errors
printer: Optional printer instance for logging
Returns:
AgentAction: A formatted answer with the error
"""
messages.append({"role": "user", "content": e.error})
formatted_answer = AgentAction(
text=e.error,
tool="",
tool_input="",
thought="",
)
if iterations > log_error_after and printer:
printer.print(
content=f"Error parsing LLM output, agent will retry: {e.error}",
color="red",
)
return formatted_answer
def is_context_length_exceeded(exception: Exception) -> bool:
"""Check if the exception is due to context length exceeding.
Args:
exception: The exception to check
Returns:
bool: True if the exception is due to context length exceeding
"""
return LLMContextLengthExceededException(str(exception))._is_context_limit_error(
str(exception)
)
def handle_context_length(
respect_context_window: bool,
printer: Any,
messages: List[Dict[str, str]],
llm: Any,
callbacks: List[Any],
i18n: Any,
) -> None:
"""Handle context length exceeded by either summarizing or raising an error.
Args:
respect_context_window: Whether to respect context window
printer: Printer instance for output
messages: List of messages to summarize
llm: LLM instance for summarization
callbacks: List of callbacks for LLM
i18n: I18N instance for messages
"""
if respect_context_window:
printer.print(
content="Context length exceeded. Summarizing content to fit the model context window. Might take a while...",
color="yellow",
)
summarize_messages(messages, llm, callbacks, i18n)
else:
printer.print(
content="Context length exceeded. Consider using smaller text or RAG tools from crewai_tools.",
color="red",
)
raise SystemExit(
"Context length exceeded and user opted not to summarize. Consider using smaller text or RAG tools from crewai_tools."
)
def summarize_messages(
messages: List[Dict[str, str]],
llm: Any,
callbacks: List[Any],
i18n: Any,
) -> None:
"""Summarize messages to fit within context window.
Args:
messages: List of messages to summarize
llm: LLM instance for summarization
callbacks: List of callbacks for LLM
i18n: I18N instance for messages
"""
messages_string = " ".join([message["content"] for message in messages])
messages_groups = []
cut_size = llm.get_context_window_size()
for i in range(0, len(messages_string), cut_size):
messages_groups.append({"content": messages_string[i : i + cut_size]})
summarized_contents = []
total_groups = len(messages_groups)
for idx, group in enumerate(messages_groups, 1):
Printer().print(
content=f"Summarizing {idx}/{total_groups}...",
color="yellow",
)
summary = llm.call(
[
format_message_for_llm(
i18n.slice("summarizer_system_message"), role="system"
),
format_message_for_llm(
i18n.slice("summarize_instruction").format(group=group["content"]),
),
],
callbacks=callbacks,
)
summarized_contents.append({"content": str(summary)})
merged_summary = " ".join(content["content"] for content in summarized_contents)
messages.clear()
messages.append(
format_message_for_llm(
i18n.slice("summary").format(merged_summary=merged_summary)
)
)
def show_agent_logs(
printer: Printer,
agent_role: str,
formatted_answer: Optional[Union[AgentAction, AgentFinish]] = None,
task_description: Optional[str] = None,
verbose: bool = False,
) -> None:
"""Show agent logs for both start and execution states.
Args:
printer: Printer instance for output
agent_role: Role of the agent
formatted_answer: Optional AgentAction or AgentFinish for execution logs
task_description: Optional task description for start logs
verbose: Whether to show verbose output
"""
if not verbose:
return
agent_role = agent_role.split("\n")[0]
if formatted_answer is None:
# Start logs
printer.print(
content=f"\033[1m\033[95m# Agent:\033[00m \033[1m\033[92m{agent_role}\033[00m"
)
if task_description:
printer.print(
content=f"\033[95m## Task:\033[00m \033[92m{task_description}\033[00m"
)
else:
# Execution logs
printer.print(
content=f"\n\n\033[1m\033[95m# Agent:\033[00m \033[1m\033[92m{agent_role}\033[00m"
)
if isinstance(formatted_answer, AgentAction):
thought = re.sub(r"\n+", "\n", formatted_answer.thought)
formatted_json = json.dumps(
formatted_answer.tool_input,
indent=2,
ensure_ascii=False,
)
if thought and thought != "":
printer.print(
content=f"\033[95m## Thought:\033[00m \033[92m{thought}\033[00m"
)
printer.print(
content=f"\033[95m## Using tool:\033[00m \033[92m{formatted_answer.tool}\033[00m"
)
printer.print(
content=f"\033[95m## Tool Input:\033[00m \033[92m\n{formatted_json}\033[00m"
)
printer.print(
content=f"\033[95m## Tool Output:\033[00m \033[92m\n{formatted_answer.result}\033[00m"
)
elif isinstance(formatted_answer, AgentFinish):
printer.print(
content=f"\033[95m## Final Answer:\033[00m \033[92m\n{formatted_answer.output}\033[00m\n\n"
)
def _print_current_organization():
settings = Settings()
if settings.org_uuid:
console.print(f"Fetching agent from organization: {settings.org_name} ({settings.org_uuid})", style="bold blue")
else:
console.print("No organization currently set. We recommend setting one before using: `crewai org switch <org_id>` command.", style="yellow")
def load_agent_from_repository(from_repository: str) -> Dict[str, Any]:
attributes: Dict[str, Any] = {}
if from_repository:
import importlib
from crewai.cli.authentication.token import get_auth_token
from crewai.cli.plus_api import PlusAPI
client = PlusAPI(api_key=get_auth_token())
_print_current_organization()
response = client.get_agent(from_repository)
if response.status_code == 404:
raise AgentRepositoryError(
f"Agent {from_repository} does not exist, make sure the name is correct or the agent is available on your organization."
f"\nIf you are using the wrong organization, switch to the correct one using `crewai org switch <org_id>` command.",
)
if response.status_code != 200:
raise AgentRepositoryError(
f"Agent {from_repository} could not be loaded: {response.text}"
f"\nIf you are using the wrong organization, switch to the correct one using `crewai org switch <org_id>` command.",
)
agent = response.json()
for key, value in agent.items():
if key == "tools":
attributes[key] = []
for tool in value:
try:
module = importlib.import_module(tool["module"])
tool_class = getattr(module, tool["name"])
tool_value = tool_class(**tool["init_params"])
if isinstance(tool_value, list):
attributes[key].extend(tool_value)
else:
attributes[key].append(tool_value)
except Exception as e:
raise AgentRepositoryError(
f"Tool {tool['name']} could not be loaded: {e}"
) from e
else:
attributes[key] = value
return attributes