import re import os import uuid from typing import List, Dict, Optional, Tuple, Any from dataclasses import dataclass from loguru import logger from .supabase_db import SupabaseClient from .embedding import EmbeddingClient from .config import get_settings @dataclass class ChunkMetadata: """Metadata cho một chunk.""" id: str content: str vanbanid: int cha: Optional[str] = None document_title: str = "" article_number: Optional[int] = None article_title: str = "" clause_number: str = "" sub_clause_letter: str = "" context_summary: str = "" class LawDocumentChunker: """Module xử lý chunking văn bản luật và tích hợp với Supabase.""" def __init__(self): """Khởi tạo chunker với các regex patterns.""" settings = get_settings() self.supabase_client = SupabaseClient(settings.supabase_url, settings.supabase_key) self.embedding_client = EmbeddingClient() self.llm_client: Optional[Any] = None # Regex patterns cho các cấp độ cấu trúc - SỬA LẠI ĐỂ CHÍNH XÁC HƠN # Đảm bảo mỗi pattern có đúng số group self.PHAN_REGEX = r"^(Phần|PHẦN|Phần thứ)\s+(\d+|[IVXLCDM]+|nhất|hai|ba|tư|năm|sáu|bảy|tám|chín|mười)\.?\s*(.*)" self.PHU_LUC_REGEX = r"^(Phụ lục|PHỤ LỤC)\s+(\d+|[A-Z]+)\.?\s*(.*)" self.CHUONG_REGEX = r"^(Chương|CHƯƠNG)\s+(\d+|[IVXLCDM]+)\.?\s*(.*)" self.MUC_REGEX = r"^(Mục|MỤC)\s+(\d+)\.?\s*(.*)" self.DIEU_REGEX = r"^Điều\s+(\d+)\.\s*(.*)" self.KHOAN_REGEX = r"^\s*(\d+(\.\d+)*)\.\s*(.*)" self.DIEM_REGEX_A = r"^\s*([a-zđ])\)\s*(.*)" self.DIEM_REGEX_NUM = r"^\s*(\d+\.\d+\.\d+)\.\s*(.*)" # Cấu hình chunking self.CHUNK_SIZE = 500 self.CHUNK_OVERLAP = 100 logger.info("[CHUNKER] Initialized LawDocumentChunker") def _create_data_directory(self): """Tạo thư mục data nếu chưa tồn tại.""" data_dir = "data" if not os.path.exists(data_dir): os.makedirs(data_dir) logger.info(f"[CHUNKER] Created directory: {data_dir}") return data_dir def _extract_document_title(self, file_path: str) -> str: """Trích xuất tiêu đề văn bản từ tên file.""" filename = os.path.basename(file_path) # Loại bỏ extension name_without_ext = os.path.splitext(filename)[0] # Thay _ bằng khoảng trắng và viết hoa chữ cái đầu title = name_without_ext.replace('_', ' ').title() logger.info(f"[CHUNKER] Extracted document title: {title}") return title def _read_document(self, file_path: str) -> str: """Đọc nội dung văn bản từ file.""" try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() logger.info(f"[CHUNKER] Read document: {file_path}, length: {len(content)}") return content except Exception as e: logger.error(f"[CHUNKER] Error reading file {file_path}: {e}") raise def _detect_structure_level(self, line: str) -> Tuple[str, Optional[str], Optional[str]]: """Phát hiện cấp độ cấu trúc của một dòng.""" line = line.strip() try: # Phần match = re.match(self.PHAN_REGEX, line, re.IGNORECASE) if match: return "PHAN", match.group(1), match.group(2) # Phụ lục match = re.match(self.PHU_LUC_REGEX, line, re.IGNORECASE) if match: return "PHU_LUC", match.group(1), match.group(2) # Chương match = re.match(self.CHUONG_REGEX, line, re.IGNORECASE) if match: return "CHUONG", match.group(1), match.group(2) # Mục match = re.match(self.MUC_REGEX, line, re.IGNORECASE) if match: return "MUC", match.group(1), match.group(2) # Điều match = re.match(self.DIEU_REGEX, line) if match: return "DIEU", match.group(1), match.group(2) # Khoản match = re.match(self.KHOAN_REGEX, line) if match: clause_num = match.group(1) # Kiểm tra không phải điểm (có từ 3 số trở lên) if len(clause_num.split('.')) < 3: return "KHOAN", clause_num, match.group(3) # Điểm chữ cái match = re.match(self.DIEM_REGEX_A, line) if match: return "DIEM", match.group(1), match.group(2) # Điểm số match = re.match(self.DIEM_REGEX_NUM, line) if match: return "DIEM", match.group(1), match.group(2) return "CONTENT", None, None except Exception as e: logger.error(f"[CHUNKER] Error in _detect_structure_level for line '{line}': {e}") return "CONTENT", None, None def _build_structure_summary(self, article_number, clause_number, sub_clause_letter): if sub_clause_letter and clause_number and article_number: return f"Điểm {sub_clause_letter} Khoản {clause_number} Điều {article_number}" elif clause_number and article_number: return f"Khoản {clause_number} Điều {article_number}" elif article_number: return f"Điều {article_number}" return "" def _create_chunk_metadata(self, content: str, level: str, level_value: Optional[str], parent_id: Optional[str], vanbanid: int, document_title: str, chunk_stack: List[Tuple[str, str, Optional[str], str]], chunk_dict: dict) -> 'ChunkMetadata': """Tạo metadata cho chunk.""" chunk_id = str(uuid.uuid4()) metadata = ChunkMetadata( id=chunk_id, content=content, vanbanid=vanbanid, cha=parent_id, document_title=document_title ) # Điền metadata từ chunk hiện tại if level == "DIEU" and level_value: metadata.article_number = int(level_value) if level_value.isdigit() else None metadata.article_title = content.split('\n')[0].strip() if content else "" elif level == "KHOAN" and level_value: metadata.clause_number = level_value elif level == "DIEM" and level_value: metadata.sub_clause_letter = level_value # Điền metadata từ parent chunks nếu có logger.debug(f"[CHUNKER] Creating chunk with level: {level}, parent_id: {parent_id}, stack_size: {len(chunk_stack)}") if chunk_dict is not None and parent_id: self._fill_metadata_from_parents(metadata, parent_id, chunk_dict) else: logger.debug(f"[CHUNKER] Skipping metadata fill - no parent_id or chunk_dict") # Gán context_summary theo format pháp lý metadata.context_summary = self._build_structure_summary( metadata.article_number, metadata.clause_number, metadata.sub_clause_letter # ) logger.debug(f"[CHUNKER] Final metadata for chunk {chunk_id[:8]}... - Level: {level}, Article: {metadata.article_number}, Clause: {metadata.clause_number}, Point: {metadata.sub_clause_letter}") return metadata def _fill_metadata_from_parents(self, metadata: ChunkMetadata, parent_id: str, chunk_dict: Dict[str, ChunkMetadata]): """ Điền metadata từ parent và ancestor (cha, ông, ...), sử dụng dict id->chunk. """ parent = chunk_dict.get(parent_id) if not parent: logger.warning(f"[CHUNKER] Parent chunk {parent_id} not found in chunk_dict") return # Điền từ cha if parent.article_number and not metadata.article_number: metadata.article_number = parent.article_number if parent.article_title and not metadata.article_title: metadata.article_title = parent.article_title # if parent.clause_number and not metadata.clause_number: metadata.clause_number = parent.clause_number if parent.sub_clause_letter and not metadata.sub_clause_letter: metadata.sub_clause_letter = parent.sub_clause_letter # Nếu cha là Khoản, tìm ông là Điều if parent.clause_number and not metadata.article_number: grandparent = chunk_dict.get(parent.cha) if parent.cha else None if grandparent and grandparent.article_number: metadata.article_number = grandparent.article_number if grandparent and grandparent.article_title: metadata.article_title = grandparent.article_title def _split_into_chunks(self, text: str, chunk_size: int, overlap: int) -> List[str]: """Chia text thành các chunk với overlap.""" chunks = [] start = 0 while start < len(text): end = start + chunk_size chunk = text[start:end] # Tìm vị trí kết thúc chunk tốt nhất (cuối câu hoặc cuối từ) if end < len(text): # Tìm dấu chấm hoặc xuống dòng gần nhất last_period = chunk.rfind('.') last_newline = chunk.rfind('\n') best_break = max(last_period, last_newline) if best_break > start + chunk_size * 0.7: # Chỉ break nếu không quá sớm end = start + best_break + 1 chunk = text[start:end] chunks.append(chunk) start = end - overlap if start >= len(text): break return chunks def _process_document_recursive(self, content: str, vanbanid: int, document_title: str) -> List[ChunkMetadata]: """Xử lý văn bản theo cấu trúc phân cấp.""" lines = content.split('\n') chunks = [] chunk_stack = [] # (chunk_id, level, level_value, content) chunk_dict = {} # id -> ChunkMetadata current_chunk_content = "" current_level = None current_level_value = None current_parent = None current_level_priority = None level_priority = { "PHAN": 1, "PHU_LUC": 1, "CHUONG": 2, "MUC": 3, "DIEU": 4, "KHOAN": 5, "DIEM": 6, "CONTENT": 7 } preamble_done = False for line in lines: level, level_value, _ = self._detect_structure_level(line) line_priority = level_priority.get(level, 7) # Nếu là dòng đầu tiên hoặc preamble if not preamble_done and (level == "CONTENT" or not level_value): current_chunk_content += line + "\n" current_level = "CONTENT" current_level_value = None current_parent = None current_level_priority = 7 continue if not preamble_done and (level != "CONTENT" and level_value): # Kết thúc preamble if current_chunk_content.strip(): metadata = self._create_chunk_metadata( current_chunk_content.strip(), "CONTENT", None, None, vanbanid, document_title, chunk_stack, chunk_dict ) chunks.append(metadata) chunk_stack.append((metadata.id, "CONTENT", None, current_chunk_content.strip())) chunk_dict[metadata.id] = metadata preamble_done = True current_chunk_content = "" current_level = level current_level_value = level_value current_level_priority = line_priority current_parent = self._find_parent_for_level(chunk_stack, level, level_priority) current_chunk_content += line + "\n" continue # Nếu gặp level mới if level != "CONTENT" and level_value: if current_level is not None and current_level_priority is not None and line_priority <= current_level_priority: # Kết thúc chunk hiện tại if current_chunk_content.strip(): metadata = self._create_chunk_metadata( current_chunk_content.strip(), str(current_level), current_level_value, current_parent, vanbanid, document_title, chunk_stack, chunk_dict ) chunks.append(metadata) chunk_stack.append((metadata.id, str(current_level), current_level_value, current_chunk_content.strip())) chunk_dict[metadata.id] = metadata # Bắt đầu chunk mới current_parent = self._find_parent_for_level(chunk_stack, level, level_priority) current_chunk_content = line + "\n" current_level = level current_level_value = level_value current_level_priority = line_priority else: # Level mới nhưng priority cao hơn (ví dụ: Mục trong Chương) if current_chunk_content.strip() and current_level is not None: metadata = self._create_chunk_metadata( current_chunk_content.strip(), str(current_level), current_level_value, current_parent, vanbanid, document_title, chunk_stack, chunk_dict ) chunks.append(metadata) chunk_stack.append((metadata.id, str(current_level), current_level_value, current_chunk_content.strip())) chunk_dict[metadata.id] = metadata current_parent = self._find_parent_for_level(chunk_stack, level, level_priority) current_chunk_content = line + "\n" current_level = level current_level_value = level_value current_level_priority = line_priority else: # CONTENT nối vào chunk hiện tại current_chunk_content += line + "\n" # Nếu chunk quá lớn thì chia nhỏ if len(current_chunk_content) > self.CHUNK_SIZE and current_level is not None: sub_chunks = self._split_into_chunks(current_chunk_content, self.CHUNK_SIZE, self.CHUNK_OVERLAP) for sub_chunk in sub_chunks: metadata = self._create_chunk_metadata( sub_chunk.strip(), str(current_level), current_level_value, current_parent, vanbanid, document_title, chunk_stack, chunk_dict ) chunks.append(metadata) chunk_stack.append((metadata.id, str(current_level), current_level_value, sub_chunk.strip())) chunk_dict[metadata.id] = metadata current_chunk_content = "" # Lưu chunk cuối cùng if current_chunk_content.strip() and current_level is not None: metadata = self._create_chunk_metadata( current_chunk_content.strip(), str(current_level), current_level_value, current_parent, vanbanid, document_title, chunk_stack, chunk_dict ) chunks.append(metadata) chunk_stack.append((metadata.id, str(current_level), current_level_value, current_chunk_content.strip())) chunk_dict[metadata.id] = metadata root_count = sum(1 for chunk in chunks if chunk.cha is None) logger.info(f"[CHUNKER] Created {len(chunks)} chunks, {root_count} root chunks") for i, chunk in enumerate(chunks[:10]): logger.debug(f"[CHUNKER] Chunk {i+1}: {chunk.content[:100]}... -> Parent: {chunk.cha}") if len(chunks) > 10: logger.debug(f"[CHUNKER] ... and {len(chunks) - 10} more chunks") return chunks def _find_parent_for_level(self, chunk_stack: List[Tuple[str, str, Optional[str], str]], current_level: str, level_priority: Dict[str, int]) -> Optional[str]: """ Tìm parent gần nhất có level cao hơn (priority thấp hơn) cho level hiện tại, kiểm tra hợp lệ cha-con. """ current_priority = level_priority.get(current_level, 999) valid_parents = { "MUC": ["CHUONG", "PHAN"], "DIEU": ["MUC", "CHUONG", "PHAN"], "CHUONG": ["PHAN"], # Các level khác giữ nguyên logic cũ } for chunk_id, level, level_value, content in reversed(chunk_stack): if level_priority.get(level, 999) < current_priority: if current_level in valid_parents: if level in valid_parents[current_level]: return chunk_id else: return chunk_id return None async def _create_embeddings_for_chunks(self, chunks: List[ChunkMetadata]) -> int: """Tạo embeddings cho các chunks và lưu ngay lập tức vào Supabase.""" logger.info(f"[CHUNKER] Creating embeddings and storing {len(chunks)} chunks") success_count = 0 failed_count = 0 # Debug: Log chi tiết metadata của từng chunk logger.info(f"[CHUNKER] === DETAILED METADATA ANALYSIS ===") for i, chunk in enumerate(chunks[:20]): # Log 20 chunks đầu tiên logger.info(f"[CHUNKER] Chunk {i+1}:") logger.info(f" - ID: {chunk.id[:8]}...") logger.info(f" - Content: {chunk.content[:100]}...") logger.info(f" - Parent: {chunk.cha}") logger.info(f" - Article: {chunk.article_number}") logger.info(f" - Article Title: {chunk.article_title}") logger.info(f" - Clause: {chunk.clause_number}") logger.info(f" - Point: {chunk.sub_clause_letter}") logger.info(f" - Document: {chunk.document_title}") logger.info(f" ---") for i, chunk in enumerate(chunks, 1): try: # Tạo embedding embedding = await self.embedding_client.create_embedding(chunk.content, task_type="retrieval_document") # Sinh semantic summary bằng LLM semantic_summary = await self._create_semantic_summary_with_llm(chunk.content) # Chuẩn bị data cho Supabase chunk_dict = { 'id': chunk.id, 'content': chunk.content, 'embedding': embedding if embedding is not None else [0.0] * 768, # Sử dụng embedding thực tế nếu có 'vanbanid': chunk.vanbanid, 'cha': chunk.cha, 'document_title': chunk.document_title, 'article_number': chunk.article_number, 'article_title': chunk.article_title, 'clause_number': chunk.clause_number, 'sub_clause_letter': chunk.sub_clause_letter, 'context_summary': f"Structure: {chunk.context_summary}|Semantic: {semantic_summary}" } # Lưu ngay lập tức vào Supabase success = self.supabase_client.store_document_chunk(chunk_dict) if success: success_count += 1 if i % 100 == 0: # Log mỗi 100 chunks logger.info(f"[CHUNKER] Stored chunk {i}/{len(chunks)}: {chunk.id[:8]}...") else: failed_count += 1 logger.error(f"[CHUNKER] Failed to store chunk {chunk.id}") except Exception as e: failed_count += 1 logger.error(f"[CHUNKER] Error processing chunk {chunk.id}: {e}") continue logger.info(f"[CHUNKER] Successfully processed {success_count}/{len(chunks)} chunks, {failed_count} failed") return success_count async def _store_chunks_to_supabase(self, chunk_data: List[Dict]) -> bool: """Legacy method - không còn sử dụng.""" logger.warning("[CHUNKER] _store_chunks_to_supabase is deprecated, use _create_embeddings_for_chunks instead") return True async def process_law_document(self, file_path: str, document_id: int) -> bool: """ Hàm chính để xử lý văn bản luật. Args: file_path: Đường dẫn đến file văn bản luật document_id: ID duy nhất của văn bản luật Returns: bool: True nếu thành công, False nếu thất bại """ try: logger.info(f"[CHUNKER] Starting processing for file: {file_path}, document_id: {document_id}") # 1. Tạo thư mục data nếu cần self._create_data_directory() # 2. Kiểm tra file tồn tại if not os.path.exists(file_path): logger.error(f"[CHUNKER] File not found: {file_path}") return False # 3. Đọc văn bản content = self._read_document(file_path) # 4. Trích xuất tiêu đề document_title = self._extract_document_title(file_path) # 5. Xử lý chunking theo cấu trúc chunks = self._process_document_recursive(content, document_id, document_title) if not chunks: logger.warning(f"[CHUNKER] No chunks created for document {document_id}") return False # 6. Tạo embeddings success_count = await self._create_embeddings_for_chunks(chunks) if success_count == 0: logger.error(f"[CHUNKER] No embeddings created for document {document_id}") return False logger.info(f"[CHUNKER] Successfully processed document {document_id} with {success_count} chunks") return True except Exception as e: logger.error(f"[CHUNKER] Error processing document {document_id}: {e}") ## return False async def _create_semantic_summary_with_llm(self, chunk_content: str) -> str: """ Sinh semantic summary ngắn gọn, súc tích cho chunk bằng LLM. """ if not hasattr(self, "llm_client") or self.llm_client is None: logger.warning("[CHUNKER] llm_client chưa được gán, bỏ qua semantic summary.") return "" prompt = ( "Tóm tắt thật ngắn gọn, súc tích nội dung luật sau (1-2 câu, không lặp lại tiêu đề, không giải thích):\n" f"{chunk_content.strip()}" ) try: summary = await self.llm_client.generate_text(prompt) return summary.strip() if summary else "" except Exception as e: logger.error(f"[CHUNKER] Lỗi khi sinh semantic summary bằng LLM: {e}") return ""