import os import zipfile import copy import time import xml.etree.ElementTree as ET from typing import List, Dict, Any, Optional, Tuple from utils.utils import translate_text, unzip_office_file, preprocess_text, postprocess_text, translate_single_text from pymongo import MongoClient import gridfs from io import BytesIO import shutil import io NS_MAIN = {'main': 'http://schemas.openxmlformats.org/spreadsheetml/2006/main'} # --- Hàm đăng ký namespace (quan trọng khi ghi file) --- def register_namespaces(xml_file): """Đọc và đăng ký các namespace từ file XML.""" namespaces = dict([ node for _, node in ET.iterparse(xml_file, events=['start-ns']) ]) for ns, uri in namespaces.items(): ET.register_namespace(ns, uri) # Đăng ký thêm namespace phổ biến nếu chưa có if 'main' not in namespaces and '' not in namespaces: # Kiểm tra cả prefix rỗng ET.register_namespace('', NS_MAIN['main']) # Đăng ký default namespace elif 'main' not in namespaces: ET.register_namespace('main', NS_MAIN['main']) # Đăng ký với prefix 'main' def extract_text_from_sheet(unzipped_folder_path: str) -> Optional[Tuple[List[Dict[str, Any]], Dict[str, Any]]]: """ Trích xuất text, lưu lại định dạng của run đầu tiên nếu là Rich Text. """ modifiable_nodes = [] shared_strings_path = os.path.join(unzipped_folder_path, "xl", "sharedStrings.xml") worksheets_folder = os.path.join(unzipped_folder_path, "xl", "worksheets") shared_tree = None sheet_trees = {} # --- Xử lý sharedStrings.xml --- if os.path.exists(shared_strings_path): try: register_namespaces(shared_strings_path) shared_tree = ET.parse(shared_strings_path) root_shared = shared_tree.getroot() for si_element in root_shared.findall('main:si', NS_MAIN): text_parts = [] t_elements = si_element.findall('.//main:t', NS_MAIN) # Tìm tất cả con # Tìm run đầu tiên () và properties () của nó first_r = si_element.find('./main:r', NS_MAIN) # Tìm con trực tiếp đầu tiên first_rpr_clone = None # Lưu bản sao của đầu tiên is_rich_text = first_r is not None if is_rich_text: # Tìm bên trong đầu tiên first_rpr = first_r.find('./main:rPr', NS_MAIN) if first_rpr is not None: # Sao chép sâu để không ảnh hưởng cây gốc và để dùng sau first_rpr_clone = copy.deepcopy(first_rpr) # Lấy toàn bộ text for t_node in t_elements: if t_node.text: text_parts.append(t_node.text) full_text = "".join(text_parts) if not full_text: continue # Bỏ qua nếu không có text if is_rich_text: modifiable_nodes.append({ 'type': 'shared_rich', 'original_text': full_text, 'element': si_element, # Tham chiếu 'first_format': first_rpr_clone, # Lưu định dạng đầu tiên (hoặc None) 'source_file': os.path.join("xl", "sharedStrings.xml"), 'sheet_name': None }) elif t_elements: # Không phải rich text, tìm thẻ đơn giản first_t = si_element.find('./main:t', NS_MAIN) if first_t is not None: modifiable_nodes.append({ 'type': 'shared_simple', 'original_text': full_text, 'element': first_t, # Tham chiếu 'first_format': None, # Không có định dạng đặc biệt 'source_file': os.path.join("xl", "sharedStrings.xml"), 'sheet_name': None }) except Exception as e: print(f"Lỗi xử lý sharedStrings: {e}") import traceback traceback.print_exc() # --- Xử lý các file sheetX.xml (Inline Strings - không có định dạng phức tạp) --- if os.path.isdir(worksheets_folder): for sheet_filename in sorted(os.listdir(worksheets_folder)): if sheet_filename.lower().endswith(".xml"): # ... (phần đọc và parse sheet tree như cũ) ... sheet_file_path = os.path.join(worksheets_folder, sheet_filename) try: register_namespaces(sheet_file_path) sheet_tree = ET.parse(sheet_file_path) sheet_trees[sheet_filename] = sheet_tree root_sheet = sheet_tree.getroot() for cell in root_sheet.findall('.//main:c[@t="inlineStr"]', NS_MAIN): t_element = cell.find('.//main:is/main:t', NS_MAIN) if t_element is not None and t_element.text is not None: modifiable_nodes.append({ 'type': 'inline', 'original_text': t_element.text, 'element': t_element, # Tham chiếu 'first_format': None, # Inline string không có định dạng 'source_file': os.path.join("xl", "worksheets", sheet_filename), 'sheet_name': sheet_filename }) except Exception as e: print(f"Lỗi xử lý sheet {sheet_filename}: {e}") import traceback traceback.print_exc() else: print(f"Lỗi: Không tìm thấy thư mục worksheets: {worksheets_folder}") global_data = {"shared_tree": shared_tree, "sheet_trees": sheet_trees, "shared_strings_path": shared_strings_path, "worksheets_folder": worksheets_folder} return modifiable_nodes, global_data def apply_and_save_changes(modified_nodes_data: List[Dict[str, Any]], global_data: Dict[str, Any]) -> bool: """ Cập nhật text, giữ lại định dạng đầu tiên cho Rich Text, và lưu file XML. """ if not global_data: print("Lỗi: Thiếu global_data."); return False updated_files = set() try: ET.register_namespace('xml', "http://www.w3.org/XML/1998/namespace") except ValueError: pass for node_info in modified_nodes_data: if 'modified_text' in node_info and node_info['element'] is not None: element = node_info['element'] modified_text = node_info['modified_text'] original_text = node_info.get('original_text', '') node_type = node_info.get('type', '') first_format = node_info.get('first_format') # Lấy đã lưu (hoặc None) if original_text != modified_text: # --- Xử lý Rich Text: Tạo lại cấu trúc [] --- if node_type == 'shared_rich': si_element = element # Xóa con cũ for child in list(si_element): si_element.remove(child) # Tạo run mới new_r = ET.Element(f"{{{NS_MAIN['main']}}}r") # Nếu có định dạng đầu tiên (), thêm nó vào mới if first_format is not None: new_r.append(first_format) # Thêm bản sao đã lưu # Tạo thẻ text mới new_t = ET.Element(f"{{{NS_MAIN['main']}}}t") new_t.text = modified_text xml_space_attr = '{http://www.w3.org/XML/1998/namespace}space' new_t.set(xml_space_attr, 'preserve') # Thêm vào new_r.append(new_t) # Thêm vào si_element.append(new_r) updated_files.add(node_info['source_file']) # print(f"Applied first format to Rich Text in {node_info['source_file']}") # --- Xử lý Simple/Inline Text: Cập nhật thẻ --- elif node_type in ['shared_simple', 'inline']: t_element = element t_element.text = modified_text xml_space_attr = '{http://www.w3.org/XML/1998/namespace}space' if xml_space_attr not in t_element.attrib or t_element.attrib[xml_space_attr] != 'preserve': t_element.set(xml_space_attr, 'preserve') updated_files.add(node_info['source_file']) # print(f"Updated Simple/Inline Text in {node_info['source_file']}") else: print(f"Cảnh báo: Loại node không xác định '{node_type}'") # --- Lưu lại các file XML đã thay đổi (Giữ nguyên) --- success = True # ... (Phần code lưu file như cũ) ... shared_tree = global_data.get("shared_tree"); shared_strings_path = global_data.get("shared_strings_path") sheet_trees = global_data.get("sheet_trees", {}); worksheets_folder = global_data.get("worksheets_folder") shared_strings_relative_path = os.path.join("xl", "sharedStrings.xml") if shared_tree and shared_strings_path and shared_strings_relative_path in updated_files: try: # print(f"Saving modified file: {shared_strings_path}") shared_tree.write(shared_strings_path, encoding='utf-8', xml_declaration=True) except Exception as e: print(f"Lỗi lưu {shared_strings_path}: {e}"); success = False if worksheets_folder and os.path.exists(worksheets_folder): for sheet_filename, sheet_tree in sheet_trees.items(): sheet_relative_path = os.path.join("xl", "worksheets", sheet_filename) if sheet_relative_path in updated_files: sheet_file_path = os.path.join(worksheets_folder, sheet_filename) try: # print(f"Saving modified file: {sheet_file_path}") sheet_tree.write(sheet_file_path, encoding='utf-8', xml_declaration=True) except Exception as e: print(f"Lỗi lưu {sheet_file_path}: {e}"); success = False if success and updated_files: print(f"Đã lưu thành công {len(updated_files)} file XML đã sửa đổi (đã giữ lại định dạng đầu tiên cho Rich Text).") elif not updated_files: print("Không có file XML nào cần cập nhật.") ; return True return success def zip_folder_to_excel_file(folder_path, file_name): try: # Nén thư mục thành file .xlsx trong RAM xlsx_buffer = io.BytesIO() with zipfile.ZipFile(xlsx_buffer, 'w', zipfile.ZIP_DEFLATED) as zipf: for root, _, files in os.walk(folder_path): for file in files: file_path = os.path.join(root, file) archive_path = os.path.relpath(file_path, folder_path) zipf.write(file_path, archive_path) xlsx_buffer.seek(0) client = MongoClient("mongodb+srv://admin:1highbar456@cluster0.equkm.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0") db = client['excel'] fs = gridfs.GridFS(db, collection='final_file') file_id = fs.put(xlsx_buffer.read(), filename=file_name) print(f"✅ Đã lưu file Excel vào MongoDB với ID: {file_id}") return file_id except Exception as e: print(f"❌ Lỗi khi nén và lưu Excel vào MongoDB: {e}") return None def get_text_list_from_nodes(modifiable_nodes: Optional[List[Dict[str, Any]]]) -> List[str]: if modifiable_nodes is None: return [] # Trả về list rỗng nếu đầu vào là None # Sử dụng list comprehension để lấy giá trị của key 'original_text' từ mỗi dictionary text_list = [ node_info['original_text'] for node_info in modifiable_nodes if 'original_text' in node_info and node_info['original_text'] is not None ] # Thêm kiểm tra 'original_text' tồn tại và không phải None cho chắc chắn return text_list def count_words(text: str) -> int: """Đếm số từ trong một chuỗi bằng cách tách theo khoảng trắng.""" if not text or text.isspace(): return 0 return len(text.split()) # Helper function to process a batch of valid segments (Unchanged) def _translate_batch_helper(segments_to_translate, original_indices_1based, source_lang, target_lang): """Handles preprocessing, translation, postprocessing, and error handling for a batch.""" batch_results = [None] * len(segments_to_translate) if not segments_to_translate: return [] try: processed_segments = preprocess_text(segments_to_translate) translated_segments = translate_text(processed_segments, source_lang, target_lang) final_translated_segments = postprocess_text(translated_segments) if len(final_translated_segments) == len(segments_to_translate): batch_results = final_translated_segments else: print(f" *** CRITICAL ERROR: Batch translation result count mismatch! Expected {len(segments_to_translate)}, got {len(final_translated_segments)}. Marking batch as failed.") error_msg = "" batch_results = [error_msg] * len(segments_to_translate) except Exception as e: print(f" *** ERROR during batch translation: {e}. Marking batch as failed.") # traceback.print_exc() # Uncomment for detailed debug error_msg = "" batch_results = [error_msg] * len(segments_to_translate) return batch_results def translate_xlsx(file_id, file_name, source_lang='en', target_lang='vi', batch_size_segments=50, max_words_per_segment=100, delay_between_requests=1): """ Dịch file XLSX, chia thành batch động, dịch riêng các segment quá dài. Args: input_filepath (str): Đường dẫn đến file XLSX đầu vào. output_filepath (str): Đường dẫn để lưu file XLSX đã dịch. source_lang (str): Mã ngôn ngữ nguồn. target_lang (str): Mã ngôn ngữ đích. batch_size_segments (int): Số lượng đoạn text tối đa MONG MUỐN trong mỗi lần gọi API. max_words_per_segment (int): Giới hạn từ tối đa cho một segment để được dịch theo batch. Các segment dài hơn sẽ được dịch riêng lẻ. delay_between_requests (int): Thời gian chờ (giây) giữa các lần gọi API dịch. """ client = MongoClient("mongodb+srv://admin:1highbar456@cluster0.equkm.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0") db = client['excel'] fs = gridfs.GridFS(db, collection='root_file') ppt_file = fs.get(file_id) excel_file = BytesIO(ppt_file.read()) xml_folder = unzip_office_file(excel_file) modifiable_nodes, global_data = extract_text_from_sheet(xml_folder) original_texts = get_text_list_from_nodes(modifiable_nodes) all_results = [None] * len(original_texts) current_index = 0 processed_count = 0 api_call_counter = 0 # Track API calls for delay logic while current_index < len(original_texts): batch_texts_to_translate = [] batch_original_indices = [] # 0-based indices for assignment batch_end_index = min(current_index + batch_size_segments, len(original_texts)) found_long_segment_at = -1 # 0-based index in original_texts # 1. Build the next potential batch, stopping if a long segment is found for i in range(current_index, batch_end_index): segment = original_texts[i] word_count = count_words(segment) if word_count <= max_words_per_segment: batch_texts_to_translate.append(segment) batch_original_indices.append(i) else: found_long_segment_at = i break # Stop building this batch # --- Process the findings --- # 2. Translate the VALID batch collected *before* the long segment (if any) if batch_texts_to_translate: # Add delay BEFORE the API call if it's not the very first call if api_call_counter > 0 and delay_between_requests > 0: time.sleep(delay_between_requests) translated_batch = _translate_batch_helper( batch_texts_to_translate, [idx + 1 for idx in batch_original_indices], # 1-based for logging source_lang, target_lang ) api_call_counter += 1 # Assign results back for batch_idx, original_idx in enumerate(batch_original_indices): all_results[original_idx] = translated_batch[batch_idx] processed_count += len(batch_texts_to_translate) # 3. Handle the long segment INDIVIDUALLY (if one was found) if found_long_segment_at != -1: long_segment_index = found_long_segment_at long_segment_text = str(original_texts[long_segment_index]) # word_count = count_words(long_segment_text) # Recalculate for log clarity try: translated = translate_single_text(long_segment_text, source_lang, target_lang) final = [translated] api_call_counter += 1 if len(final) == 1: all_results[long_segment_index] = final[0] else: print(f" *** CRITICAL ERROR: Long segment translation result count mismatch! Expected 1, got {len(final)}. Marking as failed.") all_results[long_segment_index] = "" except Exception as e: print(f" *** ERROR during translation of long segment {long_segment_index + 1}: {e}. Marking as failed.") # traceback.print_exc() # Uncomment for detailed debug all_results[long_segment_index] = "" # Do not increment api_call_counter if the API call itself failed before returning processed_count += 1 # Update current_index to start AFTER this long segment current_index = long_segment_index + 1 else: # No long segment was found in the range checked. # Move current_index to the end of the range examined. current_index = batch_end_index missing_count = 0 final_texts_for_nodes = [] for i, res in enumerate(all_results): if res is None: print(f"LỖI LOGIC: Segment {i+1} không được xử lý! Giữ lại text gốc: '{original_texts[i]}'") final_texts_for_nodes.append(original_texts[i]) missing_count += 1 else: final_texts_for_nodes.append(res) if missing_count > 0: print(f"CẢNH BÁO NGHIÊM TRỌNG: {missing_count} segments bị bỏ lỡ trong quá trình xử lý.") if len(final_texts_for_nodes) != len(original_texts): print(f"LỖI NGHIÊM TRỌNG: Số lượng text cuối cùng ({len(final_texts_for_nodes)}) không khớp với gốc ({len(original_texts)}). Hủy bỏ cập nhật.") else: # Gán vào node for i, node_info in enumerate(modifiable_nodes): node_info['modified_text'] = final_texts_for_nodes[i] save_success = apply_and_save_changes(modifiable_nodes, global_data) if not save_success: print("LỖI NGHIÊM TRỌNG: Không thể lưu thay đổi vào file XML.") else: # Only zip if saving XML was successful final_id = zip_folder_to_excel_file(xml_folder, file_name) if final_id: shutil.rmtree(xml_folder) # Mark folder as 'handled' by zipping else: print("LỖI NGHIÊM TRỌNG: Không thể tạo file XLSX đã dịch cuối cùng.") return final_id