import os import zipfile import copy import time import xml.etree.ElementTree as ET from typing import List, Dict, Any, Optional, Tuple from utils.utils import translate_text, unzip_office_file, preprocess_text, postprocess_text, translate_single_text from pymongo import MongoClient import gridfs from io import BytesIO import shutil import io import re from typing import Dict NS_MAIN = {'main': 'http://schemas.openxmlformats.org/spreadsheetml/2006/main'} NS_DRAWING = {'xdr': "http://schemas.openxmlformats.org/drawingml/2006/spreadsheetDrawing"} NS_A = {'a': "http://schemas.openxmlformats.org/drawingml/2006/main"} # --- Hàm đăng ký namespace (quan trọng khi ghi file) --- def register_namespaces(xml_file): """Đọc và đăng ký các namespace từ file XML.""" namespaces = dict([ node for _, node in ET.iterparse(xml_file, events=['start-ns']) ]) for ns, uri in namespaces.items(): ET.register_namespace(ns, uri) # Đăng ký thêm namespace phổ biến nếu chưa có if 'main' not in namespaces and '' not in namespaces and NS_MAIN['main'] not in namespaces.values(): ET.register_namespace('', NS_MAIN['main']) elif 'main' not in namespaces and NS_MAIN['main'] not in namespaces.values(): ET.register_namespace('main', NS_MAIN['main']) # Đăng ký namespaces cho drawing nếu cần if 'xdr' not in namespaces and NS_DRAWING['xdr'] not in namespaces.values(): ET.register_namespace('xdr', NS_DRAWING['xdr']) if 'a' not in namespaces and NS_A['a'] not in namespaces.values(): ET.register_namespace('a', NS_A['a']) def extract_text_from_sheet(unzipped_folder_path: str) -> Optional[Tuple[List[Dict[str, Any]], Dict[str, Any]]]: """ Trích xuất text, lưu lại định dạng của run đầu tiên nếu là Rich Text, bao gồm cả text từ TextBoxes trong drawings. """ modifiable_nodes = [] shared_strings_path = os.path.join(unzipped_folder_path, "xl", "sharedStrings.xml") worksheets_folder = os.path.join(unzipped_folder_path, "xl", "worksheets") drawings_folder = os.path.join(unzipped_folder_path, "xl", "drawings") # Thêm dòng này shared_tree = None sheet_trees = {} drawing_trees = {} # Thêm dòng này # --- Xử lý sharedStrings.xml --- if os.path.exists(shared_strings_path): try: register_namespaces(shared_strings_path) # Đảm bảo register_namespaces được gọi shared_tree = ET.parse(shared_strings_path) root_shared = shared_tree.getroot() for si_element in root_shared.findall('main:si', NS_MAIN): text_parts = [] # Tìm tất cả con, bất kể chúng nằm trong hay không t_elements = si_element.findall('.//main:t', NS_MAIN) first_r = si_element.find('./main:r', NS_MAIN) first_rpr_clone = None is_rich_text = first_r is not None # Rich text nếu có ít nhất một if is_rich_text: # Cố gắng tìm bên trong đầu tiên first_rpr_candidate = si_element.find('./main:r/main:rPr', NS_MAIN) if first_rpr_candidate is not None: first_rpr_clone = copy.deepcopy(first_rpr_candidate) else: # Nếu đầu tiên không có , kiểm tra (Phonetic properties, ít gặp hơn) # Hoặc có thể không có định dạng nào cụ thể ở run đầu pass for t_node in t_elements: if t_node.text: text_parts.append(t_node.text) full_text = "".join(text_parts) if not full_text or full_text.isspace(): continue # Logic xác định type dựa trên sự hiện diện của đã được điều chỉnh if is_rich_text : # Chỉ cần có là đủ, first_rpr_clone có thể là None modifiable_nodes.append({ 'type': 'shared_rich', 'original_text': full_text, 'element': si_element, 'first_format': first_rpr_clone, # Sẽ là None nếu đầu không có 'source_file': os.path.join("xl", "sharedStrings.xml"), 'sheet_name': None }) elif t_elements: direct_t = si_element.find('./main:t', NS_MAIN) if direct_t is not None: modifiable_nodes.append({ 'type': 'shared_simple', 'original_text': full_text, 'element': direct_t, # Tham chiếu 'first_format': None, 'source_file': os.path.join("xl", "sharedStrings.xml"), 'sheet_name': None }) # else: ít khả năng xảy ra nếu t_elements có phần tử except Exception as e: print(f"Lỗi xử lý sharedStrings: {e}") import traceback traceback.print_exc() # --- Xử lý các file sheetX.xml (Inline Strings) --- if os.path.isdir(worksheets_folder): for sheet_filename in sorted(os.listdir(worksheets_folder)): if sheet_filename.lower().endswith(".xml"): sheet_file_path = os.path.join(worksheets_folder, sheet_filename) try: register_namespaces(sheet_file_path) # Đảm bảo register_namespaces được gọi sheet_tree = ET.parse(sheet_file_path) sheet_trees[sheet_filename] = sheet_tree root_sheet = sheet_tree.getroot() for cell in root_sheet.findall('.//main:c[@t="inlineStr"]', NS_MAIN): t_element = cell.find('.//main:is/main:t', NS_MAIN) # Sửa lại tìm kiếm if t_element is not None and t_element.text is not None and t_element.text.strip(): modifiable_nodes.append({ 'type': 'inline', 'original_text': t_element.text, 'element': t_element, 'first_format': None, 'source_file': os.path.join("xl", "worksheets", sheet_filename), 'sheet_name': sheet_filename }) except Exception as e: print(f"Lỗi xử lý sheet {sheet_filename}: {e}") import traceback traceback.print_exc() else: print(f"Cảnh báo: Không tìm thấy thư mục worksheets: {worksheets_folder}") # --- Xử lý các file drawingX.xml (Text Boxes, Shapes with Text) --- if os.path.isdir(drawings_folder): for drawing_filename in sorted(os.listdir(drawings_folder)): if drawing_filename.lower().endswith(".xml"): drawing_file_path = os.path.join(drawings_folder, drawing_filename) try: register_namespaces(drawing_file_path) # Đảm bảo register_namespaces được gọi drawing_tree = ET.parse(drawing_file_path) drawing_trees[drawing_filename] = drawing_tree root_drawing = drawing_tree.getroot() # TextBoxes và Shapes có text thường nằm trong (shape) -> (text body) # Bên trong là các (paragraph) for p_element in root_drawing.findall('.//xdr:txBody/a:p', {**NS_DRAWING, **NS_A}): text_parts = [] # Lấy text từ tất cả trong paragraph này t_elements = p_element.findall('.//a:t', NS_A) first_r = p_element.find('./a:r', NS_A) # Tìm con trực tiếp đầu tiên của first_rpr_clone = None # Định dạng của run đầu tiên trong paragraph is_rich_text_paragraph = first_r is not None # Coi là rich nếu có if is_rich_text_paragraph: # Tìm bên trong đầu tiên của first_rpr = first_r.find('./a:rPr', NS_A) if first_rpr is not None: first_rpr_clone = copy.deepcopy(first_rpr) for t_node in t_elements: if t_node.text: text_parts.append(t_node.text) full_text = "".join(text_parts) if not full_text or full_text.isspace(): continue # Lưu node là vì chúng ta sẽ thay thế toàn bộ nội dung của nó # (các bên trong) modifiable_nodes.append({ 'type': 'drawing_text', # Loại mới cho text trong drawing 'original_text': full_text, 'element': p_element, # Tham chiếu đến 'first_format': first_rpr_clone, # Lưu định dạng của đầu tiên (hoặc None) 'source_file': os.path.join("xl", "drawings", drawing_filename), 'sheet_name': None # Có thể tìm cách liên kết ngược lại sheet nếu cần }) except Exception as e: print(f"Lỗi xử lý drawing {drawing_filename}: {e}") import traceback traceback.print_exc() else: print(f"Thông tin: Không tìm thấy thư mục drawings: {drawings_folder}") global_data = { "shared_tree": shared_tree, "sheet_trees": sheet_trees, "drawing_trees": drawing_trees, # Thêm dòng này "shared_strings_path": shared_strings_path, "worksheets_folder": worksheets_folder, "drawings_folder": drawings_folder # Thêm dòng này } return modifiable_nodes, global_data\ def apply_and_save_changes(modified_nodes_data: List[Dict[str, Any]], global_data: Dict[str, Any]) -> bool: """ Cập nhật text, giữ lại định dạng đầu tiên cho Rich Text / Drawing Text, và lưu file XML. """ if not global_data: print("Lỗi: Thiếu global_data."); return False updated_files = set() try: ET.register_namespace('xml', "http://www.w3.org/XML/1998/namespace") # Đảm bảo các namespace chính được đăng ký trước khi thao tác ET.register_namespace('', NS_MAIN['main']) # Default cho spreadsheet ET.register_namespace('main', NS_MAIN['main']) # Hoặc với prefix 'main' ET.register_namespace('xdr', NS_DRAWING['xdr']) ET.register_namespace('a', NS_A['a']) except ValueError: # Có thể đã được đăng ký pass for node_info in modified_nodes_data: if 'modified_text' in node_info and node_info['element'] is not None: element = node_info['element'] modified_text = node_info['modified_text'] original_text = node_info.get('original_text', '') node_type = node_info.get('type', '') first_format = node_info.get('first_format') if original_text != modified_text: # --- Xử lý Rich Text (sharedStrings): Tạo lại cấu trúc [] --- if node_type == 'shared_rich': si_element = element for child in list(si_element): # Xóa con cũ của si_element.remove(child) new_r = ET.Element(f"{{{NS_MAIN['main']}}}r") if first_format is not None: new_r.append(first_format) new_t = ET.Element(f"{{{NS_MAIN['main']}}}t") new_t.text = modified_text new_t.set('{http://www.w3.org/XML/1998/namespace}space', 'preserve') new_r.append(new_t) si_element.append(new_r) updated_files.add(node_info['source_file']) # --- Xử lý Simple Text (sharedStrings) hoặc Inline Text: Cập nhật thẻ --- elif node_type in ['shared_simple', 'inline']: t_element = element # element ở đây là thẻ t_element.text = modified_text t_element.set('{http://www.w3.org/XML/1998/namespace}space', 'preserve') updated_files.add(node_info['source_file']) # --- Xử lý Text trong Drawing (TextBoxes, Shapes): Tạo lại cấu trúc [] --- elif node_type == 'drawing_text': p_element = element # element ở đây là thẻ for child in list(p_element): # Xóa con cũ của (thường là các hoặc ) p_element.remove(child) # Tạo run mới new_r = ET.Element(f"{{{NS_A['a']}}}r") # Nếu có định dạng đã lưu, thêm nó vào mới if first_format is not None: # first_format ở đây là new_r.append(first_format) # Tạo thẻ text mới new_t = ET.Element(f"{{{NS_A['a']}}}t") new_t.text = modified_text new_r.append(new_t) # Thêm vào p_element.append(new_r) # Thêm vào updated_files.add(node_info['source_file']) # print(f"Applied first format to Drawing Text in {node_info['source_file']}") else: print(f"Cảnh báo: Loại node không xác định '{node_type}' cho text '{original_text}'") # --- Lưu lại các file XML đã thay đổi --- success = True shared_tree = global_data.get("shared_tree"); shared_strings_path = global_data.get("shared_strings_path") sheet_trees = global_data.get("sheet_trees", {}); worksheets_folder = global_data.get("worksheets_folder") drawing_trees = global_data.get("drawing_trees", {}); drawings_folder = global_data.get("drawings_folder") # Thêm shared_strings_relative_path = os.path.join("xl", "sharedStrings.xml") if shared_tree and shared_strings_path and shared_strings_relative_path in updated_files: try: shared_tree.write(shared_strings_path, encoding='utf-8', xml_declaration=True) except Exception as e: print(f"Lỗi lưu {shared_strings_path}: {e}"); success = False if worksheets_folder and os.path.exists(worksheets_folder): for sheet_filename, sheet_tree in sheet_trees.items(): sheet_relative_path = os.path.join("xl", "worksheets", sheet_filename) if sheet_relative_path in updated_files: sheet_file_path = os.path.join(worksheets_folder, sheet_filename) try: sheet_tree.write(sheet_file_path, encoding='utf-8', xml_declaration=True) except Exception as e: print(f"Lỗi lưu {sheet_file_path}: {e}"); success = False # Lưu các file drawing đã thay đổi if drawings_folder and os.path.exists(drawings_folder): for drawing_filename, drawing_tree in drawing_trees.items(): drawing_relative_path = os.path.join("xl", "drawings", drawing_filename) if drawing_relative_path in updated_files: drawing_file_path = os.path.join(drawings_folder, drawing_filename) try: drawing_tree.write(drawing_file_path, encoding='utf-8', xml_declaration=True) except Exception as e: print(f"Lỗi lưu {drawing_file_path}: {e}"); success = False if success and updated_files: print(f"Đã lưu thành công {len(updated_files)} file XML đã sửa đổi.") elif not updated_files: print("Không có file XML nào cần cập nhật.") ; return True # Vẫn coi là success nếu không có gì thay đổi return success def translate_sheet_names_via_regex( workbook_xml_path: str, source_lang: str = 'chinese', target_lang: str = 'vietnamese' ): original_to_translated_map: Dict[str, str] = {} modified_content: str = "" file_changed_flag: bool = False # Sử dụng tên biến rõ ràng hơn try: with open(workbook_xml_path, 'r', encoding='utf-8-sig') as f: content = f.read() current_content = content def replace_name_callback(match_obj): nonlocal file_changed_flag # Để sửa đổi biến bên ngoài nonlocal original_to_translated_map attr_prefix = match_obj.group(1) # Ví dụ: '' original_name = original_name_xml_encoded # Tạm thời bỏ qua unescape/escape cho đơn giản ví dụ if not original_name.strip(): return match_obj.group(0) # Trả về chuỗi gốc nếu tên rỗng translated_name = original_name # Mặc định giữ nguyên if original_name in original_to_translated_map and original_to_translated_map[original_name] != original_name: translated_name = original_to_translated_map[original_name] # Nếu đã dịch và có thay đổi, không cần gọi API dịch nữa if translated_name != original_name: # Cần kiểm tra lại vì map có thể lưu tên gốc nếu dịch lỗi print(f"Regex: Sử dụng bản dịch đã có cho '{original_name}' -> '{translated_name}'") file_changed_flag = True # Đảm bảo cờ được set nếu sử dụng bản dịch đã có mà khác gốc else: try: translated_name_raw = translate_single_text(original_name, source_lang, target_lang) if translated_name_raw and translated_name_raw.strip() and translated_name_raw != original_name: translated_name = translated_name_raw[:31] original_to_translated_map[original_name] = translated_name file_changed_flag = True print(f"Regex: Đã dịch sheet: '{original_name}' -> '{translated_name}'") else: original_to_translated_map[original_name] = original_name # Lưu tên gốc nếu dịch lỗi/không đổi # translated_name vẫn là original_name if translated_name_raw and translated_name_raw.strip() and translated_name_raw == original_name: print(f"Bản dịch cho '{original_name}' giống hệt bản gốc, không thay đổi.") elif not (translated_name_raw and translated_name_raw.strip()): print(f"Bản dịch cho '{original_name}' trống hoặc không hợp lệ, giữ nguyên.") except Exception as e_translate: print(f"Lỗi khi gọi hàm dịch cho '{original_name}': {e_translate}") original_to_translated_map[original_name] = original_name translated_name_xml_encoded = translated_name # Tạm thời bỏ qua escape return f"{attr_prefix}{opening_quote}{translated_name_xml_encoded}{opening_quote}{attr_suffix}" sheet_name_pattern = re.compile( r'(]*?\sname=)(["\'])((?:(?!\2).)*?)(\2)([^>]*?>)' ) modified_content = sheet_name_pattern.sub(replace_name_callback, current_content) if file_changed_flag: with open(workbook_xml_path, 'w', encoding='utf-8') as f: f.write(modified_content) print(f"Regex: Đã cập nhật thành công file: {workbook_xml_path}") return original_to_translated_map, True else: print(f"Regex: Không có tên sheet nào được thay đổi trong file: {workbook_xml_path}") except FileNotFoundError: print(f"Lỗi: Không tìm thấy file tại '{workbook_xml_path}'") except Exception as e: print(f"Đã xảy ra lỗi không mong muốn khi xử lý file '{workbook_xml_path}' bằng regex: {e}") import traceback traceback.print_exc() def zip_folder_to_excel_bytes(folder_path): """ Nén toàn bộ thư mục thành file Excel (.xlsx) dưới dạng BytesIO (trong RAM). Trả lại buffer BytesIO chứa nội dung file. """ try: xlsx_buffer = io.BytesIO() with zipfile.ZipFile(xlsx_buffer, 'w', zipfile.ZIP_DEFLATED) as zipf: for root, _, files in os.walk(folder_path): for file in files: file_path = os.path.join(root, file) archive_path = os.path.relpath(file_path, folder_path) zipf.write(file_path, archive_path) xlsx_buffer.seek(0) return xlsx_buffer except Exception as e: print(f"❌ Lỗi khi nén thư mục thành file Excel: {e}") return None def get_text_list_from_nodes(modifiable_nodes: Optional[List[Dict[str, Any]]]) -> List[str]: if modifiable_nodes is None: return [] # Trả về list rỗng nếu đầu vào là None # Sử dụng list comprehension để lấy giá trị của key 'original_text' từ mỗi dictionary text_list = [ node_info['original_text'] for node_info in modifiable_nodes if 'original_text' in node_info and node_info['original_text'] is not None ] # Thêm kiểm tra 'original_text' tồn tại và không phải None cho chắc chắn return text_list def count_words(text: str) -> int: """Đếm số từ trong một chuỗi bằng cách tách theo khoảng trắng.""" if not text or text.isspace(): return 0 return len(text.split()) # Helper function to process a batch of valid segments (Unchanged) def _translate_batch_helper(segments_to_translate, original_indices_1based, source_lang, target_lang): """Handles preprocessing, translation, postprocessing, and error handling for a batch.""" batch_results = [None] * len(segments_to_translate) if not segments_to_translate: return [] try: processed_segments = preprocess_text(segments_to_translate) translated_segments = translate_text(processed_segments, source_lang, target_lang) final_translated_segments = postprocess_text(translated_segments) if len(final_translated_segments) == len(segments_to_translate): batch_results = final_translated_segments else: print(f" *** CRITICAL ERROR: Batch translation result count mismatch! Expected {len(segments_to_translate)}, got {len(final_translated_segments)}. Marking batch as failed.") error_msg = "" batch_results = [error_msg] * len(segments_to_translate) except Exception as e: print(f" *** ERROR during batch translation: {e}. Marking batch as failed.") # traceback.print_exc() # Uncomment for detailed debug error_msg = "" batch_results = [error_msg] * len(segments_to_translate) return batch_results def translate_xlsx(file_io, file_name, source_lang='en', target_lang='vi', batch_size_segments=50, max_words_per_segment=100, delay_between_requests=1): file_io.seek(0) xml_folder = unzip_office_file(file_io) path_to_workbook_xml = os.path.join(xml_folder, "xl", "workbook.xml") translate_sheet_names_via_regex(path_to_workbook_xml, source_lang, target_lang) modifiable_nodes, global_data = extract_text_from_sheet(xml_folder) original_texts = get_text_list_from_nodes(modifiable_nodes) all_results = [None] * len(original_texts) current_index = 0 processed_count = 0 api_call_counter = 0 # Track API calls for delay logic while current_index < len(original_texts): batch_texts_to_translate = [] batch_original_indices = [] # 0-based indices for assignment batch_end_index = min(current_index + batch_size_segments, len(original_texts)) found_long_segment_at = -1 # 0-based index in original_texts # 1. Build the next potential batch, stopping if a long segment is found for i in range(current_index, batch_end_index): segment = original_texts[i] word_count = count_words(segment) if word_count <= max_words_per_segment: batch_texts_to_translate.append(segment) batch_original_indices.append(i) else: found_long_segment_at = i break # Stop building this batch # --- Process the findings --- # 2. Translate the VALID batch collected *before* the long segment (if any) if batch_texts_to_translate: # Add delay BEFORE the API call if it's not the very first call if api_call_counter > 0 and delay_between_requests > 0: time.sleep(delay_between_requests) translated_batch = _translate_batch_helper( batch_texts_to_translate, [idx + 1 for idx in batch_original_indices], # 1-based for logging source_lang, target_lang ) api_call_counter += 1 # Assign results back for batch_idx, original_idx in enumerate(batch_original_indices): all_results[original_idx] = translated_batch[batch_idx] processed_count += len(batch_texts_to_translate) # 3. Handle the long segment INDIVIDUALLY (if one was found) if found_long_segment_at != -1: long_segment_index = found_long_segment_at long_segment_text = str(original_texts[long_segment_index]) # word_count = count_words(long_segment_text) # Recalculate for log clarity try: translated = translate_single_text(long_segment_text, source_lang, target_lang) final = [translated] api_call_counter += 1 if len(final) == 1: all_results[long_segment_index] = final[0] else: print(f" *** CRITICAL ERROR: Long segment translation result count mismatch! Expected 1, got {len(final)}. Marking as failed.") all_results[long_segment_index] = "" except Exception as e: print(f" *** ERROR during translation of long segment {long_segment_index + 1}: {e}. Marking as failed.") # traceback.print_exc() # Uncomment for detailed debug all_results[long_segment_index] = "" # Do not increment api_call_counter if the API call itself failed before returning processed_count += 1 # Update current_index to start AFTER this long segment current_index = long_segment_index + 1 else: # No long segment was found in the range checked. # Move current_index to the end of the range examined. current_index = batch_end_index missing_count = 0 final_texts_for_nodes = [] for i, res in enumerate(all_results): if res is None: print(f"LỖI LOGIC: Segment {i+1} không được xử lý! Giữ lại text gốc: '{original_texts[i]}'") final_texts_for_nodes.append(original_texts[i]) missing_count += 1 else: final_texts_for_nodes.append(res) if missing_count > 0: print(f"CẢNH BÁO NGHIÊM TRỌNG: {missing_count} segments bị bỏ lỡ trong quá trình xử lý.") if len(final_texts_for_nodes) != len(original_texts): print(f"LỖI NGHIÊM TRỌNG: Số lượng text cuối cùng ({len(final_texts_for_nodes)}) không khớp với gốc ({len(original_texts)}). Hủy bỏ cập nhật.") else: # Gán vào node for i, node_info in enumerate(modifiable_nodes): node_info['modified_text'] = final_texts_for_nodes[i] save_success = apply_and_save_changes(modifiable_nodes, global_data) if not save_success: print("LỖI NGHIÊM TRỌNG: Không thể lưu thay đổi vào file XML.") else: # Only zip if saving XML was successful translated_buffer = zip_folder_to_excel_bytes(xml_folder) if translated_buffer: shutil.rmtree(xml_folder) # Mark folder as 'handled' by zipping else: print("LỖI NGHIÊM TRỌNG: Không thể tạo file XLSX đã dịch cuối cùng.") return translated_buffer, file_name