FBChatBot / app /law_document_chunker.py
VietCat's picture
update remote
34991da
import re
import os
import uuid
from typing import List, Dict, Optional, Tuple, Any
from dataclasses import dataclass
from loguru import logger
from .supabase_db import SupabaseClient
from .embedding import EmbeddingClient
from .config import get_settings
@dataclass
class ChunkMetadata:
"""Metadata cho một chunk."""
id: str
content: str
vanbanid: int
cha: Optional[str] = None
document_title: str = ""
article_number: Optional[int] = None
article_title: str = ""
clause_number: str = ""
sub_clause_letter: str = ""
context_summary: str = ""
class LawDocumentChunker:
"""Module xử lý chunking văn bản luật và tích hợp với Supabase."""
def __init__(self):
"""Khởi tạo chunker với các regex patterns."""
settings = get_settings()
self.supabase_client = SupabaseClient(settings.supabase_url, settings.supabase_key)
self.embedding_client = EmbeddingClient()
self.llm_client: Optional[Any] = None
# Regex patterns cho các cấp độ cấu trúc - SỬA LẠI ĐỂ CHÍNH XÁC HƠN
# Đảm bảo mỗi pattern có đúng số group
self.PHAN_REGEX = r"^(Phần|PHẦN|Phần thứ)\s+(\d+|[IVXLCDM]+|nhất|hai|ba|tư|năm|sáu|bảy|tám|chín|mười)\.?\s*(.*)"
self.PHU_LUC_REGEX = r"^(Phụ lục|PHỤ LỤC)\s+(\d+|[A-Z]+)\.?\s*(.*)"
self.CHUONG_REGEX = r"^(Chương|CHƯƠNG)\s+(\d+|[IVXLCDM]+)\.?\s*(.*)"
self.MUC_REGEX = r"^(Mục|MỤC)\s+(\d+)\.?\s*(.*)"
self.DIEU_REGEX = r"^Điều\s+(\d+)\.\s*(.*)"
self.KHOAN_REGEX = r"^\s*(\d+(\.\d+)*)\.\s*(.*)"
self.DIEM_REGEX_A = r"^\s*([a-zđ])\)\s*(.*)"
self.DIEM_REGEX_NUM = r"^\s*(\d+\.\d+\.\d+)\.\s*(.*)"
# Cấu hình chunking
self.CHUNK_SIZE = 500
self.CHUNK_OVERLAP = 100
logger.info("[CHUNKER] Initialized LawDocumentChunker")
def _create_data_directory(self):
"""Tạo thư mục data nếu chưa tồn tại."""
data_dir = "data"
if not os.path.exists(data_dir):
os.makedirs(data_dir)
logger.info(f"[CHUNKER] Created directory: {data_dir}")
return data_dir
def _extract_document_title(self, file_path: str) -> str:
"""Trích xuất tiêu đề văn bản từ tên file."""
filename = os.path.basename(file_path)
# Loại bỏ extension
name_without_ext = os.path.splitext(filename)[0]
# Thay _ bằng khoảng trắng và viết hoa chữ cái đầu
title = name_without_ext.replace('_', ' ').title()
logger.info(f"[CHUNKER] Extracted document title: {title}")
return title
def _read_document(self, file_path: str) -> str:
"""Đọc nội dung văn bản từ file."""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
logger.info(f"[CHUNKER] Read document: {file_path}, length: {len(content)}")
return content
except Exception as e:
logger.error(f"[CHUNKER] Error reading file {file_path}: {e}")
raise
def _detect_structure_level(self, line: str) -> Tuple[str, Optional[str], Optional[str]]:
"""Phát hiện cấp độ cấu trúc của một dòng."""
line = line.strip()
try:
# Phần
match = re.match(self.PHAN_REGEX, line, re.IGNORECASE)
if match:
return "PHAN", match.group(1), match.group(2)
# Phụ lục
match = re.match(self.PHU_LUC_REGEX, line, re.IGNORECASE)
if match:
return "PHU_LUC", match.group(1), match.group(2)
# Chương
match = re.match(self.CHUONG_REGEX, line, re.IGNORECASE)
if match:
return "CHUONG", match.group(1), match.group(2)
# Mục
match = re.match(self.MUC_REGEX, line, re.IGNORECASE)
if match:
return "MUC", match.group(1), match.group(2)
# Điều
match = re.match(self.DIEU_REGEX, line)
if match:
return "DIEU", match.group(1), match.group(2)
# Khoản
match = re.match(self.KHOAN_REGEX, line)
if match:
clause_num = match.group(1)
# Kiểm tra không phải điểm (có từ 3 số trở lên)
if len(clause_num.split('.')) < 3:
return "KHOAN", clause_num, match.group(3)
# Điểm chữ cái
match = re.match(self.DIEM_REGEX_A, line)
if match:
return "DIEM", match.group(1), match.group(2)
# Điểm số
match = re.match(self.DIEM_REGEX_NUM, line)
if match:
return "DIEM", match.group(1), match.group(2)
return "CONTENT", None, None
except Exception as e:
logger.error(f"[CHUNKER] Error in _detect_structure_level for line '{line}': {e}")
return "CONTENT", None, None
def _build_structure_summary(self, article_number, clause_number, sub_clause_letter):
if sub_clause_letter and clause_number and article_number:
return f"Điểm {sub_clause_letter} Khoản {clause_number} Điều {article_number}"
elif clause_number and article_number:
return f"Khoản {clause_number} Điều {article_number}"
elif article_number:
return f"Điều {article_number}"
return ""
def _create_chunk_metadata(self, content: str, level: str, level_value: Optional[str],
parent_id: Optional[str], vanbanid: int,
document_title: str, chunk_stack: List[Tuple[str, str, Optional[str], str]], chunk_dict: dict) -> 'ChunkMetadata':
"""Tạo metadata cho chunk."""
chunk_id = str(uuid.uuid4())
metadata = ChunkMetadata(
id=chunk_id,
content=content,
vanbanid=vanbanid,
cha=parent_id,
document_title=document_title
)
# Điền metadata từ chunk hiện tại
if level == "DIEU" and level_value:
metadata.article_number = int(level_value) if level_value.isdigit() else None
metadata.article_title = content.split('\n')[0].strip() if content else ""
elif level == "KHOAN" and level_value:
metadata.clause_number = level_value
elif level == "DIEM" and level_value:
metadata.sub_clause_letter = level_value
# Điền metadata từ parent chunks nếu có
logger.debug(f"[CHUNKER] Creating chunk with level: {level}, parent_id: {parent_id}, stack_size: {len(chunk_stack)}")
if chunk_dict is not None and parent_id:
self._fill_metadata_from_parents(metadata, parent_id, chunk_dict)
else:
logger.debug(f"[CHUNKER] Skipping metadata fill - no parent_id or chunk_dict")
# Gán context_summary theo format pháp lý
metadata.context_summary = self._build_structure_summary(
metadata.article_number, metadata.clause_number, metadata.sub_clause_letter #
)
logger.debug(f"[CHUNKER] Final metadata for chunk {chunk_id[:8]}... - Level: {level}, Article: {metadata.article_number}, Clause: {metadata.clause_number}, Point: {metadata.sub_clause_letter}")
return metadata
def _fill_metadata_from_parents(self, metadata: ChunkMetadata, parent_id: str, chunk_dict: Dict[str, ChunkMetadata]):
"""
Điền metadata từ parent và ancestor (cha, ông, ...), sử dụng dict id->chunk.
"""
parent = chunk_dict.get(parent_id)
if not parent:
logger.warning(f"[CHUNKER] Parent chunk {parent_id} not found in chunk_dict")
return
# Điền từ cha
if parent.article_number and not metadata.article_number:
metadata.article_number = parent.article_number
if parent.article_title and not metadata.article_title:
metadata.article_title = parent.article_title #
if parent.clause_number and not metadata.clause_number:
metadata.clause_number = parent.clause_number
if parent.sub_clause_letter and not metadata.sub_clause_letter:
metadata.sub_clause_letter = parent.sub_clause_letter
# Nếu cha là Khoản, tìm ông là Điều
if parent.clause_number and not metadata.article_number:
grandparent = chunk_dict.get(parent.cha) if parent.cha else None
if grandparent and grandparent.article_number:
metadata.article_number = grandparent.article_number
if grandparent and grandparent.article_title:
metadata.article_title = grandparent.article_title
def _split_into_chunks(self, text: str, chunk_size: int, overlap: int) -> List[str]:
"""Chia text thành các chunk với overlap."""
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
chunk = text[start:end]
# Tìm vị trí kết thúc chunk tốt nhất (cuối câu hoặc cuối từ)
if end < len(text):
# Tìm dấu chấm hoặc xuống dòng gần nhất
last_period = chunk.rfind('.')
last_newline = chunk.rfind('\n')
best_break = max(last_period, last_newline)
if best_break > start + chunk_size * 0.7: # Chỉ break nếu không quá sớm
end = start + best_break + 1
chunk = text[start:end]
chunks.append(chunk)
start = end - overlap
if start >= len(text):
break
return chunks
def _process_document_recursive(self, content: str, vanbanid: int,
document_title: str) -> List[ChunkMetadata]:
"""Xử lý văn bản theo cấu trúc phân cấp."""
lines = content.split('\n')
chunks = []
chunk_stack = [] # (chunk_id, level, level_value, content)
chunk_dict = {} # id -> ChunkMetadata
current_chunk_content = ""
current_level = None
current_level_value = None
current_parent = None
current_level_priority = None
level_priority = {
"PHAN": 1,
"PHU_LUC": 1,
"CHUONG": 2,
"MUC": 3,
"DIEU": 4,
"KHOAN": 5,
"DIEM": 6,
"CONTENT": 7
}
preamble_done = False
for line in lines:
level, level_value, _ = self._detect_structure_level(line)
line_priority = level_priority.get(level, 7)
# Nếu là dòng đầu tiên hoặc preamble
if not preamble_done and (level == "CONTENT" or not level_value):
current_chunk_content += line + "\n"
current_level = "CONTENT"
current_level_value = None
current_parent = None
current_level_priority = 7
continue
if not preamble_done and (level != "CONTENT" and level_value):
# Kết thúc preamble
if current_chunk_content.strip():
metadata = self._create_chunk_metadata(
current_chunk_content.strip(),
"CONTENT",
None,
None,
vanbanid,
document_title,
chunk_stack,
chunk_dict
)
chunks.append(metadata)
chunk_stack.append((metadata.id, "CONTENT", None, current_chunk_content.strip()))
chunk_dict[metadata.id] = metadata
preamble_done = True
current_chunk_content = ""
current_level = level
current_level_value = level_value
current_level_priority = line_priority
current_parent = self._find_parent_for_level(chunk_stack, level, level_priority)
current_chunk_content += line + "\n"
continue
# Nếu gặp level mới
if level != "CONTENT" and level_value:
if current_level is not None and current_level_priority is not None and line_priority <= current_level_priority:
# Kết thúc chunk hiện tại
if current_chunk_content.strip():
metadata = self._create_chunk_metadata(
current_chunk_content.strip(),
str(current_level),
current_level_value,
current_parent,
vanbanid,
document_title,
chunk_stack,
chunk_dict
)
chunks.append(metadata)
chunk_stack.append((metadata.id, str(current_level), current_level_value, current_chunk_content.strip()))
chunk_dict[metadata.id] = metadata
# Bắt đầu chunk mới
current_parent = self._find_parent_for_level(chunk_stack, level, level_priority)
current_chunk_content = line + "\n"
current_level = level
current_level_value = level_value
current_level_priority = line_priority
else:
# Level mới nhưng priority cao hơn (ví dụ: Mục trong Chương)
if current_chunk_content.strip() and current_level is not None:
metadata = self._create_chunk_metadata(
current_chunk_content.strip(),
str(current_level),
current_level_value,
current_parent,
vanbanid,
document_title,
chunk_stack,
chunk_dict
)
chunks.append(metadata)
chunk_stack.append((metadata.id, str(current_level), current_level_value, current_chunk_content.strip()))
chunk_dict[metadata.id] = metadata
current_parent = self._find_parent_for_level(chunk_stack, level, level_priority)
current_chunk_content = line + "\n"
current_level = level
current_level_value = level_value
current_level_priority = line_priority
else:
# CONTENT nối vào chunk hiện tại
current_chunk_content += line + "\n"
# Nếu chunk quá lớn thì chia nhỏ
if len(current_chunk_content) > self.CHUNK_SIZE and current_level is not None:
sub_chunks = self._split_into_chunks(current_chunk_content, self.CHUNK_SIZE, self.CHUNK_OVERLAP)
for sub_chunk in sub_chunks:
metadata = self._create_chunk_metadata(
sub_chunk.strip(),
str(current_level),
current_level_value,
current_parent,
vanbanid,
document_title,
chunk_stack,
chunk_dict
)
chunks.append(metadata)
chunk_stack.append((metadata.id, str(current_level), current_level_value, sub_chunk.strip()))
chunk_dict[metadata.id] = metadata
current_chunk_content = ""
# Lưu chunk cuối cùng
if current_chunk_content.strip() and current_level is not None:
metadata = self._create_chunk_metadata(
current_chunk_content.strip(),
str(current_level),
current_level_value,
current_parent,
vanbanid,
document_title,
chunk_stack,
chunk_dict
)
chunks.append(metadata)
chunk_stack.append((metadata.id, str(current_level), current_level_value, current_chunk_content.strip()))
chunk_dict[metadata.id] = metadata
root_count = sum(1 for chunk in chunks if chunk.cha is None)
logger.info(f"[CHUNKER] Created {len(chunks)} chunks, {root_count} root chunks")
for i, chunk in enumerate(chunks[:10]):
logger.debug(f"[CHUNKER] Chunk {i+1}: {chunk.content[:100]}... -> Parent: {chunk.cha}")
if len(chunks) > 10:
logger.debug(f"[CHUNKER] ... and {len(chunks) - 10} more chunks")
return chunks
def _find_parent_for_level(self, chunk_stack: List[Tuple[str, str, Optional[str], str]],
current_level: str, level_priority: Dict[str, int]) -> Optional[str]:
"""
Tìm parent gần nhất có level cao hơn (priority thấp hơn) cho level hiện tại, kiểm tra hợp lệ cha-con.
"""
current_priority = level_priority.get(current_level, 999)
valid_parents = {
"MUC": ["CHUONG", "PHAN"],
"DIEU": ["MUC", "CHUONG", "PHAN"],
"CHUONG": ["PHAN"],
# Các level khác giữ nguyên logic cũ
}
for chunk_id, level, level_value, content in reversed(chunk_stack):
if level_priority.get(level, 999) < current_priority:
if current_level in valid_parents:
if level in valid_parents[current_level]:
return chunk_id
else:
return chunk_id
return None
async def _create_embeddings_for_chunks(self, chunks: List[ChunkMetadata]) -> int:
"""Tạo embeddings cho các chunks và lưu ngay lập tức vào Supabase."""
logger.info(f"[CHUNKER] Creating embeddings and storing {len(chunks)} chunks")
success_count = 0
failed_count = 0
# Debug: Log chi tiết metadata của từng chunk
logger.info(f"[CHUNKER] === DETAILED METADATA ANALYSIS ===")
for i, chunk in enumerate(chunks[:20]): # Log 20 chunks đầu tiên
logger.info(f"[CHUNKER] Chunk {i+1}:")
logger.info(f" - ID: {chunk.id[:8]}...")
logger.info(f" - Content: {chunk.content[:100]}...")
logger.info(f" - Parent: {chunk.cha}")
logger.info(f" - Article: {chunk.article_number}")
logger.info(f" - Article Title: {chunk.article_title}")
logger.info(f" - Clause: {chunk.clause_number}")
logger.info(f" - Point: {chunk.sub_clause_letter}")
logger.info(f" - Document: {chunk.document_title}")
logger.info(f" ---")
for i, chunk in enumerate(chunks, 1):
try:
# Tạo embedding
embedding = await self.embedding_client.create_embedding(chunk.content, task_type="retrieval_document")
# Sinh semantic summary bằng LLM
semantic_summary = await self._create_semantic_summary_with_llm(chunk.content)
# Chuẩn bị data cho Supabase
chunk_dict = {
'id': chunk.id,
'content': chunk.content,
'embedding': embedding if embedding is not None else [0.0] * 768, # Sử dụng embedding thực tế nếu có
'vanbanid': chunk.vanbanid,
'cha': chunk.cha,
'document_title': chunk.document_title,
'article_number': chunk.article_number,
'article_title': chunk.article_title,
'clause_number': chunk.clause_number,
'sub_clause_letter': chunk.sub_clause_letter,
'context_summary': f"Structure: {chunk.context_summary}|Semantic: {semantic_summary}"
}
# Lưu ngay lập tức vào Supabase
success = self.supabase_client.store_document_chunk(chunk_dict)
if success:
success_count += 1
if i % 100 == 0: # Log mỗi 100 chunks
logger.info(f"[CHUNKER] Stored chunk {i}/{len(chunks)}: {chunk.id[:8]}...")
else:
failed_count += 1
logger.error(f"[CHUNKER] Failed to store chunk {chunk.id}")
except Exception as e:
failed_count += 1
logger.error(f"[CHUNKER] Error processing chunk {chunk.id}: {e}")
continue
logger.info(f"[CHUNKER] Successfully processed {success_count}/{len(chunks)} chunks, {failed_count} failed")
return success_count
async def _store_chunks_to_supabase(self, chunk_data: List[Dict]) -> bool:
"""Legacy method - không còn sử dụng."""
logger.warning("[CHUNKER] _store_chunks_to_supabase is deprecated, use _create_embeddings_for_chunks instead")
return True
async def process_law_document(self, file_path: str, document_id: int) -> bool:
"""
Hàm chính để xử lý văn bản luật.
Args:
file_path: Đường dẫn đến file văn bản luật
document_id: ID duy nhất của văn bản luật
Returns:
bool: True nếu thành công, False nếu thất bại
"""
try:
logger.info(f"[CHUNKER] Starting processing for file: {file_path}, document_id: {document_id}")
# 1. Tạo thư mục data nếu cần
self._create_data_directory()
# 2. Kiểm tra file tồn tại
if not os.path.exists(file_path):
logger.error(f"[CHUNKER] File not found: {file_path}")
return False
# 3. Đọc văn bản
content = self._read_document(file_path)
# 4. Trích xuất tiêu đề
document_title = self._extract_document_title(file_path)
# 5. Xử lý chunking theo cấu trúc
chunks = self._process_document_recursive(content, document_id, document_title)
if not chunks:
logger.warning(f"[CHUNKER] No chunks created for document {document_id}")
return False
# 6. Tạo embeddings
success_count = await self._create_embeddings_for_chunks(chunks)
if success_count == 0:
logger.error(f"[CHUNKER] No embeddings created for document {document_id}")
return False
logger.info(f"[CHUNKER] Successfully processed document {document_id} with {success_count} chunks")
return True
except Exception as e:
logger.error(f"[CHUNKER] Error processing document {document_id}: {e}") ##
return False
async def _create_semantic_summary_with_llm(self, chunk_content: str) -> str:
"""
Sinh semantic summary ngắn gọn, súc tích cho chunk bằng LLM.
"""
if not hasattr(self, "llm_client") or self.llm_client is None:
logger.warning("[CHUNKER] llm_client chưa được gán, bỏ qua semantic summary.")
return ""
prompt = (
"Tóm tắt thật ngắn gọn, súc tích nội dung luật sau (1-2 câu, không lặp lại tiêu đề, không giải thích):\n"
f"{chunk_content.strip()}"
)
try:
summary = await self.llm_client.generate_text(prompt)
return summary.strip() if summary else ""
except Exception as e:
logger.error(f"[CHUNKER] Lỗi khi sinh semantic summary bằng LLM: {e}")
return ""