chore: fix ruff linting issues in agents module

fix(agents): linting, import paths, cache key alignment, and static method
This commit is contained in:
Greyson LaLonde
2025-09-19 22:11:21 -04:00
committed by GitHub
parent 24b84a4b68
commit d879be8b66
16 changed files with 286 additions and 294 deletions

View File

@@ -1,14 +1,18 @@
import json
import re
from typing import Any, Callable, Dict, List, Optional, Sequence, Union
from collections.abc import Callable, Sequence
from typing import Any
from rich.console import Console
from crewai.agents.constants import FINAL_ANSWER_AND_PARSABLE_ACTION_ERROR_MESSAGE
from crewai.agents.parser import (
AgentAction,
AgentFinish,
OutputParserException,
OutputParserError,
parse,
)
from crewai.cli.config import Settings
from crewai.llm import LLM
from crewai.llms.base_llm import BaseLLM
from crewai.tools import BaseTool as CrewAITool
@@ -20,13 +24,11 @@ from crewai.utilities.errors import AgentRepositoryError
from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededException,
)
from rich.console import Console
from crewai.cli.config import Settings
console = Console()
def parse_tools(tools: List[BaseTool]) -> List[CrewStructuredTool]:
def parse_tools(tools: list[BaseTool]) -> list[CrewStructuredTool]:
"""Parse tools to be used for the task."""
tools_list = []
@@ -39,13 +41,13 @@ def parse_tools(tools: List[BaseTool]) -> List[CrewStructuredTool]:
return tools_list
def get_tool_names(tools: Sequence[Union[CrewStructuredTool, BaseTool]]) -> str:
def get_tool_names(tools: Sequence[CrewStructuredTool | BaseTool]) -> str:
"""Get the names of the tools."""
return ", ".join([t.name for t in tools])
def render_text_description_and_args(
tools: Sequence[Union[CrewStructuredTool, BaseTool]],
tools: Sequence[CrewStructuredTool | BaseTool],
) -> str:
"""Render the tool name, description, and args in plain text.
@@ -53,10 +55,7 @@ def render_text_description_and_args(
calculator: This tool is used for math, \
args: {"expression": {"type": "string"}}
"""
tool_strings = []
for tool in tools:
tool_strings.append(tool.description)
tool_strings = [tool.description for tool in tools]
return "\n".join(tool_strings)
@@ -66,13 +65,13 @@ def has_reached_max_iterations(iterations: int, max_iterations: int) -> bool:
def handle_max_iterations_exceeded(
formatted_answer: Union[AgentAction, AgentFinish, None],
formatted_answer: AgentAction | AgentFinish | None,
printer: Printer,
i18n: I18N,
messages: List[Dict[str, str]],
llm: Union[LLM, BaseLLM],
callbacks: List[Any],
) -> Union[AgentAction, AgentFinish]:
messages: list[dict[str, str]],
llm: LLM | BaseLLM,
callbacks: list[Any],
) -> AgentAction | AgentFinish:
"""
Handles the case when the maximum number of iterations is exceeded.
Performs one more LLM call to get the final answer.
@@ -90,7 +89,7 @@ def handle_max_iterations_exceeded(
if formatted_answer and hasattr(formatted_answer, "text"):
assistant_message = (
formatted_answer.text + f'\n{i18n.errors("force_final_answer")}'
formatted_answer.text + f"\n{i18n.errors('force_final_answer')}"
)
else:
assistant_message = i18n.errors("force_final_answer")
@@ -110,17 +109,16 @@ def handle_max_iterations_exceeded(
)
raise ValueError("Invalid response from LLM call - None or empty.")
formatted_answer = format_answer(answer)
# Return the formatted answer, regardless of its type
return formatted_answer
return format_answer(answer)
def format_message_for_llm(prompt: str, role: str = "user") -> Dict[str, str]:
def format_message_for_llm(prompt: str, role: str = "user") -> dict[str, str]:
prompt = prompt.rstrip()
return {"role": role, "content": prompt}
def format_answer(answer: str) -> Union[AgentAction, AgentFinish]:
def format_answer(answer: str) -> AgentAction | AgentFinish:
"""Format a response from the LLM into an AgentAction or AgentFinish."""
try:
return parse(answer)
@@ -134,7 +132,7 @@ def format_answer(answer: str) -> Union[AgentAction, AgentFinish]:
def enforce_rpm_limit(
request_within_rpm_limit: Optional[Callable[[], bool]] = None,
request_within_rpm_limit: Callable[[], bool] | None = None,
) -> None:
"""Enforce the requests per minute (RPM) limit if applicable."""
if request_within_rpm_limit:
@@ -142,12 +140,12 @@ def enforce_rpm_limit(
def get_llm_response(
llm: Union[LLM, BaseLLM],
messages: List[Dict[str, str]],
callbacks: List[Any],
llm: LLM | BaseLLM,
messages: list[dict[str, str]],
callbacks: list[Any],
printer: Printer,
from_task: Optional[Any] = None,
from_agent: Optional[Any] = None,
from_task: Any | None = None,
from_agent: Any | None = None,
) -> str:
"""Call the LLM and return the response, handling any invalid responses."""
try:
@@ -171,13 +169,13 @@ def get_llm_response(
def process_llm_response(
answer: str, use_stop_words: bool
) -> Union[AgentAction, AgentFinish]:
) -> AgentAction | AgentFinish:
"""Process the LLM response and format it into an AgentAction or AgentFinish."""
if not use_stop_words:
try:
# Preliminary parsing to check for errors.
format_answer(answer)
except OutputParserException as e:
except OutputParserError as e:
if FINAL_ANSWER_AND_PARSABLE_ACTION_ERROR_MESSAGE in e.error:
answer = answer.split("Observation:")[0].strip()
@@ -187,10 +185,10 @@ def process_llm_response(
def handle_agent_action_core(
formatted_answer: AgentAction,
tool_result: ToolResult,
messages: Optional[List[Dict[str, str]]] = None,
step_callback: Optional[Callable] = None,
show_logs: Optional[Callable] = None,
) -> Union[AgentAction, AgentFinish]:
messages: list[dict[str, str]] | None = None,
step_callback: Callable | None = None,
show_logs: Callable | None = None,
) -> AgentAction | AgentFinish:
"""Core logic for handling agent actions and tool results.
Args:
@@ -245,16 +243,16 @@ def handle_unknown_error(printer: Any, exception: Exception) -> None:
def handle_output_parser_exception(
e: OutputParserException,
messages: List[Dict[str, str]],
e: OutputParserError,
messages: list[dict[str, str]],
iterations: int,
log_error_after: int = 3,
printer: Optional[Any] = None,
printer: Any | None = None,
) -> AgentAction:
"""Handle OutputParserException by updating messages and formatted_answer.
"""Handle OutputParserError by updating messages and formatted_answer.
Args:
e: The OutputParserException that occurred
e: The OutputParserError that occurred
messages: List of messages to append to
iterations: Current iteration count
log_error_after: Number of iterations after which to log errors
@@ -298,9 +296,9 @@ def is_context_length_exceeded(exception: Exception) -> bool:
def handle_context_length(
respect_context_window: bool,
printer: Any,
messages: List[Dict[str, str]],
messages: list[dict[str, str]],
llm: Any,
callbacks: List[Any],
callbacks: list[Any],
i18n: Any,
) -> None:
"""Handle context length exceeded by either summarizing or raising an error.
@@ -330,9 +328,9 @@ def handle_context_length(
def summarize_messages(
messages: List[Dict[str, str]],
messages: list[dict[str, str]],
llm: Any,
callbacks: List[Any],
callbacks: list[Any],
i18n: Any,
) -> None:
"""Summarize messages to fit within context window.
@@ -344,12 +342,12 @@ def summarize_messages(
i18n: I18N instance for messages
"""
messages_string = " ".join([message["content"] for message in messages])
messages_groups = []
cut_size = llm.get_context_window_size()
for i in range(0, len(messages_string), cut_size):
messages_groups.append({"content": messages_string[i : i + cut_size]})
messages_groups = [
{"content": messages_string[i : i + cut_size]}
for i in range(0, len(messages_string), cut_size)
]
summarized_contents = []
@@ -385,8 +383,8 @@ def summarize_messages(
def show_agent_logs(
printer: Printer,
agent_role: str,
formatted_answer: Optional[Union[AgentAction, AgentFinish]] = None,
task_description: Optional[str] = None,
formatted_answer: AgentAction | AgentFinish | None = None,
task_description: str | None = None,
verbose: bool = False,
) -> None:
"""Show agent logs for both start and execution states.
@@ -458,8 +456,8 @@ def _print_current_organization():
)
def load_agent_from_repository(from_repository: str) -> Dict[str, Any]:
attributes: Dict[str, Any] = {}
def load_agent_from_repository(from_repository: str) -> dict[str, Any]:
attributes: dict[str, Any] = {}
if from_repository:
import importlib
@@ -497,7 +495,7 @@ def load_agent_from_repository(from_repository: str) -> Dict[str, Any]:
else:
attributes[key].append(tool_value)
except Exception as e:
except Exception as e: # noqa: PERF203
raise AgentRepositoryError(
f"Tool {tool['name']} could not be loaded: {e}"
) from e

