entidi2608 commited on
Commit
25e6e74
·
1 Parent(s): fc0cc58

update: retriever

Browse files
config.py CHANGED
@@ -35,8 +35,8 @@ ALLOWED_EXTENSIONS = {".pdf", ".docx", ".doc"}
35
 
36
 
37
  LEGAL_DOC_TYPES = ["Luật", "Bộ luật", "Nghị định", "Thông tư", "Quyết định", "Pháp lệnh", "Nghị quyết", "Chỉ thị", "Hiến pháp"]
38
- MAX_CHUNK_SIZE = 3000 # Kích thước tối đa cho một chunk trước khi bị chia nhỏ hơn
39
- CHUNK_OVERLAP = 300
40
 
41
  REDIS_URL = os.environ.get("REDIS_URL")
42
 
 
35
 
36
 
37
  LEGAL_DOC_TYPES = ["Luật", "Bộ luật", "Nghị định", "Thông tư", "Quyết định", "Pháp lệnh", "Nghị quyết", "Chỉ thị", "Hiến pháp"]
38
+ MAX_CHUNK_SIZE = 1000 # Kích thước tối đa cho một chunk trước khi bị chia nhỏ hơn
39
+ CHUNK_OVERLAP = 200
40
 
41
  REDIS_URL = os.environ.get("REDIS_URL")
42
 
dependencies.py CHANGED
@@ -104,7 +104,9 @@ async def initialize_api_components(app_state: AppState):
104
  logger.error("🚨 KHÔNG CÓ GOOGLE API KEYS NÀO ĐƯỢC CẤP PHÁT!")
105
  raise HTTPException(status_code=500, detail="No Google API keys found")
106
 
107
- app_state.llm = rag_components.create_llm_from_google_key_list(google_api_keys=google_api_keys_list)
 
 
108
 
109
 
110
 
 
104
  logger.error("🚨 KHÔNG CÓ GOOGLE API KEYS NÀO ĐƯỢC CẤP PHÁT!")
105
  raise HTTPException(status_code=500, detail="No Google API keys found")
106
 
107
+ # app_state.llm = rag_components.create_llm_from_google_key_list(google_api_keys=google_api_keys_list)
108
+
109
+ app_state.llm = rag_components.get_google_llm(config.GOOGLE_API_KEYS)
110
 
111
 
112
 
rag_components.py CHANGED
@@ -308,7 +308,7 @@ def get_google_llm(google_api_key):
308
  try:
309
  def create_chat_google():
