# coding=utf-8 # Copyright 2024 The Emu team, BAAI and The HuggingFace Inc. team. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Image processor class for Emu3VisionVQ.""" import math from typing import Dict, List, Optional, Union import numpy as np from transformers.image_processing_utils import BaseImageProcessor, BatchFeature from transformers.image_transforms import ( convert_to_rgb, resize, to_channel_dimension_format, ) from transformers.image_utils import ( IMAGENET_STANDARD_MEAN, IMAGENET_STANDARD_STD, ChannelDimension, ImageInput, PILImageResampling, get_image_size, infer_channel_dimension_format, is_scaled_image, make_list_of_images, to_numpy_array, valid_images, validate_preprocess_arguments, ) from transformers.utils import TensorType, is_vision_available, logging logger = logging.get_logger(__name__) if is_vision_available(): from PIL import Image def smart_resize( height: int, width: int, factor: int = 8, min_pixels: int = 512 * 512, max_pixels: int = 1024 * 1024 ): """Rescales the image so that the following conditions are met: 1. Both dimensions (height and width) are divisible by 'factor'. 2. The total number of pixels is within the range ['min_pixels', 'max_pixels']. 3. The aspect ratio of the image is maintained as closely as possible. """ if height < factor or width < factor: raise ValueError(f"height:{height} or width:{width} must be larger than factor:{factor}") elif max(height, width) / min(height, width) > 5: raise ValueError( f"absolute aspect ratio must be smaller than 5, got {max(height, width) / min(height, width)}" ) h_bar = round(height / factor) * factor w_bar = round(width / factor) * factor if h_bar * w_bar > max_pixels: beta = math.sqrt((height * width) / max_pixels) h_bar = math.floor(height / beta / factor) * factor w_bar = math.floor(width / beta / factor) * factor elif h_bar * w_bar < min_pixels: beta = math.sqrt(min_pixels / (height * width)) h_bar = math.ceil(height * beta / factor) * factor w_bar = math.ceil(width * beta / factor) * factor return h_bar, w_bar class Emu3VisionVQImageProcessor(BaseImageProcessor): r""" Constructs a Emu3VisionVQ image processor that dynamically resizes images based on the original images. Args: do_resize (`bool`, *optional*, defaults to `True`): Whether to resize the image's (height, width) dimensions. resample (`PILImageResampling`, *optional*, defaults to `Resampling.BICUBIC`): Resampling filter to use when resizing the image. do_rescale (`bool`, *optional*, defaults to `True`): Whether to rescale the image by the specified scale `rescale_factor`. rescale_factor (`int` or `float`, *optional*, defaults to `1/255`): Scale factor to use if rescaling the image. do_normalize (`bool`, *optional*, defaults to `True`): Whether to normalize the image. image_mean (`float` or `List[float]`, *optional*, defaults to `[0.5, 0.5, 0.5]`): Mean to use if normalizing the image. This is a float or list of floats for each channel in the image. image_std (`float` or `List[float]`, *optional*, defaults to `[0.5, 0.5, 0.5]`): Standard deviation to use if normalizing the image. This is a float or list of floats for each channel in the image. do_convert_rgb (`bool`, *optional*, defaults to `True`): Whether to convert the image to RGB. min_pixels (`int`, *optional*, defaults to `512 * 512`): The min pixels of the image to resize the image. max_pixels (`int`, *optional*, defaults to `1024 * 1024`): The max pixels of the image to resize the image. spatial_factor (`int`, *optional*, defautls to 8): The spatial downsample factor the image will be downsampled in feature extracting phase """ model_input_names = ["pixel_values"] def __init__( self, do_resize: bool = True, resample: PILImageResampling = PILImageResampling.BICUBIC, do_rescale: bool = True, rescale_factor: Union[int, float] = 1 / 255, do_normalize: bool = True, image_mean: Optional[Union[float, List[float]]] = None, image_std: Optional[Union[float, List[float]]] = None, do_convert_rgb: bool = True, min_pixels: int = 512 * 512, max_pixels: int = 1024 * 1024, spatial_factor: int = 8, **kwargs, ) -> None: super().__init__(**kwargs) self.do_resize = do_resize self.resample = resample self.do_rescale = do_rescale self.rescale_factor = rescale_factor self.do_normalize = do_normalize self.image_mean = image_mean if image_mean is not None else IMAGENET_STANDARD_MEAN self.image_std = image_std if image_std is not None else IMAGENET_STANDARD_STD self.min_pixels = min_pixels self.max_pixels = max_pixels self.size = {"min_pixels": min_pixels, "max_pixels": max_pixels} self.do_convert_rgb = do_convert_rgb self.spatial_factor = spatial_factor def _preprocess( self, images: ImageInput, do_resize: Optional[bool] = None, resample: PILImageResampling = None, do_rescale: Optional[bool] = None, rescale_factor: Optional[float] = None, do_normalize: Optional[bool] = None, image_mean: Optional[Union[float, List[float]]] = None, image_std: Optional[Union[float, List[float]]] = None, do_convert_rgb: Optional[bool] = None, spatial_factor: Optional[int] = None, input_data_format: Optional[Union[str, ChannelDimension]] = None, output_data_format: Optional[Union[str, ChannelDimension]] = ChannelDimension.FIRST, ): """ Preprocess an image or batch of images. Copy of the `preprocess` method from `CLIPImageProcessor`. Args: images (`ImageInput`): Image or batch of images to preprocess. Expects pixel values ranging from 0 to 255. If pixel values range from 0 to 1, set `do_rescale=False`. do_resize (`bool`, *optional*, defaults to `self.do_resize`): Whether to resize the image. resample (`PILImageResampling`, *optional*, defaults to `self.resample`): Resampling filter to use if resizing the image. This can be one of the `PILImageResampling` enums. do_rescale (`bool`, *optional*, defaults to `self.do_rescale`): Whether to rescale the image. rescale_factor (`float`, *optional*, defaults to `self.rescale_factor`): Scale factor to use if rescaling the image. do_normalize (`bool`, *optional*, defaults to `self.do_normalize`): Whether to normalize the image. image_mean (`float` or `List[float]`, *optional*, defaults to `self.image_mean`): Mean to use if normalizing the image. Can be a float or a list of floats corresponding to the number of channels in the image. image_std (`float` or `List[float]`, *optional*, defaults to `self.image_std`): Standard deviation to use if normalizing the image. Can be a float or a list of floats corresponding to the number of channels in the image. do_convert_rgb (`bool`, *optional*, defaults to `self.do_convert_rgb`): Whether to convert the image to RGB. spatial_factor (`int`, *optional*, defaults to `self.spatial_factor`): The spatial downsample factor the image will be downsampled in feature extracting phase input_data_format (`ChannelDimension` or `str`, *optional*): The channel dimension format for the input image. Can be one of: - `"channels_first"` or `ChannelDimension.FIRST`: image in (num_channels, height, width) format. - `"channels_last"` or `ChannelDimension.LAST`: image in (height, width, num_channels) format. - `"none"` or `ChannelDimension.NONE`: image in (height, width) format. - `"none"` or `ChannelDimension.NONE`: image in (height, width) format. output_data_format (`ChannelDimension`, *optional*, defaults to `ChannelDimension.FIRST`): The channel dimension format for the output image. Can be one of: - `"channels_first"` or `ChannelDimension.FIRST`: image in (num_channels, height, width) format. - `"channels_last"` or `ChannelDimension.LAST`: image in (height, width, num_channels) format. - Unset: Use the channel dimension format of the input image. """ spatial_factor = spatial_factor if spatial_factor is not None else self.spatial_factor images = make_list_of_images(images) if do_convert_rgb: images = [convert_to_rgb(image) for image in images] # All transformations expect numpy arrays. images = [to_numpy_array(image) for image in images] if is_scaled_image(images[0]) and do_rescale: logger.warning_once( "It looks like you are trying to rescale already rescaled images. If the input" "pixel_values.append()images have pixel values between 0 and 1, set `do_rescale=False` to avoid rescaling them again." ) if input_data_format is None: # We assume that all images have the same channel dimension format. input_data_format = infer_channel_dimension_format(images[0]) height, width = get_image_size(images[0], channel_dim=input_data_format) resized_height, resized_width = height, width processed_images = [] for image in images: if do_resize: resized_height, resized_width = smart_resize( height, width, factor=spatial_factor, min_pixels=self.min_pixels, max_pixels=self.max_pixels, ) image = resize( image, size=(resized_height, resized_width), resample=resample, input_data_format=input_data_format ) if do_rescale: image = self.rescale(image, scale=rescale_factor, input_data_format=input_data_format) if do_normalize: image = self.normalize( image=image, mean=image_mean, std=image_std, input_data_format=input_data_format ) image = to_channel_dimension_format(image, output_data_format, input_channel_dim=input_data_format) processed_images.append(image) image = np.array(processed_images) return image def preprocess( self, images: ImageInput, do_resize: Optional[bool] = None, resample: PILImageResampling = None, do_rescale: Optional[bool] = None, rescale_factor: Optional[float] = None, do_normalize: Optional[bool] = None, image_mean: Optional[Union[float, List[float]]] = None, image_std: Optional[Union[float, List[float]]] = None, do_convert_rgb: Optional[bool] = None, spatial_factor: Optional[int] = None, return_tensors: Optional[Union[str, TensorType]] = None, input_data_format: Optional[Union[str, ChannelDimension]] = None, output_data_format: Optional[Union[str, ChannelDimension]] = ChannelDimension.FIRST, ): """ Args: images (`ImageInput`): Image to preprocess. Expects a single or batch of images with pixel values ranging from 0 to 255. If passing in images with pixel values between 0 and 1, set `do_rescale=False`. do_resize (`bool`, *optional*, defaults to `self.do_resize`): Whether to resize the image. resample (`int`, *optional*, defaults to `self.resample`): Resampling filter to use if resizing the image. This can be one of the enum `PILImageResampling`. Only has an effect if `do_resize` is set to `True`. do_rescale (`bool`, *optional*, defaults to `self.do_rescale`): Whether to rescale the image. rescale_factor (`float`, *optional*, defaults to `self.rescale_factor`): Rescale factor to rescale the image by if `do_rescale` is set to `True`. do_normalize (`bool`, *optional*, defaults to `self.do_normalize`): Whether to normalize the image. image_mean (`float` or `List[float]`, *optional*, defaults to `self.image_mean`): Image mean to use for normalization. Only has an effect if `do_normalize` is set to `True`. image_std (`float` or `List[float]`, *optional*, defaults to `self.image_std`): Image standard deviation to use for normalization. Only has an effect if `do_normalize` is set to `True`. do_convert_rgb (`bool`, *optional*, defaults to `self.do_convert_rgb`): Whether to convert the image to RGB. spatial_factor (`int`, *optional*, defaults to `self.spatial_factor`): The spatial downsample factor the image will be downsampled in feature extracting phase return_tensors (`str` or `TensorType`, *optional*): The type of tensors to return. Can be one of: - Unset: Return a list of `np.ndarray`. - `TensorType.PYTORCH` or `'pt'`: Return a batch of type `torch.Tensor`. - `TensorType.NUMPY` or `'np'`: Return a batch of type `np.ndarray`. input_data_format (`ChannelDimension` or `str`, *optional*): The channel dimension format for the input image. If unset, the channel dimension format is inferred from the input image. Can be one of: - `"channels_first"` or `ChannelDimension.FIRST`: image in (num_channels, height, width) format. - `"channels_last"` or `ChannelDimension.LAST`: image in (height, width, num_channels) format. - `"none"` or `ChannelDimension.NONE`: image in (height, width) format. output_data_format (`ChannelDimension` or `str`, *optional*, defaults to `ChannelDimension.FIRST`): The channel dimension format for the output image. Can be one of: - `"channels_first"` or `ChannelDimension.FIRST`: image in (num_channels, height, width) format. - `"channels_last"` or `ChannelDimension.LAST`: image in (height, width, num_channels) format. - Unset: Use the channel dimension format of the input image. """ do_resize = do_resize if do_resize is not None else self.do_resize resample = resample if resample is not None else self.resample do_rescale = do_rescale if do_rescale is not None else self.do_rescale rescale_factor = rescale_factor if rescale_factor is not None else self.rescale_factor do_normalize = do_normalize if do_normalize is not None else self.do_normalize image_mean = image_mean if image_mean is not None else self.image_mean image_std = image_std if image_std is not None else self.image_std do_convert_rgb = do_convert_rgb if do_convert_rgb is not None else self.do_convert_rgb spatial_factor = spatial_factor if spatial_factor is not None else self.spatial_factor images = make_list_of_images(images) if images is None or not valid_images(images): raise ValueError( "Invalid image type. Must be of type PIL.Image.Image, numpy.ndarray, " "torch.Tensor, tf.Tensor or jax.ndarray." ) validate_preprocess_arguments( rescale_factor=rescale_factor, do_normalize=do_normalize, image_mean=image_mean, image_std=image_std, do_resize=do_resize, size=self.size, resample=resample, ) pixel_values = [] for image in images: norm_image = self._preprocess( image, do_resize=do_resize, resample=resample, do_rescale=do_rescale, rescale_factor=rescale_factor, do_normalize=do_normalize, image_mean=image_mean, image_std=image_std, do_convert_rgb=do_convert_rgb, spatial_factor=spatial_factor, input_data_format=input_data_format, output_data_format=output_data_format, ) pixel_values.extend(norm_image) pixel_values = np.array(pixel_values) data = {"pixel_values": pixel_values} return BatchFeature(data=data, tensor_type=return_tensors) def postprocess( self, images: ImageInput, do_rescale: Optional[bool] = None, rescale_factor: Optional[float] = None, do_normalize: Optional[bool] = None, image_mean: Optional[Union[float, List[float]]] = None, image_std: Optional[Union[float, List[float]]] = None, return_tensors: str | TensorType = "PIL.Image.Image", input_data_format: Optional[Union[str, ChannelDimension]] = None, ): """ Postprocess an image or batch of images tensor. Postprocess is the reverse process of preprocess. The parameters should be same as in preprocess. Args: images (`ImageInput`): Image to postprocess. Expects a single or batch of images with pixel values ranging from -1 to 1. do_rescale (`bool`, *optional*, defaults to `self.do_rescale`): Whether to rescale the image. rescale_factor (`float`, *optional*, defaults to `self.rescale_factor`): Rescale factor to rescale the image by if `do_rescale` is set to `True`. do_normalize (`bool`, *optional*, defaults to `self.do_normalize`): Whether to normalize the image. image_mean (`float` or `List[float]`, *optional*, defaults to `self.image_mean`): Image mean to use for normalization. Only has an effect if `do_normalize` is set to `True`. image_std (`float` or `List[float]`, *optional*, defaults to `self.image_std`): Image standard deviation to use for normalization. Only has an effect if `do_normalize` is set to `True`. return_tensors (`str` or `TensorType`, *optional*): The type of tensors to return. Can be one of: - Unset: Return a list of `np.ndarray`. - `TensorType.PYTORCH` or `'pt'`: Return a batch of type `torch.Tensor`. - `TensorType.NUMPY` or `'np'`: Return a batch of type `np.ndarray`. input_data_format (`ChannelDimension` or `str`, *optional*): The channel dimension format for the input image. If unset, the channel dimension format is inferred from the input image. Can be one of: - `"channels_first"` or `ChannelDimension.FIRST`: image in (num_channels, height, width) format. - `"channels_last"` or `ChannelDimension.LAST`: image in (height, width, num_channels) format. - `"none"` or `ChannelDimension.NONE`: image in (height, width) format. """ do_rescale = do_rescale if do_rescale is not None else self.do_rescale rescale_factor = rescale_factor if rescale_factor is not None else self.rescale_factor rescale_factor = 1 / rescale_factor do_normalize = do_normalize if do_normalize is not None else self.do_normalize image_mean = image_mean if image_mean is not None else self.image_mean image_std = image_std if image_std is not None else self.image_std image_mean, image_std = self.inverse_meanstd(image_mean, image_std) images = make_list_of_images(images) if isinstance(images[0], Image.Image): return images if len(images) > 1 else images[0] if input_data_format is None: # We assume that all images have the same channel dimension format. input_data_format = infer_channel_dimension_format(images[0]) pixel_values = [] for image in images: image = to_numpy_array(image) if do_normalize: image = self.normalize(image=image, mean=image_mean, std=image_std, input_data_format=input_data_format) if do_rescale: image = self.rescale(image, scale=rescale_factor, input_data_format=input_data_format) image = image.clip(0, 255).astype(np.uint8) if do_normalize and do_rescale and return_tensors == "PIL.Image.Image": image = to_channel_dimension_format(image, ChannelDimension.LAST, input_channel_dim=input_data_format) pixel_values.append(Image.fromarray(image)) else: pixel_values.extend(image) data = {"pixel_values": pixel_values} return_tensors = return_tensors if return_tensors != "PIL.Image.Image" else None return BatchFeature(data=data, tensor_type=return_tensors) def inverse_meanstd(self, image_mean, image_std): image_mean = self.to_tuple(image_mean) image_std = self.to_tuple(image_std) rev_image_mean = tuple(-m / s for m, s in zip(image_mean, image_std)) rev_image_std = tuple(1 / s for s in image_std) return rev_image_mean, rev_image_std def to_tuple(self, value, dim=3): if isinstance(value, int | float): return (value,) * dim return tuple(value)