kltn20133118 commited on
Commit
7c33a36
·
verified ·
1 Parent(s): 6422f3e

Update service/UserService.py

Browse files
Files changed (1) hide show
  1. service/UserService.py +321 -321
service/UserService.py CHANGED
@@ -1,322 +1,322 @@
1
- from datetime import timedelta, datetime
2
- from request import RequestUser as req_login
3
- from response import ResponseUser as res_login
4
- import requests
5
- import json, re
6
- from auth.authentication import signJWT
7
- from firebase_admin import credentials, auth, exceptions
8
- import firebase_admin
9
- from repository import UserLoginRepository, UserRepository, UserInfoRepository, OTPRepository
10
- import service.OTPService
11
- from function import support_function as sf
12
- from dotenv import load_dotenv
13
- import os
14
- from response import ResponseUser as res
15
- from response import ResponseDefault as res1
16
- load_dotenv()
17
- CLIENT_ID_GOOGLE = os.getenv('CLIENT_ID')
18
- API_SIGN_UP_FIREBASE_PATH = os.getenv('API_SIGN_UP_FIREBASE')
19
- regex = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,7}\b'
20
-
21
- def get_user1(email):
22
- try:
23
- user = auth.get_user_by_email(email)
24
- return user
25
- except exceptions.FirebaseError as e:
26
- return None
27
-
28
- def check_email(email):
29
- if isinstance(email, str) and re.fullmatch(regex, email):
30
- return True
31
- else:
32
- return False
33
- from pathlib import Path
34
- try:
35
- if not firebase_admin._apps:
36
- json_path = Path(__file__).resolve().parent / 'app' / 'firebase_certificate.json'
37
- cred = credentials.Certificate(str(json_path))
38
- fred = firebase_admin.initialize_app(cred)
39
- except:
40
- if not firebase_admin._apps:
41
- cred = credentials.Certificate("firebase_certificate.json")
42
- fred = firebase_admin.initialize_app(cred)
43
-
44
- def sign_up_with_email_and_password(email, password, username=None, return_secure_token=True):
45
- try:
46
- rest_api_url = "https://identitytoolkit.googleapis.com/v1/accounts:signUp"
47
- payload = {
48
- "email": email,
49
- "password": password,
50
- "returnSecureToken": return_secure_token
51
- }
52
- if username:
53
- payload["displayName"] = username
54
- payload = json.dumps(payload)
55
- r = requests.post(rest_api_url, params={"key": API_SIGN_UP_FIREBASE_PATH}, data=payload)
56
- try:
57
- return r.json()['email']
58
- except Exception as e:
59
- pass
60
- except Exception as e:
61
- pass
62
-
63
- def sign_in_with_email_and_password(email=None, password=None, return_secure_token=True):
64
- rest_api_url = "https://identitytoolkit.googleapis.com/v1/accounts:signInWithPassword"
65
- try:
66
- payload = {
67
- "returnSecureToken": return_secure_token
68
- }
69
- if email:
70
- payload["email"] = email
71
- if password:
72
- payload["password"] = password
73
- payload = json.dumps(payload)
74
- r = requests.post(rest_api_url, params={"key": API_SIGN_UP_FIREBASE_PATH}, data=payload)
75
- r.raise_for_status()
76
- data = r.json()
77
- if 'idToken' in data:
78
- return data['email']
79
- else:
80
- return False
81
- except requests.exceptions.RequestException as e:
82
- print(f"Error signing in: {e}")
83
- return False
84
-
85
- def update_info_user(uid, email=None, user_name=None, photo_url=None):
86
- user_data = {}
87
- if email is not None:
88
- user_data['email'] = email
89
- if user_name is not None:
90
- user_data['display_name'] = user_name
91
- if photo_url is not None and photo_url != 'N/A':
92
- user_data['photo_url'] = photo_url
93
- if user_data:
94
- auth.update_user(uid, **user_data)
95
-
96
- async def update_user_info(request: req_login.RequestUpdateUserInfo):
97
- try:
98
- email = request.email
99
- user_id = request.user_id
100
- email_check = sf.check_email_service(user_id)
101
- if isinstance(email_check, res1.ReponseError):
102
- return email_check
103
- check_email_fc = sf.check_email_empty_invalid(email)
104
- if check_email_fc is not True:
105
- return check_email_fc
106
- user = get_user1(email)
107
- if user:
108
- user_info = UserInfoRepository.getUserInfo(user_id)
109
- if user_info:
110
- UserInfoRepository.updateUserInfo(
111
- request.user_id,
112
- request.uid,
113
- request.email,
114
- request.display_name,
115
- request.photo_url)
116
- else:
117
- UserInfoRepository.addUserInfo(
118
- request.uid,
119
- request.email,
120
- request.display_name,
121
- request.photo_url
122
- )
123
- update_info_user(request.uid,
124
- request.email,
125
- request.display_name,
126
- request.photo_url
127
- )
128
- return res_login.ResponseUpdateUserInfo(status=200,
129
- data = res_login.Message(message=f"User info updated successfully"))
130
- else:
131
- return res_login.ReponseError(
132
- status = 404,
133
- data = res_login.Message(message="Not found user")
134
- )
135
- except:
136
- return res_login.ReponseError(
137
- status=500,
138
- data=res_login.Message(message="Server Error")
139
- )
140
-
141
- async def check_info_google(request: req_login.RequestCheckInfoGoogle):
142
- try:
143
- user_id = request.user_id
144
- check = UserRepository.getEmailUserByIdFix(user_id)
145
- if check is None:
146
- return res_login.ReponseError(
147
- status = 404,
148
- data = res_login.Message(message="user_id not exist")
149
- )
150
- email = sf.check_email_service(str(user_id))
151
- if isinstance(email, res.ReponseError):
152
- return email
153
- user_info = UserInfoRepository.getUserInfo(user_id)
154
- if user_info is not None:
155
- email_check = True
156
- else:
157
- email_check = False
158
- if email_check is not None:
159
- return res_login.ResponseCheckInfoGoogle(status= 200,data = res_login.CheckModel(check=email_check))
160
- except:
161
- return res_login.ReponseError(
162
- status=500,
163
- data=res_login.Message(message="Server Error")
164
- )
165
-
166
- async def check_info_google_email(request: req_login.RequestCheckInfoGoogleEmail):
167
- try:
168
- email = request.email
169
- check_email_fc = sf.check_email_empty_invalid(email)
170
- if check_email_fc is not True:
171
- return check_email_fc
172
- user_info = UserInfoRepository.getUserInfoByEmail(email)
173
- if user_info is not None:
174
- email_check = True
175
- else:
176
- email_check = False
177
- if email_check is not None:
178
- return res_login.ResponseCheckInfoGoogle(status= 200,data = res_login.CheckModel(check=email_check))
179
- except:
180
- return res_login.ReponseError(
181
- status=500,
182
- data=res_login.Message(message="Server Error")
183
- )
184
-
185
- async def check_state_login(request: req_login.RequestCheckStateLogin):
186
- try:
187
- user_id = request.user_id
188
- session_id_now = request.session_id_now
189
- email = sf.check_email_service(user_id)
190
- if isinstance(email, res1.ReponseError):
191
- return email
192
- elif session_id_now is None or session_id_now=="":
193
- return res_login.ReponseError(
194
- status= 400,
195
- data =res_login.Message(message="session_id is empty")
196
- )
197
- user = get_user1(email)
198
- if user:
199
- check1 = False
200
- session_id = UserLoginRepository.getUserSessionIdByUserEmail(user_id)
201
- print(f"session_id: {session_id}")
202
- if session_id != session_id_now:
203
- check1 = False
204
- else:
205
- check1 = True
206
- return res_login.ResponseCheckInfoGoogle(status= 200,data = res_login.CheckModel(check = check1))
207
- else:
208
- return res_login.ReponseError(
209
- status=404,
210
- data =res_login.Message(message="Not found user")
211
- )
212
- except:
213
- return res_login.ReponseError(
214
- status=500,
215
- data=res_login.Message(message="Server Error")
216
- )
217
-
218
- import string, random
219
- def generate_otp(length=6):
220
- characters = string.ascii_uppercase + string.digits
221
- otp = ''.join(random.choice(characters) for _ in range(length))
222
- return otp
223
-
224
- def createOTPReset(email):
225
- otp = generate_otp()
226
- check_email_fc = sf.check_email_empty_invalid(email)
227
- if check_email_fc is not True:
228
- return check_email_fc
229
- OTPRepository.addOTP(email,otp)
230
- return otp
231
-
232
- async def reset_password(request: req_login.RequestResetPassword):
233
- try:
234
- email = request.email
235
- check_email_fc = sf.check_email_empty_invalid(email)
236
- if check_email_fc is not True:
237
- return check_email_fc
238
- try:
239
- user = get_user1(email)
240
- if user is not None:
241
- otp = createOTPReset(email)
242
- return res_login.ResponseCreateOTP(
243
- status= 200,
244
- data= res_login.CheckModel(check = True),
245
- otp = otp
246
- )
247
- else:
248
- return res_login.ReponseError(
249
- status= 404,
250
- data =res_login.Message(message="Email not exist")
251
- )
252
- except auth.UserNotFoundError as e:
253
- return res_login.ReponseError(
254
- status=500,
255
- data =res_login.Message(message=str(e))
256
- )
257
- except:
258
- return res_login.ReponseError(
259
- status=500,
260
- data=res_login.Message(message="Server Error")
261
- )
262
-
263
- async def change_password(request: req_login.RequestChangePassword):
264
- try:
265
- user_id = request.user_id
266
- email = sf.check_email_service(user_id)
267
- new_password = request.new_password
268
- current_password= request.current_password
269
- confirm_new_password = request.confirm_new_password
270
- if isinstance(email, res1.ReponseError):
271
- return email
272
- if new_password is None:
273
- return res_login.ReponseError(
274
- status=400,
275
- data =res_login.Message(message="new_password is empty")
276
- )
277
- if current_password is None or confirm_new_password == "":
278
- return res_login.ReponseError(
279
- status=400,
280
- data =res_login.Message(message="current_password is empty")
281
- )
282
- if confirm_new_password is None or confirm_new_password == "":
283
- return res_login.ReponseError(
284
- status=400,
285
- data =res_login.Message(message="confirm_new_password is empty")
286
- )
287
- if current_password == new_password:
288
- return res_login.ReponseError(
289
- status=400,
290
- data=res_login.Message(message="The new_password and the current_password must be different")
291
- )
292
- if confirm_new_password != new_password:
293
- return res_login.ReponseError(
294
- status=400,
295
- data=res_login.Message(message="The new_password and the confirm_new_password must be similar")
296
- )
297
- user = sign_in_with_email_and_password(email, current_password)
298
- try:
299
- if user:
300
- user_email = auth.get_user_by_email(email)
301
- auth.update_user(
302
- user_email.uid,
303
- password=new_password
304
- )
305
- return res_login.ResponseChangePassword(
306
- status= 200,
307
- data = res_login.Message(message=f"Update password success"))
308
- else:
309
- return res_login.ReponseError(
310
- status=400,
311
- data =res_login.Message(message="Current password not valid")
312
- )
313
- except :
314
- return res_login.ReponseError(
315
- status=500,
316
- data =res_login.Message(message="Server Error")
317
- )
318
- except:
319
- return res_login.ReponseError(
320
- status=500,
321
- data=res_login.Message(message="Server Error!!")
322
  )
 
