In [1]:
%%capture
%pip install google-genai 

In [17]:
import os

from google import genai
from google.genai import types
from IPython.display import Markdown
from IPython.display import Markdown

API_KEY = os.environ.get("GEMINI_API_KEY")

client = genai.Client(api_key=API_KEY)

# Load the Python file as text

file_path = "secure_app.py"
with open(file_path, "r") as file:
    doc_data = file.read()
prompt = "Please integrate user management into the FastAPI application."

contents = [
    types.Part.from_bytes(
        data=doc_data.encode("utf-8"),
        mime_type="text/x-python",
    ),
    prompt,
]

chat = client.aio.chats.create(
    model="gemini-2.5-pro-exp-03-25",
    config=types.GenerateContentConfig(
        tools=[types.Tool(code_execution=types.ToolCodeExecution)]
    ),
)

response = await chat.send_message(contents)
Markdown(response.text)


Okay, let's integrate user management into the FastAPI application using OAuth2 with Password Flow and JWT tokens. This is a standard approach for securing APIs where users log in with a username and password.

We will:

1.  **Add necessary dependencies:** `python-jose[cryptography]` for JWT handling and `passlib[bcrypt]` for password hashing.
2.  **Define User Models:** Using Pydantic for request/response validation.
3.  **Implement Password Hashing:** Securely store and verify passwords.
4.  **Create JWT Utilities:** Functions to create and decode access tokens.
5.  **Set up OAuth2 Scheme:** Configure FastAPI's security utilities.
6.  **Implement Authentication Logic:** Create functions to authenticate users and get the current logged-in user from a token.
7.  **Add Login Endpoint:** Create a `/token` endpoint for users to exchange credentials for a JWT.
8.  **Protect Endpoints:** Modify existing endpoints to require authentication via JWT.
9.  **Use an In-Memory User "Database":** For simplicity in this example. **Note:** For production, replace this with a proper database (SQL or NoSQL).
10. **Update Environment Variables:** Add a `SECRET_KEY` for JWT signing.

**Step 1: Add necessary configurations to `.env`**

Make sure your `.env` file includes a `SECRET_KEY`:

```.env
API_KEY=your_existing_api_key # This might become redundant or used differently
SECRET_KEY=your_very_strong_secret_key_here # e.g., generate using: openssl rand -hex 32
ALGORITHM=HS256
ACCESS_TOKEN_EXPIRE_MINUTES=30
```

**Step 2: Modify the Python Code**

Here's the updated code (`secure_app_with_users.py`). I've added comments explaining the changes.

