Spaces:
Running
Running
import xml.etree.ElementTree as ET | |
from xml.dom import minidom | |
import json | |
from typing import Dict, List | |
from concurrent.futures import ThreadPoolExecutor | |
from pptx import Presentation | |
from pptx.enum.shapes import MSO_SHAPE_TYPE | |
from powerpoint.pptx_object import get_table_properties, get_shape_properties | |
from pymongo import MongoClient | |
import gridfs | |
from bson import ObjectId | |
from io import BytesIO | |
gemini_api = "AIzaSyDtBIjTSfbvuEsobNwjtdyi9gVpDrCaWPM" | |
def extract_text_from_group(group_shape, slide_number, shape_index, slide_element): | |
"""Extracts text from shapes within a group, only adding the group if it contains text.""" | |
group_element = ET.SubElement(slide_element, "group_element") | |
group_element.set("shape_index", str(shape_index)) | |
group_element.set("group_name", group_shape.name) # Add group name | |
group_has_text = False # Flag to track if the group contains any text | |
for i, shape in enumerate(group_shape.shapes): | |
if shape.shape_type == MSO_SHAPE_TYPE.GROUP: | |
# Recursively check nested groups, and update group_has_text | |
if extract_text_from_group(shape, slide_number, i, group_element): | |
group_has_text = True | |
elif shape.shape_type == MSO_SHAPE_TYPE.TABLE: | |
table_element = ET.SubElement(group_element, "table_element") | |
table_element.set("shape_index", str(i)) | |
table_data = get_table_properties(shape.table) | |
props_element = ET.SubElement(table_element, "properties") | |
props_element.text = json.dumps(table_data, indent=2) | |
group_has_text = True | |
elif hasattr(shape, "text_frame") and shape.text_frame: | |
text_element = ET.SubElement(group_element, "text_element") | |
text_element.set("shape_index", str(i)) | |
shape_data = get_shape_properties(shape) | |
props_element = ET.SubElement(text_element, "properties") | |
props_element.text = json.dumps(shape_data, indent=2) | |
if shape_data.get("text") or ( | |
"paragraphs" in shape_data | |
and any(p.get("text") for p in shape_data["paragraphs"]) | |
): | |
group_has_text = True | |
# Only keep the group element if it contains text | |
if not group_has_text: | |
slide_element.remove(group_element) | |
return False | |
return True | |
def extract_text_from_slide(slide, slide_number, translate=False): | |
"""Extract all text elements from a slide.""" | |
slide_element = ET.Element("slide") | |
slide_element.set("number", str(slide_number)) | |
for shape_index, shape in enumerate(slide.shapes): | |
if shape.shape_type == MSO_SHAPE_TYPE.GROUP: | |
extract_text_from_group(shape, slide_number, shape_index, slide_element) | |
elif shape.shape_type == MSO_SHAPE_TYPE.TABLE: | |
table_element = ET.SubElement(slide_element, "table_element") | |
table_element.set("shape_index", str(shape_index)) | |
table_data = get_table_properties(shape.table) | |
props_element = ET.SubElement(table_element, "properties") | |
props_element.text = json.dumps(table_data, indent=2) | |
elif hasattr(shape, "text"): | |
text_element = ET.SubElement(slide_element, "text_element") | |
text_element.set("shape_index", str(shape_index)) | |
shape_data = get_shape_properties(shape) | |
props_element = ET.SubElement(text_element, "properties") | |
props_element.text = json.dumps(shape_data, indent=2) | |
return slide_element | |
def ppt_to_xml_mongodb(ppt_file_id: str, db_name="ppt"): | |
""" | |
Chuyển PowerPoint từ MongoDB thành XML và lưu vào MongoDB. | |
:param ppt_file_id: ID của file PPT gốc trong MongoDB (original_pptx) | |
:param db_name: Tên database MongoDB | |
:return: ID của file XML trong MongoDB (original_xml) | |
""" | |
# Kết nối MongoDB | |
client = MongoClient( | |
"mongodb+srv://admin:[email protected]/?retryWrites=true&w=majority&appName=Cluster0", | |
connectTimeoutMS=60000, # 60 giây thay vì 20 giây | |
serverSelectionTimeoutMS=60000, # Chờ phản hồi lâu hơn | |
socketTimeoutMS=60000, # Tăng thời gian chờ socket | |
tls=True, | |
tlsAllowInvalidCertificates=True # Giữ kết nối lâu hơn | |
) | |
db = client[db_name] | |
fs_ppt = gridfs.GridFS(db, collection="root_file") # PPT gốc | |
fs_xml = gridfs.GridFS(db, collection="original_xml") # XML lưu trữ | |
try: | |
# Lấy file PPT từ MongoDB | |
if not isinstance(ppt_file_id, ObjectId): | |
ppt_file_id = ObjectId(ppt_file_id) | |
ppt_file = fs_ppt.get(ppt_file_id) | |
prs = Presentation(BytesIO(ppt_file.read())) | |
# Tạo XML | |
root = ET.Element("presentation") | |
root.set("file_name", ppt_file.filename) | |
with ThreadPoolExecutor(max_workers=4) as executor: | |
future_to_slide = { | |
executor.submit(extract_text_from_slide, slide, slide_number): slide_number | |
for slide_number, slide in enumerate(prs.slides, 1) | |
} | |
for future in future_to_slide: | |
slide_number = future_to_slide[future] | |
try: | |
slide_element = future.result() | |
root.append(slide_element) | |
except Exception as e: | |
print(f"Error processing slide {slide_number}: {str(e)}") | |
xml_str = minidom.parseString(ET.tostring(root)).toprettyxml(indent=" ") | |
# Lưu XML vào MongoDB | |
xml_output = BytesIO(xml_str.encode("utf-8")) | |
xml_file_id = fs_xml.put(xml_output, filename=f"{ppt_file.filename}.xml") | |
print(f"✅ XML đã được lưu vào MongoDB (original_xml) với file_id: {xml_file_id}") | |
client.close() | |
return xml_file_id | |
except Exception as e: | |
print(f"❌ Lỗi khi chuyển PPT sang XML: {str(e)}") | |
return None | |
finally: | |
client.close() | |
def extract_text_from_xml(file_id=None, filename=None, db_name="ppt", collection_name="original_xml") -> Dict[str, List[str]]: | |
""" | |
Tải XML từ MongoDB và trích xuất văn bản từ các slide. | |
:param file_id: ID của file trong MongoDB (dạng ObjectId hoặc string) | |
:param filename: Tên file cần tìm trong MongoDB (VD: "file.xml") | |
:param db_name: Tên database MongoDB | |
:param collection_name: Tên collection GridFS | |
:return: Dictionary {slide_number: [text1, text2, ...]} | |
""" | |
# Kết nối MongoDB | |
client = MongoClient("mongodb+srv://admin:[email protected]/?retryWrites=true&w=majority&appName=Cluster0") | |
db = client[db_name] | |
fs = gridfs.GridFS(db, collection=collection_name) | |
try: | |
# Tìm file theo file_id hoặc filename | |
if file_id: | |
if not isinstance(file_id, ObjectId): | |
file_id = ObjectId(file_id) | |
file_data = fs.get(file_id) | |
elif filename: | |
file_data = fs.find_one({"filename": filename}) | |
if not file_data: | |
print(f"❌ Không tìm thấy file '{filename}' trong MongoDB!") | |
return {} | |
else: | |
print("❌ Cần cung cấp 'file_id' hoặc 'filename' để tải file.") | |
return {} | |
# Đọc nội dung XML từ MongoDB | |
xml_content = file_data.read().decode("utf-8") | |
# print(f"✅ xml_content: {xml_content}") | |
# Chuyển đổi thành cây XML | |
root = ET.fromstring(xml_content) | |
slide_texts = {} | |
# Duyệt qua từng slide | |
for slide in root.findall("slide"): | |
slide_number = slide.get("number") | |
texts = [] | |
# Helper function to extract text recursively | |
def extract_text_recursive(element): | |
if element.tag == "text_element": | |
props = element.find("properties") | |
if props is not None and props.text: | |
try: | |
shape_data = json.loads(props.text) | |
# Handle both direct 'text' and paragraph-based text | |
if 'text' in shape_data: | |
texts.append(shape_data['text']) | |
elif 'paragraphs' in shape_data: | |
for paragraph in shape_data['paragraphs']: | |
if 'text' in paragraph: | |
texts.append(paragraph['text']) | |
#Also extract run level text | |
elif 'runs' in paragraph: | |
for run in paragraph['runs']: | |
if 'text' in run: | |
texts.append(run['text']) | |
except json.JSONDecodeError: | |
pass # Ignore if JSON is invalid | |
elif element.tag == "table_element": | |
props = element.find("properties") | |
if props is not None and props.text: | |
try: | |
table_data = json.loads(props.text) | |
for row in table_data.get("cells", []): | |
for cell in row: | |
texts.append(cell.get("text", "")) | |
except json.JSONDecodeError: | |
pass # Ignore if JSON is invalid | |
# Recursively process children of group_element | |
elif element.tag == "group_element": | |
for child in element: | |
extract_text_recursive(child) | |
# Iterate through all direct children of the slide | |
for child in slide: | |
extract_text_recursive(child) | |
slide_texts[str(slide_number)] = texts # Ensure slide number is a string | |
print(slide_texts) | |
return slide_texts | |
except Exception as e: | |
print(f"❌ Lỗi khi xử lý XML: {e}") | |
return {} | |
finally: | |
client.close() | |
def adjust_size(original_text, translated_text, data_container): | |
"""Adjust font size if translated text is significantly longer.""" | |
if not original_text or not translated_text: | |
return | |
original_len = len(original_text) | |
translated_len = len(translated_text) | |
length_ratio = translated_len / original_len if original_len >0 else 1 # Avoid division by 0 | |
if length_ratio > 1.5: # Adjust threshold as needed | |
if 'paragraphs' in data_container: | |
for paragraph in data_container['paragraphs']: | |
if 'runs' in paragraph: | |
for run in paragraph['runs']: | |
if run.get('font') and run['font'].get('size'): | |
run['font']['size'] = max(6, int(run['font']['size'] * 0.8)) | |
elif 'font' in data_container and data_container['font'].get('size'): | |
data_container['font']['size'] = max(6, int(data_container['font']['size'] * 0.8)) | |
def update_xml_with_translated_text_mongodb(file_id: str, translated_dict: Dict[str, List[str]], db_name="ppt"): | |
""" | |
Tải XML từ MongoDB (collection original_xml), cập nhật nội dung dịch, và lưu lại vào collection final_xml. | |
:param file_id: ID của file trong MongoDB (original_xml) | |
:param translated_dict: Dictionary {slide_number: [translated_text1, translated_text2, ...]} | |
:param db_name: Tên database MongoDB | |
""" | |
# Kết nối MongoDB | |
client = MongoClient("mongodb+srv://admin:[email protected]/?retryWrites=true&w=majority&appName=Cluster0") | |
db = client[db_name] | |
fs_original = gridfs.GridFS(db, collection="original_xml") # Lấy file từ original_xml | |
fs_final = gridfs.GridFS(db, collection="final_xml") # Lưu file vào final_xml | |
try: | |
# Tải file từ MongoDB (original_xml) | |
if not isinstance(file_id, ObjectId): | |
file_id = ObjectId(file_id) | |
file_data = fs_original.get(file_id) | |
xml_content = file_data.read().decode("utf-8") | |
# Chuyển đổi XML string thành cây XML | |
root = ET.fromstring(xml_content) | |
# Cập nhật nội dung dịch | |
for slide in root.findall("slide"): | |
slide_num = slide.get("number") | |
if slide_num in translated_dict: | |
translated_texts = translated_dict[slide_num] | |
text_index = 0 # Keep track of the current translated text | |
def update_element_recursive(element): | |
nonlocal text_index # Access and modify the outer scope's index | |
if element.tag == "text_element": | |
props = element.find("properties") | |
if props is not None and props.text: | |
try: | |
shape_data = json.loads(props.text) | |
original_text = "" | |
# Handle direct text and paragraph-based text | |
if 'text' in shape_data: | |
original_text = shape_data['text'] | |
if text_index < len(translated_texts): | |
shape_data['text'] = translated_texts[text_index] | |
adjust_size(original_text, translated_texts[text_index], shape_data) | |
text_index += 1 | |
elif 'paragraphs' in shape_data: | |
for paragraph in shape_data['paragraphs']: | |
if 'text' in paragraph: | |
original_text = paragraph['text'] | |
if text_index < len(translated_texts): | |
paragraph['text'] = translated_texts[text_index] | |
adjust_size(original_text, translated_texts[text_index], paragraph) | |
text_index += 1 | |
elif 'runs' in paragraph: | |
for run in paragraph['runs']: | |
if 'text' in run: | |
original_text = run['text'] | |
if text_index < len(translated_texts): | |
run['text'] = translated_texts[text_index] | |
adjust_size(original_text, translated_texts[text_index], run) | |
text_index += 1 | |
props.text = json.dumps(shape_data, indent=2) | |
except json.JSONDecodeError: | |
print(f"JSONDecodeError in text_element on slide {slide_num}") | |
elif element.tag == "table_element": | |
props = element.find("properties") | |
if props is not None and props.text: | |
try: | |
table_data = json.loads(props.text) | |
for row in table_data.get("cells", []): | |
for cell in row: | |
original_text = cell.get('text', '') | |
if text_index < len(translated_texts): | |
cell['text'] = translated_texts[text_index] | |
adjust_size(original_text, translated_texts[text_index], cell) | |
text_index += 1 | |
props.text = json.dumps(table_data, indent=2) | |
except json.JSONDecodeError: | |
print(f"JSONDecodeError in table_element on slide {slide_num}") | |
elif element.tag == "group_element": | |
print("Group element found") | |
for child in element: | |
update_element_recursive(child) # Recursively process children | |
# Start the recursive update from the slide's direct children | |
for child in slide: | |
update_element_recursive(child) | |
# Chuyển XML thành chuỗi và làm đẹp định dạng | |
updated_xml_str = minidom.parseString(ET.tostring(root)).toprettyxml(indent=" ") | |
# Lưu file cập nhật vào MongoDB (final_xml) | |
new_file_id = fs_final.put(updated_xml_str.encode("utf-8"), filename=f"{file_data.filename}_translated.xml") | |
print(f"✅ XML đã cập nhật được lưu vào MongoDB (final_xml) với file_id: {new_file_id}") | |
return new_file_id | |
except Exception as e: | |
print(f"❌ Lỗi khi cập nhật XML: {e}") | |
return None | |
finally: | |
client.close() | |