1
+ from datetime import timedelta, datetime
2
+ from request import RequestUser as req_login
3
+ from response import ResponseUser as res_login
4
+ import requests
5
+ import json, re
6
+ from auth.authentication import signJWT
7
+ from firebase_admin import credentials, auth, exceptions
8
+ import firebase_admin
9
+ from repository import UserLoginRepository, UserRepository, UserInfoRepository, OTPRepository
10
+ import service.OTPService
11
+ from function import support_function as sf
12
+ from dotenv import load_dotenv
13
+ import os
14
+ from response import ResponseUser as res
15
+ from response import ResponseDefault as res1
16
+ load_dotenv()
17
+ CLIENT_ID_GOOGLE = os.getenv('CLIENT_ID')
18
+ API_SIGN_UP_FIREBASE_PATH = os.getenv('API_SIGN_UP_FIREBASE')
19
+ regex = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,7}\b'
20
+
21
+ def get_user1(email):
22
+ try:
23
+ user = auth.get_user_by_email(email)
24
+ return user
25
+ except exceptions.FirebaseError as e:
26
+ return None
27
+
28
+ def check_email(email):
29
+ if isinstance(email, str) and re.fullmatch(regex, email):
30
+ return True
31
+ else:
32
+ return False
33
+ from pathlib import Path
34
+ try:
35
+ if not firebase_admin._apps:
36
+ json_path = Path(__file__).resolve().parent / 'app' / 'firebase_certificate.json'
37
+ cred = credentials.Certificate(str(json_path))
38
+ fred = firebase_admin.initialize_app(cred)
39
+ except:
40
+ if not firebase_admin._apps:
41
+ cred = credentials.Certificate("firebase_certificate.json")
42
+ fred = firebase_admin.initialize_app(cred)
43
+
44
+ def sign_up_with_email_and_password(email, password, username=None, return_secure_token=True):
45
+ try:
46
+ rest_api_url = "https://identitytoolkit.googleapis.com/v1/accounts:signUp"
47
+ payload = {
48
+ "email": email,
49
+ "password": password,
50
+ "returnSecureToken": return_secure_token
51
+ }
52
+ if username:
53
+ payload["displayName"] = username
54
+ payload = json.dumps(payload)
55
+ r = requests.post(rest_api_url, params={"key": API_SIGN_UP_FIREBASE_PATH}, data=payload)
56
+ try:
57
+ return r.json()['email']
58
+ except Exception as e:
59
+ pass
60
+ except Exception as e:
61
+ pass
62
+
63
+ def sign_in_with_email_and_password(email=None, password=None, return_secure_token=True):
64
+ rest_api_url = "https://identitytoolkit.googleapis.com/v1/accounts:signInWithPassword"
65
+ try:
66
+ payload = {
67
+ "returnSecureToken": return_secure_token
68
+ }
69
+ if email:
70
+ payload["email"] = email
71
+ if password:
72
+ payload["password"] = password
73
+ payload = json.dumps(payload)
74
+ r = requests.post(rest_api_url, params={"key": API_SIGN_UP_FIREBASE_PATH}, data=payload)
75
+ r.raise_for_status()
76
+ data = r.json()
77
+ if 'idToken' in data:
78
+ return data['email']
79
+ else:
80
+ return False
81
+ except requests.exceptions.RequestException as e:
82
+ print(f"Error signing in: {e}")
83
+ return False
84
+
85
+ def update_info_user(uid, email=None, user_name=None, photo_url=None):
86
+ user_data = {}
87
+ if email is not None:
88
+ user_data['email'] = email
89
+ if user_name is not None:
90
+ user_data['display_name'] = user_name
91
+ if photo_url is not None and photo_url != 'N/A':
92
+ user_data['photo_url'] = photo_url
93
+ if user_data:
94
+ auth.update_user(uid, **user_data)
95
+
96
+ async def update_user_info(request: req_login.RequestUpdateUserInfo):
97
+ try:
98
+ email = request.email
99
+ user_id = request.user_id
100
+ email_check = sf.check_email_service(user_id)
101
+ if isinstance(email_check, res1.ReponseError):
102
+ return email_check
103
+ check_email_fc = sf.check_email_empty_invalid(email)
104
+ if check_email_fc is not True:
105
+ return check_email_fc
106
+ user = get_user1(email)
107
+ if user:
108
+ user_info = UserInfoRepository.getUserInfo(user_id)
109
+ if user_info:
110
+ UserInfoRepository.updateUserInfo(
111
+ request.user_id,
112
+ request.uid,
113
+ request.email,
114
+ request.display_name,
115
+ request.photo_url)
116
+ else:
117
+ UserInfoRepository.addUserInfo(
118
+ request.uid,
119
+ request.email,
120
+ request.display_name,
121
+ request.photo_url
122
+ )
123
+ update_info_user(request.uid,
124
+ request.email,
125
+ request.display_name,
126
+ request.photo_url
127
+ )
128
+ return res_login.ResponseUpdateUserInfo(status=200,
129
+ data = res_login.Message(message=f"User info updated successfully"))
130
+ else:
131
+ return res_login.ReponseError(
132
+ status = 404,
133
+ data = res_login.Message(message="Not found user")
134
+ )
135
+ except:
136
+ return res_login.ReponseError(
137
+ status=500,
138
+ data=res_login.Message(message="Server Error")
139
+ )
140
+
141
+ async def check_info_google(request: req_login.RequestCheckInfoGoogle):
142
+ try:
143
+ user_id = request.user_id
144
+ check = UserRepository.getEmailUserByIdFix(user_id)
145
+ if check is None:
146
+ return res_login.ReponseError(
147
+ status = 404,
148
+ data = res_login.Message(message="user_id not exist")
149
+ )
150
+ email = sf.check_email_service(str(user_id))
151
+ if isinstance(email, res.ReponseError):
152
+ return email
153
+ user_info = UserInfoRepository.getUserInfo(user_id)
154
+ if user_info is not None:
155
+ email_check = True
156
+ else:
157
+ email_check = False
158
+ if email_check is not None:
159
+ return res_login.ResponseCheckInfoGoogle(status= 200,data = res_login.CheckModel(check=email_check))
160
+ except:
161
+ return res_login.ReponseError(
162
+ status=500,
163
+ data=res_login.Message(message="Server Error")
164
+ )
165
+
166
+ async def check_info_google_email(request: req_login.RequestCheckInfoGoogleEmail):
167
+ try:
168
+ email = request.email
169
+ check_email_fc = sf.check_email_empty_invalid(email)
170
+ if check_email_fc is not True:
171
+ return check_email_fc
172
+ user_info = UserInfoRepository.getUserInfoByEmail(email)
173
+ if user_info is not None:
174
+ email_check = True
175
+ else:
176
+ email_check = False
177
+ if email_check is not None:
178
+ return res_login.ResponseCheckInfoGoogle(status= 200,data = res_login.CheckModel(check=email_check))
179
+ except:
180
+ return res_login.ReponseError(
181
+ status=500,
182
+ data=res_login.Message(message="Server Error")
183
+ )
184
+
185
+ async def check_state_login(request: req_login.RequestCheckStateLogin):
186
+ try:
187
+ user_id = request.user_id
188
+ session_id_now = request.session_id_now
189
+ email = sf.check_email_service(user_id)
190
+ if isinstance(email, res1.ReponseError):
191
+ return email
192
+ elif session_id_now is None or session_id_now=="":
193
+ return res_login.ReponseError(
194
+ status= 400,
195
+ data =res_login.Message(message="session_id is empty")
196
+ )
197
+ user = get_user1(email)
198
+ if user:
199
+ check1 = False
200
+ session_id = UserLoginRepository.getUserSessionIdByUserEmail(user_id)
201
+ print(f"session_id: {session_id}")
202
+ if session_id != session_id_now:
203
+ check1 = False
204
+ else:
205
+ check1 = True
206
+ return res_login.ResponseCheckInfoGoogle(status= 200,data = res_login.CheckModel(check = check1))
207
+ else:
208
+ return res_login.ReponseError(
209
+ status=404,
210
+ data =res_login.Message(message="Not found user")
211
+ )
212
+ except:
213
+ return res_login.ReponseError(
214
+ status=500,
215
+ data=res_login.Message(message="Server Error")
216
+ )
217
+
218
+ import string, random
219
+ def generate_otp(length=6):
220
+ characters = string.ascii_uppercase + string.digits
221
+ otp = ''.join(random.choice(characters) for _ in range(length))
222
+ return otp
223
+
224
+ def createOTPReset(email):
225
+ otp = generate_otp()
226
+ check_email_fc = sf.check_email_empty_invalid(email)
227
+ if check_email_fc is not True:
228
+ return check_email_fc
229
+ OTPRepository.addOTP(email,otp)
230
+ return otp
231
+
232
+ def reset_password(request: req_login.RequestResetPassword):
233
+ try:
234
+ email = request.email
235
+ check_email_fc = sf.check_email_empty_invalid(email)
236
+ if check_email_fc is not True:
237
+ return check_email_fc
238
+ try:
239
+ user = get_user1(email)
240
+ if user is not None:
241
+ otp = createOTPReset(email)
242
+ return res_login.ResponseCreateOTP(
243
+ status= 200,
244
+ data= res_login.CheckModel(check = True),
245
+ otp = otp
246
+ )
247
+ else:
248
+ return res_login.ReponseError(
249
+ status= 404,
250
+ data =res_login.Message(message="Email not exist")
251
+ )
252
+ except auth.UserNotFoundError as e:
253
+ return res_login.ReponseError(
254
+ status=500,
255
+ data =res_login.Message(message=str(e))
256
+ )
257
+ except:
258
+ return res_login.ReponseError(
259
+ status=500,
260
+ data=res_login.Message(message="Server Error")
261
+ )
262
+
263
+ async def change_password(request: req_login.RequestChangePassword):
264
+ try:
265
+ user_id = request.user_id
266
+ email = sf.check_email_service(user_id)
267
+ new_password = request.new_password
268
+ current_password= request.current_password
269
+ confirm_new_password = request.confirm_new_password
270
+ if isinstance(email, res1.ReponseError):
271
+ return email
272
+ if new_password is None:
273
+ return res_login.ReponseError(
274
+ status=400,
275
+ data =res_login.Message(message="new_password is empty")
276
+ )
277
+ if current_password is None or confirm_new_password == "":
278
+ return res_login.ReponseError(
279
+ status=400,
280
+ data =res_login.Message(message="current_password is empty")
281
+ )
282
+ if confirm_new_password is None or confirm_new_password == "":
283
+ return res_login.ReponseError(
284
+ status=400,
285
+ data =res_login.Message(message="confirm_new_password is empty")
286
+ )
287
+ if current_password == new_password:
288
+ return res_login.ReponseError(
289
+ status=400,
290
+ data=res_login.Message(message="The new_password and the current_password must be different")
291
+ )
292
+ if confirm_new_password != new_password:
293
+ return res_login.ReponseError(
294
+ status=400,
295
+ data=res_login.Message(message="The new_password and the confirm_new_password must be similar")
296
+ )
297
+ user = sign_in_with_email_and_password(email, current_password)
298
+ try:
299
+ if user:
300
+ user_email = auth.get_user_by_email(email)
301
+ auth.update_user(
302
+ user_email.uid,
303
+ password=new_password
304
+ )
305
+ return res_login.ResponseChangePassword(
306
+ status= 200,
307
+ data = res_login.Message(message=f"Update password success"))
308
+ else:
309
+ return res_login.ReponseError(
310
+ status=400,
311
+ data =res_login.Message(message="Current password not valid")
312
+ )
313
+ except :
314
+ return res_login.ReponseError(
315
+ status=500,
316
+ data =res_login.Message(message="Server Error")
317
+ )
318
+ except:
319
+ return res_login.ReponseError(
320
+ status=500,
321
+ data=res_login.Message(message="Server Error!!")
322
  )