Spaces:
Runtime error
Runtime error
| # build_v5.py | |
| import time | |
| import gc | |
| import os | |
| import logging | |
| from tqdm import tqdm | |
| from concurrent.futures import ProcessPoolExecutor, as_completed | |
| # Import các hàm cần thiết | |
| import config | |
| from db.weaviateDB import connect_to_weaviate | |
| from rag_components import ( | |
| get_huggingface_embeddings, | |
| create_weaviate_schema_if_not_exists, | |
| ingest_chunks_with_native_batching, | |
| filter_and_serialize_complex_metadata | |
| ) | |
| from utils.process_data import process_single_file | |
| logger = logging.getLogger(__name__) | |
| # --- CÁC HÀM HỖ TRỢ CHO CHECKPOINTING --- | |
| def load_processed_files() -> set: | |
| """Đọc file checkpoint và trả về một set các tên file đã xử lý.""" | |
| if not os.path.exists(config.CHECKPOINT_FILE): | |
| return set() | |
| with open(config.CHECKPOINT_FILE, 'r', encoding='utf-8') as f: | |
| return {line.strip() for line in f if line.strip()} | |
| def log_processed_file(filename: str): | |
| """Ghi tên file đã xử lý thành công vào file checkpoint.""" | |
| with open(config.CHECKPOINT_FILE, 'a', encoding='utf-8') as f: | |
| f.write(filename + '\n') | |
| def clear_weaviate_collection(client, collection_name: str): | |
| """Hàm helper để xóa collection và file checkpoint.""" | |
| try: | |
| if client.collections.exists(collection_name): | |
| logger.warning(f"🗑️ Đang xóa collection '{collection_name}'...") | |
| client.collections.delete(collection_name) | |
| logger.info(f"✅ Đã xóa collection '{collection_name}' thành công.") | |
| # Xóa cả file checkpoint khi rebuild | |
| if os.path.exists(config.CHECKPOINT_FILE): | |
| os.remove(config.CHECKPOINT_FILE) | |
| logger.info(f"🗑️ Đã xóa file checkpoint '{config.CHECKPOINT_FILE}'.") | |
| except Exception as e: | |
| logger.error(f"❌ Lỗi khi xóa collection: {e}") | |
| raise e | |
| def build_store_v5(force_rebuild: bool = False, pool_batch_size: int = 50): | |
| """ | |
| Hàm xây dựng Vector Store với Checkpointing và Pool Restart để xử lý các job dài hơi. | |
| Args: | |
| force_rebuild (bool): Xóa dữ liệu cũ và bắt đầu lại từ đầu. | |
| pool_batch_size (int): Số lượng file xử lý trước khi tái khởi động worker pool. | |
| """ | |
| logger.info(f"🚀 Bắt đầu Quá trình Xây dựng Vector Store v5 (Checkpointing & Pool Restart) 🚀") | |
| total_start_time = time.time() | |
| total_chunks_indexed = 0 | |
| weaviate_client = None | |
| try: | |
| # --- 1. SETUP PHASE --- | |
| logger.info("⚙️ 1. Thiết lập môi trường...") | |
| device = 'cpu' # Giả định không có GPU | |
| logger.info(f"💻 Sử dụng thiết bị: {device}") | |
| weaviate_client = connect_to_weaviate(run_diagnostics=False) | |
| if not weaviate_client: raise ConnectionError("Không thể kết nối đến Weaviate.") | |
| logger.info("🧠 Tải model embedding (chạy trên CPU)...") | |
| embeddings_model = get_huggingface_embeddings(config.EMBEDDING_MODEL_NAME, device) | |
| if not embeddings_model: raise RuntimeError("Không thể khởi tạo model embedding.") | |
| collection_name = config.WEAVIATE_COLLECTION_NAME | |
| if force_rebuild: | |
| clear_weaviate_collection(weaviate_client, collection_name) | |
| create_weaviate_schema_if_not_exists(weaviate_client, collection_name) | |
| # --- 2. LỌC FILE VÀ CHUẨN BỊ BATCH --- | |
| processed_files = load_processed_files() | |
| all_txt_paths = [os.path.join(config.CORE_DATA_FOLDER, f) for f in os.listdir(config.CORE_DATA_FOLDER) if f.lower().endswith('.txt')] | |
| # Lọc ra những file chưa được xử lý | |
| files_to_process = [path for path in all_txt_paths if os.path.basename(path) not in processed_files] | |
| if not files_to_process: | |
| logger.info("✅ Tất cả các file đã được xử lý. Không có gì để làm.") | |
| return | |
| logger.info(f"🔍 Đã xử lý {len(processed_files)} files. Còn lại {len(files_to_process)} files cần xử lý.") | |
| # --- 3. XỬ LÝ THEO LÔ ĐỂ CHỐNG RÒ RỈ BỘ NHỚ --- | |
| max_workers = os.cpu_count() or 1 | |
| # Tạo thanh tiến trình tổng | |
| main_progress_bar = tqdm(total=len(files_to_process), desc="Tổng tiến trình") | |
| # Chia danh sách file thành các lô nhỏ | |
| for i in range(0, len(files_to_process), pool_batch_size): | |
| file_batch = files_to_process[i:i + pool_batch_size] | |
| logger.info(f"\n🔄 Đang xử lý lô {i//pool_batch_size + 1}, gồm {len(file_batch)} files. Tái khởi động Worker Pool...") | |
| # Tạo một Worker Pool MỚI cho mỗi lô | |
| with ProcessPoolExecutor(max_workers=max_workers) as executor: | |
| future_to_file = {executor.submit(process_single_file, path): path for path in file_batch} | |
| for future in as_completed(future_to_file): | |
| path = future_to_file[future] | |
| filename = os.path.basename(path) | |
| try: | |
| chunks_from_file = future.result() | |
| if not chunks_from_file: | |
| logger.warning(f"File '{filename}' không có chunks.") | |
| log_processed_file(filename) # Vẫn ghi nhận là đã xử lý | |
| main_progress_bar.update(1) | |
| continue | |
| processed_chunks = filter_and_serialize_complex_metadata(chunks_from_file) | |
| ingest_chunks_with_native_batching( | |
| client=weaviate_client, | |
| collection_name=collection_name, | |
| chunks=processed_chunks, | |
| embeddings_model=embeddings_model | |
| ) | |
| total_chunks_indexed += len(chunks_from_file) | |
| log_processed_file(filename) # Ghi nhận thành công | |
| main_progress_bar.set_description(f"✅ Ingested '{filename}'") | |
| main_progress_bar.update(1) # Cập nhật thanh tiến trình tổng | |
| del chunks_from_file, processed_chunks | |
| gc.collect() | |
| except Exception as e: | |
| logger.error(f"❌ Lỗi khi xử lý file '{filename}': {e}", exc_info=True) | |
| main_progress_bar.close() | |
| logger.info(f"\n✅ Đã ingest thành công thêm {total_chunks_indexed} chunks.") | |
| except (ConnectionError, RuntimeError, Exception) as e: | |
| logger.error(f"💥 Đã xảy ra lỗi nghiêm trọng: {e}", exc_info=True) | |
| finally: | |
| # --- 4. DỌN DẸP --- | |
| logger.info("\n⚙️ 4. Dọn dẹp tài nguyên...") | |
| if 'embeddings_model' in locals(): del embeddings_model | |
| if weaviate_client and weaviate_client.is_connected(): | |
| weaviate_client.close() | |
| logger.info("🔌 Đã đóng kết nối Weaviate.") | |
| gc.collect() | |
| total_end_time = time.time() | |
| logger.info(f"⏱️ Tổng thời gian chạy lần này: {total_end_time - total_start_time:.2f} giây") | |
| logger.info("🎉 Chương trình kết thúc! 🎉") | |
| if __name__ == "__main__": | |
| # Khi chạy lại, chỉ cần chạy lệnh này. Nó sẽ tự động tiếp tục. | |
| # Đặt force_rebuild=True chỉ khi bạn muốn xóa sạch và làm lại từ đầu. | |
| build_store_v5(force_rebuild=True) |