mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-08 15:48:29 +00:00
74 lines
2.2 KiB
Python
74 lines
2.2 KiB
Python
import threading
|
|
import time
|
|
from typing import Optional
|
|
|
|
from pydantic import BaseModel, Field, PrivateAttr, model_validator
|
|
|
|
from crewai.utilities.logger import Logger
|
|
|
|
|
|
class RPMController(BaseModel):
|
|
max_rpm: Optional[int] = Field(default=None)
|
|
logger: Logger = Field(default_factory=lambda: Logger(verbose=False))
|
|
_current_rpm: int = PrivateAttr(default=0)
|
|
_timer: Optional[threading.Timer] = PrivateAttr(default=None)
|
|
_lock: Optional[threading.Lock] = PrivateAttr(default=None)
|
|
_shutdown_flag: bool = PrivateAttr(default=False)
|
|
|
|
@model_validator(mode="after")
|
|
def reset_counter(self):
|
|
if self.max_rpm is not None:
|
|
if not self._shutdown_flag:
|
|
self._lock = threading.Lock()
|
|
self._reset_request_count()
|
|
return self
|
|
|
|
def check_or_wait(self):
|
|
if self.max_rpm is None:
|
|
return True
|
|
|
|
def _check_and_increment():
|
|
if self.max_rpm is not None and self._current_rpm < self.max_rpm:
|
|
self._current_rpm += 1
|
|
return True
|
|
elif self.max_rpm is not None:
|
|
self.logger.log(
|
|
"info", "Max RPM reached, waiting for next minute to start."
|
|
)
|
|
self._wait_for_next_minute()
|
|
self._current_rpm = 1
|
|
return True
|
|
return True
|
|
|
|
if self._lock:
|
|
with self._lock:
|
|
return _check_and_increment()
|
|
else:
|
|
return _check_and_increment()
|
|
|
|
def stop_rpm_counter(self):
|
|
if self._timer:
|
|
self._timer.cancel()
|
|
self._timer = None
|
|
|
|
def _wait_for_next_minute(self):
|
|
time.sleep(60)
|
|
self._current_rpm = 0
|
|
|
|
def _reset_request_count(self):
|
|
def _reset():
|
|
self._current_rpm = 0
|
|
if not self._shutdown_flag:
|
|
self._timer = threading.Timer(60.0, self._reset_request_count)
|
|
self._timer.start()
|
|
|
|
if self._lock:
|
|
with self._lock:
|
|
_reset()
|
|
else:
|
|
_reset()
|
|
|
|
if self._timer:
|
|
self._shutdown_flag = True
|
|
self._timer.cancel()
|