MT_deploy / excel /xlsx.py
mintlee's picture
update xlsx
4d84219
raw
history blame
20.8 kB
import os
import zipfile
import copy
import time
import xml.etree.ElementTree as ET
from typing import List, Dict, Any, Optional, Tuple
from utils.utils import translate_text, unzip_office_file, preprocess_text, postprocess_text, translate_single_text
from pymongo import MongoClient
import gridfs
from io import BytesIO
import shutil
import io
NS_MAIN = {'main': 'http://schemas.openxmlformats.org/spreadsheetml/2006/main'}
# --- Hàm đăng ký namespace (quan trọng khi ghi file) ---
def register_namespaces(xml_file):
"""Đọc và đăng ký các namespace từ file XML."""
namespaces = dict([
node for _, node in ET.iterparse(xml_file, events=['start-ns'])
])
for ns, uri in namespaces.items():
ET.register_namespace(ns, uri)
# Đăng ký thêm namespace phổ biến nếu chưa có
if 'main' not in namespaces and '' not in namespaces: # Kiểm tra cả prefix rỗng
ET.register_namespace('', NS_MAIN['main']) # Đăng ký default namespace
elif 'main' not in namespaces:
ET.register_namespace('main', NS_MAIN['main']) # Đăng ký với prefix 'main'
def extract_text_from_sheet(unzipped_folder_path: str) -> Optional[Tuple[List[Dict[str, Any]], Dict[str, Any]]]:
"""
Trích xuất text, lưu lại định dạng của run đầu tiên nếu là Rich Text.
"""
modifiable_nodes = []
shared_strings_path = os.path.join(unzipped_folder_path, "xl", "sharedStrings.xml")
worksheets_folder = os.path.join(unzipped_folder_path, "xl", "worksheets")
shared_tree = None
sheet_trees = {}
# --- Xử lý sharedStrings.xml ---
if os.path.exists(shared_strings_path):
try:
register_namespaces(shared_strings_path)
shared_tree = ET.parse(shared_strings_path)
root_shared = shared_tree.getroot()
for si_element in root_shared.findall('main:si', NS_MAIN):
text_parts = []
t_elements = si_element.findall('.//main:t', NS_MAIN) # Tìm tất cả <t> con
# Tìm run đầu tiên (<r>) và properties (<rPr>) của nó
first_r = si_element.find('./main:r', NS_MAIN) # Tìm <r> con trực tiếp đầu tiên
first_rpr_clone = None # Lưu bản sao của <rPr> đầu tiên
is_rich_text = first_r is not None
if is_rich_text:
# Tìm <rPr> bên trong <r> đầu tiên
first_rpr = first_r.find('./main:rPr', NS_MAIN)
if first_rpr is not None:
# Sao chép sâu để không ảnh hưởng cây gốc và để dùng sau
first_rpr_clone = copy.deepcopy(first_rpr)
# Lấy toàn bộ text
for t_node in t_elements:
if t_node.text:
text_parts.append(t_node.text)
full_text = "".join(text_parts)
if not full_text: continue # Bỏ qua nếu không có text
if is_rich_text:
modifiable_nodes.append({
'type': 'shared_rich',
'original_text': full_text,
'element': si_element, # Tham chiếu <si>
'first_format': first_rpr_clone, # Lưu định dạng <rPr> đầu tiên (hoặc None)
'source_file': os.path.join("xl", "sharedStrings.xml"),
'sheet_name': None
})
elif t_elements: # Không phải rich text, tìm thẻ <t> đơn giản
first_t = si_element.find('./main:t', NS_MAIN)
if first_t is not None:
modifiable_nodes.append({
'type': 'shared_simple',
'original_text': full_text,
'element': first_t, # Tham chiếu <t>
'first_format': None, # Không có định dạng đặc biệt
'source_file': os.path.join("xl", "sharedStrings.xml"),
'sheet_name': None
})
except Exception as e:
print(f"Lỗi xử lý sharedStrings: {e}")
import traceback
traceback.print_exc()
# --- Xử lý các file sheetX.xml (Inline Strings - không có định dạng phức tạp) ---
if os.path.isdir(worksheets_folder):
for sheet_filename in sorted(os.listdir(worksheets_folder)):
if sheet_filename.lower().endswith(".xml"):
# ... (phần đọc và parse sheet tree như cũ) ...
sheet_file_path = os.path.join(worksheets_folder, sheet_filename)
try:
register_namespaces(sheet_file_path)
sheet_tree = ET.parse(sheet_file_path)
sheet_trees[sheet_filename] = sheet_tree
root_sheet = sheet_tree.getroot()
for cell in root_sheet.findall('.//main:c[@t="inlineStr"]', NS_MAIN):
t_element = cell.find('.//main:is/main:t', NS_MAIN)
if t_element is not None and t_element.text is not None:
modifiable_nodes.append({
'type': 'inline',
'original_text': t_element.text,
'element': t_element, # Tham chiếu <t>
'first_format': None, # Inline string không có định dạng <rPr>
'source_file': os.path.join("xl", "worksheets", sheet_filename),
'sheet_name': sheet_filename
})
except Exception as e:
print(f"Lỗi xử lý sheet {sheet_filename}: {e}")
import traceback
traceback.print_exc()
else:
print(f"Lỗi: Không tìm thấy thư mục worksheets: {worksheets_folder}")
global_data = {"shared_tree": shared_tree, "sheet_trees": sheet_trees, "shared_strings_path": shared_strings_path, "worksheets_folder": worksheets_folder}
return modifiable_nodes, global_data
def apply_and_save_changes(modified_nodes_data: List[Dict[str, Any]], global_data: Dict[str, Any]) -> bool:
"""
Cập nhật text, giữ lại định dạng đầu tiên cho Rich Text, và lưu file XML.
"""
if not global_data: print("Lỗi: Thiếu global_data."); return False
updated_files = set()
try: ET.register_namespace('xml', "http://www.w3.org/XML/1998/namespace")
except ValueError: pass
for node_info in modified_nodes_data:
if 'modified_text' in node_info and node_info['element'] is not None:
element = node_info['element']
modified_text = node_info['modified_text']
original_text = node_info.get('original_text', '')
node_type = node_info.get('type', '')
first_format = node_info.get('first_format') # Lấy <rPr> đã lưu (hoặc None)
if original_text != modified_text:
# --- Xử lý Rich Text: Tạo lại cấu trúc <si><r>[<rPr>]<t></r></si> ---
if node_type == 'shared_rich':
si_element = element
# Xóa con cũ
for child in list(si_element):
si_element.remove(child)
# Tạo run mới <r>
new_r = ET.Element(f"{{{NS_MAIN['main']}}}r")
# Nếu có định dạng đầu tiên (<rPr>), thêm nó vào <r> mới
if first_format is not None:
new_r.append(first_format) # Thêm bản sao <rPr> đã lưu
# Tạo thẻ text mới <t>
new_t = ET.Element(f"{{{NS_MAIN['main']}}}t")
new_t.text = modified_text
xml_space_attr = '{http://www.w3.org/XML/1998/namespace}space'
new_t.set(xml_space_attr, 'preserve')
# Thêm <t> vào <r>
new_r.append(new_t)
# Thêm <r> vào <si>
si_element.append(new_r)
updated_files.add(node_info['source_file'])
# print(f"Applied first format to Rich Text in {node_info['source_file']}")
# --- Xử lý Simple/Inline Text: Cập nhật thẻ <t> ---
elif node_type in ['shared_simple', 'inline']:
t_element = element
t_element.text = modified_text
xml_space_attr = '{http://www.w3.org/XML/1998/namespace}space'
if xml_space_attr not in t_element.attrib or t_element.attrib[xml_space_attr] != 'preserve':
t_element.set(xml_space_attr, 'preserve')
updated_files.add(node_info['source_file'])
# print(f"Updated Simple/Inline Text in {node_info['source_file']}")
else:
print(f"Cảnh báo: Loại node không xác định '{node_type}'")
# --- Lưu lại các file XML đã thay đổi (Giữ nguyên) ---
success = True
# ... (Phần code lưu file như cũ) ...
shared_tree = global_data.get("shared_tree"); shared_strings_path = global_data.get("shared_strings_path")
sheet_trees = global_data.get("sheet_trees", {}); worksheets_folder = global_data.get("worksheets_folder")
shared_strings_relative_path = os.path.join("xl", "sharedStrings.xml")
if shared_tree and shared_strings_path and shared_strings_relative_path in updated_files:
try:
# print(f"Saving modified file: {shared_strings_path}")
shared_tree.write(shared_strings_path, encoding='utf-8', xml_declaration=True)
except Exception as e: print(f"Lỗi lưu {shared_strings_path}: {e}"); success = False
if worksheets_folder and os.path.exists(worksheets_folder):
for sheet_filename, sheet_tree in sheet_trees.items():
sheet_relative_path = os.path.join("xl", "worksheets", sheet_filename)
if sheet_relative_path in updated_files:
sheet_file_path = os.path.join(worksheets_folder, sheet_filename)
try:
# print(f"Saving modified file: {sheet_file_path}")
sheet_tree.write(sheet_file_path, encoding='utf-8', xml_declaration=True)
except Exception as e: print(f"Lỗi lưu {sheet_file_path}: {e}"); success = False
if success and updated_files: print(f"Đã lưu thành công {len(updated_files)} file XML đã sửa đổi (đã giữ lại định dạng đầu tiên cho Rich Text).")
elif not updated_files: print("Không có file XML nào cần cập nhật.") ; return True
return success
def zip_folder_to_excel_file(folder_path, file_name):
try:
# Nén thư mục thành file .xlsx trong RAM
xlsx_buffer = io.BytesIO()
with zipfile.ZipFile(xlsx_buffer, 'w', zipfile.ZIP_DEFLATED) as zipf:
for root, _, files in os.walk(folder_path):
for file in files:
file_path = os.path.join(root, file)
archive_path = os.path.relpath(file_path, folder_path)
zipf.write(file_path, archive_path)
xlsx_buffer.seek(0)
client = MongoClient("mongodb+srv://admin:[email protected]/?retryWrites=true&w=majority&appName=Cluster0")
db = client['excel']
fs = gridfs.GridFS(db, collection='final_file')
file_id = fs.put(xlsx_buffer.read(), filename=file_name)
print(f"✅ Đã lưu file Excel vào MongoDB với ID: {file_id}")
return file_id
except Exception as e:
print(f"❌ Lỗi khi nén và lưu Excel vào MongoDB: {e}")
return None
def get_text_list_from_nodes(modifiable_nodes: Optional[List[Dict[str, Any]]]) -> List[str]:
if modifiable_nodes is None:
return [] # Trả về list rỗng nếu đầu vào là None
# Sử dụng list comprehension để lấy giá trị của key 'original_text' từ mỗi dictionary
text_list = [
node_info['original_text']
for node_info in modifiable_nodes
if 'original_text' in node_info and node_info['original_text'] is not None
]
# Thêm kiểm tra 'original_text' tồn tại và không phải None cho chắc chắn
return text_list
def count_words(text: str) -> int:
"""Đếm số từ trong một chuỗi bằng cách tách theo khoảng trắng."""
if not text or text.isspace():
return 0
return len(text.split())
# Helper function to process a batch of valid segments (Unchanged)
def _translate_batch_helper(segments_to_translate, original_indices_1based, source_lang, target_lang):
"""Handles preprocessing, translation, postprocessing, and error handling for a batch."""
batch_results = [None] * len(segments_to_translate)
if not segments_to_translate:
return []
try:
processed_segments = preprocess_text(segments_to_translate)
translated_segments = translate_text(processed_segments, source_lang, target_lang)
final_translated_segments = postprocess_text(translated_segments)
if len(final_translated_segments) == len(segments_to_translate):
batch_results = final_translated_segments
else:
print(f" *** CRITICAL ERROR: Batch translation result count mismatch! Expected {len(segments_to_translate)}, got {len(final_translated_segments)}. Marking batch as failed.")
error_msg = "<translation_length_mismatch_error>"
batch_results = [error_msg] * len(segments_to_translate)
except Exception as e:
print(f" *** ERROR during batch translation: {e}. Marking batch as failed.")
# traceback.print_exc() # Uncomment for detailed debug
error_msg = "<translation_api_error>"
batch_results = [error_msg] * len(segments_to_translate)
return batch_results
def translate_xlsx(file_id, file_name, source_lang='en', target_lang='vi', batch_size_segments=50, max_words_per_segment=100, delay_between_requests=1):
"""
Dịch file XLSX, chia thành batch động, dịch riêng các segment quá dài.
Args:
input_filepath (str): Đường dẫn đến file XLSX đầu vào.
output_filepath (str): Đường dẫn để lưu file XLSX đã dịch.
source_lang (str): Mã ngôn ngữ nguồn.
target_lang (str): Mã ngôn ngữ đích.
batch_size_segments (int): Số lượng đoạn text tối đa MONG MUỐN trong mỗi lần gọi API.
max_words_per_segment (int): Giới hạn từ tối đa cho một segment để được dịch theo batch.
Các segment dài hơn sẽ được dịch riêng lẻ.
delay_between_requests (int): Thời gian chờ (giây) giữa các lần gọi API dịch.
"""
client = MongoClient("mongodb+srv://admin:[email protected]/?retryWrites=true&w=majority&appName=Cluster0")
db = client['excel']
fs = gridfs.GridFS(db, collection='root_file')
ppt_file = fs.get(file_id)
excel_file = BytesIO(ppt_file.read())
xml_folder = unzip_office_file(excel_file)
modifiable_nodes, global_data = extract_text_from_sheet(xml_folder)
original_texts = get_text_list_from_nodes(modifiable_nodes)
all_results = [None] * len(original_texts)
current_index = 0
processed_count = 0
api_call_counter = 0 # Track API calls for delay logic
while current_index < len(original_texts):
batch_texts_to_translate = []
batch_original_indices = [] # 0-based indices for assignment
batch_end_index = min(current_index + batch_size_segments, len(original_texts))
found_long_segment_at = -1 # 0-based index in original_texts
# 1. Build the next potential batch, stopping if a long segment is found
for i in range(current_index, batch_end_index):
segment = original_texts[i]
word_count = count_words(segment)
if word_count <= max_words_per_segment:
batch_texts_to_translate.append(segment)
batch_original_indices.append(i)
else:
found_long_segment_at = i
break # Stop building this batch
# --- Process the findings ---
# 2. Translate the VALID batch collected *before* the long segment (if any)
if batch_texts_to_translate:
# Add delay BEFORE the API call if it's not the very first call
if api_call_counter > 0 and delay_between_requests > 0:
time.sleep(delay_between_requests)
translated_batch = _translate_batch_helper(
batch_texts_to_translate,
[idx + 1 for idx in batch_original_indices], # 1-based for logging
source_lang,
target_lang
)
api_call_counter += 1
# Assign results back
for batch_idx, original_idx in enumerate(batch_original_indices):
all_results[original_idx] = translated_batch[batch_idx]
processed_count += len(batch_texts_to_translate)
# 3. Handle the long segment INDIVIDUALLY (if one was found)
if found_long_segment_at != -1:
long_segment_index = found_long_segment_at
long_segment_text = str(original_texts[long_segment_index])
# word_count = count_words(long_segment_text) # Recalculate for log clarity
try:
translated = translate_single_text(long_segment_text, source_lang, target_lang)
final = [translated]
api_call_counter += 1
if len(final) == 1:
all_results[long_segment_index] = final[0]
else:
print(f" *** CRITICAL ERROR: Long segment translation result count mismatch! Expected 1, got {len(final)}. Marking as failed.")
all_results[long_segment_index] = "<translation_length_mismatch_error>"
except Exception as e:
print(f" *** ERROR during translation of long segment {long_segment_index + 1}: {e}. Marking as failed.")
# traceback.print_exc() # Uncomment for detailed debug
all_results[long_segment_index] = "<translation_api_error>"
# Do not increment api_call_counter if the API call itself failed before returning
processed_count += 1
# Update current_index to start AFTER this long segment
current_index = long_segment_index + 1
else:
# No long segment was found in the range checked.
# Move current_index to the end of the range examined.
current_index = batch_end_index
missing_count = 0
final_texts_for_nodes = []
for i, res in enumerate(all_results):
if res is None:
print(f"LỖI LOGIC: Segment {i+1} không được xử lý! Giữ lại text gốc: '{original_texts[i]}'")
final_texts_for_nodes.append(original_texts[i])
missing_count += 1
else:
final_texts_for_nodes.append(res)
if missing_count > 0:
print(f"CẢNH BÁO NGHIÊM TRỌNG: {missing_count} segments bị bỏ lỡ trong quá trình xử lý.")
if len(final_texts_for_nodes) != len(original_texts):
print(f"LỖI NGHIÊM TRỌNG: Số lượng text cuối cùng ({len(final_texts_for_nodes)}) không khớp với gốc ({len(original_texts)}). Hủy bỏ cập nhật.")
else:
# Gán vào node
for i, node_info in enumerate(modifiable_nodes):
node_info['modified_text'] = final_texts_for_nodes[i]
save_success = apply_and_save_changes(modifiable_nodes, global_data)
if not save_success:
print("LỖI NGHIÊM TRỌNG: Không thể lưu thay đổi vào file XML.")
else:
# Only zip if saving XML was successful
final_id = zip_folder_to_excel_file(xml_folder, file_name)
if final_id:
shutil.rmtree(xml_folder) # Mark folder as 'handled' by zipping
else:
print("LỖI NGHIÊM TRỌNG: Không thể tạo file XLSX đã dịch cuối cùng.")
return final_id