import time import threading from typing import Dict, List, Tuple, Optional, Iterator, Union from app.config import get_settings from loguru import logger class RequestLimitManager: _instance = None _lock = threading.Lock() def __new__(cls, provider: str): if cls._instance is None: with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance def __init__(self, provider: str): if hasattr(self, 'initialized'): return self.provider = provider self.lock = threading.Lock() self._init_keys_models() self.initialized = True def _init_keys_models(self): settings = get_settings() if self.provider == "gemini": self.api_keys: List[str] = getattr(settings, 'gemini_api_keys_list', []) self.models: List[str] = getattr(settings, 'gemini_models_list', []) # Có thể mở rộng cho provider khác ở đây self.status: Dict[str, Dict[str, Dict[str, Union[str, float]]]] = {} now = time.time() for key in self.api_keys: self.status[key] = {} for model in self.models: self.status[key][model] = {"status": "active", "timestamp": now} self.current_key: Optional[str] = self.api_keys[0] if self.api_keys else None self.current_model: Optional[str] = self.models[0] if self.models else None key_display = f"{self.current_key[:5]}...{self.current_key[-5:]}" if self.current_key else "None" logger.info(f"[LIMIT] Initialized with current key={key_display} model={self.current_model}") def get_current_key_model(self) -> Tuple[str, str]: """ Trả về cặp key/model hiện tại đang active. Chỉ scan tìm key/model mới khi current pair bị blocked. """ with self.lock: now = time.time() # Check if current pair is still available if self.current_key and self.current_model: info = self.status.get(self.current_key, {}).get(self.current_model, {}) status = info.get("status", "active") ts = float(info.get("timestamp", 0.0)) if status == "active" or (status == "blocked" and now > ts): logger.info(f"[LIMIT] Using current key={self.current_key[:5]}...{self.current_key[-5:]} model={self.current_model}") return self.current_key, self.current_model # Current pair not available, scan for new one logger.warning(f"[LIMIT] Current pair not available, scanning for new key/model...") new_key, new_model = self._find_available_key_model() if new_key and new_model: self.current_key = new_key self.current_model = new_model logger.info(f"[LIMIT] Switched to new key={self.current_key[:5]}...{self.current_key[-5:]} model={self.current_model}") return self.current_key, self.current_model else: logger.error(f"[LIMIT] No available key/model found for provider {self.provider}") raise RuntimeError(f"No available key/model for provider {self.provider}") def _find_available_key_model(self) -> Tuple[Optional[str], Optional[str]]: """ Tìm cặp key/model khả dụng gần nhất. """ now = time.time() keys = self.api_keys[:] models = self.models[:] # Ưu tiên default key/model nếu có if self.current_key and self.current_key in keys: keys.remove(self.current_key) keys = [self.current_key] + keys if self.current_model and self.current_model in models: models.remove(self.current_model) models = [self.current_model] + models for key in keys: for model in models: info = self.status.get(key, {}).get(model, {"status": "active", "timestamp": 0.0}) status = info.get("status", "active") ts = float(info.get("timestamp", 0.0)) if status == "active" or (status == "blocked" and now > ts): logger.info(f"[LIMIT] Found available key={key[:5]}...{key[-5:]} model={model}") return key, model return None, None def log_request(self, key: str, model: str, success: bool, retry_delay: Optional[int] = None): """ Log kết quả request và cập nhật status. Nếu request fail với 429, trigger scan cho key/model mới. """ with self.lock: now = time.time() if key not in self.status: self.status[key] = {} if model not in self.status[key]: self.status[key][model] = {"status": "active", "timestamp": now} if success: logger.info(f"[LIMIT] Mark key={key[:5]}...{key[-5:]} - model={model} as active at {now}") self.status[key][model]["status"] = "active" self.status[key][model]["timestamp"] = now else: logger.warning(f"[LIMIT] Mark key={key[:5]}...{key[-5:]} - model={model} as blocked until {now + (retry_delay or 60)} (retry_delay={retry_delay})") self.status[key][model]["status"] = "blocked" self.status[key][model]["timestamp"] = now + (retry_delay or 60) # Chỉ clear current pair nếu chính xác là pair đang được sử dụng # Không clear ngay lập tức để tránh trigger scan không cần thiết if key == self.current_key and model == self.current_model: logger.warning(f"[LIMIT] Current pair blocked, will scan for new pair on next request") # Không clear ngay lập tức, để get_current_key_model() tự xử lý def iterate_key_model(self) -> Iterator[Tuple[str, str]]: """ Legacy method - chỉ trả về current pair. Để tương thích với code cũ. """ key, model = self.get_current_key_model() yield key, model