View File

@@ -1,17 +1,18 @@
import json
from typing import Any, Type
from typing import Any
import regex
from pydantic import BaseModel, ValidationError
from crewai.agents.parser import OutputParserException
from crewai.agents.parser import OutputParserError
"""Parser for converting text outputs into Pydantic models."""
class CrewPydanticOutputParser:
"""Parses text outputs into specified Pydantic models."""
pydantic_object: Type[BaseModel]
pydantic_object: type[BaseModel]
def parse_result(self, result: str) -> Any:
result = self._transform_in_valid_json(result)
@@ -27,7 +28,7 @@ class CrewPydanticOutputParser:
except ValidationError as e:
name = self.pydantic_object.__name__
msg = f"Failed to parse {name} from completion {json_object}. Got: {e}"
raise OutputParserException(error=msg)
raise OutputParserError(error=msg) from e
def _transform_in_valid_json(self, text) -> str:
text = text.replace("```", "").replace("json", "")
@@ -41,7 +42,7 @@ class CrewPydanticOutputParser:
# Return the first successfully parsed JSON object
json_obj = json.dumps(json_obj)
return str(json_obj)
except json.JSONDecodeError:
except json.JSONDecodeError: # noqa: PERF203
# If parsing fails, skip to the next match
continue
return text

View File

