|
import time |
|
import random |
|
from functools import wraps |
|
from loguru import logger |
|
from typing import Any, Callable, List |
|
import os |
|
import asyncio |
|
import httpx |
|
|
|
def timing_decorator_async(func: Callable) -> Callable: |
|
""" |
|
Decorator đo thời gian thực thi của hàm async, log thời lượng xử lý. |
|
Dùng cho async def. |
|
""" |
|
@wraps(func) |
|
async def wrapper(*args: Any, **kwargs: Any) -> Any: |
|
start_time = time.time() |
|
result = await func(*args, **kwargs) |
|
end_time = time.time() |
|
duration = end_time - start_time |
|
logger.info(f"[TIMING][async] {func.__name__} took {duration:.2f} seconds to execute") |
|
return result |
|
return wrapper |
|
|
|
def timing_decorator_sync(func: Callable) -> Callable: |
|
""" |
|
Decorator đo thời gian thực thi của hàm sync, log thời lượng xử lý. |
|
Dùng cho def thường. |
|
""" |
|
@wraps(func) |
|
def wrapper(*args: Any, **kwargs: Any) -> Any: |
|
start_time = time.time() |
|
result = func(*args, **kwargs) |
|
end_time = time.time() |
|
duration = end_time - start_time |
|
logger.info(f"[TIMING][sync] {func.__name__} took {duration:.2f} seconds to execute") |
|
return result |
|
return wrapper |
|
|
|
def setup_logging(log_level: str = "INFO") -> None: |
|
""" |
|
Thiết lập logging với loguru, log ra file và console. |
|
Input: log_level (str) - mức log. |
|
Output: None. |
|
""" |
|
logger.remove() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logger.add( |
|
lambda msg: print(msg), |
|
level=log_level, |
|
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}", |
|
) |
|
|
|
def extract_command(text: str) -> tuple[str, str]: |
|
""" |
|
Tách lệnh (bắt đầu bằng \) và phần còn lại từ message. |
|
Input: text (str) - message từ user. |
|
Output: (command, remaining_text) - tuple (str, str). |
|
""" |
|
if not text.startswith("\\"): |
|
return "", text |
|
|
|
parts = text.split(maxsplit=1) |
|
command = parts[0][1:] |
|
remaining = parts[1] if len(parts) > 1 else "" |
|
return command, remaining |
|
|
|
def extract_keywords(text: str, keywords: list[str]) -> list[str]: |
|
""" |
|
Tìm các từ khóa xuất hiện trong message. |
|
Input: text (str), keywords (list[str]) |
|
Output: list[str] các từ khóa tìm thấy. |
|
""" |
|
return [keyword for keyword in keywords if keyword.lower() in text.lower()] |
|
|
|
def ensure_log_dir(): |
|
""" |
|
Đảm bảo thư mục logs tồn tại, tạo nếu chưa có. |
|
Input: None |
|
Output: None |
|
""" |
|
|
|
|
|
def validate_config(settings) -> None: |
|
""" |
|
Kiểm tra các biến môi trường/config bắt buộc, raise lỗi nếu thiếu. |
|
Input: settings (Settings) |
|
Output: None (raise RuntimeError nếu thiếu) |
|
""" |
|
missing = [] |
|
|
|
if not getattr(settings, 'facebook_verify_token', None): |
|
missing.append('facebook_verify_token') |
|
if not getattr(settings, 'facebook_app_secret', None): |
|
missing.append('facebook_app_secret') |
|
|
|
if not (getattr(settings, 'google_sheets_credentials_file', None) or os.getenv("GOOGLE_SHEETS_CREDENTIALS_JSON")): |
|
missing.append('google_sheets_credentials_file or GOOGLE_SHEETS_CREDENTIALS_JSON') |
|
if not getattr(settings, 'google_sheets_token_file', None): |
|
missing.append('google_sheets_token_file') |
|
if not getattr(settings, 'conversation_sheet_id', None): |
|
missing.append('conversation_sheet_id') |
|
|
|
if not getattr(settings, 'supabase_url', None): |
|
missing.append('supabase_url') |
|
if not getattr(settings, 'supabase_key', None): |
|
missing.append('supabase_key') |
|
if missing: |
|
raise RuntimeError(f"Missing config: {', '.join(missing)}") |
|
|
|
def get_logger(): |
|
return logger |
|
|
|
async def call_endpoint_with_retry(client, url, payload, max_retries=3, base_timeout=30, headers=None): |
|
logger = get_logger() |
|
timeout = base_timeout |
|
for attempt in range(1, max_retries + 1): |
|
try: |
|
response = await client.post(url, json=payload, timeout=timeout, headers=headers) |
|
response.raise_for_status() |
|
return response |
|
except httpx.TimeoutException as e: |
|
if attempt == max_retries: |
|
raise |
|
else: |
|
logger.warning(f"Timeout (attempt {attempt}/{max_retries}), retrying with timeout={timeout * 2}s...") |
|
timeout *= 2 |
|
await asyncio.sleep(1) |
|
except httpx.HTTPStatusError as e: |
|
logger.error(f"HTTP error: {e.response.status_code} - {e.response.text}") |
|
raise |
|
except Exception as e: |
|
logger.error(f"Other error: {e}") |
|
raise |
|
|
|
def get_random_message(message_list: List[str]) -> str: |
|
""" |
|
Lấy ngẫu nhiên một message từ danh sách messages. |
|
|
|
Args: |
|
message_list (List[str]): Danh sách các messages có sẵn |
|
|
|
Returns: |
|
str: Message được chọn ngẫu nhiên, hoặc message mặc định nếu danh sách rỗng |
|
|
|
Example: |
|
>>> messages = ["Message 1", "Message 2", "Message 3"] |
|
>>> get_random_message(messages) |
|
"Message 2" # hoặc message khác ngẫu nhiên |
|
""" |
|
if not message_list: |
|
return "Đang xử lý..." |
|
|
|
return random.choice(message_list) |