FBChatBot / app /gemini_client.py
VietCat's picture
fix metadata
6723e05
from google.generativeai.embedding import embed_content
from google.generativeai.client import configure
from google.generativeai.generative_models import GenerativeModel
from loguru import logger
from .request_limit_manager import RequestLimitManager
from typing import List, Optional
class GeminiClient:
def __init__(self):
self.limit_manager = RequestLimitManager("gemini")
self._cached_model = None
self._cached_key = None
self._cached_model_instance = None
def _get_model_instance(self, key: str, model: str):
"""
Cache model instance để tránh recreate mỗi lần.
"""
if (self._cached_key == key and
self._cached_model == model and
self._cached_model_instance is not None):
return self._cached_model_instance
# Configure và tạo model instance mới
configure(api_key=key)
self._cached_model_instance = GenerativeModel(model)
self._cached_key = key
self._cached_model = model
logger.info(f"[GEMINI] Created new model instance for key={key[:5]}...{key[-5:]} model={model}")
return self._cached_model_instance
def _clear_cache_if_needed(self, new_key: str, new_model: str):
"""
Chỉ clear cache khi key/model thực sự thay đổi.
"""
if (self._cached_key != new_key or self._cached_model != new_model):
logger.info(f"[GEMINI] Clearing cache due to key/model change: {self._cached_key}->{new_key}, {self._cached_model}->{new_model}")
self._cached_model_instance = None
self._cached_key = None
self._cached_model = None
def generate_text(self, prompt: str, **kwargs) -> str:
last_error = None
max_retries = 3
for attempt in range(max_retries):
try:
# Lấy current key/model từ manager
key, model = self.limit_manager.get_current_key_model()
# Sử dụng cached model instance
_model = self._get_model_instance(key, model)
response = _model.generate_content(prompt, **kwargs)
self.limit_manager.log_request(key, model, success=True)
if hasattr(response, 'usage_metadata'):
logger.info(f"[GEMINI][USAGE] Prompt Token Count: {response.usage_metadata.prompt_token_count} - Candidate Token Count: {response.usage_metadata.candidates_token_count} - Total Token Count: {response.usage_metadata.total_token_count}")
if hasattr(response, 'text'):
logger.info(f"[GEMINI][TEXT_RESPONSE] {response.text}")
return response.text
elif hasattr(response, 'candidates') and response.candidates:
logger.info(f"[GEMINI][CANDIDATES_RESPONSE] {response.candidates[0].content.parts[0].text}")
return response.candidates[0].content.parts[0].text
logger.info(f"[GEMINI][RAW_RESPONSE] {response}")
return str(response)
except Exception as e:
import re
msg = str(e)
if "429" in msg or "rate limit" in msg.lower():
retry_delay = 60
m = re.search(r'retry_delay.*?seconds: (\d+)', msg)
if m:
retry_delay = int(m.group(1))
# Log failure với key/model thực tế đang được sử dụng
self.limit_manager.log_request(key, model, success=False, retry_delay=retry_delay)
# Chỉ clear cache nếu key/model thay đổi
# Không clear cache ngay lập tức để tránh recreate không cần thiết
logger.warning(f"[GEMINI] Rate limit hit, will retry with new key/model (attempt {attempt + 1}/{max_retries})")
last_error = e
continue
else:
# Lỗi khác không phải rate limit
logger.error(f"[GEMINI] Error generating text: {e}")
last_error = e
break
raise last_error or RuntimeError("No available Gemini API key/model")
def count_tokens(self, prompt: str) -> int:
try:
key, model = self.limit_manager.get_current_key_model()
_model = self._get_model_instance(key, model)
return _model.count_tokens(prompt).total_tokens
except Exception as e:
logger.error(f"[GEMINI] Error counting tokens: {e}")
return 0
def create_embedding(self, text: str, model: Optional[str] = None, task_type: str = "retrieval_query") -> list:
last_error = None
max_retries = 3
for attempt in range(max_retries):
try:
key, default_model = self.limit_manager.get_current_key_model()
# Ưu tiên model được truyền vào parameter, chỉ fallback về default_model nếu không có
use_model = model if model and model.strip() else default_model
if not use_model:
raise ValueError("No model specified for embedding")
logger.info(f"[GEMINI][EMBEDDING] Using model={use_model} (requested={model}, default={default_model}), task_type={task_type}")
configure(api_key=key)
response = embed_content(
model=use_model,
content=text,
task_type=task_type
)
self.limit_manager.log_request(key, use_model, success=True)
logger.info(f"[GEMINI][EMBEDDING][RAW_RESPONSE] {response['embedding'][:10]} ..... {response['embedding'][-10:]}")
return response['embedding']
except Exception as e:
import re
msg = str(e)
if "429" in msg or "rate limit" in msg.lower():
retry_delay = 60
m_retry = re.search(r'retry_delay.*?seconds: (\d+)', msg)
if m_retry:
retry_delay = int(m_retry.group(1))
# Log failure và trigger scan cho key/model mới
self.limit_manager.log_request(key, use_model, success=False, retry_delay=retry_delay)
logger.warning(f"[GEMINI] Rate limit hit in embedding, will retry with new key/model (attempt {attempt + 1}/{max_retries})")
last_error = e
continue
else:
logger.error(f"[GEMINI] Error creating embedding: {e}")
last_error = e
break
raise last_error or RuntimeError("No available Gemini API key/model")