Lin / backend /services /auth_service.py
Zelyanoth's picture
Increase stdout maxBuffer length to prevent overflow
9bb4b11
raw
history blame
11.7 kB
from flask import current_app, request
from flask_jwt_extended import create_access_token, get_jwt
import bcrypt
from datetime import datetime, timedelta
from supabase import Client
from backend.models.user import User
from backend.utils.database import authenticate_user, create_user
def register_user(email: str, password: str) -> dict:
"""
Register a new user.
Args:
email (str): User email
password (str): User password
Returns:
dict: Registration result with user data or error message
"""
try:
# Check if user already exists in the profiles table
try:
profile_check_response = current_app.supabase.table("profiles").select("id").eq("email", email).execute()
if profile_check_response.data:
# A profile with this email already exists
return {
'success': False,
'message': 'account with this mail already exist'
}
except Exception as profile_check_error:
# Log the error but don't fail the registration just yet
# There might be a temporary database issue
current_app.logger.warning(f"Failed to check profiles table for email {email}: {str(profile_check_error)}")
# Optionally, you could return an error here if you want to be strict about this check
# return {'success': False, 'message': 'Unable to process registration at this time. Please try again later.'}
# If no profile found, proceed with Supabase Auth sign up
response = create_user(current_app.supabase, email, password)
if response.user:
user = User.from_dict({
'id': response.user.id,
'email': response.user.email,
'created_at': response.user.created_at,
'email_confirmed_at': response.user.email_confirmed_at
})
# Check if email is confirmed
if response.user.email_confirmed_at:
# Email is confirmed, user can login immediately
return {
'success': True,
'message': 'Account created successfully! You can now log in with your email and password.',
'user': user.to_dict(),
'email_confirmed': True
}
else:
# Email confirmation is required
return {
'success': True,
'message': 'Check your mail to confirm your account',
'user': user.to_dict(),
'email_confirmed': False,
'requires_confirmation': True
}
else:
return {
'success': False,
'message': 'Failed to register user'
}
except Exception as e:
# Log the full error for debugging
current_app.logger.error(f"Registration error for email {email}: {str(e)}")
# Check if it's a duplicate user error from Supabase Auth
error_str = str(e).lower()
if 'already registered' in error_str or 'already exists' in error_str:
# This is a fallback in case the profiles table check missed it or failed
return {
'success': False,
'message': 'account with this mail already exist'
}
elif 'invalid email' in error_str:
return {
'success': False,
'message': 'Please enter a valid email address.'
}
# More specific check for Supabase password policy errors
elif 'password' in error_str and ('weak' in error_str or 'policy' in error_str or 'requirement' in error_str):
return {
'success': False,
'message': 'Password does not meet requirements. Please use at least 8 characters.'
}
else:
# For other errors, provide a more generic message to the user but log the details
return {
'success': False,
'message': 'Registration failed. Please check your information and try again.'
}
def login_user(email: str, password: str, remember_me: bool = False) -> dict:
"""
Authenticate and login a user.
Args:
email (str): User email
password (str): User password
remember_me (bool): Remember me flag for extended session
Returns:
dict: Login result with token and user data or error message
"""
try:
# Authenticate user with Supabase
response = authenticate_user(current_app.supabase, email, password)
if response.user:
# Check if email is confirmed
if not response.user.email_confirmed_at:
return {
'success': False,
'message': 'Check your mail to confirm your account',
'requires_confirmation': True
}
# Set token expiration based on remember me flag
if remember_me:
# Extended token expiration (7 days)
expires_delta = timedelta(days=7)
token_type = "remember"
else:
# Standard token expiration (1 hour)
expires_delta = timedelta(hours=1)
token_type = "session"
# Create JWT token with proper expiration and claims
access_token = create_access_token(
identity=response.user.id,
additional_claims={
'email': response.user.email,
'email_confirmed_at': response.user.email_confirmed_at.isoformat() if response.user.email_confirmed_at else None,
'remember_me': remember_me,
'token_type': token_type
},
expires_delta=expires_delta
)
user = User.from_dict({
'id': response.user.id,
'email': response.user.email,
'created_at': response.user.created_at,
'email_confirmed_at': response.user.email_confirmed_at
})
return {
'success': True,
'token': access_token,
'user': user.to_dict(),
'rememberMe': remember_me,
'expiresAt': (datetime.now() + expires_delta).isoformat(),
'tokenType': token_type
}
else:
return {
'success': False,
'message': 'Invalid email or password. Please check your credentials and try again.'
}
except Exception as e:
current_app.logger.error(f"Login error: {str(e)}")
# Provide more specific error messages
error_str = str(e).lower()
if 'invalid credentials' in error_str or 'unauthorized' in error_str:
return {
'success': False,
'message': 'Password/email Incorrect'
}
elif 'email not confirmed' in error_str or 'email not verified' in error_str:
return {
'success': False,
'message': 'Check your mail to confirm your account',
'requires_confirmation': True
}
elif 'user not found' in error_str:
return {
'success': False,
'message': 'No account found with this email. Please check your email or register for a new account.'
}
else:
error_str = str(e).lower()
if 'invalid credentials' in error_str or 'unauthorized' in error_str:
return {
'success': False,
'message': 'Password/email Incorrect'
}
elif 'email not confirmed' in error_str or 'email not verified' in error_str:
return {
'success': False,
'message': 'Check your mail to confirm your account',
'requires_confirmation': True
}
elif 'user not found' in error_str:
return {
'success': False,
'message': 'No account found with this email. Please check your email or register for a new account.'
}
else:
return {
'success': False,
'message': 'Password/email Incorrect'
}
def get_user_by_id(user_id: str) -> dict:
"""
Get user by ID.
Args:
user_id (str): User ID
Returns:
dict: User data or None if not found
"""
try:
# Get user from Supabase Auth
response = current_app.supabase.auth.get_user(user_id)
if response.user:
user = User.from_dict({
'id': response.user.id,
'email': response.user.email,
'created_at': response.user.created_at,
'email_confirmed_at': response.user.email_confirmed_at
})
return user.to_dict()
else:
return None
except Exception:
return None
def request_password_reset(supabase: Client, email: str) -> dict:
"""
Request password reset for a user.
Args:
supabase (Client): Supabase client instance
email (str): User email
Returns:
dict: Password reset request result
"""
try:
# Request password reset
response = supabase.auth.reset_password_for_email(email)
return {
'success': True,
'message': 'Password reset instructions sent to your email. Please check your inbox.'
}
except Exception as e:
error_str = str(e).lower()
if 'user not found' in error_str:
# We don't want to reveal if a user exists or not for security reasons
# But we still return a success message to prevent user enumeration
return {
'success': True,
'message': 'If an account exists with this email, password reset instructions have been sent.'
}
else:
return {
'success': False,
'message': f'Failed to process password reset request: {str(e)}'
}
def reset_user_password(supabase: Client, token: str, new_password: str) -> dict:
"""
This function is deprecated. Password reset should be handled directly by the frontend
using the Supabase JavaScript client after the user is redirected from the reset email.
The standard Supabase v2 flow is:
1. User clicks reset link -> Supabase verifies token and establishes a recovery session.
2. User is redirected to the app (e.g., /reset-password).
3. Frontend uses supabase.auth.updateUser({ password: newPassword }) directly.
Args:
supabase (Client): Supabase client instance
token (str): Password reset token (not used in this implementation)
new_password (str): New password (not used in this implementation)
Returns:
dict: Message indicating this endpoint is deprecated
"""
return {
'success': False,
'message': 'Password reset should be handled by the frontend. Please update your frontend code to use the Supabase JavaScript client directly.'
}