```python
# secure_app_with_users.py
import io
import os
import logging
from datetime import datetime, timedelta, timezone
from typing import Optional, List, Dict, Any

import torch
import torch.nn as nn
import torchvision.transforms as transforms
import uvicorn
from fastapi import FastAPI, File, HTTPException, UploadFile, Depends, Query, status
from fastapi.responses import JSONResponse
# Removed APIKeyHeader, replaced with OAuth2
# from fastapi.security.api_key import APIKeyHeader
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from dotenv import load_dotenv
from PIL import Image, UnidentifiedImageError
from torchvision import models
from pydantic import BaseModel

# --- User Management Imports ---
from passlib.context import CryptContext
from jose import JWTError, jwt

# Load environment variables from .env file
if not load_dotenv():
    # Try loading from parent directory if running from a subdirectory
    if not load_dotenv("../.env"):
         raise ValueError("Failed to load .env file from current or parent directory")


# --- User Management Configuration ---
SECRET_KEY = os.getenv("SECRET_KEY")
ALGORITHM = os.getenv("ALGORITHM", "HS256")
ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES", 30))

if not SECRET_KEY:
    raise ValueError("SECRET_KEY environment variable not set in .env file")

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") # Points to the login endpoint

# --- User Models ---
class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None

class UserInDB(User):
    hashed_password: str

class Token(BaseModel):
    access_token: str
    token_type: str

class TokenData(BaseModel):
    username: Optional[str] = None

# --- Mock User Database (Replace with real DB in production) ---
# Store hashed passwords!
fake_users_db = {
    "testuser": {
        "username": "testuser",
        "full_name": "Test User",
        "email": "test@example.com",
        # Replace "secret" with a desired password, it will be hashed below
        "hashed_password": pwd_context.hash("secret"),
        "disabled": False,
    }
}

# --- Utility Functions ---
def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)

def get_password_hash(password):
    return pwd_context.hash(password)

def get_user(db, username: str) -> Optional[UserInDB]:
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)
    return None

def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.now(timezone.utc) + expires_delta
    else:
        expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        logger.warning("JWTError during token decoding")
        raise credentials_exception

    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        logger.warning(f"User '{token_data.username}' from token not found")
        raise credentials_exception
    return User(**user.dict()) # Return basic User model, not UserInDB

async def get_current_active_user(current_user: User = Depends(get_current_user)) -> User:
    if current_user.disabled:
        logger.warning(f"Attempt to use disabled user account: {current_user.username}")
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user

# --- FastAPI App Initialization ---
app = FastAPI(
    title="CIFAR10 Image Classification APP with User Auth",
    description="A production-ready API for image classification using a fine-tuned model on CIFAR10, secured with OAuth2.",
)

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# --- Model Loading --- (Same as before)
class_names = [
    "airplane", "automobile", "bird", "cat", "deer",
    "dog", "frog", "horse", "ship", "truck",
]
num_classes = len(class_names)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model_path = "finetuned_model.pth"
if not os.path.exists(model_path):
    # Attempt relative path for common deployment scenarios
    model_path = os.path.join(os.path.dirname(__file__), model_path)
    if not os.path.exists(model_path):
        raise FileNotFoundError(f"Model file not found at specified path: {model_path}")

try:
    model = models.resnet18(weights=None) # Changed weights=None as per original code
    num_ftrs = model.fc.in_features
    model.fc = nn.Linear(num_ftrs, num_classes)
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.to(device)
    model.eval()
    logger.info(f"Model loaded successfully from {model_path} onto {device}")
except Exception as e:
    logger.error(f"Error loading model: {e}")
    raise RuntimeError(f"Could not load the model from {model_path}")


# Preprocessing transforms (Same as before)
preprocess = transforms.Compose(
    [
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
    ]
)

# --- API Endpoints ---

@app.post("/token", response_model=Token, tags=["Authentication"])
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    """
    Authenticate user and return an access token.
    """
    user = get_user(fake_users_db, form_data.username)
    if not user or not verify_password(form_data.password, user.hashed_password):
        logger.warning(f"Authentication failed for user: {form_data.username}")
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    if user.disabled:
        logger.warning(f"Authentication attempt for disabled user: {form_data.username}")
        raise HTTPException(status_code=400, detail="Inactive user")

    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    logger.info(f"Token generated for user: {form_data.username}")
    return {"access_token": access_token, "token_type": "bearer"}

@app.get("/users/me", response_model=User, tags=["Users"])
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    """
    Get information about the currently logged-in user.
    """
    return current_user

@app.get("/health", summary="Health Check", tags=["Status"])
async def health_check():
    """Endpoint for checking if the API is running."""
    # Added check for model status based on successful loading
    model_status = "loaded" if 'model' in globals() and isinstance(model, nn.Module) else "error"
    return {"status": "ok", "message": "API is running", "device": str(device), "model_status": model_status}

@app.get("/model-info", summary="Get Model Information", tags=["Metadata"])
async def get_model_info(current_user: User = Depends(get_current_active_user)): # Protected endpoint
    """
    Combined endpoint for retrieving model metadata and class names.
    Requires authentication.
    """
    model_info = {
        "model_architecture": "ResNet18",
        "num_classes": num_classes,
        "class_names": class_names,
        "device": str(device),
        "model_weights_file": model_path,
        "description": "Model fine-tuned on CIFAR10 dataset",
    }
    return JSONResponse(model_info)


@app.post("/predict", summary="Predict Image Class", tags=["Inference"])
async def predict(
    file: UploadFile = File(...),
    include_confidence: bool = Query(
        False, description="Include confidence scores for top predictions"
    ),
    top_k: int = Query(
        1, ge=1, le=num_classes, description="Number of top predictions to return"
    ),
    current_user: User = Depends(get_current_active_user), # Protected endpoint
):
    """
    Unified prediction endpoint that can return either simple class prediction
    or detailed predictions with confidence scores. Requires authentication.
    """
    logger.info(f"Prediction request received from user: {current_user.username}, file: {file.filename}")
    # Validate file type
    if not file.content_type.startswith("image/"):
         logger.error(f"Invalid file content type: {file.content_type} from user {current_user.username}")
         raise HTTPException(
             status_code=400,
             detail=f"Invalid file type: {file.content_type}. Only image/* types are supported.",
         )

    # Limit file size (e.g., 10MB)
    MAX_FILE_SIZE = 10 * 1024 * 1024
    size = 0
    contents = b""
    # Read file chunk by chunk to prevent large files exhausting memory
    # and check size on the fly
    for chunk in iter(lambda: file.file.read(4096), b""):
        size += len(chunk)
        if size > MAX_FILE_SIZE:
            await file.close() # Ensure file is closed
            logger.error(f"File size {size} exceeds limit {MAX_FILE_SIZE} for user {current_user.username}")
            raise HTTPException(status_code=413, detail=f"File too large. Limit is {MAX_FILE_SIZE / (1024 * 1024)} MB.")
        contents += chunk
    await file.close() # Close the file after reading


    if not contents:
        logger.error(f"Empty file uploaded by user {current_user.username}")
        raise HTTPException(status_code=400, detail="Empty file uploaded.")


    try:
        image = Image.open(io.BytesIO(contents)).convert("RGB")
        # Verify image integrity (optional but good)
        image.verify()
        # Reopen after verify
        image = Image.open(io.BytesIO(contents)).convert("RGB")
    except UnidentifiedImageError:
        logger.error(f"Uploaded file is not a valid image for user {current_user.username}")
        raise HTTPException(
            status_code=400, detail="Uploaded file is not a valid image or is corrupted."
        )
    except Exception as e:
        logger.error(f"Error processing image for user {current_user.username}: {str(e)}")
        raise HTTPException(status_code=400, detail=f"Error processing image: {str(e)}")


    # Preprocess the image
    try:
        input_tensor = preprocess(image).unsqueeze(0).to(device)
    except Exception as e:
        logger.error(f"Error during image preprocessing for user {current_user.username}: {str(e)}")
        raise HTTPException(status_code=500, detail=f"Error during image preprocessing: {str(e)}")


    try:
        with torch.no_grad():
            outputs = model(input_tensor)

            if include_confidence:
                probabilities = torch.nn.functional.softmax(outputs, dim=1)
                # Ensure top_k is not greater than num_classes
                k = min(top_k, num_classes)
                top_probs, top_idxs = torch.topk(probabilities, k=k)
                top_probs_list = top_probs.cpu().numpy().tolist()[0]
                top_idxs_list = top_idxs.cpu().numpy().tolist()[0]
                predictions = [
                    {"class": class_names[idx], "confidence": float(prob)} # Ensure JSON serializable
                    for idx, prob in zip(top_idxs_list, top_probs_list)
                ]
                logger.info(f"Prediction successful for user: {current_user.username}, file: {file.filename}, top_k={k}")
                return JSONResponse({"predictions": predictions})
            else:
                _, preds = torch.max(outputs, 1)
                predicted_class = class_names[preds[0].item()] # Use .item()
                logger.info(f"Prediction successful for user: {current_user.username}, file: {file.filename}, predicted_class={predicted_class}")
                return JSONResponse({"predicted_class": predicted_class})
    except Exception as e:
        logger.error(f"Error during model inference for user {current_user.username}: {str(e)}")
        raise HTTPException(status_code=500, detail=f"Error during model inference: {str(e)}")


if __name__ == "__main__":
    # Make sure to run this file, e.g., secure_app_with_users.py
    # Use the filename in the uvicorn command string
    uvicorn.run("secure_app_with_users:app", host="0.0.0.0", port=5454, reload=True) # Added reload=True for development
```

