Spaces:
Running
Running
import openpyxl | |
from typing import Dict, List | |
from translate.translator import translate_text_dict | |
import math | |
import chardet | |
import io | |
import pandas as pd | |
import pymongo | |
import gridfs | |
import tempfile | |
import os | |
def translate_xlsx(file_id: str, sheet_name: str = None, from_lang: str = 'en', target_lang: str = "fr", gemini_api: str = "", db_name: str = "excel"): | |
# Kết nối MongoDB | |
client = pymongo.MongoClient("mongodb+srv://admin:[email protected]/?retryWrites=true&w=majority&appName=Cluster0") | |
db = client[db_name] | |
fs_input = gridfs.GridFS(db, collection="root_file") | |
fs_output = gridfs.GridFS(db, collection="final_file") | |
# Tải file từ MongoDB | |
file_data = fs_input.get(file_id).read() | |
# Lưu file tạm thời | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".xlsx") as temp_file: | |
temp_file.write(file_data) | |
temp_file_path = temp_file.name | |
# Đọc file Excel bằng openpyxl | |
wb = openpyxl.load_workbook(temp_file_path) | |
# Chọn sheet được chỉ định hoặc tất cả các sheet | |
sheets = [wb[sheet_name]] if sheet_name else wb.worksheets | |
for ws in sheets: | |
max_row = ws.max_row | |
max_col = ws.max_column | |
# Tạo dictionary lưu trữ nội dung cần dịch và mapping từ key đến cell | |
text_dict: Dict[str, List[str]] = {} | |
cell_map: Dict[str, any] = {} # lưu mapping key -> cell object | |
for row in range(1, max_row + 1): | |
for col in range(1, max_col + 1): | |
cell = ws.cell(row=row, column=col) | |
if isinstance(cell.value, str): | |
key = f"R{row}C{col}" # key theo dạng R{row}C{col} | |
text_dict[key] = [cell.value] # Lưu giá trị dưới dạng danh sách với 1 phần tử | |
cell_map[key] = cell | |
# Gọi hàm dịch theo dạng bulk | |
translated_dict = translate_text_dict(text_dict, target_lang=target_lang, gemini_api=gemini_api) | |
# Cập nhật lại các cell với nội dung đã dịch | |
for key, cell in cell_map.items(): | |
if key in translated_dict: | |
translated_text_list = translated_dict[key] | |
if translated_text_list and len(translated_text_list) > 0: | |
cell.value = translated_text_list[0] | |
# Lưu workbook vào file tạm thời | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".xlsx") as output_file: | |
wb.save(output_file.name) | |
output_file.seek(0) | |
translated_file_id = fs_output.put(output_file.read(), filename=f"translated_{file_id}.xlsx") | |
# Đóng workbook và xóa file tạm | |
wb.close() | |
os.remove(temp_file_path) | |
print(f"✅ Dịch thành công! File đã lưu vào MongoDB với file_id: {translated_file_id}") | |
return translated_file_id | |
def read_csv_with_auto_encoding(csv_path): | |
# Đọc file dưới dạng nhị phân | |
with open(csv_path, "rb") as f: | |
raw_data = f.read() | |
# Dò tìm encoding | |
detect_result = chardet.detect(raw_data) | |
encoding = detect_result["encoding"] | |
confidence = detect_result["confidence"] | |
print(f"Chardet dự đoán file '{csv_path}' có encoding = {encoding} (độ tin cậy = {confidence})") | |
# Nếu chardet không phát hiện được, ta đặt fallback = 'utf-8' | |
if encoding is None: | |
encoding = "utf-8" | |
decoded_data = raw_data.decode(encoding, errors='replace') | |
# Sử dụng io.StringIO để chuyển đổi chuỗi thành đối tượng file-like | |
csv_data = io.StringIO(decoded_data) | |
df = pd.read_csv(csv_data) | |
return df | |
def translate_csv(file_id, source_lang="en", target_lang="vi", gemini_api="", chunk_size=50, text_columns=None, db_name="csv"): | |
# Kết nối MongoDB | |
client = pymongo.MongoClient("mongodb+srv://admin:[email protected]/?retryWrites=true&w=majority&appName=Cluster0") | |
db = client[db_name] | |
fs_input = gridfs.GridFS(db, collection="root_file") | |
fs_output = gridfs.GridFS(db, collection="final_file") | |
# Tải file từ MongoDB | |
file_data = fs_input.get(file_id).read() | |
# Lưu file tạm thời | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".csv") as temp_file: | |
temp_file.write(file_data) | |
temp_file_path = temp_file.name | |
df = read_csv_with_auto_encoding(temp_file_path) | |
# If text_columns is not specified, we assume we want to translate everything that looks like text. | |
# Otherwise, only translate the given columns. | |
if text_columns is None: | |
# Example heuristic: choose all object/string columns | |
text_columns = df.select_dtypes(include=["object"]).columns.tolist() | |
num_rows = len(df) | |
num_chunks = math.ceil(num_rows / chunk_size) | |
translated_df = df.copy() # copy to store the final translations | |
for chunk_index in range(num_chunks): | |
start_idx = chunk_index * chunk_size | |
end_idx = min((chunk_index + 1) * chunk_size, num_rows) | |
chunk_df = df.iloc[start_idx:end_idx] | |
# Build a dictionary structure. For example, row-based: | |
# { | |
# "0": {"colA": "some text", "colB": "some text"}, | |
# "1": {"colA": "some text", "colB": "some text"}, | |
# ... | |
# } | |
chunk_dict = {} | |
for i, row in chunk_df.iterrows(): | |
row_dict = {} | |
for col in text_columns: | |
row_dict[col] = str(row[col]) if pd.notnull(row[col]) else "" | |
chunk_dict[str(i)] = row_dict | |
# Now call your LLM translator on this dictionary | |
translated_chunk = translate_text_dict( | |
text_dict=chunk_dict, | |
source_lang=source_lang, | |
target_lang=target_lang, | |
gemini_api=gemini_api | |
) | |
# 'translated_chunk' should be the same structure, so let's re-inject into the DataFrame | |
for i_str, row_data in translated_chunk.items(): | |
i = int(i_str) | |
for col, translated_val in row_data.items(): | |
translated_df.at[i, col] = translated_val | |
# Lưu file dịch vào tệp tạm thời | |
translated_file_path = temp_file_path.replace(".csv", f"_translated_{target_lang}.csv") | |
translated_df.to_csv(translated_file_path, index=False, encoding='utf-8-sig') | |
# Đọc lại file tạm để lưu vào MongoDB | |
with open(translated_file_path, "rb") as f: | |
translated_file_id = fs_output.put(f, filename=f"translated_{file_id}.csv") | |
# Xóa file tạm | |
os.remove(temp_file_path) | |
os.remove(translated_file_path) | |
print(f"Translation complete! Saved to MongoDB with file_id: {translated_file_id}") | |
return translated_file_id |