from typing import Any, Dict, List, Optional from postgrest.types import CountMethod from supabase.client import create_client, Client from loguru import logger import re from .utils import timing_decorator_sync from .constants import VEHICLE_KEYWORD_TO_COLUMN, VIETNAMESE_STOP_WORDS, VIETNAMESE_STOP_PHRASES from .config import get_settings def remove_stop_phrases(text, stop_phrases): for phrase in stop_phrases: # Sửa: Không escape dấu cách trong phrase, chỉ escape các ký tự đặc biệt khác # Loại bỏ cụm từ, chỉ xóa khi là từ nguyên vẹn pattern = rf"\b{phrase}\b" text = re.sub(pattern, " ", text) return text class SupabaseClient: def __init__(self, url: str, key: str): """ Khởi tạo SupabaseClient với url và key. Input: url (str), key (str) Output: SupabaseClient instance. """ self.client: Client = create_client(url, key) settings = get_settings() self.default_match_count = settings.match_count @timing_decorator_sync def get_page_token(self, page_id: str): """ Lấy access token của Facebook page từ Supabase. Input: page_id (str) Output: access_token (str) hoặc None nếu không có. """ try: response = self.client.table('PageToken').select('token').eq('id', page_id).execute() if response.data and len(response.data) > 0: return response.data[0]['token'] return None except Exception as e: logger.error(f"Error getting page token: {e}") return None @timing_decorator_sync def match_documents(self, embedding: List[float], match_count: Optional[int] = None, vehicle_keywords: Optional[List[str]] = None, user_question: str = '', min_rank_threshold: float = 0.001, rrf_k: int = 60): """ Truy vấn vector similarity search qua RPC match_documents. Input: embedding (list[float]), match_count (int), vehicle_keywords (list[str] hoặc None) Output: list[dict] kết quả truy vấn. """ # Sử dụng match_count từ config nếu không được truyền vào if match_count is None: match_count = self.default_match_count # Chuẩn bị chuỗi truy vấn trong Python # Tách từ và nối bằng '|' """ Xử lý câu hỏi thô: tách từ, loại bỏ stop words, và trả về chuỗi text sạch để truyền vào RPC. """ # Lọc bỏ các từ có trong danh sách stop words và nối thành chuỗi với dấu cách # 1. Loại bỏ stop phrase (từ ghép) cleaned_text = remove_stop_phrases(user_question.lower(), VIETNAMESE_STOP_PHRASES) # 2. Tách từ và loại bỏ stop word đơn lẻ words = cleaned_text.split() or_query_tsquery = " ".join([word for word in words if word not in VIETNAMESE_STOP_WORDS]) logger.info(f"[DEBUG][RPC]: or_query_tsquery: {or_query_tsquery}") try: payload = { 'query_text': or_query_tsquery, 'query_embedding': embedding, 'match_count': match_count, 'min_rank_threshold': min_rank_threshold, 'vehicle_filters': None, 'rrf_k': rrf_k } if vehicle_keywords: vehicle_columns = [VEHICLE_KEYWORD_TO_COLUMN[k] for k in vehicle_keywords if k in VEHICLE_KEYWORD_TO_COLUMN] if vehicle_columns: payload['vehicle_filters'] = vehicle_columns response = self.client.rpc( 'match_documents', payload ).execute() if response.data: return response.data return [] except Exception as e: logger.error(f"Error matching documents: {e}") return [] @timing_decorator_sync def store_embedding(self, text: str, embedding: List[float], metadata: Dict[str, Any]): """ Lưu embedding vào Supabase. Input: text (str), embedding (list[float]), metadata (dict) Output: bool (True nếu thành công, False nếu lỗi) """ try: response = self.client.table('embeddings').insert({ 'content': text, 'embedding': embedding, 'metadata': metadata }).execute() return bool(response.data) except Exception as e: logger.error(f"Error storing embedding: {e}") return False @timing_decorator_sync def store_document_chunk(self, chunk_data: Dict[str, Any]) -> bool: """ Lưu document chunk vào Supabase. Input: chunk_data (dict) - chứa tất cả thông tin chunk Output: bool (True nếu thành công, False nếu lỗi) """ try: # Xử lý các giá trị null/empty cho integer fields processed_data = chunk_data.copy() # Giữ lại embedding để lưu vào database if 'embedding' in processed_data: processed_data['embedding'] = processed_data['embedding'] # Xử lý article_number - chỉ gửi nếu có giá trị hợp lệ if 'article_number' in processed_data: if processed_data['article_number'] is None or processed_data['article_number'] == "": processed_data['article_number'] = None elif isinstance(processed_data['article_number'], str): try: processed_data['article_number'] = int(processed_data['article_number']) except (ValueError, TypeError): processed_data['article_number'] = None # Xử lý vanbanid - đảm bảo là integer if 'vanbanid' in processed_data: if isinstance(processed_data['vanbanid'], str): try: processed_data['vanbanid'] = int(processed_data['vanbanid']) except (ValueError, TypeError): logger.error(f"Invalid vanbanid: {processed_data['vanbanid']}") return False # Xử lý các trường text - chuyển empty string thành None text_fields = ['document_title', 'article_title', 'clause_number', 'sub_clause_letter', 'context_summary'] for field in text_fields: if field in processed_data and processed_data[field] == "": processed_data[field] = None # Xử lý cha field - chuyển empty string thành None if 'cha' in processed_data and processed_data['cha'] == "": processed_data['cha'] = None response = self.client.table('document_chunks').insert(processed_data).execute() if response.data: logger.info(f"Successfully stored chunk {processed_data.get('id', 'unknown')}") return True else: logger.error(f"Failed to store chunk {processed_data.get('id', 'unknown')}") return False except Exception as e: logger.error(f"Error storing document chunk: {e}") return False @timing_decorator_sync def delete_all_document_chunks(self) -> bool: """ Xóa toàn bộ bảng document_chunks. Output: bool (True nếu thành công, False nếu lỗi) """ try: # Xóa tất cả records trong bảng response = self.client.table('document_chunks').delete().execute() logger.info(f"Successfully deleted all document chunks") return True except Exception as e: logger.error(f"Error deleting all document chunks: {e}") return False @timing_decorator_sync def get_document_chunks_by_vanbanid(self, vanbanid: int) -> List[Dict[str, Any]]: """ Lấy tất cả chunks của một văn bản theo vanbanid. Input: vanbanid (int) Output: List[Dict] - danh sách chunks """ try: response = self.client.table('document_chunks').select('*').eq('vanbanid', vanbanid).execute() if response.data: logger.info(f"Found {len(response.data)} chunks for vanbanid {vanbanid}") return response.data return [] except Exception as e: logger.error(f"Error getting document chunks for vanbanid {vanbanid}: {e}") return [] @timing_decorator_sync def delete_document_chunks_by_vanbanid(self, vanbanid: int) -> bool: """ Xóa tất cả chunks của một văn bản theo vanbanid. Input: vanbanid (int) Output: bool (True nếu thành công, False nếu lỗi) """ try: response = self.client.table('document_chunks').delete().eq('vanbanid', vanbanid).execute() logger.info(f"Successfully deleted all chunks for vanbanid {vanbanid}") return True except Exception as e: logger.error(f"Error deleting chunks for vanbanid {vanbanid}: {e}") return False @timing_decorator_sync def get_all_document_chunks(self) -> List[Dict[str, Any]]: """ Lấy toàn bộ dữ liệu từ bảng document_chunks. Output: List[Dict] - danh sách tất cả chunks """ try: logger.info("[SUPABASE] Fetching all document chunks") # Đếm tổng số records trước count_response = self.client.table('document_chunks').select('*', count=CountMethod.exact).execute() total_count = count_response.count if hasattr(count_response, 'count') else 'unknown' logger.info(f"[SUPABASE] Total records in table: {total_count}") all_chunks = [] page_size = 1000 last_id = 0 page_count = 0 while True: page_count += 1 # Sử dụng cursor-based pagination với id if last_id == 0: # Lần đầu: lấy từ đầu response = self.client.table('document_chunks').select('*').order('id').limit(page_size).execute() else: # Các lần sau: lấy từ id > last_id response = self.client.table('document_chunks').select('*').order('id').gt('id', last_id).limit(page_size).execute() actual_count = len(response.data) if response.data else 0 logger.info(f"[SUPABASE] Page {page_count}: last_id={last_id}, requested={page_size}, actual={actual_count}") if not response.data: logger.info(f"[SUPABASE] No more data after id {last_id}") break all_chunks.extend(response.data) # Cập nhật last_id cho page tiếp theo if response.data: last_id = max(chunk.get('id', 0) for chunk in response.data) if actual_count < page_size: logger.info(f"[SUPABASE] Last page with {actual_count} records") break logger.info(f"[SUPABASE] Cursor-based pagination fetched {len(all_chunks)} document chunks (expected: {total_count})") logger.info(f"[SUPABASE] Fetched {page_count} pages with page_size={page_size}") return all_chunks except Exception as e: logger.error(f"[SUPABASE] Error fetching document chunks: {e}") return []