diff --git a/pyproject.toml b/pyproject.toml index b99c1e6ca..95b3fabcf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -131,7 +131,7 @@ select = [ "I001", # sort imports "I002", # remove unused imports ] -ignore = ["E501"] # ignore line too long +ignore = ["E501", "S101"] # ignore line too long and assert statements [tool.mypy] exclude = ["src/crewai/cli/templates", "tests"] diff --git a/src/crewai/agent.py b/src/crewai/agent.py index f46581db2..9793df894 100644 --- a/src/crewai/agent.py +++ b/src/crewai/agent.py @@ -1,17 +1,10 @@ import shutil import subprocess import time +from collections.abc import Callable, Sequence from typing import ( Any, - Callable, - Dict, - List, Literal, - Optional, - Sequence, - Tuple, - Type, - Union, ) from pydantic import Field, InstanceOf, PrivateAttr, model_validator @@ -19,12 +12,31 @@ from pydantic import Field, InstanceOf, PrivateAttr, model_validator from crewai.agents import CacheHandler from crewai.agents.agent_builder.base_agent import BaseAgent from crewai.agents.crew_agent_executor import CrewAgentExecutor +from crewai.events.event_bus import crewai_event_bus +from crewai.events.types.agent_events import ( + AgentExecutionCompletedEvent, + AgentExecutionErrorEvent, + AgentExecutionStartedEvent, +) +from crewai.events.types.knowledge_events import ( + KnowledgeQueryCompletedEvent, + KnowledgeQueryFailedEvent, + KnowledgeQueryStartedEvent, + KnowledgeRetrievalCompletedEvent, + KnowledgeRetrievalStartedEvent, + KnowledgeSearchQueryFailedEvent, +) +from crewai.events.types.memory_events import ( + MemoryRetrievalCompletedEvent, + MemoryRetrievalStartedEvent, +) from crewai.knowledge.knowledge import Knowledge from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource from crewai.knowledge.utils.knowledge_utils import extract_knowledge_context from crewai.lite_agent import LiteAgent, LiteAgentOutput from crewai.llm import BaseLLM from crewai.memory.contextual.contextual_memory import ContextualMemory +from crewai.responsibility.models import AgentCapability from crewai.security import Fingerprint from crewai.task import Task from crewai.tools import BaseTool @@ -38,28 +50,9 @@ from crewai.utilities.agent_utils import ( ) from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE from crewai.utilities.converter import generate_model_description -from crewai.events.types.agent_events import ( - AgentExecutionCompletedEvent, - AgentExecutionErrorEvent, - AgentExecutionStartedEvent, -) -from crewai.events.event_bus import crewai_event_bus -from crewai.events.types.memory_events import ( - MemoryRetrievalStartedEvent, - MemoryRetrievalCompletedEvent, -) -from crewai.events.types.knowledge_events import ( - KnowledgeQueryCompletedEvent, - KnowledgeQueryFailedEvent, - KnowledgeQueryStartedEvent, - KnowledgeRetrievalCompletedEvent, - KnowledgeRetrievalStartedEvent, - KnowledgeSearchQueryFailedEvent, -) from crewai.utilities.llm_utils import create_llm from crewai.utilities.token_counter_callback import TokenCalcHandler from crewai.utilities.training_handler import CrewTrainingHandler -from crewai.responsibility.models import AgentCapability class Agent(BaseAgent): @@ -88,36 +81,36 @@ class Agent(BaseAgent): """ _times_executed: int = PrivateAttr(default=0) - max_execution_time: Optional[int] = Field( + max_execution_time: int | None = Field( default=None, description="Maximum execution time for an agent to execute a task", ) agent_ops_agent_name: str = None # type: ignore # Incompatible types in assignment (expression has type "None", variable has type "str") agent_ops_agent_id: str = None # type: ignore # Incompatible types in assignment (expression has type "None", variable has type "str") - step_callback: Optional[Any] = Field( + step_callback: Any | None = Field( default=None, description="Callback to be executed after each step of the agent execution.", ) - use_system_prompt: Optional[bool] = Field( + use_system_prompt: bool | None = Field( default=True, description="Use system prompt for the agent.", ) - llm: Union[str, InstanceOf[BaseLLM], Any] = Field( + llm: str | InstanceOf[BaseLLM] | Any = Field( description="Language model that will run the agent.", default=None ) - function_calling_llm: Optional[Union[str, InstanceOf[BaseLLM], Any]] = Field( + function_calling_llm: str | InstanceOf[BaseLLM] | Any | None = Field( description="Language model that will run the agent.", default=None ) - system_template: Optional[str] = Field( + system_template: str | None = Field( default=None, description="System format for the agent." ) - prompt_template: Optional[str] = Field( + prompt_template: str | None = Field( default=None, description="Prompt format for the agent." ) - response_template: Optional[str] = Field( + response_template: str | None = Field( default=None, description="Response format for the agent." ) - allow_code_execution: Optional[bool] = Field( + allow_code_execution: bool | None = Field( default=False, description="Enable code execution for the agent." ) respect_context_window: bool = Field( @@ -148,38 +141,38 @@ class Agent(BaseAgent): default=False, description="Whether the agent should reflect and create a plan before executing a task.", ) - max_reasoning_attempts: Optional[int] = Field( + max_reasoning_attempts: int | None = Field( default=None, description="Maximum number of reasoning attempts before executing the task. If None, will try until ready.", ) - embedder: Optional[Dict[str, Any]] = Field( + embedder: dict[str, Any] | None = Field( default=None, description="Embedder configuration for the agent.", ) - agent_knowledge_context: Optional[str] = Field( + agent_knowledge_context: str | None = Field( default=None, description="Knowledge context for the agent.", ) - crew_knowledge_context: Optional[str] = Field( + crew_knowledge_context: str | None = Field( default=None, description="Knowledge context for the crew.", ) - knowledge_search_query: Optional[str] = Field( + knowledge_search_query: str | None = Field( default=None, description="Knowledge search query for the agent dynamically generated by the agent.", ) - from_repository: Optional[str] = Field( + from_repository: str | None = Field( default=None, description="The Agent's role to be used from your repository.", ) - guardrail: Optional[Union[Callable[[Any], Tuple[bool, Any]], str]] = Field( + guardrail: Callable[[Any], tuple[bool, Any]] | str | None = Field( default=None, description="Function or string description of a guardrail to validate agent output", ) guardrail_max_retries: int = Field( default=3, description="Maximum number of retries when guardrail fails" ) - capabilities: Optional[List[AgentCapability]] = Field( + capabilities: list[AgentCapability] | None = Field( default_factory=list, description="List of agent capabilities for responsibility tracking" ) @@ -217,7 +210,7 @@ class Agent(BaseAgent): def set_responsibility_system(self, responsibility_system) -> None: """Set the responsibility tracking system for this agent.""" self._responsibility_system = responsibility_system - + if self.capabilities: self._responsibility_system.register_agent(self, self.capabilities) @@ -226,19 +219,19 @@ class Agent(BaseAgent): if self.capabilities is None: self.capabilities = [] self.capabilities.append(capability) - + if self._responsibility_system: self._responsibility_system.hierarchy.add_agent(self, self.capabilities) - def get_capabilities(self) -> List[AgentCapability]: + def get_capabilities(self) -> list[AgentCapability]: """Get all capabilities for this agent.""" return self.capabilities or [] - + def get_responsibility_system(self): """Get the responsibility tracking system for this agent.""" return self._responsibility_system - def set_knowledge(self, crew_embedder: Optional[Dict[str, Any]] = None): + def set_knowledge(self, crew_embedder: dict[str, Any] | None = None): try: if self.embedder is None and crew_embedder: self.embedder = crew_embedder @@ -254,7 +247,7 @@ class Agent(BaseAgent): ) self.knowledge.add_sources() except (TypeError, ValueError) as e: - raise ValueError(f"Invalid Knowledge Configuration: {str(e)}") + raise ValueError(f"Invalid Knowledge Configuration: {e!s}") from e def _is_any_available_memory(self) -> bool: """Check if any memory is available.""" @@ -274,8 +267,8 @@ class Agent(BaseAgent): def execute_task( self, task: Task, - context: Optional[str] = None, - tools: Optional[List[BaseTool]] = None, + context: str | None = None, + tools: list[BaseTool] | None = None, ) -> str: """Execute a task with the agent. @@ -309,10 +302,10 @@ class Agent(BaseAgent): except Exception as e: if hasattr(self, "_logger"): self._logger.log( - "error", f"Error during reasoning process: {str(e)}" + "error", f"Error during reasoning process: {e!s}" ) else: - print(f"Error during reasoning process: {str(e)}") + print(f"Error during reasoning process: {e!s}") self._inject_date_to_task(task) @@ -555,14 +548,14 @@ class Agent(BaseAgent): try: return future.result(timeout=timeout) - except concurrent.futures.TimeoutError: + except concurrent.futures.TimeoutError as e: future.cancel() raise TimeoutError( f"Task '{task.description}' execution timed out after {timeout} seconds. Consider increasing max_execution_time or optimizing the task." - ) + ) from e except Exception as e: future.cancel() - raise RuntimeError(f"Task execution failed: {str(e)}") + raise RuntimeError(f"Task execution failed: {e!s}") from e def _execute_without_timeout(self, task_prompt: str, task: Task) -> str: """Execute a task without a timeout. @@ -584,14 +577,14 @@ class Agent(BaseAgent): )["output"] def create_agent_executor( - self, tools: Optional[List[BaseTool]] = None, task=None + self, tools: list[BaseTool] | None = None, task=None ) -> None: """Create an agent executor for the agent. Returns: An instance of the CrewAgentExecutor class. """ - raw_tools: List[BaseTool] = tools or self.tools or [] + raw_tools: list[BaseTool] = tools or self.tools or [] parsed_tools = parse_tools(raw_tools) prompt = Prompts( @@ -633,10 +626,9 @@ class Agent(BaseAgent): callbacks=[TokenCalcHandler(self._token_process)], ) - def get_delegation_tools(self, agents: List[BaseAgent]): + def get_delegation_tools(self, agents: list[BaseAgent]): agent_tools = AgentTools(agents=agents) - tools = agent_tools.tools() - return tools + return agent_tools.tools() def get_multimodal_tools(self) -> Sequence[BaseTool]: from crewai.tools.agent_tools.add_image_tool import AddImageTool @@ -684,7 +676,7 @@ class Agent(BaseAgent): ) return task_prompt - def _render_text_description(self, tools: List[Any]) -> str: + def _render_text_description(self, tools: list[Any]) -> str: """Render the tool name and description in plain text. Output will be in the format of: @@ -694,15 +686,13 @@ class Agent(BaseAgent): search: This tool is used for search calculator: This tool is used for math """ - description = "\n".join( + return "\n".join( [ f"Tool name: {tool.name}\nTool description:\n{tool.description}" for tool in tools ] ) - return description - def _inject_date_to_task(self, task): """Inject the current date into the task description if inject_date is enabled.""" if self.inject_date: @@ -730,9 +720,9 @@ class Agent(BaseAgent): task.description += f"\n\nCurrent Date: {current_date}" except Exception as e: if hasattr(self, "_logger"): - self._logger.log("warning", f"Failed to inject date: {str(e)}") + self._logger.log("warning", f"Failed to inject date: {e!s}") else: - print(f"Warning: Failed to inject date: {str(e)}") + print(f"Warning: Failed to inject date: {e!s}") def _validate_docker_installation(self) -> None: """Check if Docker is installed and running.""" @@ -748,10 +738,10 @@ class Agent(BaseAgent): stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) - except subprocess.CalledProcessError: + except subprocess.CalledProcessError as e: raise RuntimeError( f"Docker is not running. Please start Docker to use code execution with agent: {self.role}" - ) + ) from e def __repr__(self): return f"Agent(role={self.role}, goal={self.goal}, backstory={self.backstory})" @@ -826,8 +816,8 @@ class Agent(BaseAgent): def kickoff( self, - messages: Union[str, List[Dict[str, str]]], - response_format: Optional[Type[Any]] = None, + messages: str | list[dict[str, str]], + response_format: type[Any] | None = None, ) -> LiteAgentOutput: """ Execute the agent with the given messages using a LiteAgent instance. @@ -866,8 +856,8 @@ class Agent(BaseAgent): async def kickoff_async( self, - messages: Union[str, List[Dict[str, str]]], - response_format: Optional[Type[Any]] = None, + messages: str | list[dict[str, str]], + response_format: type[Any] | None = None, ) -> LiteAgentOutput: """ Execute the agent asynchronously with the given messages using a LiteAgent instance. diff --git a/src/crewai/agents/agent_builder/base_agent.py b/src/crewai/agents/agent_builder/base_agent.py index ba2596f63..fa0e0a529 100644 --- a/src/crewai/agents/agent_builder/base_agent.py +++ b/src/crewai/agents/agent_builder/base_agent.py @@ -1,8 +1,9 @@ import uuid from abc import ABC, abstractmethod +from collections.abc import Callable from copy import copy as shallow_copy from hashlib import md5 -from typing import Any, Callable, Dict, List, Optional, TypeVar +from typing import Any, TypeVar from pydantic import ( UUID4, @@ -25,7 +26,6 @@ from crewai.security.security_config import SecurityConfig from crewai.tools.base_tool import BaseTool, Tool from crewai.utilities import I18N, Logger, RPMController from crewai.utilities.config import process_config -from crewai.utilities.converter import Converter from crewai.utilities.string_utils import interpolate_only T = TypeVar("T", bound="BaseAgent") @@ -81,17 +81,17 @@ class BaseAgent(ABC, BaseModel): __hash__ = object.__hash__ # type: ignore _logger: Logger = PrivateAttr(default_factory=lambda: Logger(verbose=False)) - _rpm_controller: Optional[RPMController] = PrivateAttr(default=None) + _rpm_controller: RPMController | None = PrivateAttr(default=None) _request_within_rpm_limit: Any = PrivateAttr(default=None) - _original_role: Optional[str] = PrivateAttr(default=None) - _original_goal: Optional[str] = PrivateAttr(default=None) - _original_backstory: Optional[str] = PrivateAttr(default=None) + _original_role: str | None = PrivateAttr(default=None) + _original_goal: str | None = PrivateAttr(default=None) + _original_backstory: str | None = PrivateAttr(default=None) _token_process: TokenProcess = PrivateAttr(default_factory=TokenProcess) id: UUID4 = Field(default_factory=uuid.uuid4, frozen=True) role: str = Field(description="Role of the agent") goal: str = Field(description="Objective of the agent") backstory: str = Field(description="Backstory of the agent") - config: Optional[Dict[str, Any]] = Field( + config: dict[str, Any] | None = Field( description="Configuration for the agent", default=None, exclude=True ) cache: bool = Field( @@ -100,7 +100,7 @@ class BaseAgent(ABC, BaseModel): verbose: bool = Field( default=False, description="Verbose mode for the Agent Execution" ) - max_rpm: Optional[int] = Field( + max_rpm: int | None = Field( default=None, description="Maximum number of requests per minute for the agent execution to be respected.", ) @@ -108,7 +108,7 @@ class BaseAgent(ABC, BaseModel): default=False, description="Enable agent to delegate and ask questions among each other.", ) - tools: Optional[List[BaseTool]] = Field( + tools: list[BaseTool] | None = Field( default_factory=list, description="Tools at agents' disposal" ) max_iter: int = Field( @@ -122,27 +122,27 @@ class BaseAgent(ABC, BaseModel): ) crew: Any = Field(default=None, description="Crew to which the agent belongs.") i18n: I18N = Field(default=I18N(), description="Internationalization settings.") - cache_handler: Optional[InstanceOf[CacheHandler]] = Field( + cache_handler: InstanceOf[CacheHandler] | None = Field( default=None, description="An instance of the CacheHandler class." ) tools_handler: InstanceOf[ToolsHandler] = Field( default_factory=ToolsHandler, description="An instance of the ToolsHandler class.", ) - tools_results: List[Dict[str, Any]] = Field( + tools_results: list[dict[str, Any]] = Field( default=[], description="Results of the tools used by the agent." ) - max_tokens: Optional[int] = Field( + max_tokens: int | None = Field( default=None, description="Maximum number of tokens for the agent's execution." ) - knowledge: Optional[Knowledge] = Field( + knowledge: Knowledge | None = Field( default=None, description="Knowledge for the agent." ) - knowledge_sources: Optional[List[BaseKnowledgeSource]] = Field( + knowledge_sources: list[BaseKnowledgeSource] | None = Field( default=None, description="Knowledge sources for the agent.", ) - knowledge_storage: Optional[Any] = Field( + knowledge_storage: Any | None = Field( default=None, description="Custom knowledge storage for the agent.", ) @@ -150,13 +150,13 @@ class BaseAgent(ABC, BaseModel): default_factory=SecurityConfig, description="Security configuration for the agent, including fingerprinting.", ) - callbacks: List[Callable] = Field( + callbacks: list[Callable] = Field( default=[], description="Callbacks to be used for the agent" ) adapted_agent: bool = Field( default=False, description="Whether the agent is adapted" ) - knowledge_config: Optional[KnowledgeConfig] = Field( + knowledge_config: KnowledgeConfig | None = Field( default=None, description="Knowledge configuration for the agent such as limits and threshold", ) @@ -168,7 +168,7 @@ class BaseAgent(ABC, BaseModel): @field_validator("tools") @classmethod - def validate_tools(cls, tools: List[Any]) -> List[BaseTool]: + def validate_tools(cls, tools: list[Any]) -> list[BaseTool]: """Validate and process the tools provided to the agent. This method ensures that each tool is either an instance of BaseTool @@ -221,7 +221,7 @@ class BaseAgent(ABC, BaseModel): @field_validator("id", mode="before") @classmethod - def _deny_user_set_id(cls, v: Optional[UUID4]) -> None: + def _deny_user_set_id(cls, v: UUID4 | None) -> None: if v: raise PydanticCustomError( "may_not_set_field", "This field is not to be set by the user.", {} @@ -252,8 +252,8 @@ class BaseAgent(ABC, BaseModel): def execute_task( self, task: Any, - context: Optional[str] = None, - tools: Optional[List[BaseTool]] = None, + context: str | None = None, + tools: list[BaseTool] | None = None, ) -> str: pass @@ -262,9 +262,8 @@ class BaseAgent(ABC, BaseModel): pass @abstractmethod - def get_delegation_tools(self, agents: List["BaseAgent"]) -> List[BaseTool]: + def get_delegation_tools(self, agents: list["BaseAgent"]) -> list[BaseTool]: """Set the task tools that init BaseAgenTools class.""" - pass def copy(self: T) -> T: # type: ignore # Signature of "copy" incompatible with supertype "BaseModel" """Create a deep copy of the Agent.""" @@ -309,7 +308,7 @@ class BaseAgent(ABC, BaseModel): copied_data = self.model_dump(exclude=exclude) copied_data = {k: v for k, v in copied_data.items() if v is not None} - copied_agent = type(self)( + return type(self)( **copied_data, llm=existing_llm, tools=self.tools, @@ -318,9 +317,7 @@ class BaseAgent(ABC, BaseModel): knowledge_storage=copied_knowledge_storage, ) - return copied_agent - - def interpolate_inputs(self, inputs: Dict[str, Any]) -> None: + def interpolate_inputs(self, inputs: dict[str, Any]) -> None: """Interpolate inputs into the agent description and backstory.""" if self._original_role is None: self._original_role = self.role @@ -362,5 +359,8 @@ class BaseAgent(ABC, BaseModel): self._rpm_controller = rpm_controller self.create_agent_executor() - def set_knowledge(self, crew_embedder: Optional[Dict[str, Any]] = None): + def set_knowledge(self, crew_embedder: dict[str, Any] | None = None): pass + + def set_responsibility_system(self, responsibility_system: Any) -> None: + """Set the responsibility system for the agent.""" diff --git a/src/crewai/responsibility/accountability.py b/src/crewai/responsibility/accountability.py index 89d4b85da..dd4f26e02 100644 --- a/src/crewai/responsibility/accountability.py +++ b/src/crewai/responsibility/accountability.py @@ -36,7 +36,9 @@ class AccountabilityLogger: action_type=action_type, action_description=action_description, task_id=task_id, - context=context or {} + context=context or {}, + outcome=None, + success=None ) self.records.append(record) @@ -161,9 +163,9 @@ class AccountabilityLogger: records = [r for r in records if r.timestamp >= since] agent_id = "all_agents" - action_counts = defaultdict(int) - success_counts = defaultdict(int) - failure_counts = defaultdict(int) + action_counts: dict[str, int] = defaultdict(int) + success_counts: dict[str, int] = defaultdict(int) + failure_counts: dict[str, int] = defaultdict(int) for record in records: action_counts[record.action_type] += 1 @@ -172,7 +174,7 @@ class AccountabilityLogger: elif record.success is False: failure_counts[record.action_type] += 1 - success_rates = {} + success_rates: dict[str, float | None] = {} for action_type in action_counts: total = success_counts[action_type] + failure_counts[action_type] if total > 0: diff --git a/src/crewai/responsibility/assignment.py b/src/crewai/responsibility/assignment.py index a95c5cb0d..f9f56d600 100644 --- a/src/crewai/responsibility/assignment.py +++ b/src/crewai/responsibility/assignment.py @@ -53,8 +53,8 @@ class ResponsibilityCalculator: strategy: AssignmentStrategy = AssignmentStrategy.OPTIMAL ) -> list[ResponsibilityAssignment]: """Calculate assignment for tasks requiring multiple agents.""" - assignments = [] - used_agents = set() + assignments: list[ResponsibilityAssignment] = [] + used_agents: set[str] = set() sorted_requirements = sorted(requirements, key=lambda r: r.weight, reverse=True) @@ -104,7 +104,9 @@ class ResponsibilityCalculator: task_id=str(task.id), responsibility_score=score, capability_matches=matches, - reasoning=f"Greedy assignment: highest capability match score ({score:.3f})" + reasoning=f"Greedy assignment: highest capability match score ({score:.3f})", + completed_at=None, + success=None ) def _balanced_assignment( @@ -121,7 +123,7 @@ class ResponsibilityCalculator: best_agent = None best_score = -1.0 - best_matches = [] + best_matches: list[str] = [] for agent, capability_score in capable_agents: agent_id = self.hierarchy._get_agent_id(agent) @@ -146,7 +148,9 @@ class ResponsibilityCalculator: task_id=str(task.id), responsibility_score=best_score, capability_matches=best_matches, - reasoning=f"Balanced assignment: capability ({capability_score:.3f}) with workload consideration" + reasoning=f"Balanced assignment: capability ({capability_score:.3f}) with workload consideration", + completed_at=None, + success=None ) return None @@ -165,7 +169,7 @@ class ResponsibilityCalculator: best_agent = None best_score = -1.0 - best_matches = [] + best_matches: list[str] = [] for agent, capability_score in capable_agents: agent_id = self.hierarchy._get_agent_id(agent) @@ -189,7 +193,9 @@ class ResponsibilityCalculator: task_id=str(task.id), responsibility_score=best_score, capability_matches=best_matches, - reasoning=f"Optimal assignment: multi-factor optimization score ({best_score:.3f})" + reasoning=f"Optimal assignment: multi-factor optimization score ({best_score:.3f})", + completed_at=None, + success=None ) return None diff --git a/src/crewai/responsibility/hierarchy.py b/src/crewai/responsibility/hierarchy.py index 790018437..be16c1c13 100644 --- a/src/crewai/responsibility/hierarchy.py +++ b/src/crewai/responsibility/hierarchy.py @@ -152,7 +152,7 @@ class CapabilityHierarchy: def get_capability_distribution(self) -> dict[CapabilityType, dict[str, int]]: """Get distribution of capabilities across all agents.""" - distribution = defaultdict(lambda: defaultdict(int)) + distribution: dict[CapabilityType, dict[str, int]] = defaultdict(lambda: defaultdict(int)) for capabilities in self.agent_capabilities.values(): for capability in capabilities: diff --git a/src/crewai/responsibility/performance.py b/src/crewai/responsibility/performance.py index 1da7ce7da..e56a6000b 100644 --- a/src/crewai/responsibility/performance.py +++ b/src/crewai/responsibility/performance.py @@ -3,6 +3,7 @@ Performance-based capability adjustment system. """ from datetime import timedelta +from typing import Any from crewai.agents.agent_builder.base_agent import BaseAgent from crewai.responsibility.hierarchy import CapabilityHierarchy @@ -101,7 +102,7 @@ class PerformanceTracker: def identify_improvement_opportunities( self, agent: BaseAgent - ) -> list[dict[str, any]]: + ) -> list[dict[str, Any]]: """Identify areas where an agent could improve.""" agent_id = self._get_agent_id(agent) metrics = self.performance_metrics.get(agent_id) diff --git a/src/crewai/utilities/rpm_controller.py b/src/crewai/utilities/rpm_controller.py index ec59b8304..75ddb289b 100644 --- a/src/crewai/utilities/rpm_controller.py +++ b/src/crewai/utilities/rpm_controller.py @@ -1,6 +1,6 @@ import threading import time -from typing import Optional +from typing import Any from pydantic import BaseModel, Field, PrivateAttr, model_validator @@ -12,11 +12,11 @@ from crewai.utilities.logger import Logger class RPMController(BaseModel): """Manages requests per minute limiting.""" - max_rpm: Optional[int] = Field(default=None) + max_rpm: int | None = Field(default=None) logger: Logger = Field(default_factory=lambda: Logger(verbose=False)) _current_rpm: int = PrivateAttr(default=0) - _timer: Optional[threading.Timer] = PrivateAttr(default=None) - _lock: Optional[threading.Lock] = PrivateAttr(default=None) + _timer: Any = PrivateAttr(default=None) + _lock: Any = PrivateAttr(default=None) _shutdown_flag: bool = PrivateAttr(default=False) @model_validator(mode="after") @@ -35,7 +35,7 @@ class RPMController(BaseModel): if self.max_rpm is not None and self._current_rpm < self.max_rpm: self._current_rpm += 1 return True - elif self.max_rpm is not None: + if self.max_rpm is not None: self.logger.log( "info", "Max RPM reached, waiting for next minute to start." )