Spaces:
Running
Running
import docx | |
from docx import Document | |
import google.generativeai as genai | |
import ast | |
import json | |
import re | |
import dotenv | |
import os | |
import io | |
from pymongo import MongoClient | |
from gridfs import GridFS | |
from docx import Document | |
dotenv.load_dotenv(".env") | |
api_key = os.getenv("GEMINI_API_KEY") | |
genai.configure(api_key=api_key) | |
model = genai.GenerativeModel("gemini-2.0-flash") | |
def batch_translate(texts, target_lang="Vietnamese"): | |
""" Translates multiple text segments in a single API call. """ | |
if not texts: | |
return texts # Skip if empty | |
system_prompt = """ You are given three inputs: source language, target language and a json file. | |
- Your task is to translate the JSON file from the source language (you have to detect the source language yourself) to the target language. | |
- The original JSON file contains a Python array of objects, each with "index" and "text" keys. | |
- Ensure **one-to-one correspondence** — the output must have exactly as many items as the input. | |
- The array contains text that makes up whole paragraphs. Make sure that the translation makes sense when the text is put together and retains the same context. | |
- This is very important: Empty spaces should be left as is. For example: From English, "Hello " should be translated into Vietnamese as "Xin chào ", with the same space at the end. | |
- Very frequently there are spaces before or after a string. Do not remove these spaces. | |
- If the source language is English and the target language is Vietnamese and a string contains "'s" in the possessive sense, translate it as "của". | |
- Example: [["WorldQuant's"], ["Mission"]] should be translated as [["Nhiệm vụ"], ["của WorldQuant"]] | |
- Words might be split into multiple continuous arrays. Translate them such that the translation corresponds to the full word. | |
- If a word is split up into multiple arrays, the translation should be such that the word is not split up. | |
- Exampe: ['Tesla sells its pro', 'ducts'] should be translated as ['Tesla bán sản phẩm của mình', ''.]. Note that the number of elements in the output is the same as the input. | |
- Example: [["Hello"], ["World"]] should be translated as [["Xin chào"], ["Thế giới"]] | |
- Do **not** merge, split, or omit strings. Each input object corresponds to exactly one output object. | |
- Return a JSON object that is a Python array. | |
- Each object in the array is a dictionary with two keys: "index" and "text". | |
- The text should be the translated version of the text in the original object, and the index should stay consistent. | |
- The number of objects in the output MUST the same as the number of objects in the input. | |
- The format of the output should look exactly like the example. | |
- Example: | |
**Input**: Target language: Vietnamese. JSON file: | |
[{"index": 0, "text": ["My name is "]}, {"index": 1, "text": ["Huy"]}, {"index": 2, "text": ["."]}, {"index": 3, "text": ["Today is "]}, {"index": 4, "text": ["a"]}, {"index": 5, "text": ["good day"]}, {"index": 6, "text": ["."]}, {"index": 7, "text": [""]}]' | |
**Output**: [{"index": 0, "text": ["Tên tôi là "]}, {"index": 1, "text": ["Huy"]}, {"index": 2, "text": ["."]}, {"index": 4, "text": ["Hôm nay là "]}, {"index": 3, "text": ["Một"]}, {"index": 5, "text": ["ngày đẹp"]}, {"index": 6, "text": ["."]}, {"index": 7, "text": [""]}] | |
- Return the result of translation according to the format. Do NOT return code for translating. | |
""" | |
json_data = json.dumps([{"index": i, "text": t} for i, t in enumerate(texts)]) | |
user_prompt = f"Target language: {target_lang}. JSON file: {json_data}" | |
model = genai.GenerativeModel('gemini-2.0-flash') | |
response = model.generate_content(contents = system_prompt.strip() + "\n" + user_prompt.strip(), generation_config={ | |
'temperature': 1, # Adjust temperature for desired creativity | |
'top_p': 1, | |
'top_k': 1,}) | |
response_dict = ast.literal_eval(response.text.strip().strip("json```").strip("```").strip()) | |
if len(response_dict) > 0: | |
if isinstance(response_dict[0]['text'], list): | |
translated_texts = [i['text'][0] for i in sorted(response_dict, key = lambda x: x['index'])] | |
elif isinstance(response_dict[0]['text'], str): | |
translated_texts = [i['text'] for i in sorted(response_dict, key = lambda x: x['index'])] | |
return translated_texts | |
def full_translate(texts, target_lang="Vietnamese"): | |
full_translated_texts = [] | |
batch = [] | |
word_count = 0 | |
for string in texts: | |
if len(string.split()) + word_count >= 1000: | |
print('Translating a batch.') | |
full_translated_texts += batch_translate(batch, target_lang) | |
batch = [] | |
word_count = 0 | |
batch.append(string) | |
word_count += len(string.split()) | |
full_translated_texts += batch_translate(batch, target_lang) | |
return full_translated_texts | |
def merge_runs(runs): | |
""" Merges adjacent runs with the same style. """ | |
merged_runs = [] | |
for run in runs: | |
if (merged_runs and isinstance(run, docx.text.run.Run) and isinstance(merged_runs[-1], docx.text.run.Run) and | |
run.style == merged_runs[-1].style and | |
merged_runs[-1].bold == run.bold and | |
merged_runs[-1].italic == run.italic and | |
merged_runs[-1].underline == run.underline and | |
merged_runs[-1].font.size == run.font.size and | |
merged_runs[-1].font.color.rgb == run.font.color.rgb and | |
merged_runs[-1].font.name == run.font.name): | |
merged_runs[-1].text += run.text | |
else: | |
merged_runs.append(run) | |
return merged_runs | |
NS_W = "{http://schemas.openxmlformats.org/wordprocessingml/2006/main}" | |
def translate_header_footer(doc, target_lang): | |
head_foot = [] | |
for section in doc.sections: | |
for header in section.header.paragraphs: | |
for run in header.runs: | |
head_foot.append(run.text) | |
for footer in section.footer.paragraphs: | |
for run in footer.runs: | |
head_foot.append(run.text) | |
translated_head_foot = full_translate(head_foot, target_lang) | |
i = 0 | |
for section in doc.sections: | |
for header in section.header.paragraphs: | |
for run in header.runs: | |
run.text = translated_head_foot[i] | |
i += 1 | |
for footer in section.footer.paragraphs: | |
for run in footer.runs: | |
run.text = translated_head_foot[i] | |
i += 1 | |
def get_text_elements_para(doc): | |
para_texts = [] | |
for para in doc.paragraphs: | |
for element in para._element.iter(): | |
if element.tag.endswith('t'): | |
if element.text: | |
emoji_pattern = r'[\U00010000-\U0010FFFF]' | |
# Split the text but keep emojis as separate elements | |
parts = re.split(f'({emoji_pattern})', element.text) | |
for part in parts: | |
if re.match(emoji_pattern, part): | |
continue | |
para_texts.append(part) | |
return para_texts | |
def get_text_elements_table(doc): | |
table_texts = [] | |
for table in doc.tables: | |
for row in table.rows: | |
for cell in row.cells: | |
table_texts += get_text_elements_para(cell) | |
return table_texts | |
def translate_paragraphs(doc, translated_texts, i = 0): | |
for para in doc.paragraphs: | |
for element in para._element.iter(): | |
if element.tag.endswith('t'): | |
if element.text: | |
emoji_pattern = r'[\U00010000-\U0010FFFF]' | |
# Split the text but keep emojis as separate elements | |
parts = re.split(f'({emoji_pattern})', element.text) | |
for j in range(len(parts)): | |
if re.match(emoji_pattern, parts[j]): | |
continue | |
translated_text = translated_texts[i] | |
i += 1 | |
parts[j] = translated_text | |
element.text = "".join(parts) | |
return doc, i | |
def translate_tables(doc, translated_texts): | |
i = 0 | |
for table in doc.tables: | |
for row in table.rows: | |
for cell in row.cells: | |
cell, i = translate_paragraphs(cell, translated_texts, i) | |
return doc | |
def translate_docx_from_mongodb(file_id, target_lang="Vietnamese"): | |
# Kết nối MongoDB | |
client = MongoClient("mongodb+srv://admin:[email protected]/?retryWrites=true&w=majority&appName=Cluster0") | |
db = client["word"] | |
fs_input = GridFS(db, collection="root_file") | |
fs_output = GridFS(db, collection="final_file") | |
# Lấy file từ MongoDB | |
file_data = fs_input.get(file_id).read() | |
original_file = fs_input.get(file_id).filename # Lấy tên gốc của file | |
doc = Document(io.BytesIO(file_data)) | |
# Lấy nội dung và dịch | |
para_texts = get_text_elements_para(doc) | |
translated_para = full_translate(para_texts, target_lang) | |
table_texts = get_text_elements_table(doc) | |
translated_tables = full_translate(table_texts, target_lang) | |
# Cập nhật nội dung dịch vào document | |
doc, _ = translate_paragraphs(doc, translated_para) | |
doc = translate_tables(doc, translated_tables) | |
translate_header_footer(doc, target_lang) | |
# Lưu file dịch vào MongoDB với cùng tên gốc | |
output_stream = io.BytesIO() | |
doc.save(output_stream) | |
output_stream.seek(0) | |
translated_file_id = fs_output.put(output_stream, filename=original_file) | |
client.close() | |
return translated_file_id | |