data_access / src /services /user_router.py
QuentinL52's picture
Update src/services/user_router.py
32c9c7b verified
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update
from src.core.database import get_db
from src.models.postgres.user_model import User
from pydantic import BaseModel, Field
from typing import Optional
router = APIRouter()
class UserCreate(BaseModel):
email: str
hashed_password: str
name: Optional[str] = None
picture_url: Optional[str] = None
google_id: Optional[str] = None
candidate_mongo_id: Optional[str] = None
class UserUpdate(BaseModel):
email: Optional[str] = None
hashed_password: Optional[str] = None
name: Optional[str] = None
picture_url: Optional[str] = None
google_id: Optional[str] = None
candidate_mongo_id: Optional[str] = None
class UserResponse(BaseModel):
id: int
email: str
name: Optional[str] = None
picture_url: Optional[str] = None
google_id: Optional[str] = None
candidate_mongo_id: Optional[str] = None
class Config:
from_attributes = True
@router.post("/users", response_model=UserResponse)
async def create_user(user: UserCreate, db: AsyncSession = Depends(get_db)):
db_user = User(**user.model_dump())
db.add(db_user)
await db.commit()
await db.refresh(db_user)
return db_user
@router.get("/users/{user_id}", response_model=UserResponse)
async def get_user_by_id(user_id: int, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if user is None:
raise HTTPException(status_code=404, detail="User not found")
return user
@router.get("/users/email/{email}", response_model=UserResponse)
async def get_user_by_email(email: str, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(User).where(User.email == email))
user = result.scalar_one_or_none()
if user is None:
raise HTTPException(status_code=404, detail="User not found")
return user
@router.put("/users/{user_id}", response_model=UserResponse)
async def update_user(user_id: int, user: UserUpdate, db: AsyncSession = Depends(get_db)):
query = update(User).where(User.id == user_id).values(**user.model_dump(exclude_unset=True))
await db.execute(query)
await db.commit()
return await get_user_by_id(user_id, db)