310
  return ChatGoogleGenerativeAI(
311
- model="gemini-2.5-flash-preview-05-20",
312
  google_api_key=google_api_key,
313
  temperature=0.0, # Điều chỉnh nhiệt độ nếu cần, 0.1-0.3 thường tốt cho RAG
314
  safety_settings={},
 
308
  try:
309
  def create_chat_google():
310
  return ChatGoogleGenerativeAI(
311
+ model="gemini-2.5-flash",
312
  google_api_key=google_api_key,
313
  temperature=0.0, # Điều chỉnh nhiệt độ nếu cần, 0.1-0.3 thường tốt cho RAG
314
  safety_settings={},
services/document_service.py CHANGED
@@ -2,25 +2,25 @@ from pathlib import Path
2
  from llama_parse import LlamaParse
3
  import docx
4
  import pypandoc
5
- from datetime import datetime, timezone
6
- from rag_components import get_huggingface_embeddings
7
- from io import BytesIO
8
  import os
9
  import logging
10
  from langchain_core.documents import Document
11
  import config
12
  from db.weaviateDB import connect_to_weaviate
13
- from db.mongoDB import mongo_db
14
-
15
- from fastapi.concurrency import run_in_threadpool
16
  logger = logging.getLogger(__name__)
17
 
18
  from rag_components import create_weaviate_schema_if_not_exists, ingest_chunks_with_native_batching
19
  from utils.process_data import hierarchical_split_law_document,extract_document_metadata,clean_document_text,infer_field, infer_entity_type, filter_and_serialize_complex_metadata
20
 
21
  class ApiKeyManager:
22
- """Quản lý một danh sách các API key."""
 
 
 
23
  def __init__(self, api_key_string: str):
 
24
  self.keys = [key.strip() for key in api_key_string.split(',') if key.strip()]
25
  if not self.keys:
26
  raise ValueError("Chuỗi API key không hợp lệ hoặc rỗng.")
@@ -39,171 +39,105 @@ class ApiKeyManager:
39
  logger.warning(f"Chuyển sang sử dụng API key tiếp theo (index: {self.current_key_index}).")
40
  return self.get_key()
41
 
42
- def reset(self):
43
- """Reset lại index để bắt đầu từ key đầu tiên cho lần xử lý mới."""
44
- self.current_key_index = 0
45
- logger.info("Key Manager đã được reset.")
46
-
47
  llama_key_manager = ApiKeyManager(config.LLAMA_CLOUD_API_KEYS)
48
 
49
- # --- SỬA LẠI HÀM NÀY ĐỂ NHẬN STREAM ---
50
- def convert_to_text_content(source_stream: BytesIO, original_filename: str) -> str:
51
- """Trích xuất nội dung text từ một stream trong bộ nhớ."""
52
- file_extension = Path(original_filename).suffix.lower()
53
- logger.info(f"Extracting content from: {original_filename}")
54
- content = ""
55
 
56
- source_stream.seek(0)
 
 
 
 
57
 
58
  if file_extension == ".pdf":
59
- # Do LlamaParse cần đường dẫn file, chúng ta sẽ ghi stream ra file tạm MỘT LẦN
60
- # tái sử dụng đường dẫn này trong vòng lặp thử key.
61
-
62
- # Tạo tên file tạm duy nhất để tránh xung đột khi xử lý song song
63
-
64
- temp_pdf_path = f"/tmp/{original_filename}"
65
-
66
- try:
67
- with open(temp_pdf_path, "wb") as f:
68
- f.write(source_stream.getvalue())
69
-
70
- # Reset key manager trước khi bắt đầu để đảm bảo nó luôn thử từ key đầu tiên
71
- llama_key_manager.reset()
72
-
73
- # Bắt đầu vòng lặp để thử các API key
74
- while (current_key := llama_key_manager.get_key()) is not None:
75
- try:
76
- logger.info(f"Đang thử chuyển đổi PDF '{original_filename}' bằng key index: {llama_key_manager.current_key_index}...")
77
- parser = LlamaParse(
78
- api_key=current_key,
79
- result_type="text",
80
- verbose=True, # Giữ để debug
81
- language="vi"
82
- )
83
- # Sử dụng đường dẫn file tạm đã tạo
84
- documents = parser.load_data([temp_pdf_path])
85
-
86
- if documents and documents[0].text.strip():
87
- content = documents[0].text
88
- logger.info(f"✅ Chuyển đổi PDF thành công bằng key index: {llama_key_manager.current_key_index}.")
89
- break # Thành công, thoát khỏi vòng lặp
90
- else:
91
- raise ValueError("LlamaParse trả về nội dung rỗng.")
92
-
93
- except Exception as e:
94
- logger.error(f"❌ Lỗi với key index {llama_key_manager.current_key_index} cho file '{original_filename}': {e}")
95
- if llama_key_manager.get_next_key() is None:
96
- logger.critical("Đã thử hết tất cả các API key nhưng đều thất bại cho file PDF.")
97
- raise Exception(f"Không thể chuyển đổi file '{original_filename}' sau khi đã thử tất cả các API key.") from e
98
-
99
- if not content:
100
- raise ValueError(f"Không thể trích xuất nội dung từ PDF '{original_filename}' sau khi thử các key.")
101
-
102
- finally:
103
- # Luôn dọn dẹp file tạm, dù thành công hay thất bại
104
- if os.path.exists(temp_pdf_path):
105
- os.remove(temp_pdf_path)
106
- logger.debug(f"Đã dọn dẹp file tạm: {temp_pdf_path}")
107
 
108
  elif file_extension == ".docx":
109
- # docx có thể đọc trực tiếp từ stream
110
- doc = docx.Document(source_stream)
111
  content = '\n'.join([para.text for para in doc.paragraphs])
112
-
113
  elif file_extension == ".doc":
114
- # pypandoc cần file trên đĩa
115
-
116
- temp_doc_path = f"/tmp/{original_filename}"
117
- try:
118
- with open(temp_doc_path, "wb") as f:
119
- f.write(source_stream.getvalue())
120
- content = pypandoc.convert_file(temp_doc_path, 'plain', format='doc')
121
- finally:
122
- if os.path.exists(temp_doc_path):
123
- os.remove(temp_doc_path)
124
- logger.debug(f"Đã dọn dẹp file tạm: {temp_doc_path}")
125
-
126
  else:
127
  raise ValueError(f"Định dạng file không được hỗ trợ: {file_extension}")
128
 
129
  if not content.strip():
130
- raise ValueError(f"Nội dung trích xuất từ '{original_filename}' bị rỗng.")
 
131
 
132
- logger.info(f"✅ Trích xuất nội dung thành công từ stream của file: {original_filename}.")
133
  return content
134
 
135
-
136
-
137
-
138
- async def full_process_and_ingest_pipeline(raw_content: str, filename: str, file_hash: str):
139
- """
140
- Pipeline xử lý nền đã được tối ưu hoàn toàn.
141
- """
142
- logger.info(f"BACKGROUND TASK: Starting NLP and Ingestion for: {filename} (Hash: {file_hash[:10]}...)")
143
  weaviate_client = None
144
  try:
145
- embeddings_model = get_huggingface_embeddings(
146
- config.EMBEDDING_MODEL_NAME
147
- )
148
- # Giai đoạn 1: Xử lý NLP (CPU-bound)
149
- doc_metadata = await run_in_threadpool(extract_document_metadata, raw_content, filename)
150
- doc_metadata["source"] = filename
151
 
152
- cleaned_content = await run_in_threadpool(clean_document_text, raw_content)
153
- doc_metadata["field"] = await run_in_threadpool(infer_field, cleaned_content, doc_metadata.get("ten_van_ban"))
154
- doc_metadata["entity_type"] = await run_in_threadpool(infer_entity_type, cleaned_content, doc_metadata.get("field", ""))
 
 
155
 
156
  doc_to_split = Document(page_content=cleaned_content, metadata=doc_metadata)
157
- chunks_from_file = await run_in_threadpool(hierarchical_split_law_document, doc_to_split)
158
 
159
  if not chunks_from_file:
160
  raise ValueError("File did not yield any chunks after processing.")
161
 
162
  processed_chunks = filter_and_serialize_complex_metadata(chunks_from_file)
163
 
164
- # Giai đoạn 2: Ingest vào Weaviate (I/O-bound và CPU-bound)
165
- weaviate_client = connect_to_weaviate(run_diagnostics=False)
166
-
167
-
168
- await run_in_threadpool(create_weaviate_schema_if_not_exists, weaviate_client, config.WEAVIATE_COLLECTION_NAME)
169
- await run_in_threadpool(
170
- ingest_chunks_with_native_batching,
171
- weaviate_client, config.WEAVIATE_COLLECTION_NAME, processed_chunks, embeddings_model
172
- )
173
 
174
- # Giai đoạn 3: Ghi log thành công
175
- await log_processed_hash(file_hash, filename)
176
- logger.info(f"✅✅✅ All tasks completed for '{filename}'.")
177
 
 
 
 
 
178
  except Exception as e:
179
  logger.error(f"❌ FAILED pipeline for '{filename}': {e}", exc_info=True)
180
- await log_failed_process(file_hash, filename, str(e))
 
181
  finally:
182
  if weaviate_client and weaviate_client.is_connected():
183
- weaviate_client.close()
184
-
185
-
186
- async def log_processed_hash(file_hash: str, filename: str, status: str = "SUCCESS", error_message: str = None):
187
- """Ghi lại trạng thái xử lý (thành công hoặc thất bại) vào MongoDB."""
188
- try:
189
- record = {
190
- "file_hash": file_hash,
191
- "original_filename": filename,
192
- "processed_at": datetime.now(timezone.utc),
193
- "status": status,
194
- }
195
- if error_message:
196
- record["error_message"] = error_message
197
-
198
- await mongo_db.processed_documents.insert_one(record)
199
- except Exception as e:
200
- logger.error(f"Could not write process record to MongoDB for hash {file_hash}: {e}")
201
-
202
- # Wrapper cho việc log lỗi
203
- async def log_failed_process(file_hash: str, filename: str, error_message: str):
204
- await log_processed_hash(file_hash, filename, status="FAILED", error_message=error_message)
205
-
206
- # Hàm kiểm tra trùng lặp
207
- async def check_if_hash_exists(file_hash: str) -> bool:
208
- count = await mongo_db.processed_documents.count_documents({"file_hash": file_hash, "status": "SUCCESS"})
209
- return count > 0
 
2
  from llama_parse import LlamaParse
3
  import docx
4
  import pypandoc
5
+ import shutil
 
 
6
  import os
7
  import logging
8
  from langchain_core.documents import Document
9
  import config
10
  from db.weaviateDB import connect_to_weaviate
11
+ import utils.utils as utils
 
 
12
  logger = logging.getLogger(__name__)
13
 
14
  from rag_components import create_weaviate_schema_if_not_exists, ingest_chunks_with_native_batching
15
  from utils.process_data import hierarchical_split_law_document,extract_document_metadata,clean_document_text,infer_field, infer_entity_type, filter_and_serialize_complex_metadata
16
 
17
  class ApiKeyManager:
18
+ """
19
+ Quản lý một danh sách các API key.
20
+ Cung cấp key tiếp theo trong danh sách và cho phép xoay vòng.
21
+ """
22
  def __init__(self, api_key_string: str):
23
+ # Tách chuỗi thành danh sách các key, loại bỏ khoảng trắng thừa
24
  self.keys = [key.strip() for key in api_key_string.split(',') if key.strip()]
25
  if not self.keys:
26
  raise ValueError("Chuỗi API key không hợp lệ hoặc rỗng.")
 
39
  logger.warning(f"Chuyển sang sử dụng API key tiếp theo (index: {self.current_key_index}).")
40
  return self.get_key()
41
 
42
+ # Khởi tạo một instance của Key Manager với chuỗi key từ config
43
+ # Đặt ở cấp module để duy trì trạng thái qua các lần gọi hàm
 
 
 
44
  llama_key_manager = ApiKeyManager(config.LLAMA_CLOUD_API_KEYS)
45
 
 
 
 
 
 
 
46
 
47
+ def convert_to_text_content(source_path: str) -> str:
48
+ source_file = Path(source_path)
49
+ file_extension = source_file.suffix.lower()
50
+ logger.info(f"Đang trích xuất nội dung từ: {source_file.name}")
51
+ content = ""
52
 
53
  if file_extension == ".pdf":
54
+ # Bắt đầu vòng lặp để thử các API key
55
+ while (current_key := llama_key_manager.get_key()) is not None:
56
+ try:
57
+ logger.info(f"Đang thử chuyển đổi PDF bằng key index: {llama_key_manager.current_key_index}...")
58
+ parser = LlamaParse(
59
+ api_key=current_key,
60
+ result_type="text",
61
+ verbose=True,
62
+ language="vi"
63
+ )
64
+ # parser.load_data có thể ném ra lỗi nếu key không hợp lệ/hết hạn
65
+ documents = parser.load_data([str(source_file)])
66
+
67
+ if documents and documents[0].text.strip():
68
+ content = documents[0].text
69
+ logger.info(f"✅ Chuyển đổi PDF thành công bằng key index: {llama_key_manager.current_key_index}.")
70
+ # Nếu thành công, thoát khỏi vòng lặp
71
+ break
72
+ else:
73
+ # Trường hợp hiếm: không có lỗi nhưng nội dung rỗng
74
+ raise ValueError("LlamaParse trả về nội dung rỗng.")
75
+
76
+ except Exception as e:
77
+ logger.error(f"❌ Lỗi với key index {llama_key_manager.current_key_index}: {e}")
78
+ # Nếu lỗi, thử key tiếp theo trong lần lặp tới
79
+ if llama_key_manager.get_next_key() is None:
80
+ # Nếu đã hết key để thử
81
+ logger.critical("Đã thử hết tất cả các API key nhưng đều thất bại.")
82
+ raise Exception("Không thể chuyển đổi file PDF sau khi đã thử tất cả các API key.") from e
83
+ # Nếu còn key, vòng lặp while sẽ tiếp tục với key mới
84
+
85
+ # Sau vòng lặp, nếu không có content, nghĩa là đã có lỗi nghiêm trọng
86
+ if not content:
87
+ raise ValueError("Không thể trích xuất nội dung PDF sau khi thử các key.")
88
+
 
 
 
 
 
 
 
 
 
 
 
 
 
89
 
90
  elif file_extension == ".docx":
91
+ doc = docx.Document(source_path)
 
92
  content = '\n'.join([para.text for para in doc.paragraphs])
 
93
  elif file_extension == ".doc":
94
+ content = pypandoc.convert_file(source_path, 'plain', format='doc')
 
 
 
 
 
 
 
 
 
 
 
95
  else:
96
  raise ValueError(f"Định dạng file không được hỗ trợ: {file_extension}")
97
 
98
  if not content.strip():
99
+ # Lỗi này chỉ nên xảy ra với docx/doc hoặc nếu LlamaParse thất bại một cách thầm lặng
100
+ raise ValueError("Nội dung trích xuất bị rỗng.")
101
 
102
+ logger.info(f"✅ Trích xuất nội dung thành công từ {source_file.name}.")
103
  return content
104
 
105
+ def full_process_and_ingest_pipeline(filepath: str, file_hash: str, embedding_model):
106
+ filename = os.path.basename(filepath)
107
+ logger.info(f"BACKGROUND TASK: Starting full pipeline for: {filename} (Hash: {file_hash[:10]}...)")
 
 
 
 
 
108
  weaviate_client = None
109
  try:
110
+ raw_content = convert_to_text_content(filepath)
 
 
 
 
 
111
 
112
+ doc_metadata = extract_document_metadata(raw_content, filename)
113
+ doc_metadata["source"] = filename
114
+ cleaned_content = clean_document_text(raw_content)
115
+ doc_metadata["field"] = infer_field(cleaned_content, doc_metadata.get("ten_van_ban"))
116
+ doc_metadata["entity_type"] = infer_entity_type(cleaned_content, doc_metadata.get("field", ""))
117
 
118
  doc_to_split = Document(page_content=cleaned_content, metadata=doc_metadata)
119
+ chunks_from_file = hierarchical_split_law_document(doc_to_split)
120
 
121
  if not chunks_from_file:
122
  raise ValueError("File did not yield any chunks after processing.")
123
 
124
  processed_chunks = filter_and_serialize_complex_metadata(chunks_from_file)
125
 
126
+ weaviate_client = connect_to_weaviate()
127
+ embeddings_model = embedding_model
128
+ collection_name = config.WEAVIATE_COLLECTION_NAME
129
+ create_weaviate_schema_if_not_exists(weaviate_client, collection_name)
 
 
 
 
 
130
 
131
+ ingest_chunks_with_native_batching(weaviate_client, collection_name, processed_chunks, embeddings_model)
 
 
132
 
133
+ utils.log_processed_hash(file_hash)
134
+ logger.info(f"✅ Successfully ingested '{filename}'.")
135
+ shutil.move(filepath, os.path.join(config.PROCESSED_FILES_FOLDER, filename))
136
+ logger.info(f"Moved '{filename}' to processed folder.")
137
  except Exception as e:
138
  logger.error(f"❌ FAILED pipeline for '{filename}': {e}", exc_info=True)
139
+ shutil.move(filepath, os.path.join(config.FAILED_FILES_FOLDER, filename))
140
+ logger.info(f"Moved '{filename}' to failed folder.")
141
  finally:
142
  if weaviate_client and weaviate_client.is_connected():
143
+ weaviate_client.close()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
utils/AdvancedLawRetriever.py CHANGED
@@ -21,8 +21,8 @@ class AdvancedLawRetriever(BaseRetriever):
21
  reranker: Any
22
  embeddings_model: Any
23
 
24
- default_k: int = 5
25
- initial_k: int = 15 # Lấy nhiều ứng viên ban đầu
26
  hybrid_search_alpha: float = 0.5
27
  doc_type_boost: float = 0.4
28
 
 
21
  reranker: Any
22
  embeddings_model: Any
23
 
24
+ default_k: int = 3
25
+ initial_k: int = 10 # Lấy nhiều ứng viên ban đầu
26
  hybrid_search_alpha: float = 0.5
27
  doc_type_boost: float = 0.4
28
 
utils/process_data.py CHANGED
@@ -38,23 +38,19 @@ def filter_and_serialize_complex_metadata(documents: List[Document]) -> List[Doc
38
  return updated_documents
39
 
40
 
41
- class SimpleTextSplitter:
42
- """Một text splitter đơn giản để chia nhỏ các chunk quá lớn."""
43
- def __init__(self, chunk_size: int, chunk_overlap: int):
44
- self.chunk_size = chunk_size
45
- self.chunk_overlap = chunk_overlap
46
-
47
- def split_text(self, text: str) -> List[str]:
48
- if not text: return []
49
- chunks = []
50
- start = 0
51
- while start < len(text):
52
- end = start + self.chunk_size
53
- chunks.append(text[start:end])
54
- start += self.chunk_size - self.chunk_overlap
55
- return chunks
56
-
57
- base_text_splitter = SimpleTextSplitter(chunk_size=MAX_CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP)
58
 
59
  def generate_structured_id(doc_so_hieu: Optional[str], structure_path: List[str], filename: str) -> str:
60
  """
 
38
  return updated_documents
39
 
40
 
41
+ from langchain.text_splitter import RecursiveCharacterTextSplitter
42
+
43
+ # Đây là cấu hình tốt nhất, cân bằng, đã được kiểm chứng và không cần regex phức tạp
44
+ base_text_splitter = RecursiveCharacterTextSplitter(
45
+ chunk_size=MAX_CHUNK_SIZE,
46
+ chunk_overlap=CHUNK_OVERLAP,
47
+ length_function=len,
48
+ add_start_index=True,
49
+ # Các separator này tôn trọng cấu trúc tự nhiên của văn bản (đoạn, dòng, câu)
50
+ # Đây là cách tiếp cận được khuyến nghị và hiệu quả nhất.
51
+ separators=["\n\n", "\n", ". ", ", ", " ", ""],
52
+ is_separator_regex=False # Không cần bật regex cho các separator đơn giản này
53
+ )
 
 
 
 
54
 
55
  def generate_structured_id(doc_so_hieu: Optional[str], structure_path: List[str], filename: str) -> str:
56
  """