Spaces:
Running
Running
| import os | |
| import docx | |
| from docx import Document | |
| import google.generativeai as genai | |
| import ast | |
| import json | |
| import re | |
| import time | |
| import dotenv | |
| import os | |
| from pymongo import MongoClient | |
| import gridfs | |
| from io import BytesIO | |
| dotenv.load_dotenv(".env") | |
| genai.configure(api_key=os.getenv("GEMINI_API_KEY")) | |
| time_spent_sleeping = 0 | |
| mismatches = 0 | |
| def batch_translate(texts, source_lang = 'English', target_lang="Vietnamese"): | |
| """ Translates multiple text segments in a single API call. """ | |
| if not texts: | |
| return texts # Skip if empty | |
| system_prompt = """ | |
| Translate the contents of a JSON file from the specified source language to the specified target language while preserving the structure, spaces, and context of the original text. | |
| Instructions: | |
| 1. You will be given three inputs: source language, target language, and a JSON file. | |
| 2. The JSON file contains a Python dictionary where each key is an integer, and each value is a string. | |
| 3. Ensure one-to-one correspondence—each input item must correspond to exactly one output item with the same number of items. | |
| 4. The names of people, places, and organizations should be preserved in the translation. | |
| 5. Preserve spaces before or after strings. Do not remove, merge, split, or omit any strings. | |
| 6. Translate paragraphs and ensure the translation makes sense when text is put together. | |
| 7. Translate split words so that the word is not split in the translation. | |
| 8. Return a JSON object that is a Python dictionary containing as many items as the original JSON file, with keys and order preserved. | |
| 9. The output must be a syntactically correct Python dictionary. | |
| Additional Examples: | |
| **Input 1**: | |
| - Source language: English | |
| - Target language: Vietnamese | |
| - JSON file: | |
| ```json | |
| {"0": "My name is ", "1": "Huy", "2": ".", "3": " Today is ", "4": "a ", "5": "good day", "6": ".", "7": ""} | |
| ``` | |
| **Output 1**: | |
| ```json | |
| {"0": "Tên tôi là ", "1": "Huy", "2": ".", "3": " Hôm nay là ", "4": "một ", "5": "ngày đẹp", "6": ".", "7": ""} | |
| ``` | |
| **Input 2**: | |
| - Source language: English | |
| - Target language: Spanish | |
| - JSON file: | |
| ```json | |
| {"0": "The sky is ", "1": "blue", "2": ".", "3": " Water is ", "4": "essential", "5": " for ", "6": "life", "7": "."} | |
| ``` | |
| **Output 2**: | |
| ```json | |
| {"0": "El cielo es ", "1": "azul", "2": ".", "3": " El agua es ", "4": "esencial", "5": " para ", "6": "la vida", "7": "."} | |
| ``` | |
| **Input 3**: | |
| - Source language: English | |
| - Target language: French | |
| - JSON file: | |
| ```json | |
| {"0": "The quick brown ", "1": "fox ", "2": "jumps ", "3": "over ", "4": "the ", "5": "lazy ", "6": "dog", "7": "."} | |
| ``` | |
| **Output 3**: | |
| ```json | |
| {"0": "Le renard brun ", "1": "rapide ", "2": "saute ", "3": "par-dessus ", "4": "le ", "5": "chien ", "6": "paresseux", "7": "."} | |
| ``` | |
| Perform the translation and return the result as specified above. Do not include any additional text other than the translated JSON object. | |
| """ | |
| json_data = json.dumps({i: t for i, t in enumerate(texts)}) | |
| user_prompt = f"Source language: {source_lang}. Target language: {target_lang}. JSON file: {json_data}" | |
| model = genai.GenerativeModel('gemini-2.0-flash') | |
| response = model.generate_content(contents = system_prompt.strip() + "\n" + user_prompt.strip(), generation_config={ | |
| 'temperature': 1, # Adjust temperature for desired creativity | |
| 'top_p': 1, | |
| 'top_k': 1,}) | |
| # response_dict = ast.literal_eval(response.text.strip().strip("json```").strip("```").strip().strip("\"")) | |
| # print(len(texts), len(list(response_dict.values()))) | |
| # return list(response_dict.values()) | |
| return response | |
| def response_to_dict(response): | |
| return list(ast.literal_eval(response.text.strip().strip("json```").strip("```").strip().strip("\"")).values()) | |
| def brute_force_fix(batch, translated_batch): | |
| if len(batch) > len(translated_batch): | |
| translated_batch += [""] * (len(batch) - len(translated_batch)) | |
| elif len(batch) < len(translated_batch): | |
| translated_batch = translated_batch[:len(batch)] | |
| return translated_batch | |
| def batch_translate_loop(batch, source_lang, target_lang): | |
| if not batch: | |
| return batch | |
| translated_batch_response = batch_translate(batch, source_lang, target_lang) | |
| try: | |
| translated_batch = response_to_dict(translated_batch_response) | |
| assert(len(translated_batch) == len(batch)) | |
| except: | |
| for i in range(10): | |
| print(f'I am ChatGPT and I am retarded, retrying translation time {i}:') | |
| try: | |
| translated_batch_response = batch_translate(batch, source_lang, target_lang) | |
| translated_batch = response_to_dict(translated_batch_response) | |
| assert(len(translated_batch) == len(batch)) | |
| break | |
| except: | |
| pass | |
| try: | |
| assert(isinstance(response_to_dict(translated_batch_response), list)) | |
| except: | |
| raise ValueError("The translated batch is not a list.") | |
| if len(translated_batch) != len(batch): | |
| print("Length mismatch after translation. Brute Force Fixing...") | |
| translated_batch = brute_force_fix(batch, translated_batch) | |
| global mismatches | |
| mismatches += 1 | |
| print(len(batch), len(translated_batch)) | |
| return translated_batch | |
| def get_batches(texts, limit = 1000): | |
| batches = [] | |
| batch = [] | |
| word_count = 0 | |
| for string in texts: | |
| if len(string.split()) + word_count >= limit: | |
| batches.append(batch) | |
| batch = [] | |
| word_count = 0 | |
| batch.append(string) | |
| word_count += len(string) | |
| batches.append(batch) | |
| return batches | |
| def full_translate(texts, source_lang = 'English', target_lang="Vietnamese"): | |
| full_translated_texts = [] | |
| batches = get_batches(texts, limit = 1000) | |
| word_count = 0 | |
| global time_spent_sleeping | |
| for batch in batches: | |
| translated_batch = batch_translate_loop(batch, source_lang, target_lang) | |
| full_translated_texts += translated_batch | |
| time.sleep(3) | |
| time_spent_sleeping += 3 | |
| return full_translated_texts | |
| def merge_runs(runs): | |
| """ Merges adjacent runs with the same style. """ | |
| merged_runs = [] | |
| for run in runs: | |
| if (merged_runs and isinstance(run, docx.text.run.Run) and isinstance(merged_runs[-1], docx.text.run.Run) and | |
| run.style == merged_runs[-1].style and | |
| merged_runs[-1].bold == run.bold and | |
| merged_runs[-1].italic == run.italic and | |
| merged_runs[-1].underline == run.underline and | |
| merged_runs[-1].font.size == run.font.size and | |
| merged_runs[-1].font.color.rgb == run.font.color.rgb and | |
| merged_runs[-1].font.name == run.font.name): | |
| merged_runs[-1].text += run.text | |
| else: | |
| merged_runs.append(run) | |
| return merged_runs | |
| NS_W = "{http://schemas.openxmlformats.org/wordprocessingml/2006/main}" | |
| def translate_header_footer(doc, source_lang, target_lang): | |
| head_foot = [] | |
| for section in doc.sections: | |
| for header in section.header.paragraphs: | |
| for run in header.runs: | |
| head_foot.append(run.text) | |
| for footer in section.footer.paragraphs: | |
| for run in footer.runs: | |
| head_foot.append(run.text) | |
| translated_head_foot = batch_translate_loop(head_foot, source_lang, target_lang) | |
| i = 0 | |
| for section in doc.sections: | |
| for header in section.header.paragraphs: | |
| for run in header.runs: | |
| run.text = translated_head_foot[i] | |
| i += 1 | |
| for footer in section.footer.paragraphs: | |
| for run in footer.runs: | |
| run.text = translated_head_foot[i] | |
| i += 1 | |
| def get_text_elements_para(doc): | |
| para_texts = [] | |
| for para in doc.paragraphs: | |
| for element in para._element.iter(): | |
| if element.tag.endswith('t'): | |
| if element.text: | |
| emoji_pattern = r'[\U00010000-\U0010FFFF]' | |
| # Split the text but keep emojis as separate elements | |
| parts = re.split(f'({emoji_pattern})', element.text) | |
| for part in parts: | |
| if re.match(emoji_pattern, part): | |
| continue | |
| if len(part.strip()) != 0: | |
| para_texts.append(part) | |
| return para_texts | |
| def get_text_elements_table(doc): | |
| table_texts = [] | |
| for table in doc.tables: | |
| for row in table.rows: | |
| for cell in row.cells: | |
| table_texts += get_text_elements_para(cell) | |
| return table_texts | |
| def translate_paragraphs(doc, translated_texts, i = 0): | |
| for para in doc.paragraphs: | |
| for element in para._element.iter(): | |
| if element.tag.endswith('t'): | |
| if element.text: | |
| emoji_pattern = r'[\U00010000-\U0010FFFF]' | |
| # Split the text but keep emojis as separate elements | |
| parts = re.split(f'({emoji_pattern})', element.text) | |
| for j in range(len(parts)): | |
| if re.match(emoji_pattern, parts[j]): | |
| continue | |
| if len(parts[j].strip()) != 0: | |
| translated_text = translated_texts[i] | |
| i += 1 | |
| parts[j] = translated_text | |
| element.text = "".join(parts) | |
| return doc, i | |
| def translate_tables(doc, translated_texts): | |
| i = 0 | |
| for table in doc.tables: | |
| for row in table.rows: | |
| for cell in row.cells: | |
| cell, i = translate_paragraphs(cell, translated_texts, i) | |
| return doc | |
| def is_same_formatting(text1, text2): | |
| """ | |
| Check if two texts have the same formatting. | |
| """ | |
| return (text1.bold == text2.bold \ | |
| and text1.italic == text2.italic \ | |
| and text1.underline == text2.underline \ | |
| and text1.font.size == text2.font.size \ | |
| and text1.font.color.rgb == text2.font.color.rgb \ | |
| and text1.font.name == text2.font.name) | |
| def merge_elements(doc): | |
| for para in doc.paragraphs: | |
| current_run = [] | |
| for element in para.iter_inner_content(): | |
| if isinstance(element, docx.text.run.Run): | |
| if current_run == []: | |
| current_run = [element] | |
| elif is_same_formatting(current_run[0], element): | |
| current_run[0].text += element.text | |
| element.text = "" | |
| else: | |
| current_run = [element] | |
| for table in doc.tables: | |
| for row in table.rows: | |
| for cell in row.cells: | |
| for para in cell.paragraphs: | |
| current_run = [] | |
| for element in para.iter_inner_content(): | |
| if isinstance(element, docx.text.run.Run): | |
| if current_run == []: | |
| current_run = [element] | |
| elif is_same_formatting(current_run[0], element): | |
| current_run[0].text += element.text | |
| element.text = "" | |
| else: | |
| current_run = [element] | |
| return doc | |
| def translate_docx(file_id, source_lang="English", target_lang="Vietnamese", file_name=''): | |
| """Translates a Word document and saves the output to MongoDB.""" | |
| client = MongoClient(os.getenv("MONGODB_URI")) | |
| db = client["word"] | |
| fs_input = gridfs.GridFS(db, collection="root_file") | |
| fs_output = gridfs.GridFS(db, collection="final_file") | |
| # Lấy file gốc từ MongoDB | |
| input_file = fs_input.get(file_id) | |
| doc = Document(BytesIO(input_file.read())) | |
| # Dịch nội dung | |
| doc = merge_elements(doc) | |
| print('Translating paragraphs.') | |
| para_texts = get_text_elements_para(doc) | |
| translated_para = full_translate(para_texts, source_lang=source_lang, target_lang=target_lang) | |
| print('Done translating paragraphs.') | |
| print('Translating tables.') | |
| table_texts = get_text_elements_table(doc) | |
| translated_tables = full_translate(table_texts, source_lang=source_lang, target_lang=target_lang) | |
| print('Done translating tables.') | |
| print('Inserting paragraphs.') | |
| doc, _ = translate_paragraphs(doc, translated_para) | |
| print('Inserting tables.') | |
| doc = translate_tables(doc, translated_tables) | |
| translate_header_footer(doc, source_lang, target_lang) | |
| print('Done translating headers & footers.') | |
| # Lưu tài liệu đã dịch vào MongoDB | |
| output_stream = BytesIO() | |
| doc.save(output_stream) | |
| output_stream.seek(0) | |
| translated_file_id = fs_output.put(output_stream, filename=file_name) | |
| client.close() | |
| print(f"Translation complete! Saved to MongoDB with id: {translated_file_id}") | |
| return translated_file_id |