Spaces:
Running
Running
| from typing import Any | |
| from diffusers import ( | |
| DiffusionPipeline, | |
| AutoencoderTiny, | |
| LCMScheduler, | |
| UNet2DConditionModel, | |
| ) | |
| from os import path | |
| import torch | |
| from backend.models.lcmdiffusion_setting import LCMDiffusionSetting | |
| import numpy as np | |
| from constants import ( | |
| DEVICE, | |
| LCM_DEFAULT_MODEL, | |
| TAESD_MODEL, | |
| TAESDXL_MODEL, | |
| TAESD_MODEL_OPENVINO, | |
| ) | |
| from huggingface_hub import model_info | |
| from backend.models.lcmdiffusion_setting import LCMLora | |
| from backend.device import is_openvino_device | |
| if is_openvino_device(): | |
| from huggingface_hub import snapshot_download | |
| from optimum.intel.openvino.modeling_diffusion import OVModelVaeDecoder, OVBaseModel | |
| # from optimum.intel.openvino.modeling_diffusion import OVStableDiffusionPipeline | |
| from backend.lcmdiffusion.pipelines.openvino.lcm_ov_pipeline import ( | |
| OVStableDiffusionPipeline, | |
| ) | |
| from backend.lcmdiffusion.pipelines.openvino.lcm_scheduler import ( | |
| LCMScheduler as OpenVinoLCMscheduler, | |
| ) | |
| class CustomOVModelVaeDecoder(OVModelVaeDecoder): | |
| def __init__( | |
| self, | |
| model, | |
| parent_model, | |
| ov_config=None, | |
| model_dir=None, | |
| ): | |
| super(OVModelVaeDecoder, self).__init__( | |
| model, | |
| parent_model, | |
| ov_config, | |
| "vae_decoder", | |
| model_dir, | |
| ) | |
| class LCMTextToImage: | |
| def __init__( | |
| self, | |
| device: str = "cpu", | |
| ) -> None: | |
| self.pipeline = None | |
| self.use_openvino = False | |
| self.device = "" | |
| self.previous_model_id = None | |
| self.previous_use_tae_sd = False | |
| self.previous_use_lcm_lora = False | |
| self.torch_data_type = ( | |
| torch.float32 if is_openvino_device() or DEVICE == "mps" else torch.float16 | |
| ) | |
| print(f"Torch datatype : {self.torch_data_type}") | |
| def _get_lcm_pipeline( | |
| self, | |
| lcm_model_id: str, | |
| base_model_id: str, | |
| use_local_model: bool, | |
| ): | |
| pipeline = None | |
| unet = UNet2DConditionModel.from_pretrained( | |
| lcm_model_id, | |
| torch_dtype=torch.float32, | |
| local_files_only=use_local_model | |
| # resume_download=True, | |
| ) | |
| pipeline = DiffusionPipeline.from_pretrained( | |
| base_model_id, | |
| unet=unet, | |
| torch_dtype=torch.float32, | |
| local_files_only=use_local_model | |
| # resume_download=True, | |
| ) | |
| pipeline.scheduler = LCMScheduler.from_config(pipeline.scheduler.config) | |
| return pipeline | |
| def get_tiny_decoder_vae_model(self) -> str: | |
| pipeline_class = self.pipeline.__class__.__name__ | |
| print(f"Pipeline class : {pipeline_class}") | |
| if ( | |
| pipeline_class == "LatentConsistencyModelPipeline" | |
| or pipeline_class == "StableDiffusionPipeline" | |
| ): | |
| return TAESD_MODEL | |
| elif pipeline_class == "StableDiffusionXLPipeline": | |
| return TAESDXL_MODEL | |
| elif pipeline_class == "OVStableDiffusionPipeline": | |
| return TAESD_MODEL_OPENVINO | |
| def _get_lcm_model_pipeline( | |
| self, | |
| model_id: str, | |
| use_local_model, | |
| ): | |
| pipeline = None | |
| if model_id == LCM_DEFAULT_MODEL: | |
| pipeline = DiffusionPipeline.from_pretrained( | |
| model_id, | |
| local_files_only=use_local_model, | |
| ) | |
| elif model_id == "latent-consistency/lcm-sdxl": | |
| pipeline = self._get_lcm_pipeline( | |
| model_id, | |
| "stabilityai/stable-diffusion-xl-base-1.0", | |
| use_local_model, | |
| ) | |
| elif model_id == "latent-consistency/lcm-ssd-1b": | |
| pipeline = self._get_lcm_pipeline( | |
| model_id, | |
| "segmind/SSD-1B", | |
| use_local_model, | |
| ) | |
| return pipeline | |
| def _get_lcm_lora_pipeline( | |
| self, | |
| base_model_id: str, | |
| lcm_lora_id: str, | |
| use_local_model: bool, | |
| ): | |
| pipeline = DiffusionPipeline.from_pretrained( | |
| base_model_id, | |
| torch_dtype=self.torch_data_type, | |
| local_files_only=use_local_model, | |
| ) | |
| pipeline.load_lora_weights( | |
| lcm_lora_id, | |
| local_files_only=use_local_model, | |
| ) | |
| pipeline.scheduler = LCMScheduler.from_config(pipeline.scheduler.config) | |
| pipeline.fuse_lora() | |
| pipeline.unet.to(memory_format=torch.channels_last) | |
| return pipeline | |
| def _pipeline_to_device(self): | |
| print(f"Pipeline device : {DEVICE}") | |
| print(f"Pipeline dtype : {self.torch_data_type}") | |
| self.pipeline.to( | |
| torch_device=DEVICE, | |
| torch_dtype=self.torch_data_type, | |
| ) | |
| def _add_freeu(self): | |
| pipeline_class = self.pipeline.__class__.__name__ | |
| if pipeline_class == "StableDiffusionPipeline": | |
| print("Add FreeU - SD") | |
| self.pipeline.enable_freeu( | |
| s1=0.9, | |
| s2=0.2, | |
| b1=1.2, | |
| b2=1.4, | |
| ) | |
| elif pipeline_class == "StableDiffusionXLPipeline": | |
| print("Add FreeU - SDXL") | |
| self.pipeline.enable_freeu( | |
| s1=0.6, | |
| s2=0.4, | |
| b1=1.1, | |
| b2=1.2, | |
| ) | |
| def init( | |
| self, | |
| model_id: str, | |
| use_openvino: bool = False, | |
| device: str = "cpu", | |
| use_local_model: bool = False, | |
| use_tiny_auto_encoder: bool = False, | |
| use_lora: bool = False, | |
| lcm_lora: LCMLora = LCMLora(), | |
| ) -> None: | |
| self.device = device | |
| self.use_openvino = use_openvino | |
| if ( | |
| self.pipeline is None | |
| or self.previous_model_id != model_id | |
| or self.previous_use_tae_sd != use_tiny_auto_encoder | |
| or self.previous_lcm_lora_base_id != lcm_lora.base_model_id | |
| or self.previous_lcm_lora_id != lcm_lora.lcm_lora_id | |
| or self.previous_use_lcm_lora != use_lora | |
| ): | |
| if self.use_openvino and is_openvino_device(): | |
| if self.pipeline: | |
| del self.pipeline | |
| self.pipeline = None | |
| self.pipeline = OVStableDiffusionPipeline.from_pretrained( | |
| model_id, | |
| local_files_only=use_local_model, | |
| ov_config={"CACHE_DIR": ""}, | |
| device=DEVICE.upper(), | |
| ) | |
| if use_tiny_auto_encoder: | |
| print("Using Tiny Auto Encoder (OpenVINO)") | |
| taesd_dir = snapshot_download( | |
| repo_id=self.get_tiny_decoder_vae_model(), | |
| local_files_only=use_local_model, | |
| ) | |
| self.pipeline.vae_decoder = CustomOVModelVaeDecoder( | |
| model=OVBaseModel.load_model( | |
| f"{taesd_dir}/vae_decoder/openvino_model.xml" | |
| ), | |
| parent_model=self.pipeline, | |
| model_dir=taesd_dir, | |
| ) | |
| else: | |
| if self.pipeline: | |
| del self.pipeline | |
| self.pipeline = None | |
| if use_lora: | |
| print("Init LCM-LoRA pipeline") | |
| self.pipeline = self._get_lcm_lora_pipeline( | |
| lcm_lora.base_model_id, | |
| lcm_lora.lcm_lora_id, | |
| use_local_model, | |
| ) | |
| else: | |
| print("Init LCM Model pipeline") | |
| self.pipeline = self._get_lcm_model_pipeline( | |
| model_id, | |
| use_local_model, | |
| ) | |
| if use_tiny_auto_encoder: | |
| vae_model = self.get_tiny_decoder_vae_model() | |
| print(f"Using Tiny Auto Encoder {vae_model}") | |
| self.pipeline.vae = AutoencoderTiny.from_pretrained( | |
| vae_model, | |
| torch_dtype=torch.float32, | |
| local_files_only=use_local_model, | |
| ) | |
| self._pipeline_to_device() | |
| self.previous_model_id = model_id | |
| self.previous_use_tae_sd = use_tiny_auto_encoder | |
| self.previous_lcm_lora_base_id = lcm_lora.base_model_id | |
| self.previous_lcm_lora_id = lcm_lora.lcm_lora_id | |
| self.previous_use_lcm_lora = use_lora | |
| print(f"Model :{model_id}") | |
| print(f"Pipeline : {self.pipeline}") | |
| self.pipeline.scheduler = LCMScheduler.from_config( | |
| self.pipeline.scheduler.config, | |
| beta_start=0.001, | |
| beta_end=0.01, | |
| ) | |
| if use_lora: | |
| self._add_freeu() | |
| def generate( | |
| self, | |
| lcm_diffusion_setting: LCMDiffusionSetting, | |
| reshape: bool = False, | |
| ) -> Any: | |
| guidance_scale = lcm_diffusion_setting.guidance_scale | |
| if lcm_diffusion_setting.use_seed: | |
| cur_seed = lcm_diffusion_setting.seed | |
| if self.use_openvino: | |
| np.random.seed(cur_seed) | |
| else: | |
| torch.manual_seed(cur_seed) | |
| if lcm_diffusion_setting.use_openvino and is_openvino_device(): | |
| print("Using OpenVINO") | |
| if reshape: | |
| print("Reshape and compile") | |
| self.pipeline.reshape( | |
| batch_size=-1, | |
| height=lcm_diffusion_setting.image_height, | |
| width=lcm_diffusion_setting.image_width, | |
| num_images_per_prompt=lcm_diffusion_setting.number_of_images, | |
| ) | |
| self.pipeline.compile() | |
| if not lcm_diffusion_setting.use_safety_checker: | |
| self.pipeline.safety_checker = None | |
| if ( | |
| not lcm_diffusion_setting.use_lcm_lora | |
| and not lcm_diffusion_setting.use_openvino | |
| and lcm_diffusion_setting.guidance_scale != 1.0 | |
| ): | |
| print("Not using LCM-LoRA so setting guidance_scale 1.0") | |
| guidance_scale = 1.0 | |
| if lcm_diffusion_setting.use_openvino: | |
| result_images = self.pipeline( | |
| prompt=lcm_diffusion_setting.prompt, | |
| negative_prompt=lcm_diffusion_setting.negative_prompt, | |
| num_inference_steps=lcm_diffusion_setting.inference_steps, | |
| guidance_scale=guidance_scale, | |
| width=lcm_diffusion_setting.image_width, | |
| height=lcm_diffusion_setting.image_height, | |
| num_images_per_prompt=lcm_diffusion_setting.number_of_images, | |
| ).images | |
| else: | |
| result_images = self.pipeline( | |
| prompt=lcm_diffusion_setting.prompt, | |
| negative_prompt=lcm_diffusion_setting.negative_prompt, | |
| num_inference_steps=lcm_diffusion_setting.inference_steps, | |
| guidance_scale=guidance_scale, | |
| width=lcm_diffusion_setting.image_width, | |
| height=lcm_diffusion_setting.image_height, | |
| num_images_per_prompt=lcm_diffusion_setting.number_of_images, | |
| ).images | |
| return result_images | |