pychatbot / service /FileService.py
kltn20133118's picture
Upload 258 files
13ba451 verified
from request import RequestFile as req
from response import ResponseFile as res
from response import ResponseDefault as res1
import function.dropbox as sf_dropbox
import os
import shutil
import re
from repository import UserRepository
from function import support_function as sf
ALLOWED_EXTENSIONS = {'csv', 'txt', 'doc', 'docx', 'pdf', 'xlsx', 'pptx', 'json','md'}
regex = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,7}\b'
def check_email(email):
if(re.fullmatch(regex, email)):
return True
else:
return False
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
async def listNameFiles(request: req.RequestGetNameFile ):
try:
user_id = request.user_id
email = sf.check_email_service(user_id)
if isinstance(email,res1.ReponseError):
return email
list_files = sf_dropbox.list_files(email)
return res.ResponseGetNameFile(
status= 200,
data = res.DataGetNameFile(files=list_files)
)
except:
return res.ReponseError(
status=500,
data =res.Message(message="Server Error")
)
async def deleteFile(request: req.RequestDeleteFile):
try:
user_id = request.user_id
name_file = request.name_file
email = sf.check_email_service(user_id)
if isinstance(email, res1.ReponseError):
return email
if name_file is None or name_file == "":
return res.ReponseError(
status=400,
data =res.Message(message="Name file is empty")
)
sf_dropbox.delete_file(email,name_file)
return res.ResponseDeleteFile(
status=200,
data =res.Message(message=f"delete {name_file} success")
)
except:
return res.ReponseError(
status=500,
data =res.Message(message=f"delete {name_file} error")
)
async def download_folder(request:req.RequestDownLoadFolder):
try:
user_id = request.user_id
email = sf.check_email_service(user_id)
if isinstance(email, res1.ReponseError):
return email
sf_dropbox.download_folder(email)
return res.ResponseDownloadFolder(
status=200,
data =res.Message(message=f"Downloaded folder {email} success")
)
except:
return res.ReponseError(
status=500,
data =res.Message(message=f"Server error")
)
async def download_file(request:req.RequestDownLoadFile):
try:
user_id = request.user_id
name_file = request.name_file
email = sf.check_email_service(user_id)
if isinstance(email, res1.ReponseError):
return email
if name_file is None or name_file == "":
return res.ReponseError(
status=400,
data =res.Message(message="name_file is empty")
)
sf_dropbox.search_and_download_file(name_file,email)
return res.ResponseDownloadFile(
status=200,
data =res.Message(message=f"Downloaded file '{name_file}' by email: '{email}' success")
)
except:
return res.ReponseError(
status=500,
data =res.Message(message=f"Server error")
)
ALLOWED_EXTENSIONS = {'csv', 'txt', 'doc', 'docx', 'pdf', 'xlsx', 'pptx', 'json','md'}
def allowed_file1(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
async def upload_files(request: req.RequestUploadFile):
try:
user_id = request.user_id
files = request.files
email = sf.check_email_service(user_id)
if isinstance(email, res1.ReponseError):
return email
for file in files:
if not allowed_file(file.filename):
return res.ReponseError(
status=415,
data =res.Message(message=f"File type not allow")
)
temp_dir = f"/code/temp/{email}"
os.makedirs(temp_dir, exist_ok=True)
file_path = os.path.join(temp_dir, file.filename)
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
cloud_path = f"/{email}/{file.filename}"
sf_dropbox.upload_file(file_path, cloud_path)
return res.ResponseUploadedFile(
status=200,
data =res.Message(message=f"Load file success")
)
except:
return res.ReponseError(
status=500,
data =res.Message(message=f"Load file error")
)
async def deleteAllFile(request: req.RequestDeleteAllFile):
try:
user_id = request.user_id
email = sf.check_email_service(user_id)
if isinstance(email, res.ReponseError):
return email
sf_dropbox.delete_all_files_in_folder(email)
return res.ResponseDeleteAllFile(
status=200,
data=res.Message(message=f"Delete all file success")
)
except:
return res.ReponseError(
status=500,
data=res.Message(message=f"Delete all file error")
)