@@ -1,24 +1,24 @@
from typing import Any, Dict, List, Optional
from typing import Any
from crewai.agents.parser import AgentAction
from crewai.security import Fingerprint
from crewai.tools.structured_tool import CrewStructuredTool
from crewai.tools.tool_types import ToolResult
from crewai.tools.tool_usage import ToolUsage, ToolUsageErrorException
from crewai.tools.tool_usage import ToolUsage, ToolUsageError
from crewai.utilities.i18n import I18N
def execute_tool_and_check_finality(
agent_action: AgentAction,
tools: List[CrewStructuredTool],
tools: list[CrewStructuredTool],
i18n: I18N,
agent_key: Optional[str] = None,
agent_role: Optional[str] = None,
tools_handler: Optional[Any] = None,
task: Optional[Any] = None,
agent: Optional[Any] = None,
function_calling_llm: Optional[Any] = None,
fingerprint_context: Optional[Dict[str, str]] = None,
agent_key: str | None = None,
agent_role: str | None = None,
tools_handler: Any | None = None,
task: Any | None = None,
agent: Any | None = None,
function_calling_llm: Any | None = None,
fingerprint_context: dict[str, str] | None = None,
) -> ToolResult:
"""Execute a tool and check if the result should be treated as a final answer.
@@ -50,7 +50,7 @@ def execute_tool_and_check_finality(
fingerprint_obj = Fingerprint.from_dict(fingerprint_context)
agent.set_fingerprint(fingerprint_obj)
except Exception as e:
raise ValueError(f"Failed to set fingerprint: {e}")
raise ValueError(f"Failed to set fingerprint: {e}") from e
# Create tool usage instance
tool_usage = ToolUsage(
@@ -65,7 +65,7 @@ def execute_tool_and_check_finality(
# Parse tool calling
tool_calling = tool_usage.parse_tool_calling(agent_action.text)
if isinstance(tool_calling, ToolUsageErrorException):
if isinstance(tool_calling, ToolUsageError):
return ToolResult(tool_calling.message, False)
# Check if tool name matches