Spaces:
Running
Running
File size: 7,382 Bytes
0e9ff78 6ae64ab 0e9ff78 fad6c52 0e9ff78 fad6c52 0e9ff78 fad6c52 0e9ff78 6ae64ab 0e9ff78 fad6c52 0e9ff78 fad6c52 0e9ff78 fad6c52 0e9ff78 6ae64ab 0e9ff78 6ae64ab 0e9ff78 6ae64ab 0e9ff78 6ae64ab 0e9ff78 d586fe1 0e9ff78 d586fe1 0e9ff78 d586fe1 0e9ff78 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
from pymongo import MongoClient
import gridfs
from bson import ObjectId
import os
from io import BytesIO
import magic
def connect_mongodb(db_name, collection_name):
client = MongoClient("mongodb+srv://admin:[email protected]/?retryWrites=true&w=majority&appName=Cluster0")
db = client[db_name]
fs = gridfs.GridFS(db, collection=collection_name)
return fs
def save_file_to_mongodb(uploaded_file, db_name="ppt", collection_name="root_file", file_tail=".pptx"):
"""
Lưu file vào MongoDB bằng GridFS mà không kiểm tra trùng lặp.
:param uploaded_file: đối tượng UploadedFile từ Streamlit
:param db_name: Tên database trong MongoDB
:param collection_name: Tên collection GridFS
:param file_tail: Phần mở rộng mặc định của file nếu không có
:return: file_id nếu lưu thành công
"""
client = MongoClient("mongodb+srv://admin:[email protected]/?retryWrites=true&w=majority&appName=Cluster0")
db = client[db_name]
fs = gridfs.GridFS(db, collection=collection_name)
# Xác định tên file
file_name = uploaded_file.name
# Đảm bảo con trỏ file đang ở đầu
uploaded_file.seek(0)
file_bytes = uploaded_file.read()
# Lưu file vào MongoDB (không kiểm tra trùng lặp)
file_id = fs.put(file_bytes, filename=file_name)
print(f"✅ File '{file_name}' đã được lưu vào '{collection_name}' với ID: {file_id}")
client.close()
return file_id
def delete_pptx_from_mongodb(file_id, db_name="ppt", collection_name="root_file"):
"""
Xóa file PowerPoint khỏi MongoDB theo ID.
:param file_id: ID của file cần xóa (chuỗi hoặc ObjectId)
:param db_name: Tên database trong MongoDB
:param collection_name: Tên collection GridFS
"""
# Kết nối đến MongoDB
client = MongoClient("mongodb+srv://admin:[email protected]/?retryWrites=true&w=majority&appName=Cluster0")
db = client[db_name]
fs = gridfs.GridFS(db, collection=collection_name)
try:
# Chuyển đổi ID nếu cần
if not isinstance(file_id, ObjectId):
file_id = ObjectId(file_id)
# Kiểm tra file có tồn tại không
if fs.exists(file_id):
fs.delete(file_id)
print(f"✅ Đã xóa file với ID: {file_id}")
else:
print(f"⚠️ Không tìm thấy file với ID: {file_id}")
except Exception as e:
print(f"❌ Lỗi khi xóa file: {e}")
client.close()
def download_pptx_from_mongodb(file_id, save_path, save_name, db_name="ppt", collection_name="root_file"):
"""
Tải file PowerPoint từ MongoDB GridFS và lưu về máy.
:param file_id: ID của file cần tải (dạng chuỗi hoặc ObjectId)
:param save_path: Đường dẫn đến thư mục sẽ lưu file (VD: 'D:/output')
:param save_name: Tên file khi lưu (VD: 'my_presentation.pptx')
:param db_name: Tên database trong MongoDB (mặc định: 'ppt')
:param collection_name: Tên collection GridFS (mặc định: 'root_file')
"""
# Đảm bảo thư mục lưu file tồn tại
os.makedirs(save_path, exist_ok=True)
# Tạo đường dẫn đầy đủ cho file
full_file_path = os.path.join(save_path, save_name)
# Kết nối đến MongoDB
client = MongoClient("mongodb+srv://admin:[email protected]/?retryWrites=true&w=majority&appName=Cluster0")
db = client[db_name]
fs = gridfs.GridFS(db, collection=collection_name)
try:
# Chuyển đổi ID nếu cần
if not isinstance(file_id, ObjectId):
file_id = ObjectId(file_id)
# Lấy dữ liệu file từ GridFS
file_data = fs.get(file_id)
# Ghi dữ liệu ra file
with open(full_file_path, "wb") as f:
f.write(file_data.read())
print(f"✅ File đã được tải về: {full_file_path}")
except Exception as e:
print(f"❌ Lỗi khi tải file: {e}")
finally:
client.close()
def save_xml_to_gridfs(xml_content, file_name, db_name="ppt", collection_name="original_xml"):
"""
Lưu XML vào MongoDB GridFS.
:param xml_content: Chuỗi XML cần lưu
:param file_name: Tên file XML
:param db_name: Tên database MongoDB
:param collection_name: Tên collection GridFS
"""
client = MongoClient("mongodb+srv://admin:[email protected]/?retryWrites=true&w=majority&appName=Cluster0")
db = client[db_name]
fs = gridfs.GridFS(db, collection=collection_name)
# Kiểm tra file đã tồn tại chưa
existing_file = fs.find_one({"filename": file_name})
if existing_file:
print(f"⚠️ File '{file_name}' đã tồn tại trong GridFS. Không lưu lại.")
return
# Chuyển đổi chuỗi XML thành bytes và lưu vào GridFS
file_id = fs.put(xml_content.encode("utf-8"), filename=file_name)
print(f"✅ XML '{file_name}' đã được lưu vào GridFS với ID: {file_id}")
def fetch_file_from_mongodb(db_name, collection_name, file_id):
client = MongoClient("mongodb+srv://admin:[email protected]/?retryWrites=true&w=majority&appName=Cluster0") # Cập nhật nếu cần
db = client[db_name]
fs = gridfs.GridFS(db, collection_name)
try:
file_data = fs.get(file_id)
pptx_io = BytesIO(file_data.read())
pptx_io.seek(0) # Đặt lại vị trí đầu file
return pptx_io, file_data.filename
except Exception as e:
print(f"Lỗi khi lấy file từ MongoDB: {e}")
return None, None
def detect_file_type(uploaded_file):
if uploaded_file is not None:
try:
# Ưu tiên kiểm tra phần mở rộng trước
ext = os.path.splitext(uploaded_file.name)[1].lower()
ext_mapping = {
".csv": "CSV", ".docx": "Word", ".doc": "Word",
".xlsx": "Excel", ".pptx": "PPTX", ".pdf": "PDF"
}
detected_type = ext_mapping.get(ext)
if detected_type:
return detected_type # Nếu có trong danh sách, trả về ngay
# Nếu không có phần mở rộng hợp lệ, fallback vào MIME type
file_bytes = uploaded_file.read(4096)
mime = magic.Magic(mime=True)
file_type = mime.from_buffer(file_bytes)
mime_types = {
"application/pdf": "PDF",
"application/vnd.ms-powerpoint": "PPTX",
"application/vnd.openxmlformats-officedocument.presentationml.presentation": "PPTX",
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": "Excel",
"application/vnd.ms-excel": "Excel",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document": "Word",
"application/msword": "Word",
"text/csv": "CSV",
"text/plain": "CSV"
}
return mime_types.get(file_type, "Unknown")
except Exception as e:
print(f"Error detecting file type: {e}")
return "Unknown"
return None |