|
import time |
|
import threading |
|
from typing import Dict, List, Tuple, Optional, Iterator, Union |
|
from app.config import get_settings |
|
from loguru import logger |
|
|
|
class RequestLimitManager: |
|
_instance = None |
|
_lock = threading.Lock() |
|
|
|
def __new__(cls, provider: str): |
|
if cls._instance is None: |
|
with cls._lock: |
|
if cls._instance is None: |
|
cls._instance = super().__new__(cls) |
|
return cls._instance |
|
|
|
def __init__(self, provider: str): |
|
if hasattr(self, 'initialized'): |
|
return |
|
self.provider = provider |
|
self.lock = threading.Lock() |
|
self._init_keys_models() |
|
self.initialized = True |
|
|
|
def _init_keys_models(self): |
|
settings = get_settings() |
|
if self.provider == "gemini": |
|
self.api_keys: List[str] = getattr(settings, 'gemini_api_keys_list', []) |
|
self.models: List[str] = getattr(settings, 'gemini_models_list', []) |
|
|
|
self.status: Dict[str, Dict[str, Dict[str, Union[str, float]]]] = {} |
|
now = time.time() |
|
for key in self.api_keys: |
|
self.status[key] = {} |
|
for model in self.models: |
|
self.status[key][model] = {"status": "active", "timestamp": now} |
|
self.current_key: Optional[str] = self.api_keys[0] if self.api_keys else None |
|
self.current_model: Optional[str] = self.models[0] if self.models else None |
|
key_display = f"{self.current_key[:5]}...{self.current_key[-5:]}" if self.current_key else "None" |
|
logger.info(f"[LIMIT] Initialized with current key={key_display} model={self.current_model}") |
|
|
|
def get_current_key_model(self) -> Tuple[str, str]: |
|
""" |
|
Trả về cặp key/model hiện tại đang active. |
|
Chỉ scan tìm key/model mới khi current pair bị blocked. |
|
""" |
|
with self.lock: |
|
now = time.time() |
|
|
|
|
|
if self.current_key and self.current_model: |
|
info = self.status.get(self.current_key, {}).get(self.current_model, {}) |
|
status = info.get("status", "active") |
|
ts = float(info.get("timestamp", 0.0)) |
|
|
|
if status == "active" or (status == "blocked" and now > ts): |
|
logger.info(f"[LIMIT] Using current key={self.current_key[:5]}...{self.current_key[-5:]} model={self.current_model}") |
|
return self.current_key, self.current_model |
|
|
|
|
|
logger.warning(f"[LIMIT] Current pair not available, scanning for new key/model...") |
|
new_key, new_model = self._find_available_key_model() |
|
|
|
if new_key and new_model: |
|
self.current_key = new_key |
|
self.current_model = new_model |
|
logger.info(f"[LIMIT] Switched to new key={self.current_key[:5]}...{self.current_key[-5:]} model={self.current_model}") |
|
return self.current_key, self.current_model |
|
else: |
|
logger.error(f"[LIMIT] No available key/model found for provider {self.provider}") |
|
raise RuntimeError(f"No available key/model for provider {self.provider}") |
|
|
|
def _find_available_key_model(self) -> Tuple[Optional[str], Optional[str]]: |
|
""" |
|
Tìm cặp key/model khả dụng gần nhất. |
|
""" |
|
now = time.time() |
|
keys = self.api_keys[:] |
|
models = self.models[:] |
|
|
|
|
|
if self.current_key and self.current_key in keys: |
|
keys.remove(self.current_key) |
|
keys = [self.current_key] + keys |
|
if self.current_model and self.current_model in models: |
|
models.remove(self.current_model) |
|
models = [self.current_model] + models |
|
|
|
for key in keys: |
|
for model in models: |
|
info = self.status.get(key, {}).get(model, {"status": "active", "timestamp": 0.0}) |
|
status = info.get("status", "active") |
|
ts = float(info.get("timestamp", 0.0)) |
|
|
|
if status == "active" or (status == "blocked" and now > ts): |
|
logger.info(f"[LIMIT] Found available key={key[:5]}...{key[-5:]} model={model}") |
|
return key, model |
|
|
|
return None, None |
|
|
|
def log_request(self, key: str, model: str, success: bool, retry_delay: Optional[int] = None): |
|
""" |
|
Log kết quả request và cập nhật status. |
|
Nếu request fail với 429, trigger scan cho key/model mới. |
|
""" |
|
with self.lock: |
|
now = time.time() |
|
if key not in self.status: |
|
self.status[key] = {} |
|
if model not in self.status[key]: |
|
self.status[key][model] = {"status": "active", "timestamp": now} |
|
|
|
if success: |
|
logger.info(f"[LIMIT] Mark key={key[:5]}...{key[-5:]} - model={model} as active at {now}") |
|
self.status[key][model]["status"] = "active" |
|
self.status[key][model]["timestamp"] = now |
|
else: |
|
logger.warning(f"[LIMIT] Mark key={key[:5]}...{key[-5:]} - model={model} as blocked until {now + (retry_delay or 60)} (retry_delay={retry_delay})") |
|
self.status[key][model]["status"] = "blocked" |
|
self.status[key][model]["timestamp"] = now + (retry_delay or 60) |
|
|
|
|
|
|
|
if key == self.current_key and model == self.current_model: |
|
logger.warning(f"[LIMIT] Current pair blocked, will scan for new pair on next request") |
|
|
|
|
|
def iterate_key_model(self) -> Iterator[Tuple[str, str]]: |
|
""" |
|
Legacy method - chỉ trả về current pair. |
|
Để tương thích với code cũ. |
|
""" |
|
key, model = self.get_current_key_model() |
|
yield key, model |