FBChatBot / app /supabase_db.py
VietCat's picture
update variables
c025e27
from typing import Any, Dict, List, Optional
from postgrest.types import CountMethod
from supabase.client import create_client, Client
from loguru import logger
import re
from .utils import timing_decorator_sync
from .constants import VEHICLE_KEYWORD_TO_COLUMN, VIETNAMESE_STOP_WORDS, VIETNAMESE_STOP_PHRASES
from .config import get_settings
def remove_stop_phrases(text, stop_phrases):
for phrase in stop_phrases:
# Sửa: Không escape dấu cách trong phrase, chỉ escape các ký tự đặc biệt khác
# Loại bỏ cụm từ, chỉ xóa khi là từ nguyên vẹn
pattern = rf"\b{phrase}\b"
text = re.sub(pattern, " ", text)
return text
class SupabaseClient:
def __init__(self, url: str, key: str):
"""
Khởi tạo SupabaseClient với url và key.
Input: url (str), key (str)
Output: SupabaseClient instance.
"""
self.client: Client = create_client(url, key)
settings = get_settings()
self.default_match_count = settings.match_count
@timing_decorator_sync
def get_page_token(self, page_id: str):
"""
Lấy access token của Facebook page từ Supabase.
Input: page_id (str)
Output: access_token (str) hoặc None nếu không có.
"""
try:
response = self.client.table('PageToken').select('token').eq('id', page_id).execute()
if response.data and len(response.data) > 0:
return response.data[0]['token']
return None
except Exception as e:
logger.error(f"Error getting page token: {e}")
return None
@timing_decorator_sync
def match_documents(self, embedding: List[float], match_count: Optional[int] = None, vehicle_keywords: Optional[List[str]] = None, user_question: str = '', min_rank_threshold: float = 0.001, rrf_k: int = 60):
"""
Truy vấn vector similarity search qua RPC match_documents.
Input: embedding (list[float]), match_count (int), vehicle_keywords (list[str] hoặc None)
Output: list[dict] kết quả truy vấn.
"""
# Sử dụng match_count từ config nếu không được truyền vào
if match_count is None:
match_count = self.default_match_count
# Chuẩn bị chuỗi truy vấn trong Python
# Tách từ và nối bằng '|'
"""
Xử lý câu hỏi thô: tách từ, loại bỏ stop words,
và trả về chuỗi text sạch để truyền vào RPC.
"""
# Lọc bỏ các từ có trong danh sách stop words và nối thành chuỗi với dấu cách
# 1. Loại bỏ stop phrase (từ ghép)
cleaned_text = remove_stop_phrases(user_question.lower(), VIETNAMESE_STOP_PHRASES)
# 2. Tách từ và loại bỏ stop word đơn lẻ
words = cleaned_text.split()
or_query_tsquery = " ".join([word for word in words if word not in VIETNAMESE_STOP_WORDS])
logger.info(f"[DEBUG][RPC]: or_query_tsquery: {or_query_tsquery}")
try:
payload = {
'query_text': or_query_tsquery,
'query_embedding': embedding,
'match_count': match_count,
'min_rank_threshold': min_rank_threshold,
'vehicle_filters': None,
'rrf_k': rrf_k
}
if vehicle_keywords:
vehicle_columns = [VEHICLE_KEYWORD_TO_COLUMN[k] for k in vehicle_keywords if k in VEHICLE_KEYWORD_TO_COLUMN]
if vehicle_columns:
payload['vehicle_filters'] = vehicle_columns
response = self.client.rpc(
'match_documents',
payload
).execute()
if response.data:
return response.data
return []
except Exception as e:
logger.error(f"Error matching documents: {e}")
return []
@timing_decorator_sync
def store_embedding(self, text: str, embedding: List[float], metadata: Dict[str, Any]):
"""
Lưu embedding vào Supabase.
Input: text (str), embedding (list[float]), metadata (dict)
Output: bool (True nếu thành công, False nếu lỗi)
"""
try:
response = self.client.table('embeddings').insert({
'content': text,
'embedding': embedding,
'metadata': metadata
}).execute()
return bool(response.data)
except Exception as e:
logger.error(f"Error storing embedding: {e}")
return False
@timing_decorator_sync
def store_document_chunk(self, chunk_data: Dict[str, Any]) -> bool:
"""
Lưu document chunk vào Supabase.
Input: chunk_data (dict) - chứa tất cả thông tin chunk
Output: bool (True nếu thành công, False nếu lỗi)
"""
try:
# Xử lý các giá trị null/empty cho integer fields
processed_data = chunk_data.copy()
# Giữ lại embedding để lưu vào database
if 'embedding' in processed_data:
processed_data['embedding'] = processed_data['embedding']
# Xử lý article_number - chỉ gửi nếu có giá trị hợp lệ
if 'article_number' in processed_data:
if processed_data['article_number'] is None or processed_data['article_number'] == "":
processed_data['article_number'] = None
elif isinstance(processed_data['article_number'], str):
try:
processed_data['article_number'] = int(processed_data['article_number'])
except (ValueError, TypeError):
processed_data['article_number'] = None
# Xử lý vanbanid - đảm bảo là integer
if 'vanbanid' in processed_data:
if isinstance(processed_data['vanbanid'], str):
try:
processed_data['vanbanid'] = int(processed_data['vanbanid'])
except (ValueError, TypeError):
logger.error(f"Invalid vanbanid: {processed_data['vanbanid']}")
return False
# Xử lý các trường text - chuyển empty string thành None
text_fields = ['document_title', 'article_title', 'clause_number', 'sub_clause_letter', 'context_summary']
for field in text_fields:
if field in processed_data and processed_data[field] == "":
processed_data[field] = None
# Xử lý cha field - chuyển empty string thành None
if 'cha' in processed_data and processed_data['cha'] == "":
processed_data['cha'] = None
response = self.client.table('document_chunks').insert(processed_data).execute()
if response.data:
logger.info(f"Successfully stored chunk {processed_data.get('id', 'unknown')}")
return True
else:
logger.error(f"Failed to store chunk {processed_data.get('id', 'unknown')}")
return False
except Exception as e:
logger.error(f"Error storing document chunk: {e}")
return False
@timing_decorator_sync
def delete_all_document_chunks(self) -> bool:
"""
Xóa toàn bộ bảng document_chunks.
Output: bool (True nếu thành công, False nếu lỗi)
"""
try:
# Xóa tất cả records trong bảng
response = self.client.table('document_chunks').delete().execute()
logger.info(f"Successfully deleted all document chunks")
return True
except Exception as e:
logger.error(f"Error deleting all document chunks: {e}")
return False
@timing_decorator_sync
def get_document_chunks_by_vanbanid(self, vanbanid: int) -> List[Dict[str, Any]]:
"""
Lấy tất cả chunks của một văn bản theo vanbanid.
Input: vanbanid (int)
Output: List[Dict] - danh sách chunks
"""
try:
response = self.client.table('document_chunks').select('*').eq('vanbanid', vanbanid).execute()
if response.data:
logger.info(f"Found {len(response.data)} chunks for vanbanid {vanbanid}")
return response.data
return []
except Exception as e:
logger.error(f"Error getting document chunks for vanbanid {vanbanid}: {e}")
return []
@timing_decorator_sync
def delete_document_chunks_by_vanbanid(self, vanbanid: int) -> bool:
"""
Xóa tất cả chunks của một văn bản theo vanbanid.
Input: vanbanid (int)
Output: bool (True nếu thành công, False nếu lỗi)
"""
try:
response = self.client.table('document_chunks').delete().eq('vanbanid', vanbanid).execute()
logger.info(f"Successfully deleted all chunks for vanbanid {vanbanid}")
return True
except Exception as e:
logger.error(f"Error deleting chunks for vanbanid {vanbanid}: {e}")
return False
@timing_decorator_sync
def get_all_document_chunks(self) -> List[Dict[str, Any]]:
"""
Lấy toàn bộ dữ liệu từ bảng document_chunks.
Output: List[Dict] - danh sách tất cả chunks
"""
try:
logger.info("[SUPABASE] Fetching all document chunks")
# Đếm tổng số records trước
count_response = self.client.table('document_chunks').select('*', count=CountMethod.exact).execute()
total_count = count_response.count if hasattr(count_response, 'count') else 'unknown'
logger.info(f"[SUPABASE] Total records in table: {total_count}")
all_chunks = []
page_size = 1000
last_id = 0
page_count = 0
while True:
page_count += 1
# Sử dụng cursor-based pagination với id
if last_id == 0:
# Lần đầu: lấy từ đầu
response = self.client.table('document_chunks').select('*').order('id').limit(page_size).execute()
else:
# Các lần sau: lấy từ id > last_id
response = self.client.table('document_chunks').select('*').order('id').gt('id', last_id).limit(page_size).execute()
actual_count = len(response.data) if response.data else 0
logger.info(f"[SUPABASE] Page {page_count}: last_id={last_id}, requested={page_size}, actual={actual_count}")
if not response.data:
logger.info(f"[SUPABASE] No more data after id {last_id}")
break
all_chunks.extend(response.data)
# Cập nhật last_id cho page tiếp theo
if response.data:
last_id = max(chunk.get('id', 0) for chunk in response.data)
if actual_count < page_size:
logger.info(f"[SUPABASE] Last page with {actual_count} records")
break
logger.info(f"[SUPABASE] Cursor-based pagination fetched {len(all_chunks)} document chunks (expected: {total_count})")
logger.info(f"[SUPABASE] Fetched {page_count} pages with page_size={page_size}")
return all_chunks
except Exception as e:
logger.error(f"[SUPABASE] Error fetching document chunks: {e}")
return []