""" This module handles the endpoint for image upscaling using the Real-ESRGAN model. Required Environment Variables: - TILING_SIZE: The size of the tiles for processing images. Set to 0 to disable tiling. - AWS_ACCESS_KEY_ID: AWS access key for S3 access. - AWS_SECRET_ACCESS_KEY: AWS secret key for S3 access. - BUCKET_NAME: The name of the S3 bucket where images will be uploaded. """ import torch from PIL import Image from io import BytesIO from realesrgan import RealESRGANer from typing import Dict, List, Any import os from pathlib import Path from basicsr.archs.rrdbnet_arch import RRDBNet import numpy as np import cv2 import PIL import boto3 import uuid, io import torch import base64 import requests import logging import time class EndpointHandler: def __init__(self, path=""): """ Initializes the EndpointHandler class, setting up the Real-ESRGAN model and S3 client. Args: path (str): Optional path to the model weights. Defaults to an empty string. This constructor performs the following actions: - Configures logging based on environment variables. - Retrieves the tiling size from environment variables. - Initializes the Real-ESRGAN model with specified parameters, including scale, model path, and architecture. - Sets up the S3 client using AWS credentials from environment variables. - Logs the initialization process and any errors encountered during setup. """ # get the logging level from environment variables logging.basicConfig(level=logging.INFO, format='%(levelname)s - %(message)s') self.logger = logging.getLogger(__name__) self.tiling_size = int(os.environ["TILING_SIZE"]) self.model_path = f"/repository/weights/Real-ESRGAN-x4plus.pth" self.max_image_size = 1400 * 1400 # self.model_path = f"/workspace/real-esrgan/weights/Real-ESRGAN-x4plus.pth" # log model path and tiling size self.logger.info(f"model_path: {self.model_path}") if self.tiling_size == 0: self.logger.info("TILING_SIZE is 0, not using tiling") else: self.logger.info(f"TILING_SIZE is {self.tiling_size}, using tiling") # Initialize the Real-ESRGAN model with specified parameters start_time = time.time() self.logger.info(f"initializing model") try: self.model = RealESRGANer( scale=4, # Scale factor for the model # Path to the pre-trained model weights model_path=self.model_path, # Initialize the RRDBNet model architecture with specified parameters model= RRDBNet(num_in_ch=3, num_out_ch=3, num_feat=64, num_block=23, num_grow_ch=32, scale=4 ), tile=self.tiling_size, tile_pad=0, half=True, ) self.logger.info(f"model initialized in {time.time() - start_time} seconds") except Exception as e: self.logger.error(f"Error initializing model: {e}") raise e try: # Initialize the S3 client with AWS credentials from environment variables self.s3 = boto3.client('s3', aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'], aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY'], ) # Get the S3 bucket name from environment variables self.bucket_name = os.environ["S3_BUCKET_NAME"] except Exception as e: self.logger.error(f"Error initializing S3 client: {e}") raise e def __call__(self, data: Any) -> Dict[str, List[float]]: """ Processes the input data to upscale an image using the Real-ESRGAN model. Args: data (Any): A dictionary containing the input data. It should include: - 'inputs': A dictionary with the following keys: - 'image_url' (str): The URL of the image to be upscaled. - 'outscale' (float): The scaling factor for the upscaling process. - 'apply_thresholding' (bool): Whether to apply thresholding to the upscaled image. Returns: Dict[str, List[float]]: A dictionary containing the results of the upscaling process, which includes: - 'image_url' (str | None): The URL of the upscaled image or None if an error occurred. - 'image_key' (str | None): The key for the uploaded image in S3 or None if an error occurred. - 'error' (str | None): An error message if an error occurred, otherwise None. """ ############################################################ # get inputs and download image ############################################################ self.logger.info(">>> 1/8: GETTING INPUTS....") try: inputs = data.pop("inputs", data) outscale = float(inputs.pop("outscale", 3)) apply_thresholding = inputs.pop("apply_thresholding", False) self.logger.info(f"outscale: {outscale}") self.logger.info(f"apply_thresholding: {apply_thresholding}") image_url = inputs["image_url"] except Exception as e: self.logger.error(f"Error getting inputs: {e}") return {"image_url": None, "image_key": None, "error": f"Failed to get inputs: {e}"} # download image try: self.logger.info(f"downloading image from URL: {image_url}") image = self.download_image_url(image_url) except Exception as e: self.logger.error(f"Error downloading image from URL: {image_url}. Exception: {e}") return {"image_url": None, "image_key": None, "error": f"Failed to download image: {e}"} ############################################################ # run assertions ############################################################ self.logger.info(">>> 2/8: RUNNING ASSERTIONS ON IMAGE....") # get image size and mode in_size, in_mode = image.size, image.mode self.logger.info(f"image.size: {image.size}, image.mode: {image.mode}") # check image size and mode and return dict try: assert in_mode in Image.MODES, f"Unsupported image mode: {in_mode}" if self.tiling_size == 0: assert in_size[0] * in_size[1] < self.max_image_size, f"Image is too large: {in_size}: {in_size[0] * in_size[1]} is greater than {self.max_image_size}" assert outscale > 1 and outscale <= 10, f"Outscale must be between 1 and 10: {outscale}" except AssertionError as e: self.logger.error(f"Assertion error: {e}") return {"image_url": None, "image_key": None, "error": str(e)} ############################################################ # Convert RGB to BGR (PIL uses RGB, OpenCV expects BGR) ############################################################ self.logger.info(f">>> 3/8: CONVERTING IMAGE TO OPENCV BGR/BGRA FORMAT....") try: opencv_image = np.array(image) except Exception as e: self.logger.error(f"Error converting image to opencv format: {e}") return {"image_url": None, "image_key": None, "error": f"Failed to convert image to opencv format: {e}"} # convert image to BGR if in_mode == "RGB": self.logger.info(f"converting RGB image to BGR") opencv_image = cv2.cvtColor(opencv_image, cv2.COLOR_RGB2BGR) elif in_mode == "RGBA": self.logger.info(f"converting RGBA image to BGRA") opencv_image = cv2.cvtColor(opencv_image, cv2.COLOR_RGBA2BGRA) elif in_mode == "L": self.logger.info(f"converting grayscale image to BGR") opencv_image = cv2.cvtColor(opencv_image, cv2.COLOR_GRAY2RGB) else: self.logger.error(f"Unsupported image mode: {in_mode}") return {"image_url": None, "image_key": None, "error": f"Unsupported image mode: {in_mode}"} ############################################################ # upscale image ############################################################ self.logger.info(f">>> 4/8: UPSCALING IMAGE....") try: output, _ = self.model.enhance(opencv_image, outscale=outscale) except Exception as e: self.logger.error(f"Error enhancing image: {e}") return {"image_url": None, "image_key": None, "error": "Image enhancement failed."} # debug self.logger.info(f"output.shape: {output.shape}") ############################################################ # convert to RGB/RGBA format ############################################################ self.logger.info(f">>> 5/8: CONVERTING IMAGE TO RGB/RGBA FORMAT....") out_shape = output.shape if len(out_shape) == 3: if out_shape[2] == 3: output = cv2.cvtColor(output, cv2.COLOR_BGR2RGB) elif out_shape[2] == 4: output = cv2.cvtColor(output, cv2.COLOR_BGRA2RGBA) else: output = cv2.cvtColor(output, cv2.COLOR_GRAY2RGB) ########################################################### # convert to PIL image ############################################################ self.logger.info(f">>> 6/8: CONVERTING IMAGE TO PIL....") try:# output = Image.fromarray(output) except Exception as e: self.logger.error(f"Error converting upscaled image to PIL: {e}") return {"image_url": None, "image_key": None, "error": f"Failed to convert upscaled image to PIL: {e}"} ############################################################ # apply thresholding ############################################################ self.logger.info(f">>> 7/8: APPLYING THRESHOLDING....") if apply_thresholding: try: if self.has_alpha_channel(image): self.logger.info(f"input image mode: {image.mode}, it has alpha channel, applying thresholding") output = self.apply_thresholding(image, output) else: self.logger.info(f"input image mode: {image.mode}, it does not have alpha channel, skipping thresholding") except Exception as e: self.logger.error(f"Error applying thresholding: {e}") return {"image_url": None, "image_key": None, "error": f"Failed to apply thresholding: {e}"} else: self.logger.info(f"thresholding is not enabled, skipping thresholding") ############################################################ # upload to s3 ############################################################ self.logger.info(f">>> 8/8: UPLOADING IMAGE TO S3....") try: image_url, key = self.upload_to_s3(output) self.logger.info(f"image uploaded to s3: {image_url}") except Exception as e: self.logger.error(f"Error uploading image to s3: {e}") return {"image_url": None, "image_key": None, "error": f"Failed to upload image to s3: {e}"} return {"image_url": image_url, "image_key": key, "error": None } def upload_to_s3(self, image: Image.Image) -> tuple[str, str]: """ Upload the image to S3 and return the URL and key. Args: image (Image.Image): The image to upload. Returns: tuple[str, str]: A tuple containing the image URL and the S3 key. """ prefix = str(uuid.uuid4()) # Save the image to an in-memory file in_mem_file = io.BytesIO() image.save(in_mem_file, format='PNG') in_mem_file.seek(0) # Upload the image to S3 key = f"{prefix}.png" self.s3.upload_fileobj(in_mem_file, Bucket=self.bucket_name, Key=key) image_url = f"https://{self.bucket_name}.s3.amazonaws.com/{key}" # Return the URL and the key return image_url, key def download_image_url(self, image_url: str) -> Image.Image: """ Downloads an image from the specified URL and returns it as a PIL Image. Args: image_url (str): The URL of the image to download. Returns: Image.Image: The downloaded image as a PIL Image. """ response = requests.get(image_url) image = Image.open(BytesIO(response.content)) return image def has_alpha_channel(self, image: Image.Image) -> bool: """ Check if the image has an alpha channel. """ return image.mode in ("RGBA", "LA") or (image.mode == "P" and "transparency" in image.info) def replace_mask_with_edge_aware_clamping( self, original_img: Image.Image, esrgan_img: Image.Image, edge_buffer: int = 2, smooth_buffer: int = 10, default_lower: int = 10, default_upper: int = 245 ) -> Image.Image: """ Apply edge-aware alpha channel clamping to merge original and ESRGAN images. Args: original_img: Original image as PIL Image esrgan_img: ESRGAN upscaled image as PIL Image edge_buffer: Buffer for edge regions smooth_buffer: Buffer for smooth regions default_lower: Default lower threshold default_upper: Default upper threshold Returns: PIL Image with merged alpha channel """ # Convert images to RGBA if needed original_img = original_img.convert("RGBA") esrgan_img = esrgan_img.convert("RGBA") esr_w, esr_h = esrgan_img.size # Upscale original alpha with bicubic resampling orig_alpha = original_img.getchannel("A") upscaled_alpha = orig_alpha.resize((esr_w, esr_h), resample=Image.Resampling.BICUBIC) alpha_arr = np.array(upscaled_alpha, dtype=np.uint8) # Edge detection on original alpha (for alignment) edge_mask = cv2.Canny(alpha_arr, threshold1=50, threshold2=150) edge_mask = (edge_mask > 0).astype(np.uint8) * 255 # Determine dynamic cutoffs for edge vs. smooth regions semi_transparent_mask = (alpha_arr > 0) & (alpha_arr < 255) if np.any(semi_transparent_mask): min_alpha = int(np.min(alpha_arr[semi_transparent_mask])) max_alpha = int(np.max(alpha_arr[semi_transparent_mask])) lower_edge = max(0, min_alpha - edge_buffer) upper_edge = min(255, max_alpha + edge_buffer) lower_smooth = max(0, min_alpha - smooth_buffer) upper_smooth = min(255, max_alpha + smooth_buffer) else: lower_edge = lower_smooth = default_lower upper_edge = upper_smooth = default_upper # Apply selective clamping based on edge regions clamped_alpha = alpha_arr.copy() clamped_alpha[edge_mask == 255] = np.clip(clamped_alpha[edge_mask == 255], lower_edge, upper_edge) clamped_alpha[edge_mask == 0] = np.clip(clamped_alpha[edge_mask == 0], lower_smooth, upper_smooth) # Blend with ESRGAN's alpha (50% weight to original in edges) esrgan_alpha = np.array(esrgan_img.getchannel("A"), dtype=np.uint8) blended_alpha = np.where(edge_mask == 255, (clamped_alpha * 0.5 + esrgan_alpha * 0.5).astype(np.uint8), clamped_alpha) # Replace and return esrgan_array = np.array(esrgan_img, dtype=np.uint8) esrgan_array[..., 3] = blended_alpha return Image.fromarray(esrgan_array, mode="RGBA") def apply_thresholding(self, input_image: Image.Image, esrgan_image: Image.Image): """ Apply thresholding to the image to account for ghost pixels Args: input_image (Image.Image): The input image. esrgan_image (Image.Image): The ESRGAN image. """ self.logger.info(f"Applying edge-aware clamping") # Using method 3: Edge-aware clamping self.logger.info(f"Input image size: {input_image.size}") edge_aware_image = self.replace_mask_with_edge_aware_clamping( input_image, esrgan_image, edge_buffer=2, smooth_buffer=10 ) self.logger.info(f"Edge-aware clamping applied, image size: {edge_aware_image.size}") return edge_aware_image