import os import zipfile import copy import time import xml.etree.ElementTree as ET from typing import List, Dict, Any, Optional, Tuple from utils.utils import translate_text, unzip_office_file, preprocess_text, postprocess_text, translate_single_text from pymongo import MongoClient import gridfs from io import BytesIO import shutil import io NS_MAIN = {'main': 'http://schemas.openxmlformats.org/spreadsheetml/2006/main'} NS_DRAWING = {'xdr': "http://schemas.openxmlformats.org/drawingml/2006/spreadsheetDrawing"} NS_A = {'a': "http://schemas.openxmlformats.org/drawingml/2006/main"} # --- Hàm đăng ký namespace (quan trọng khi ghi file) --- def register_namespaces(xml_file): """Đọc và đăng ký các namespace từ file XML.""" namespaces = dict([ node for _, node in ET.iterparse(xml_file, events=['start-ns']) ]) for ns, uri in namespaces.items(): ET.register_namespace(ns, uri) # Đăng ký thêm namespace phổ biến nếu chưa có if 'main' not in namespaces and '' not in namespaces and NS_MAIN['main'] not in namespaces.values(): ET.register_namespace('', NS_MAIN['main']) elif 'main' not in namespaces and NS_MAIN['main'] not in namespaces.values(): ET.register_namespace('main', NS_MAIN['main']) # Đăng ký namespaces cho drawing nếu cần if 'xdr' not in namespaces and NS_DRAWING['xdr'] not in namespaces.values(): ET.register_namespace('xdr', NS_DRAWING['xdr']) if 'a' not in namespaces and NS_A['a'] not in namespaces.values(): ET.register_namespace('a', NS_A['a']) def extract_text_from_sheet(unzipped_folder_path: str) -> Optional[Tuple[List[Dict[str, Any]], Dict[str, Any]]]: """ Trích xuất text, lưu lại định dạng của run đầu tiên nếu là Rich Text, bao gồm cả text từ TextBoxes trong drawings. """ modifiable_nodes = [] shared_strings_path = os.path.join(unzipped_folder_path, "xl", "sharedStrings.xml") worksheets_folder = os.path.join(unzipped_folder_path, "xl", "worksheets") drawings_folder = os.path.join(unzipped_folder_path, "xl", "drawings") # Thêm dòng này shared_tree = None sheet_trees = {} drawing_trees = {} # Thêm dòng này # --- Xử lý sharedStrings.xml --- if os.path.exists(shared_strings_path): try: register_namespaces(shared_strings_path) # Đảm bảo register_namespaces được gọi shared_tree = ET.parse(shared_strings_path) root_shared = shared_tree.getroot() for si_element in root_shared.findall('main:si', NS_MAIN): text_parts = [] # Tìm tất cả con, bất kể chúng nằm trong hay không t_elements = si_element.findall('.//main:t', NS_MAIN) first_r = si_element.find('./main:r', NS_MAIN) first_rpr_clone = None is_rich_text = first_r is not None # Rich text nếu có ít nhất một if is_rich_text: # Cố gắng tìm bên trong đầu tiên first_rpr_candidate = si_element.find('./main:r/main:rPr', NS_MAIN) if first_rpr_candidate is not None: first_rpr_clone = copy.deepcopy(first_rpr_candidate) else: # Nếu đầu tiên không có , kiểm tra (Phonetic properties, ít gặp hơn) # Hoặc có thể không có định dạng nào cụ thể ở run đầu pass for t_node in t_elements: if t_node.text: text_parts.append(t_node.text) full_text = "".join(text_parts) if not full_text or full_text.isspace(): continue # Logic xác định type dựa trên sự hiện diện của đã được điều chỉnh if is_rich_text : # Chỉ cần có là đủ, first_rpr_clone có thể là None modifiable_nodes.append({ 'type': 'shared_rich', 'original_text': full_text, 'element': si_element, 'first_format': first_rpr_clone, # Sẽ là None nếu đầu không có 'source_file': os.path.join("xl", "sharedStrings.xml"), 'sheet_name': None }) elif t_elements: # Trường hợp có nhưng không có nào (simple shared string) # Tìm đầu tiên trực tiếp dưới nếu không phải rich text # Hoặc nếu cấu trúc là ... # Trong trường hợp này, element nên là si_element để khi apply_changes, # ta sẽ tạo cấu trúc ... nếu có định dạng # hoặc ... nếu không. # Tuy nhiên, để đơn giản hóa, nếu không có , ta coi element là đầu tiên # và không áp dụng "first_format" (vì nó sẽ là None). # Hoặc, ta có thể luôn coi là element cho shared strings. # Lựa chọn hiện tại: nếu không có , element là đầu tiên tìm thấy. direct_t = si_element.find('./main:t', NS_MAIN) if direct_t is not None: modifiable_nodes.append({ 'type': 'shared_simple', 'original_text': full_text, 'element': direct_t, # Tham chiếu 'first_format': None, 'source_file': os.path.join("xl", "sharedStrings.xml"), 'sheet_name': None }) # else: ít khả năng xảy ra nếu t_elements có phần tử except Exception as e: print(f"Lỗi xử lý sharedStrings: {e}") import traceback traceback.print_exc() # --- Xử lý các file sheetX.xml (Inline Strings) --- if os.path.isdir(worksheets_folder): for sheet_filename in sorted(os.listdir(worksheets_folder)): if sheet_filename.lower().endswith(".xml"): sheet_file_path = os.path.join(worksheets_folder, sheet_filename) try: register_namespaces(sheet_file_path) # Đảm bảo register_namespaces được gọi sheet_tree = ET.parse(sheet_file_path) sheet_trees[sheet_filename] = sheet_tree root_sheet = sheet_tree.getroot() for cell in root_sheet.findall('.//main:c[@t="inlineStr"]', NS_MAIN): t_element = cell.find('.//main:is/main:t', NS_MAIN) # Sửa lại tìm kiếm if t_element is not None and t_element.text is not None and t_element.text.strip(): modifiable_nodes.append({ 'type': 'inline', 'original_text': t_element.text, 'element': t_element, 'first_format': None, 'source_file': os.path.join("xl", "worksheets", sheet_filename), 'sheet_name': sheet_filename }) except Exception as e: print(f"Lỗi xử lý sheet {sheet_filename}: {e}") import traceback traceback.print_exc() else: print(f"Cảnh báo: Không tìm thấy thư mục worksheets: {worksheets_folder}") # --- Xử lý các file drawingX.xml (Text Boxes, Shapes with Text) --- if os.path.isdir(drawings_folder): for drawing_filename in sorted(os.listdir(drawings_folder)): if drawing_filename.lower().endswith(".xml"): drawing_file_path = os.path.join(drawings_folder, drawing_filename) try: register_namespaces(drawing_file_path) # Đảm bảo register_namespaces được gọi drawing_tree = ET.parse(drawing_file_path) drawing_trees[drawing_filename] = drawing_tree root_drawing = drawing_tree.getroot() # TextBoxes và Shapes có text thường nằm trong (shape) -> (text body) # Bên trong là các (paragraph) for p_element in root_drawing.findall('.//xdr:txBody/a:p', {**NS_DRAWING, **NS_A}): text_parts = [] # Lấy text từ tất cả trong paragraph này t_elements = p_element.findall('.//a:t', NS_A) first_r = p_element.find('./a:r', NS_A) # Tìm con trực tiếp đầu tiên của first_rpr_clone = None # Định dạng của run đầu tiên trong paragraph is_rich_text_paragraph = first_r is not None # Coi là rich nếu có if is_rich_text_paragraph: # Tìm bên trong đầu tiên của first_rpr = first_r.find('./a:rPr', NS_A) if first_rpr is not None: first_rpr_clone = copy.deepcopy(first_rpr) for t_node in t_elements: if t_node.text: text_parts.append(t_node.text) full_text = "".join(text_parts) if not full_text or full_text.isspace(): continue # Lưu node là vì chúng ta sẽ thay thế toàn bộ nội dung của nó # (các bên trong) modifiable_nodes.append({ 'type': 'drawing_text', # Loại mới cho text trong drawing 'original_text': full_text, 'element': p_element, # Tham chiếu đến 'first_format': first_rpr_clone, # Lưu định dạng của đầu tiên (hoặc None) 'source_file': os.path.join("xl", "drawings", drawing_filename), 'sheet_name': None # Có thể tìm cách liên kết ngược lại sheet nếu cần }) except Exception as e: print(f"Lỗi xử lý drawing {drawing_filename}: {e}") import traceback traceback.print_exc() else: print(f"Thông tin: Không tìm thấy thư mục drawings: {drawings_folder}") global_data = { "shared_tree": shared_tree, "sheet_trees": sheet_trees, "drawing_trees": drawing_trees, # Thêm dòng này "shared_strings_path": shared_strings_path, "worksheets_folder": worksheets_folder, "drawings_folder": drawings_folder # Thêm dòng này } return modifiable_nodes, global_data\ def apply_and_save_changes(modified_nodes_data: List[Dict[str, Any]], global_data: Dict[str, Any]) -> bool: """ Cập nhật text, giữ lại định dạng đầu tiên cho Rich Text / Drawing Text, và lưu file XML. """ if not global_data: print("Lỗi: Thiếu global_data."); return False updated_files = set() try: ET.register_namespace('xml', "http://www.w3.org/XML/1998/namespace") # Đảm bảo các namespace chính được đăng ký trước khi thao tác ET.register_namespace('', NS_MAIN['main']) # Default cho spreadsheet ET.register_namespace('main', NS_MAIN['main']) # Hoặc với prefix 'main' ET.register_namespace('xdr', NS_DRAWING['xdr']) ET.register_namespace('a', NS_A['a']) except ValueError: # Có thể đã được đăng ký pass for node_info in modified_nodes_data: if 'modified_text' in node_info and node_info['element'] is not None: element = node_info['element'] modified_text = node_info['modified_text'] original_text = node_info.get('original_text', '') node_type = node_info.get('type', '') first_format = node_info.get('first_format') if original_text != modified_text: # --- Xử lý Rich Text (sharedStrings): Tạo lại cấu trúc [] --- if node_type == 'shared_rich': si_element = element for child in list(si_element): # Xóa con cũ của si_element.remove(child) new_r = ET.Element(f"{{{NS_MAIN['main']}}}r") if first_format is not None: new_r.append(first_format) new_t = ET.Element(f"{{{NS_MAIN['main']}}}t") new_t.text = modified_text new_t.set('{http://www.w3.org/XML/1998/namespace}space', 'preserve') new_r.append(new_t) si_element.append(new_r) updated_files.add(node_info['source_file']) # --- Xử lý Simple Text (sharedStrings) hoặc Inline Text: Cập nhật thẻ --- elif node_type in ['shared_simple', 'inline']: t_element = element # element ở đây là thẻ t_element.text = modified_text t_element.set('{http://www.w3.org/XML/1998/namespace}space', 'preserve') updated_files.add(node_info['source_file']) # --- Xử lý Text trong Drawing (TextBoxes, Shapes): Tạo lại cấu trúc [] --- elif node_type == 'drawing_text': p_element = element # element ở đây là thẻ for child in list(p_element): # Xóa con cũ của (thường là các hoặc ) p_element.remove(child) # Tạo run mới new_r = ET.Element(f"{{{NS_A['a']}}}r") # Nếu có định dạng đã lưu, thêm nó vào mới if first_format is not None: # first_format ở đây là new_r.append(first_format) # Tạo thẻ text mới new_t = ET.Element(f"{{{NS_A['a']}}}t") new_t.text = modified_text # Trong DrawingML, xml:space="preserve" thường không cần thiết cho # vì việc xuống dòng được kiểm soát bởi hoặc các paragraph riêng biệt. # Tuy nhiên, việc thêm nó không gây hại. new_r.append(new_t) # Thêm vào p_element.append(new_r) # Thêm vào # Một số text box có thể có để định dạng cuối paragraph. # Nếu muốn giữ lại, cần logic phức tạp hơn. # Hiện tại, chúng ta chỉ tạo lại với một run duy nhất. updated_files.add(node_info['source_file']) # print(f"Applied first format to Drawing Text in {node_info['source_file']}") else: print(f"Cảnh báo: Loại node không xác định '{node_type}' cho text '{original_text}'") # --- Lưu lại các file XML đã thay đổi --- success = True shared_tree = global_data.get("shared_tree"); shared_strings_path = global_data.get("shared_strings_path") sheet_trees = global_data.get("sheet_trees", {}); worksheets_folder = global_data.get("worksheets_folder") drawing_trees = global_data.get("drawing_trees", {}); drawings_folder = global_data.get("drawings_folder") # Thêm shared_strings_relative_path = os.path.join("xl", "sharedStrings.xml") if shared_tree and shared_strings_path and shared_strings_relative_path in updated_files: try: shared_tree.write(shared_strings_path, encoding='utf-8', xml_declaration=True) except Exception as e: print(f"Lỗi lưu {shared_strings_path}: {e}"); success = False if worksheets_folder and os.path.exists(worksheets_folder): for sheet_filename, sheet_tree in sheet_trees.items(): sheet_relative_path = os.path.join("xl", "worksheets", sheet_filename) if sheet_relative_path in updated_files: sheet_file_path = os.path.join(worksheets_folder, sheet_filename) try: sheet_tree.write(sheet_file_path, encoding='utf-8', xml_declaration=True) except Exception as e: print(f"Lỗi lưu {sheet_file_path}: {e}"); success = False # Lưu các file drawing đã thay đổi if drawings_folder and os.path.exists(drawings_folder): for drawing_filename, drawing_tree in drawing_trees.items(): drawing_relative_path = os.path.join("xl", "drawings", drawing_filename) if drawing_relative_path in updated_files: drawing_file_path = os.path.join(drawings_folder, drawing_filename) try: # Đảm bảo namespaces được đăng ký đúng cách TRƯỚC KHI GHI # register_namespaces(drawing_file_path) # Có thể không cần nếu đã làm ở extract # Hoặc đăng ký cứng các namespace cần thiết: # ET.register_namespace('xdr', NS_DRAWING['xdr']) # ET.register_namespace('a', NS_A['a']) # (Đã chuyển lên đầu hàm apply_and_save_changes) drawing_tree.write(drawing_file_path, encoding='utf-8', xml_declaration=True) except Exception as e: print(f"Lỗi lưu {drawing_file_path}: {e}"); success = False if success and updated_files: print(f"Đã lưu thành công {len(updated_files)} file XML đã sửa đổi.") elif not updated_files: print("Không có file XML nào cần cập nhật.") ; return True # Vẫn coi là success nếu không có gì thay đổi return success def zip_folder_to_excel_file(folder_path, file_name): try: # Nén thư mục thành file .xlsx trong RAM xlsx_buffer = io.BytesIO() with zipfile.ZipFile(xlsx_buffer, 'w', zipfile.ZIP_DEFLATED) as zipf: for root, _, files in os.walk(folder_path): for file in files: file_path = os.path.join(root, file) archive_path = os.path.relpath(file_path, folder_path) zipf.write(file_path, archive_path) xlsx_buffer.seek(0) client = MongoClient("mongodb+srv://admin:1highbar456@cluster0.equkm.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0") db = client['excel'] fs = gridfs.GridFS(db, collection='final_file') file_id = fs.put(xlsx_buffer.read(), filename=file_name) print(f"✅ Đã lưu file Excel vào MongoDB với ID: {file_id}") return file_id except Exception as e: print(f"❌ Lỗi khi nén và lưu Excel vào MongoDB: {e}") return None def get_text_list_from_nodes(modifiable_nodes: Optional[List[Dict[str, Any]]]) -> List[str]: if modifiable_nodes is None: return [] # Trả về list rỗng nếu đầu vào là None # Sử dụng list comprehension để lấy giá trị của key 'original_text' từ mỗi dictionary text_list = [ node_info['original_text'] for node_info in modifiable_nodes if 'original_text' in node_info and node_info['original_text'] is not None ] # Thêm kiểm tra 'original_text' tồn tại và không phải None cho chắc chắn return text_list def count_words(text: str) -> int: """Đếm số từ trong một chuỗi bằng cách tách theo khoảng trắng.""" if not text or text.isspace(): return 0 return len(text.split()) # Helper function to process a batch of valid segments (Unchanged) def _translate_batch_helper(segments_to_translate, original_indices_1based, source_lang, target_lang): """Handles preprocessing, translation, postprocessing, and error handling for a batch.""" batch_results = [None] * len(segments_to_translate) if not segments_to_translate: return [] try: processed_segments = preprocess_text(segments_to_translate) translated_segments = translate_text(processed_segments, source_lang, target_lang) final_translated_segments = postprocess_text(translated_segments) if len(final_translated_segments) == len(segments_to_translate): batch_results = final_translated_segments else: print(f" *** CRITICAL ERROR: Batch translation result count mismatch! Expected {len(segments_to_translate)}, got {len(final_translated_segments)}. Marking batch as failed.") error_msg = "" batch_results = [error_msg] * len(segments_to_translate) except Exception as e: print(f" *** ERROR during batch translation: {e}. Marking batch as failed.") # traceback.print_exc() # Uncomment for detailed debug error_msg = "" batch_results = [error_msg] * len(segments_to_translate) return batch_results def translate_xlsx(file_id, file_name, source_lang='en', target_lang='vi', batch_size_segments=50, max_words_per_segment=100, delay_between_requests=1): """ Dịch file XLSX, chia thành batch động, dịch riêng các segment quá dài. Args: input_filepath (str): Đường dẫn đến file XLSX đầu vào. output_filepath (str): Đường dẫn để lưu file XLSX đã dịch. source_lang (str): Mã ngôn ngữ nguồn. target_lang (str): Mã ngôn ngữ đích. batch_size_segments (int): Số lượng đoạn text tối đa MONG MUỐN trong mỗi lần gọi API. max_words_per_segment (int): Giới hạn từ tối đa cho một segment để được dịch theo batch. Các segment dài hơn sẽ được dịch riêng lẻ. delay_between_requests (int): Thời gian chờ (giây) giữa các lần gọi API dịch. """ client = MongoClient("mongodb+srv://admin:1highbar456@cluster0.equkm.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0") db = client['excel'] fs = gridfs.GridFS(db, collection='root_file') ppt_file = fs.get(file_id) excel_file = BytesIO(ppt_file.read()) xml_folder = unzip_office_file(excel_file) modifiable_nodes, global_data = extract_text_from_sheet(xml_folder) original_texts = get_text_list_from_nodes(modifiable_nodes) all_results = [None] * len(original_texts) current_index = 0 processed_count = 0 api_call_counter = 0 # Track API calls for delay logic while current_index < len(original_texts): batch_texts_to_translate = [] batch_original_indices = [] # 0-based indices for assignment batch_end_index = min(current_index + batch_size_segments, len(original_texts)) found_long_segment_at = -1 # 0-based index in original_texts # 1. Build the next potential batch, stopping if a long segment is found for i in range(current_index, batch_end_index): segment = original_texts[i] word_count = count_words(segment) if word_count <= max_words_per_segment: batch_texts_to_translate.append(segment) batch_original_indices.append(i) else: found_long_segment_at = i break # Stop building this batch # --- Process the findings --- # 2. Translate the VALID batch collected *before* the long segment (if any) if batch_texts_to_translate: # Add delay BEFORE the API call if it's not the very first call if api_call_counter > 0 and delay_between_requests > 0: time.sleep(delay_between_requests) translated_batch = _translate_batch_helper( batch_texts_to_translate, [idx + 1 for idx in batch_original_indices], # 1-based for logging source_lang, target_lang ) api_call_counter += 1 # Assign results back for batch_idx, original_idx in enumerate(batch_original_indices): all_results[original_idx] = translated_batch[batch_idx] processed_count += len(batch_texts_to_translate) # 3. Handle the long segment INDIVIDUALLY (if one was found) if found_long_segment_at != -1: long_segment_index = found_long_segment_at long_segment_text = str(original_texts[long_segment_index]) # word_count = count_words(long_segment_text) # Recalculate for log clarity try: translated = translate_single_text(long_segment_text, source_lang, target_lang) final = [translated] api_call_counter += 1 if len(final) == 1: all_results[long_segment_index] = final[0] else: print(f" *** CRITICAL ERROR: Long segment translation result count mismatch! Expected 1, got {len(final)}. Marking as failed.") all_results[long_segment_index] = "" except Exception as e: print(f" *** ERROR during translation of long segment {long_segment_index + 1}: {e}. Marking as failed.") # traceback.print_exc() # Uncomment for detailed debug all_results[long_segment_index] = "" # Do not increment api_call_counter if the API call itself failed before returning processed_count += 1 # Update current_index to start AFTER this long segment current_index = long_segment_index + 1 else: # No long segment was found in the range checked. # Move current_index to the end of the range examined. current_index = batch_end_index missing_count = 0 final_texts_for_nodes = [] for i, res in enumerate(all_results): if res is None: print(f"LỖI LOGIC: Segment {i+1} không được xử lý! Giữ lại text gốc: '{original_texts[i]}'") final_texts_for_nodes.append(original_texts[i]) missing_count += 1 else: final_texts_for_nodes.append(res) if missing_count > 0: print(f"CẢNH BÁO NGHIÊM TRỌNG: {missing_count} segments bị bỏ lỡ trong quá trình xử lý.") if len(final_texts_for_nodes) != len(original_texts): print(f"LỖI NGHIÊM TRỌNG: Số lượng text cuối cùng ({len(final_texts_for_nodes)}) không khớp với gốc ({len(original_texts)}). Hủy bỏ cập nhật.") else: # Gán vào node for i, node_info in enumerate(modifiable_nodes): node_info['modified_text'] = final_texts_for_nodes[i] save_success = apply_and_save_changes(modifiable_nodes, global_data) if not save_success: print("LỖI NGHIÊM TRỌNG: Không thể lưu thay đổi vào file XML.") else: # Only zip if saving XML was successful final_id = zip_folder_to_excel_file(xml_folder, file_name) if final_id: shutil.rmtree(xml_folder) # Mark folder as 'handled' by zipping else: print("LỖI NGHIÊM TRỌNG: Không thể tạo file XLSX đã dịch cuối cùng.") return final_id