|
import hmac |
|
import hashlib |
|
import json |
|
from typing import Any, Dict, Optional |
|
import httpx |
|
from fastapi import HTTPException, Request |
|
from loguru import logger |
|
|
|
from .utils import timing_decorator_async, timing_decorator_sync |
|
|
|
class FacebookClient: |
|
def __init__(self, app_secret: str, page_id: Optional[str] = None, page_token: Optional[str] = None, sender_id: Optional[str] = None): |
|
""" |
|
Khởi tạo FacebookClient với app_secret. |
|
Input: app_secret (str) - Facebook App Secret. |
|
Output: FacebookClient instance. |
|
""" |
|
self.app_secret = app_secret |
|
self._client = httpx.AsyncClient() |
|
self.page_id = page_id |
|
self.page_token = page_token |
|
self.sender_id = sender_id |
|
|
|
def update_context(self, page_id: Optional[str] = None, page_token: Optional[str] = None, sender_id: Optional[str] = None): |
|
""" |
|
Cập nhật các thông tin context (page_id, page_token, sender_id) của client. |
|
Input: page_id (str), page_token (str), sender_id (str) |
|
Output: None |
|
""" |
|
if page_id is not None: |
|
self.page_id = page_id |
|
if page_token is not None: |
|
self.page_token = page_token |
|
if sender_id is not None: |
|
self.sender_id = sender_id |
|
|
|
@timing_decorator_async |
|
async def verify_webhook(self, token: str, challenge: str, verify_token: str) -> int: |
|
""" |
|
Xác thực webhook Facebook bằng verify_token và trả về challenge. |
|
Input: token (str), challenge (str), verify_token (str) |
|
Output: int (challenge nếu thành công, lỗi nếu thất bại) |
|
""" |
|
if token != verify_token: |
|
raise HTTPException(status_code=403, detail="Invalid verify token") |
|
return int(challenge) |
|
|
|
def verify_signature(self, request: Request, payload: bytes) -> bool: |
|
""" |
|
Kiểm tra chữ ký X-Hub-Signature-256 để xác thực request từ Facebook. |
|
Input: request (Request), payload (bytes) |
|
Output: bool (True nếu hợp lệ, False nếu không) |
|
""" |
|
signature = request.headers.get("X-Hub-Signature-256", "") |
|
if not signature.startswith("sha256="): |
|
return False |
|
|
|
expected = hmac.new( |
|
self.app_secret.encode(), |
|
payload, |
|
hashlib.sha256 |
|
).hexdigest() |
|
|
|
return hmac.compare_digest(signature[7:], expected) |
|
|
|
def format_message(self, text: str) -> str: |
|
|
|
text = text.replace('\n* ', '\n- ') |
|
text = text.replace('\n * ', '\n + ') |
|
text = text.replace('\n* ', '\n- ') |
|
text = text.replace('\n * ', '\n + ') |
|
|
|
import re |
|
text = re.sub(r'\*\*([^\*]+)\*\*', r'*\1*', text) |
|
text = re.sub(r'__([^_]+)__', r'*\1*', text) |
|
|
|
text = re.sub(r'^#+\s+', '', text, flags=re.MULTILINE) |
|
|
|
text = re.sub(r'\n{3,}', '\n\n', text) |
|
|
|
return text |
|
|
|
def split_message(self, text: str, max_length: int = 2000) -> list: |
|
""" |
|
Chia message thành các đoạn <= max_length ký tự, ưu tiên chia theo dòng. |
|
""" |
|
lines = text.split('\n') |
|
messages = [] |
|
current = "" |
|
for line in lines: |
|
|
|
if len(current) + len(line) + 1 > max_length: |
|
messages.append(current.rstrip()) |
|
current = "" |
|
current += (line + '\n') |
|
if current.strip(): |
|
messages.append(current.rstrip()) |
|
return messages |
|
|
|
@timing_decorator_async |
|
async def send_message(self, page_access_token: Optional[str] = None, recipient_id: Optional[str] = None, message: str = "") -> dict: |
|
page_access_token = page_access_token or self.page_token |
|
recipient_id = recipient_id or self.sender_id |
|
if not page_access_token or not recipient_id: |
|
raise ValueError("FacebookClient: page_access_token and recipient_id must not be None when sending a message.") |
|
|
|
response_to_send = self.format_message(message.replace('**', '*')) if isinstance(message, str) else message |
|
|
|
messages = self.split_message(response_to_send) |
|
results = [] |
|
for msg in messages: |
|
if len(msg) > 2000: |
|
msg = msg[:2000] |
|
url = f"https://graph.facebook.com/v18.0/me/messages?access_token={page_access_token}" |
|
payload = { |
|
"recipient": {"id": recipient_id}, |
|
"message": {"text": msg} |
|
} |
|
try: |
|
response = await self._client.post(url, json=payload) |
|
response.raise_for_status() |
|
results.append(response.json()) |
|
except httpx.HTTPError as e: |
|
logger.error(f"Error sending message to Facebook: {e}") |
|
raise HTTPException(status_code=500, detail="Failed to send message to Facebook") |
|
return results[0] if results else {} |
|
|
|
@timing_decorator_sync |
|
def parse_message(self, body: Dict[str, Any]) -> Optional[Dict[str, Any]]: |
|
""" |
|
Parse message từ payload Facebook webhook. |
|
Input: body (dict) - payload JSON từ Facebook. |
|
Output: dict chứa sender_id, page_id, timestamp, text, attachments hoặc None nếu lỗi. |
|
""" |
|
try: |
|
entry = body["entry"][0] |
|
messaging = entry["messaging"][0] |
|
|
|
sender_id = messaging["sender"]["id"] |
|
recipient_id = messaging["recipient"]["id"] |
|
timestamp = messaging["timestamp"] |
|
|
|
message_data = { |
|
"sender_id": sender_id, |
|
"page_id": recipient_id, |
|
"timestamp": timestamp, |
|
"text": None, |
|
"attachments": [] |
|
} |
|
|
|
if "message" in messaging: |
|
message = messaging["message"] |
|
if "text" in message: |
|
message_data["text"] = message["text"] |
|
if "attachments" in message: |
|
message_data["attachments"] = message["attachments"] |
|
|
|
return message_data |
|
except (KeyError, IndexError) as e: |
|
logger.error(f"Error parsing Facebook message: {e}") |
|
return None |