**Step 3: Install Dependencies**

```bash
pip install "fastapi[all]" uvicorn python-dotenv torch torchvision Pillow "python-jose[cryptography]" "passlib[bcrypt]"
```

*   `fastapi[all]` includes `pydantic` and `python-multipart` (needed for `OAuth2PasswordRequestForm`).
*   `python-jose[cryptography]` provides JWT functionality.
*   `passlib[bcrypt]` provides password hashing.

**Step 4: Run the Application**

Save the updated code (e.g., as `secure_app_with_users.py`). Make sure your `.env` file is in the same directory (or parent directory) and contains `SECRET_KEY`, `ALGORITHM`, and `ACCESS_TOKEN_EXPIRE_MINUTES`. Also ensure the `finetuned_model.pth` file is accessible.

Run the app:

```bash
uvicorn secure_app_with_users:app --host 0.0.0.0 --port 5454 --reload
```

**How to Use the Updated API:**

1.  **Login:** Send a POST request to `/token` with `username` and `password` as form data (e.g., `username=testuser`, `password=secret`).
    *   You'll receive an `access_token` in the response.
2.  **Access Protected Endpoints:** For requests to `/model-info` or `/predict`, include the received token in the `Authorization` header:
    `Authorization: Bearer <your_access_token>`

The original API key mechanism (`X-API-Key`) is no longer used for `/model-info` and `/predict` in this version. Authentication is now handled entirely by the JWT bearer token obtained via the `/token` endpoint. You could potentially re-introduce the API key as an alternative or additional layer if needed, but this implementation replaces it with standard user login.

