import hmac import hashlib import json from typing import Any, Dict, Optional import httpx from fastapi import HTTPException, Request from loguru import logger from .utils import timing_decorator_async, timing_decorator_sync class FacebookClient: def __init__(self, app_secret: str, page_id: Optional[str] = None, page_token: Optional[str] = None, sender_id: Optional[str] = None): """ Khởi tạo FacebookClient với app_secret. Input: app_secret (str) - Facebook App Secret. Output: FacebookClient instance. """ self.app_secret = app_secret self._client = httpx.AsyncClient() self.page_id = page_id self.page_token = page_token self.sender_id = sender_id def update_context(self, page_id: Optional[str] = None, page_token: Optional[str] = None, sender_id: Optional[str] = None): """ Cập nhật các thông tin context (page_id, page_token, sender_id) của client. Input: page_id (str), page_token (str), sender_id (str) Output: None """ if page_id is not None: self.page_id = page_id if page_token is not None: self.page_token = page_token if sender_id is not None: self.sender_id = sender_id @timing_decorator_async async def verify_webhook(self, token: str, challenge: str, verify_token: str) -> int: """ Xác thực webhook Facebook bằng verify_token và trả về challenge. Input: token (str), challenge (str), verify_token (str) Output: int (challenge nếu thành công, lỗi nếu thất bại) """ if token != verify_token: raise HTTPException(status_code=403, detail="Invalid verify token") return int(challenge) def verify_signature(self, request: Request, payload: bytes) -> bool: """ Kiểm tra chữ ký X-Hub-Signature-256 để xác thực request từ Facebook. Input: request (Request), payload (bytes) Output: bool (True nếu hợp lệ, False nếu không) """ signature = request.headers.get("X-Hub-Signature-256", "") if not signature.startswith("sha256="): return False expected = hmac.new( self.app_secret.encode(), payload, hashlib.sha256 ).hexdigest() return hmac.compare_digest(signature[7:], expected) def format_message(self, text: str) -> str: # 1. Thay bullet markdown bằng ký hiệu khác text = text.replace('\n* ', '\n- ') text = text.replace('\n * ', '\n + ') text = text.replace('\n* ', '\n- ') text = text.replace('\n * ', '\n + ') # 2. Chuyển **text** hoặc __text__ thành *text* import re text = re.sub(r'\*\*([^\*]+)\*\*', r'*\1*', text) text = re.sub(r'__([^_]+)__', r'*\1*', text) # 3. Loại bỏ các tiêu đề markdown kiểu #, ##, ###, ... text = re.sub(r'^#+\s+', '', text, flags=re.MULTILINE) # 4. Rút gọn nhiều dòng trống liên tiếp thành 1 dòng trống text = re.sub(r'\n{3,}', '\n\n', text) # 5. Loại bỏ các markdown không hỗ trợ khác nếu cần return text def split_message(self, text: str, max_length: int = 2000) -> list: """ Chia message thành các đoạn <= max_length ký tự, ưu tiên chia theo dòng. """ lines = text.split('\n') messages = [] current = "" for line in lines: # +1 cho ký tự xuống dòng if len(current) + len(line) + 1 > max_length: messages.append(current.rstrip()) current = "" current += (line + '\n') if current.strip(): messages.append(current.rstrip()) return messages @timing_decorator_async async def send_message(self, page_access_token: Optional[str] = None, recipient_id: Optional[str] = None, message: str = "") -> dict: page_access_token = page_access_token or self.page_token recipient_id = recipient_id or self.sender_id if not page_access_token or not recipient_id: raise ValueError("FacebookClient: page_access_token and recipient_id must not be None when sending a message.") # Format message response_to_send = self.format_message(message.replace('**', '*')) if isinstance(message, str) else message # Chia nhỏ nếu quá dài messages = self.split_message(response_to_send) results = [] for msg in messages: if len(msg) > 2000: msg = msg[:2000] # fallback cắt cứng url = f"https://graph.facebook.com/v18.0/me/messages?access_token={page_access_token}" payload = { "recipient": {"id": recipient_id}, "message": {"text": msg} } try: response = await self._client.post(url, json=payload) response.raise_for_status() results.append(response.json()) except httpx.HTTPError as e: logger.error(f"Error sending message to Facebook: {e}") raise HTTPException(status_code=500, detail="Failed to send message to Facebook") return results[0] if results else {} @timing_decorator_sync def parse_message(self, body: Dict[str, Any]) -> Optional[Dict[str, Any]]: """ Parse message từ payload Facebook webhook. Input: body (dict) - payload JSON từ Facebook. Output: dict chứa sender_id, page_id, timestamp, text, attachments hoặc None nếu lỗi. """ try: entry = body["entry"][0] messaging = entry["messaging"][0] sender_id = messaging["sender"]["id"] recipient_id = messaging["recipient"]["id"] timestamp = messaging["timestamp"] message_data = { "sender_id": sender_id, "page_id": recipient_id, "timestamp": timestamp, "text": None, "attachments": [] } if "message" in messaging: message = messaging["message"] if "text" in message: message_data["text"] = message["text"] if "attachments" in message: message_data["attachments"] = message["attachments"] return message_data except (KeyError, IndexError) as e: logger.error(f"Error parsing Facebook message: {e}") return None