mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-11 17:18:29 +00:00
60 lines
1.7 KiB
Python
60 lines
1.7 KiB
Python
import threading
|
|
import time
|
|
from typing import Union
|
|
|
|
from pydantic import BaseModel, ConfigDict, Field, PrivateAttr, model_validator
|
|
|
|
from crewai.utilities.logger import Logger
|
|
|
|
|
|
class RPMController(BaseModel):
|
|
model_config = ConfigDict(arbitrary_types_allowed=True)
|
|
max_rpm: Union[int, None] = Field(default=None)
|
|
logger: Logger = Field(default=None)
|
|
_current_rpm: int = PrivateAttr(default=0)
|
|
_timer: threading.Timer | None = PrivateAttr(default=None)
|
|
_lock: threading.Lock = PrivateAttr(default=None)
|
|
_shutdown_flag = False
|
|
|
|
@model_validator(mode="after")
|
|
def reset_counter(self):
|
|
if self.max_rpm:
|
|
if not self._shutdown_flag:
|
|
self._lock = threading.Lock()
|
|
self._reset_request_count()
|
|
return self
|
|
|
|
def check_or_wait(self):
|
|
if not self.max_rpm:
|
|
return True
|
|
|
|
with self._lock:
|
|
if self._current_rpm < self.max_rpm:
|
|
self._current_rpm += 1
|
|
return True
|
|
else:
|
|
self.logger.log(
|
|
"info", "Max RPM reached, waiting for next minute to start."
|
|
)
|
|
self._wait_for_next_minute()
|
|
self._current_rpm = 1
|
|
return True
|
|
|
|
def stop_rpm_counter(self):
|
|
if self._timer:
|
|
self._timer.cancel()
|
|
self._timer = None
|
|
|
|
def _wait_for_next_minute(self):
|
|
time.sleep(60)
|
|
self._current_rpm = 0
|
|
|
|
def _reset_request_count(self):
|
|
with self._lock:
|
|
self._current_rpm = 0
|
|
if self._timer:
|
|
self._shutdown_flag = True
|
|
self._timer.cancel()
|
|
self._timer = threading.Timer(60.0, self._reset_request_count)
|
|
self._timer.start()
|