In [18]:
response = await chat.send_message(
    "Please run the code to ensure that everything is functioning properly."
)
Markdown(response.text)

Okay, I can execute the Python code provided to check for syntax errors, import issues, and basic structural validity.

However, please note the following limitations:

1.  **Cannot Run the Web Server:** I cannot actually start the `uvicorn` web server process within this environment. The `uvicorn.run(...)` call will be skipped.
2.  **No File System Access:** I cannot access local files like `.env` or `finetuned_model.pth`.
    *   I will mock the environment variables (`SECRET_KEY`, `ALGORITHM`, `ACCESS_TOKEN_EXPIRE_MINUTES`).
    *   The code related to loading the `finetuned_model.pth` will likely fail or be skipped/mocked because the file doesn't exist here. This means the model itself won't be loaded or tested.
3.  **No Network Access:** I cannot test the API endpoints by sending actual HTTP requests.
4.  **Dependency Check:** While I can attempt to import the required libraries (FastAPI, PyTorch, etc.), this only confirms they are *available* in the execution environment, not necessarily the exact versions you might have locally.

The purpose of this run is primarily to catch immediate code errors (like typos, incorrect syntax, missing imports) *before* the server tries to start.

Here is the execution attempt:
It seems the execution environment doesn't have the `torch`, `torchvision`, and potentially `PIL` (Pillow) libraries installed. The script failed during the import stage.

**Findings:**

1.  **Missing Dependencies:** The primary issue is that the core machine learning libraries (`torch`, `torchvision`) required by the application are not available in the environment where the code was just executed. `PIL` (Pillow) might also be missing if `torchvision` didn't pull it in.
2.  **Code Structure (Up to Failure):** The code structure *before* the failed import (including environment variable mocking, basic imports like `os`, `logging`, `datetime`, `fastapi`, `pydantic`, `passlib`, `jose`) seems syntactically correct.
3.  **Next Steps:** To run this code successfully, it needs to be executed in a Python environment where all the dependencies listed in the `pip install` command from the previous step are present:
    ```bash
    pip install "fastapi[all]" uvicorn python-dotenv torch torchvision Pillow "python-jose[cryptography]" "passlib[bcrypt]"
    ```
4.  **File Requirements:** Remember that even with the dependencies installed, running the application fully (using `uvicorn`) will *also* require:
    *   The `.env` file with your actual `SECRET_KEY` and other variables in the correct location.
    *   The `finetuned_model.pth` file accessible at the path specified in the script.

**Conclusion:**

The verification failed due to missing essential libraries (`torch`, `torchvision`). While the initial parts of the script related to FastAPI setup, user models, and JWT utilities appear structurally sound, the application cannot run without its core ML dependencies installed.