Spaces:
Running
on
Zero
Running
on
Zero
| import gc, time | |
| import numpy as np | |
| import PIL.Image | |
| from diffusers import ( | |
| ControlNetModel, | |
| DiffusionPipeline, | |
| StableDiffusionControlNetPipeline, | |
| StableDiffusionControlNetInpaintPipeline, | |
| StableDiffusionPipeline, | |
| AutoencoderKL, | |
| StableDiffusionXLInpaintPipeline, | |
| StableDiffusionXLAdapterPipeline, | |
| T2IAdapter, | |
| StableDiffusionXLPipeline, | |
| AutoPipelineForImage2Image | |
| ) | |
| from huggingface_hub import hf_hub_download | |
| import torch, random, json | |
| from controlnet_aux import ( | |
| CannyDetector, | |
| ContentShuffleDetector, | |
| HEDdetector, | |
| LineartAnimeDetector, | |
| LineartDetector, | |
| MidasDetector, | |
| MLSDdetector, | |
| NormalBaeDetector, | |
| OpenposeDetector, | |
| PidiNetDetector, | |
| ) | |
| from transformers import pipeline | |
| from controlnet_aux.util import HWC3, ade_palette | |
| from transformers import AutoImageProcessor, UperNetForSemanticSegmentation | |
| import cv2 | |
| from diffusers import ( | |
| DPMSolverMultistepScheduler, | |
| DPMSolverSinglestepScheduler, | |
| KDPM2DiscreteScheduler, | |
| EulerDiscreteScheduler, | |
| EulerAncestralDiscreteScheduler, | |
| HeunDiscreteScheduler, | |
| LMSDiscreteScheduler, | |
| DDIMScheduler, | |
| DEISMultistepScheduler, | |
| UniPCMultistepScheduler, | |
| LCMScheduler, | |
| PNDMScheduler, | |
| KDPM2AncestralDiscreteScheduler, | |
| EDMDPMSolverMultistepScheduler, | |
| EDMEulerScheduler, | |
| ) | |
| from .prompt_weights import get_embed_new, add_comma_after_pattern_ti | |
| from .utils import save_pil_image_with_metadata | |
| from .lora_loader import lora_mix_load | |
| from .inpainting_canvas import draw, make_inpaint_condition | |
| from .adetailer import ad_model_process | |
| from ..upscalers.esrgan import UpscalerESRGAN, UpscalerLanczos, UpscalerNearest | |
| from ..logging.logging_setup import logger | |
| from .extra_model_loaders import custom_task_model_loader | |
| from .high_resolution import process_images_high_resolution | |
| from .style_prompt_config import styles_data, STYLE_NAMES, get_json_content, apply_style | |
| import os | |
| from compel import Compel, ReturnedEmbeddingsType | |
| import ipywidgets as widgets, mediapy | |
| from IPython.display import display | |
| from PIL import Image | |
| from typing import Union, Optional, List, Tuple, Dict, Any, Callable | |
| import logging, diffusers, copy, warnings | |
| logging.getLogger("diffusers").setLevel(logging.ERROR) | |
| #logging.getLogger("transformers").setLevel(logging.ERROR) | |
| diffusers.utils.logging.set_verbosity(40) | |
| warnings.filterwarnings(action="ignore", category=FutureWarning, module="diffusers") | |
| warnings.filterwarnings(action="ignore", category=FutureWarning, module="transformers") | |
| # ===================================== | |
| # Utils preprocessor | |
| # ===================================== | |
| def resize_image(input_image, resolution, interpolation=None): | |
| H, W, C = input_image.shape | |
| H = float(H) | |
| W = float(W) | |
| k = float(resolution) / max(H, W) | |
| H *= k | |
| W *= k | |
| H = int(np.round(H / 64.0)) * 64 | |
| W = int(np.round(W / 64.0)) * 64 | |
| if interpolation is None: | |
| interpolation = cv2.INTER_LANCZOS4 if k > 1 else cv2.INTER_AREA | |
| img = cv2.resize(input_image, (W, H), interpolation=interpolation) | |
| return img | |
| class DepthEstimator: | |
| def __init__(self): | |
| self.model = pipeline("depth-estimation") | |
| def __call__(self, image: np.ndarray, **kwargs) -> PIL.Image.Image: | |
| detect_resolution = kwargs.pop("detect_resolution", 512) | |
| image_resolution = kwargs.pop("image_resolution", 512) | |
| image = np.array(image) | |
| image = HWC3(image) | |
| image = resize_image(image, resolution=detect_resolution) | |
| image = PIL.Image.fromarray(image) | |
| image = self.model(image) | |
| image = image["depth"] | |
| image = np.array(image) | |
| image = HWC3(image) | |
| image = resize_image(image, resolution=image_resolution) | |
| return PIL.Image.fromarray(image) | |
| class ImageSegmentor: | |
| def __init__(self): | |
| self.image_processor = AutoImageProcessor.from_pretrained( | |
| "openmmlab/upernet-convnext-small" | |
| ) | |
| self.image_segmentor = UperNetForSemanticSegmentation.from_pretrained( | |
| "openmmlab/upernet-convnext-small" | |
| ) | |
| def __call__(self, image: np.ndarray, **kwargs) -> PIL.Image.Image: | |
| detect_resolution = kwargs.pop("detect_resolution", 512) | |
| image_resolution = kwargs.pop("image_resolution", 512) | |
| image = HWC3(image) | |
| image = resize_image(image, resolution=detect_resolution) | |
| image = PIL.Image.fromarray(image) | |
| pixel_values = self.image_processor(image, return_tensors="pt").pixel_values | |
| outputs = self.image_segmentor(pixel_values) | |
| seg = self.image_processor.post_process_semantic_segmentation( | |
| outputs, target_sizes=[image.size[::-1]] | |
| )[0] | |
| color_seg = np.zeros((seg.shape[0], seg.shape[1], 3), dtype=np.uint8) | |
| for label, color in enumerate(ade_palette()): | |
| color_seg[seg == label, :] = color | |
| color_seg = color_seg.astype(np.uint8) | |
| color_seg = resize_image( | |
| color_seg, resolution=image_resolution, interpolation=cv2.INTER_NEAREST | |
| ) | |
| return PIL.Image.fromarray(color_seg) | |
| class Preprocessor: | |
| MODEL_ID = "lllyasviel/Annotators" | |
| def __init__(self): | |
| self.model = None | |
| self.name = "" | |
| def load(self, name: str) -> None: | |
| if name == self.name: | |
| return | |
| if name == "HED": | |
| self.model = HEDdetector.from_pretrained(self.MODEL_ID) | |
| elif name == "Midas": | |
| self.model = MidasDetector.from_pretrained(self.MODEL_ID) | |
| elif name == "MLSD": | |
| self.model = MLSDdetector.from_pretrained(self.MODEL_ID) | |
| elif name == "Openpose": | |
| self.model = OpenposeDetector.from_pretrained(self.MODEL_ID) | |
| elif name == "PidiNet": | |
| self.model = PidiNetDetector.from_pretrained(self.MODEL_ID) | |
| elif name == "NormalBae": | |
| self.model = NormalBaeDetector.from_pretrained(self.MODEL_ID) | |
| elif name == "Lineart": | |
| self.model = LineartDetector.from_pretrained(self.MODEL_ID) | |
| elif name == "LineartAnime": | |
| self.model = LineartAnimeDetector.from_pretrained(self.MODEL_ID) | |
| elif name == "Canny": | |
| self.model = CannyDetector() | |
| elif name == "ContentShuffle": | |
| self.model = ContentShuffleDetector() | |
| elif name == "DPT": | |
| self.model = DepthEstimator() | |
| elif name == "UPerNet": | |
| self.model = ImageSegmentor() | |
| else: | |
| raise ValueError | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| self.name = name | |
| def __call__(self, image: PIL.Image.Image, **kwargs) -> PIL.Image.Image: | |
| if self.name == "Canny": | |
| if "detect_resolution" in kwargs: | |
| detect_resolution = kwargs.pop("detect_resolution") | |
| image = np.array(image) | |
| image = HWC3(image) | |
| image = resize_image(image, resolution=detect_resolution) | |
| image = self.model(image, **kwargs) | |
| return PIL.Image.fromarray(image) | |
| elif self.name == "Midas": | |
| detect_resolution = kwargs.pop("detect_resolution", 512) | |
| image_resolution = kwargs.pop("image_resolution", 512) | |
| image = np.array(image) | |
| image = HWC3(image) | |
| image = resize_image(image, resolution=detect_resolution) | |
| image = self.model(image, **kwargs) | |
| image = HWC3(image) | |
| image = resize_image(image, resolution=image_resolution) | |
| return PIL.Image.fromarray(image) | |
| else: | |
| return self.model(image, **kwargs) | |
| # ===================================== | |
| # Base Model | |
| # ===================================== | |
| CONTROLNET_MODEL_IDS = { | |
| "openpose": "lllyasviel/control_v11p_sd15_openpose", | |
| "canny": "lllyasviel/control_v11p_sd15_canny", | |
| "mlsd": "lllyasviel/control_v11p_sd15_mlsd", | |
| "scribble": "lllyasviel/control_v11p_sd15_scribble", | |
| "softedge": "lllyasviel/control_v11p_sd15_softedge", | |
| "segmentation": "lllyasviel/control_v11p_sd15_seg", | |
| "depth": "lllyasviel/control_v11f1p_sd15_depth", | |
| "normalbae": "lllyasviel/control_v11p_sd15_normalbae", | |
| "lineart": "lllyasviel/control_v11p_sd15_lineart", | |
| "lineart_anime": "lllyasviel/control_v11p_sd15s2_lineart_anime", | |
| "shuffle": "lllyasviel/control_v11e_sd15_shuffle", | |
| "ip2p": "lllyasviel/control_v11e_sd15_ip2p", | |
| "inpaint": "lllyasviel/control_v11p_sd15_inpaint", | |
| "txt2img": "Nothinghere", | |
| "sdxl_canny": "TencentARC/t2i-adapter-canny-sdxl-1.0", | |
| "sdxl_sketch": "TencentARC/t2i-adapter-sketch-sdxl-1.0", | |
| "sdxl_lineart": "TencentARC/t2i-adapter-lineart-sdxl-1.0", | |
| "sdxl_depth-midas": "TencentARC/t2i-adapter-depth-midas-sdxl-1.0", | |
| "sdxl_openpose": "TencentARC/t2i-adapter-openpose-sdxl-1.0", | |
| #"sdxl_depth-zoe": "TencentARC/t2i-adapter-depth-zoe-sdxl-1.0", | |
| #"sdxl_recolor": "TencentARC/t2i-adapter-recolor-sdxl-1.0", | |
| "img2img": "Nothinghere", | |
| } | |
| # def download_all_controlnet_weights() -> None: | |
| # for model_id in CONTROLNET_MODEL_IDS.values(): | |
| # ControlNetModel.from_pretrained(model_id) | |
| SCHEDULER_CONFIG_MAP = { | |
| "DPM++ 2M": (DPMSolverMultistepScheduler, {}), | |
| "DPM++ 2M Karras": (DPMSolverMultistepScheduler, {"use_karras_sigmas": True}), | |
| "DPM++ 2M SDE": (DPMSolverMultistepScheduler, {"algorithm_type": "sde-dpmsolver++"}), | |
| "DPM++ 2M SDE Karras": (DPMSolverMultistepScheduler, {"use_karras_sigmas": True, "algorithm_type": "sde-dpmsolver++"}), | |
| "DPM++ SDE": (DPMSolverSinglestepScheduler, {}), | |
| "DPM++ SDE Karras": (DPMSolverSinglestepScheduler, {"use_karras_sigmas": True}), | |
| "DPM2": (KDPM2DiscreteScheduler, {}), | |
| "DPM2 Karras": (KDPM2DiscreteScheduler, {"use_karras_sigmas": True}), | |
| "DPM2 a" : (KDPM2AncestralDiscreteScheduler, {}), | |
| "DPM2 a Karras" : (KDPM2AncestralDiscreteScheduler, {"use_karras_sigmas": True}), | |
| "Euler": (EulerDiscreteScheduler, {}), | |
| "Euler a": (EulerAncestralDiscreteScheduler, {}), | |
| "Heun": (HeunDiscreteScheduler, {}), | |
| "LMS": (LMSDiscreteScheduler, {}), | |
| "LMS Karras": (LMSDiscreteScheduler, {"use_karras_sigmas": True}), | |
| "DDIM": (DDIMScheduler, {}), | |
| "DEIS": (DEISMultistepScheduler, {}), | |
| "UniPC": (UniPCMultistepScheduler, {}), | |
| "PNDM" : (PNDMScheduler, {}), | |
| "DPM++ 2M Lu": (DPMSolverMultistepScheduler, {"use_lu_lambdas": True}), | |
| "DPM++ 2M Ef": (DPMSolverMultistepScheduler, {"euler_at_final": True}), | |
| "DPM++ 2M SDE Lu": (DPMSolverMultistepScheduler, {"use_lu_lambdas": True, "algorithm_type": "sde-dpmsolver++"}), | |
| "DPM++ 2M SDE Ef": (DPMSolverMultistepScheduler, {"algorithm_type": "sde-dpmsolver++", "euler_at_final": True}), | |
| "EDMDPM": (EDMDPMSolverMultistepScheduler, {}), | |
| "EDMEuler": (EDMEulerScheduler, {}), | |
| "LCM" : (LCMScheduler, {}), | |
| } | |
| scheduler_names = list(SCHEDULER_CONFIG_MAP.keys()) | |
| def process_prompts_valid(specific_prompt, specific_negative_prompt, prompt, negative_prompt): | |
| specific_prompt_empty = (specific_prompt in [None, ""]) | |
| specific_negative_prompt_empty = (specific_negative_prompt in [None, ""]) | |
| prompt_valid = prompt if specific_prompt_empty else specific_prompt | |
| negative_prompt_valid = negative_prompt if specific_negative_prompt_empty else specific_negative_prompt | |
| return specific_prompt_empty, specific_negative_prompt_empty, prompt_valid, negative_prompt_valid | |
| class Model_Diffusers: | |
| def __init__( | |
| self, | |
| base_model_id: str = "runwayml/stable-diffusion-v1-5", | |
| task_name: str = "txt2img", | |
| vae_model=None, | |
| type_model_precision=torch.float16, | |
| sdxl_safetensors = False, | |
| ): | |
| self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") | |
| self.base_model_id = "" | |
| self.task_name = "" | |
| self.vae_model = None | |
| self.type_model_precision = ( | |
| type_model_precision if torch.cuda.is_available() else torch.float32 | |
| ) # For SD 1.5 | |
| self.load_pipe( | |
| base_model_id, task_name, vae_model, type_model_precision, sdxl_safetensors = sdxl_safetensors | |
| ) | |
| self.preprocessor = Preprocessor() | |
| self.styles_data = styles_data | |
| self.STYLE_NAMES = STYLE_NAMES | |
| self.style_json_file = "" | |
| def load_pipe( | |
| self, | |
| base_model_id: str, | |
| task_name="txt2img", | |
| vae_model=None, | |
| type_model_precision=torch.float16, | |
| reload=False, | |
| sdxl_safetensors = False, | |
| retain_model_in_memory = True, | |
| ) -> DiffusionPipeline: | |
| if ( | |
| base_model_id == self.base_model_id | |
| and task_name == self.task_name | |
| and hasattr(self, "pipe") | |
| and self.vae_model == vae_model | |
| and self.pipe is not None | |
| and reload == False | |
| ): | |
| if self.type_model_precision == type_model_precision or self.device.type == "cpu": | |
| return | |
| if hasattr(self, "pipe") and os.path.isfile(base_model_id): | |
| unload_model = False | |
| if self.pipe == None: | |
| unload_model = True | |
| elif type_model_precision != self.type_model_precision and self.device.type != "cpu": | |
| unload_model = True | |
| else: | |
| if hasattr(self, "pipe"): | |
| unload_model = False | |
| if self.pipe == None: | |
| unload_model = True | |
| else: | |
| unload_model = True | |
| self.type_model_precision = ( | |
| type_model_precision if torch.cuda.is_available() else torch.float32 | |
| ) | |
| if self.type_model_precision == torch.float32 and os.path.isfile(base_model_id): | |
| logger.info(f"Working with full precision {str(self.type_model_precision)}") | |
| # Load model | |
| if self.base_model_id == base_model_id and self.pipe is not None and reload == False and self.vae_model == vae_model and unload_model == False: | |
| #logger.info("Previous loaded base model") # not return | |
| class_name = self.class_name | |
| else: | |
| # Unload previous model and stuffs | |
| self.pipe = None | |
| self.model_memory = {} | |
| self.lora_memory = [None, None, None, None, None] | |
| self.lora_scale_memory = [1.0, 1.0, 1.0, 1.0, 1.0] | |
| self.LCMconfig = None | |
| self.embed_loaded = [] | |
| self.FreeU = False | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| # Load new model | |
| if os.path.isfile(base_model_id): # exists or not same # if os.path.exists(base_model_id): | |
| if sdxl_safetensors: | |
| logger.info("Default VAE: madebyollin/sdxl-vae-fp16-fix") | |
| self.pipe = StableDiffusionXLPipeline.from_single_file( | |
| base_model_id, | |
| vae=AutoencoderKL.from_pretrained( | |
| "madebyollin/sdxl-vae-fp16-fix", torch_dtype=torch.float16 | |
| ), | |
| torch_dtype=self.type_model_precision, | |
| ) | |
| class_name = "StableDiffusionXLPipeline" | |
| else: | |
| self.pipe = StableDiffusionPipeline.from_single_file( | |
| base_model_id, | |
| # vae=None | |
| # if vae_model == None | |
| # else AutoencoderKL.from_single_file( | |
| # vae_model | |
| # ), | |
| torch_dtype=self.type_model_precision, | |
| ) | |
| class_name = "StableDiffusionPipeline" | |
| else: | |
| file_config = hf_hub_download(repo_id=base_model_id, filename="model_index.json") | |
| # Reading data from the JSON file | |
| with open(file_config, 'r') as json_config: | |
| data_config = json.load(json_config) | |
| # Searching for the value of the "_class_name" key | |
| if '_class_name' in data_config: | |
| class_name = data_config['_class_name'] | |
| match class_name: | |
| case "StableDiffusionPipeline": | |
| self.pipe = StableDiffusionPipeline.from_pretrained( | |
| base_model_id, | |
| torch_dtype=self.type_model_precision, | |
| ) | |
| case "StableDiffusionXLPipeline": | |
| logger.info("Default VAE: madebyollin/sdxl-vae-fp16-fix") | |
| try: | |
| self.pipe = DiffusionPipeline.from_pretrained( | |
| base_model_id, | |
| vae=AutoencoderKL.from_pretrained( | |
| "madebyollin/sdxl-vae-fp16-fix", torch_dtype=torch.float16 | |
| ), | |
| torch_dtype=torch.float16, | |
| use_safetensors=True, | |
| variant="fp16", | |
| add_watermarker=False, | |
| ) | |
| except Exception as e: | |
| logger.debug(e) | |
| logger.debug("Loading model without parameter variant=fp16") | |
| self.pipe = DiffusionPipeline.from_pretrained( | |
| base_model_id, | |
| vae=AutoencoderKL.from_pretrained( | |
| "madebyollin/sdxl-vae-fp16-fix", torch_dtype=torch.float16 | |
| ), | |
| torch_dtype=torch.float16, | |
| use_safetensors=True, | |
| add_watermarker=False, | |
| ) | |
| self.base_model_id = base_model_id | |
| self.class_name = class_name | |
| # Load VAE after loaded model | |
| if vae_model is None : | |
| logger.debug("Default VAE") | |
| pass | |
| else: | |
| if os.path.isfile(vae_model): | |
| self.pipe.vae = AutoencoderKL.from_single_file( | |
| vae_model | |
| ) | |
| else: | |
| self.pipe.vae = AutoencoderKL.from_pretrained( | |
| vae_model, | |
| subfolder = "vae", | |
| ) | |
| try: | |
| self.pipe.vae.to(self.type_model_precision) | |
| except: | |
| logger.warning(f"VAE: not in {self.type_model_precision}") | |
| self.vae_model = vae_model | |
| # Define base scheduler | |
| self.default_scheduler = copy.deepcopy(self.pipe.scheduler) | |
| logger.debug(f"Base sampler: {self.default_scheduler}") | |
| if task_name in self.model_memory: | |
| self.pipe = self.model_memory[task_name] | |
| # Create new base values | |
| #self.pipe.to(self.device) | |
| # torch.cuda.empty_cache() | |
| # gc.collect() | |
| self.base_model_id = base_model_id | |
| self.task_name = task_name | |
| self.vae_model = vae_model | |
| self.class_name = class_name | |
| self.pipe.watermark = None | |
| return | |
| # Load task | |
| model_id = CONTROLNET_MODEL_IDS[task_name] | |
| if task_name == "inpaint": | |
| match class_name: | |
| case "StableDiffusionPipeline": | |
| controlnet = ControlNetModel.from_pretrained( | |
| model_id, torch_dtype=self.type_model_precision | |
| ) | |
| self.pipe = StableDiffusionControlNetInpaintPipeline( | |
| vae=self.pipe.vae, | |
| text_encoder=self.pipe.text_encoder, | |
| tokenizer=self.pipe.tokenizer, | |
| unet=self.pipe.unet, | |
| controlnet=controlnet, | |
| scheduler=self.pipe.scheduler, | |
| safety_checker=self.pipe.safety_checker, | |
| feature_extractor=self.pipe.feature_extractor, | |
| requires_safety_checker=self.pipe.config.requires_safety_checker, | |
| ) | |
| case "StableDiffusionXLPipeline": | |
| self.pipe = StableDiffusionXLInpaintPipeline( | |
| vae=self.pipe.vae, | |
| text_encoder=self.pipe.text_encoder, | |
| text_encoder_2=self.pipe.text_encoder_2, | |
| tokenizer=self.pipe.tokenizer, | |
| tokenizer_2=self.pipe.tokenizer_2, | |
| unet=self.pipe.unet, | |
| # controlnet=self.controlnet, | |
| scheduler=self.pipe.scheduler, | |
| ) | |
| if task_name not in ["txt2img", "inpaint", "img2img"]: | |
| match class_name: | |
| case "StableDiffusionPipeline": | |
| controlnet = ControlNetModel.from_pretrained( | |
| model_id, torch_dtype=self.type_model_precision | |
| ) | |
| self.pipe = StableDiffusionControlNetPipeline( | |
| vae=self.pipe.vae, | |
| text_encoder=self.pipe.text_encoder, | |
| tokenizer=self.pipe.tokenizer, | |
| unet=self.pipe.unet, | |
| controlnet=controlnet, | |
| scheduler=self.pipe.scheduler, | |
| safety_checker=self.pipe.safety_checker, | |
| feature_extractor=self.pipe.feature_extractor, | |
| requires_safety_checker=self.pipe.config.requires_safety_checker, | |
| ) | |
| self.pipe.scheduler = UniPCMultistepScheduler.from_config(self.pipe.scheduler.config) | |
| case "StableDiffusionXLPipeline": | |
| adapter = T2IAdapter.from_pretrained( | |
| model_id, | |
| torch_dtype=torch.float16, | |
| varient="fp16", | |
| ).to(self.device) | |
| self.pipe = StableDiffusionXLAdapterPipeline( | |
| vae=self.pipe.vae, | |
| text_encoder=self.pipe.text_encoder, | |
| text_encoder_2=self.pipe.text_encoder_2, | |
| tokenizer=self.pipe.tokenizer, | |
| tokenizer_2=self.pipe.tokenizer_2, | |
| unet=self.pipe.unet, | |
| adapter=adapter, | |
| scheduler=self.pipe.scheduler, | |
| ).to(self.device) | |
| if task_name in ["txt2img", "img2img"]: | |
| match class_name: | |
| case "StableDiffusionPipeline": | |
| self.pipe = StableDiffusionPipeline( | |
| vae=self.pipe.vae, | |
| text_encoder=self.pipe.text_encoder, | |
| tokenizer=self.pipe.tokenizer, | |
| unet=self.pipe.unet, | |
| scheduler=self.pipe.scheduler, | |
| safety_checker=self.pipe.safety_checker, | |
| feature_extractor=self.pipe.feature_extractor, | |
| requires_safety_checker=self.pipe.config.requires_safety_checker, | |
| ) | |
| case "StableDiffusionXLPipeline": | |
| self.pipe = StableDiffusionXLPipeline( | |
| vae=self.pipe.vae, | |
| text_encoder=self.pipe.text_encoder, | |
| text_encoder_2=self.pipe.text_encoder_2, | |
| tokenizer=self.pipe.tokenizer, | |
| tokenizer_2=self.pipe.tokenizer_2, | |
| unet=self.pipe.unet, | |
| scheduler=self.pipe.scheduler, | |
| ) | |
| if task_name == "img2img": | |
| self.pipe = AutoPipelineForImage2Image.from_pipe(self.pipe) | |
| # Create new base values | |
| self.pipe.to(self.device) | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| self.base_model_id = base_model_id | |
| self.task_name = task_name | |
| self.vae_model = vae_model | |
| self.class_name = class_name | |
| if self.class_name == "StableDiffusionXLPipeline": | |
| self.pipe.enable_vae_slicing() | |
| self.pipe.enable_vae_tiling() | |
| self.pipe.watermark = None | |
| if retain_model_in_memory == True and task_name not in self.model_memory: | |
| self.model_memory[task_name] = self.pipe | |
| return | |
| def load_controlnet_weight(self, task_name: str) -> None: | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| model_id = CONTROLNET_MODEL_IDS[task_name] | |
| controlnet = ControlNetModel.from_pretrained( | |
| model_id, torch_dtype=self.type_model_precision | |
| ) | |
| controlnet.to(self.device) | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| self.pipe.controlnet = controlnet | |
| #self.task_name = task_name | |
| def run_pipe( | |
| self, | |
| prompt: str, | |
| negative_prompt: str, | |
| prompt_embeds, | |
| negative_prompt_embeds, | |
| control_image: PIL.Image.Image, | |
| num_images: int, | |
| num_steps: int, | |
| guidance_scale: float, | |
| clip_skip: int, | |
| generator, | |
| controlnet_conditioning_scale, | |
| control_guidance_start, | |
| control_guidance_end, | |
| ) -> list[PIL.Image.Image]: | |
| # Return PIL images | |
| # generator = torch.Generator().manual_seed(seed) | |
| return self.pipe( | |
| prompt=prompt, | |
| negative_prompt=negative_prompt, | |
| prompt_embeds=prompt_embeds, | |
| negative_prompt_embeds=negative_prompt_embeds, | |
| guidance_scale=guidance_scale, | |
| clip_skip=clip_skip, | |
| num_images_per_prompt=num_images, | |
| num_inference_steps=num_steps, | |
| generator=generator, | |
| controlnet_conditioning_scale=controlnet_conditioning_scale, | |
| control_guidance_start=control_guidance_start, | |
| control_guidance_end=control_guidance_end, | |
| image=control_image, | |
| ).images | |
| def run_pipe_SD( | |
| self, | |
| prompt: str, | |
| negative_prompt: str, | |
| prompt_embeds, | |
| negative_prompt_embeds, | |
| num_images: int, | |
| num_steps: int, | |
| guidance_scale: float, | |
| clip_skip: int, | |
| height: int, | |
| width: int, | |
| generator, | |
| ) -> list[PIL.Image.Image]: | |
| # Return PIL images | |
| # generator = torch.Generator().manual_seed(seed) | |
| self.preview_handle = None | |
| return self.pipe( | |
| prompt=prompt, | |
| negative_prompt=negative_prompt, | |
| prompt_embeds=prompt_embeds, | |
| negative_prompt_embeds=negative_prompt_embeds, | |
| guidance_scale=guidance_scale, | |
| clip_skip=clip_skip, | |
| num_images_per_prompt=num_images, | |
| num_inference_steps=num_steps, | |
| generator=generator, | |
| height=height, | |
| width=width, | |
| callback=self.callback_pipe if self.image_previews else None, | |
| callback_steps=10 if self.image_previews else 100, | |
| ).images | |
| # @torch.autocast('cuda') | |
| # def run_pipe_SDXL( | |
| # self, | |
| # prompt: str, | |
| # negative_prompt: str, | |
| # prompt_embeds, | |
| # negative_prompt_embeds, | |
| # num_images: int, | |
| # num_steps: int, | |
| # guidance_scale: float, | |
| # clip_skip: int, | |
| # height : int, | |
| # width : int, | |
| # generator, | |
| # seddd, | |
| # conditioning, | |
| # pooled, | |
| # ) -> list[PIL.Image.Image]: | |
| # # Return PIL images | |
| # #generator = torch.Generator("cuda").manual_seed(seddd) # generator = torch.Generator("cuda").manual_seed(seed), | |
| # return self.pipe( | |
| # prompt = None, | |
| # negative_prompt = None, | |
| # prompt_embeds=conditioning[0:1], | |
| # pooled_prompt_embeds=pooled[0:1], | |
| # negative_prompt_embeds=conditioning[1:2], | |
| # negative_pooled_prompt_embeds=pooled[1:2], | |
| # height = height, | |
| # width = width, | |
| # num_inference_steps = num_steps, | |
| # guidance_scale = guidance_scale, | |
| # clip_skip = clip_skip, | |
| # num_images_per_prompt = num_images, | |
| # generator = generator, | |
| # ).images | |
| def run_pipe_inpaint( | |
| self, | |
| prompt: str, | |
| negative_prompt: str, | |
| prompt_embeds, | |
| negative_prompt_embeds, | |
| control_image: PIL.Image.Image, | |
| num_images: int, | |
| num_steps: int, | |
| guidance_scale: float, | |
| clip_skip: int, | |
| strength: float, | |
| init_image, | |
| control_mask, | |
| controlnet_conditioning_scale, | |
| control_guidance_start, | |
| control_guidance_end, | |
| generator, | |
| ) -> list[PIL.Image.Image]: | |
| # Return PIL images | |
| # generator = torch.Generator().manual_seed(seed) | |
| return self.pipe( | |
| prompt=None, | |
| negative_prompt=None, | |
| prompt_embeds=prompt_embeds, | |
| negative_prompt_embeds=negative_prompt_embeds, | |
| eta=1.0, | |
| strength=strength, | |
| image=init_image, # original image | |
| mask_image=control_mask, # mask, values of 0 to 255 | |
| control_image=control_image, # tensor control image | |
| num_images_per_prompt=num_images, | |
| num_inference_steps=num_steps, | |
| guidance_scale=guidance_scale, | |
| clip_skip=clip_skip, | |
| generator=generator, | |
| controlnet_conditioning_scale=controlnet_conditioning_scale, | |
| control_guidance_start=control_guidance_start, | |
| control_guidance_end=control_guidance_end, | |
| ).images | |
| def run_pipe_img2img( | |
| self, | |
| prompt: str, | |
| negative_prompt: str, | |
| prompt_embeds, | |
| negative_prompt_embeds, | |
| num_images: int, | |
| num_steps: int, | |
| guidance_scale: float, | |
| clip_skip: int, | |
| strength: float, | |
| init_image, | |
| generator, | |
| ) -> list[PIL.Image.Image]: | |
| # Return PIL images | |
| # generator = torch.Generator().manual_seed(seed) | |
| return self.pipe( | |
| prompt=None, | |
| negative_prompt=None, | |
| prompt_embeds=prompt_embeds, | |
| negative_prompt_embeds=negative_prompt_embeds, | |
| eta=1.0, | |
| strength=strength, | |
| image=init_image, # original image | |
| num_images_per_prompt=num_images, | |
| num_inference_steps=num_steps, | |
| guidance_scale=guidance_scale, | |
| clip_skip=clip_skip, | |
| generator=generator, | |
| ).images | |
| ### self.x_process return image_preprocessor### | |
| def process_canny( | |
| self, | |
| image: np.ndarray, | |
| image_resolution: int, | |
| preprocess_resolution: int, | |
| low_threshold: int, | |
| high_threshold: int, | |
| ) -> list[PIL.Image.Image]: | |
| if image is None: | |
| raise ValueError | |
| self.preprocessor.load("Canny") | |
| control_image = self.preprocessor( | |
| image=image, | |
| low_threshold=low_threshold, | |
| high_threshold=high_threshold, | |
| image_resolution=image_resolution, | |
| detect_resolution=preprocess_resolution, | |
| ) | |
| return control_image | |
| def process_mlsd( | |
| self, | |
| image: np.ndarray, | |
| image_resolution: int, | |
| preprocess_resolution: int, | |
| value_threshold: float, | |
| distance_threshold: float, | |
| ) -> list[PIL.Image.Image]: | |
| if image is None: | |
| raise ValueError | |
| self.preprocessor.load("MLSD") | |
| control_image = self.preprocessor( | |
| image=image, | |
| image_resolution=image_resolution, | |
| detect_resolution=preprocess_resolution, | |
| thr_v=value_threshold, | |
| thr_d=distance_threshold, | |
| ) | |
| return control_image | |
| def process_scribble( | |
| self, | |
| image: np.ndarray, | |
| image_resolution: int, | |
| preprocess_resolution: int, | |
| preprocessor_name: str, | |
| ) -> list[PIL.Image.Image]: | |
| if image is None: | |
| raise ValueError | |
| if preprocessor_name == "None": | |
| image = HWC3(image) | |
| image = resize_image(image, resolution=image_resolution) | |
| control_image = PIL.Image.fromarray(image) | |
| elif preprocessor_name == "HED": | |
| self.preprocessor.load(preprocessor_name) | |
| control_image = self.preprocessor( | |
| image=image, | |
| image_resolution=image_resolution, | |
| detect_resolution=preprocess_resolution, | |
| scribble=False, | |
| ) | |
| elif preprocessor_name == "PidiNet": | |
| self.preprocessor.load(preprocessor_name) | |
| control_image = self.preprocessor( | |
| image=image, | |
| image_resolution=image_resolution, | |
| detect_resolution=preprocess_resolution, | |
| safe=False, | |
| ) | |
| return control_image | |
| def process_scribble_interactive( | |
| self, | |
| image_and_mask: dict[str, np.ndarray], | |
| image_resolution: int, | |
| ) -> list[PIL.Image.Image]: | |
| if image_and_mask is None: | |
| raise ValueError | |
| image = image_and_mask["mask"] | |
| image = HWC3(image) | |
| image = resize_image(image, resolution=image_resolution) | |
| control_image = PIL.Image.fromarray(image) | |
| return control_image | |
| def process_softedge( | |
| self, | |
| image: np.ndarray, | |
| image_resolution: int, | |
| preprocess_resolution: int, | |
| preprocessor_name: str, | |
| ) -> list[PIL.Image.Image]: | |
| if image is None: | |
| raise ValueError | |
| if preprocessor_name == "None": | |
| image = HWC3(image) | |
| image = resize_image(image, resolution=image_resolution) | |
| control_image = PIL.Image.fromarray(image) | |
| elif preprocessor_name in ["HED", "HED safe"]: | |
| safe = "safe" in preprocessor_name | |
| self.preprocessor.load("HED") | |
| control_image = self.preprocessor( | |
| image=image, | |
| image_resolution=image_resolution, | |
| detect_resolution=preprocess_resolution, | |
| scribble=safe, | |
| ) | |
| elif preprocessor_name in ["PidiNet", "PidiNet safe"]: | |
| safe = "safe" in preprocessor_name | |
| self.preprocessor.load("PidiNet") | |
| control_image = self.preprocessor( | |
| image=image, | |
| image_resolution=image_resolution, | |
| detect_resolution=preprocess_resolution, | |
| safe=safe, | |
| ) | |
| else: | |
| raise ValueError | |
| return control_image | |
| def process_openpose( | |
| self, | |
| image: np.ndarray, | |
| image_resolution: int, | |
| preprocess_resolution: int, | |
| preprocessor_name: str, | |
| ) -> list[PIL.Image.Image]: | |
| if image is None: | |
| raise ValueError | |
| if preprocessor_name == "None": | |
| image = HWC3(image) | |
| image = resize_image(image, resolution=image_resolution) | |
| control_image = PIL.Image.fromarray(image) | |
| else: | |
| self.preprocessor.load("Openpose") | |
| control_image = self.preprocessor( | |
| image=image, | |
| image_resolution=image_resolution, | |
| detect_resolution=preprocess_resolution, | |
| hand_and_face=True, | |
| ) | |
| return control_image | |
| def process_segmentation( | |
| self, | |
| image: np.ndarray, | |
| image_resolution: int, | |
| preprocess_resolution: int, | |
| preprocessor_name: str, | |
| ) -> list[PIL.Image.Image]: | |
| if image is None: | |
| raise ValueError | |
| if preprocessor_name == "None": | |
| image = HWC3(image) | |
| image = resize_image(image, resolution=image_resolution) | |
| control_image = PIL.Image.fromarray(image) | |
| else: | |
| self.preprocessor.load(preprocessor_name) | |
| control_image = self.preprocessor( | |
| image=image, | |
| image_resolution=image_resolution, | |
| detect_resolution=preprocess_resolution, | |
| ) | |
| return control_image | |
| def process_depth( | |
| self, | |
| image: np.ndarray, | |
| image_resolution: int, | |
| preprocess_resolution: int, | |
| preprocessor_name: str, | |
| ) -> list[PIL.Image.Image]: | |
| if image is None: | |
| raise ValueError | |
| if preprocessor_name == "None": | |
| image = HWC3(image) | |
| image = resize_image(image, resolution=image_resolution) | |
| control_image = PIL.Image.fromarray(image) | |
| else: | |
| self.preprocessor.load(preprocessor_name) | |
| control_image = self.preprocessor( | |
| image=image, | |
| image_resolution=image_resolution, | |
| detect_resolution=preprocess_resolution, | |
| ) | |
| return control_image | |
| def process_normal( | |
| self, | |
| image: np.ndarray, | |
| image_resolution: int, | |
| preprocess_resolution: int, | |
| preprocessor_name: str, | |
| ) -> list[PIL.Image.Image]: | |
| if image is None: | |
| raise ValueError | |
| if preprocessor_name == "None": | |
| image = HWC3(image) | |
| image = resize_image(image, resolution=image_resolution) | |
| control_image = PIL.Image.fromarray(image) | |
| else: | |
| self.preprocessor.load("NormalBae") | |
| control_image = self.preprocessor( | |
| image=image, | |
| image_resolution=image_resolution, | |
| detect_resolution=preprocess_resolution, | |
| ) | |
| return control_image | |
| def process_lineart( | |
| self, | |
| image: np.ndarray, | |
| image_resolution: int, | |
| preprocess_resolution: int, | |
| preprocessor_name: str, | |
| ) -> list[PIL.Image.Image]: | |
| if image is None: | |
| raise ValueError | |
| if preprocessor_name in ["None", "None (anime)"]: | |
| image = HWC3(image) | |
| image = resize_image(image, resolution=image_resolution) | |
| control_image = PIL.Image.fromarray(image) | |
| elif preprocessor_name in ["Lineart", "Lineart coarse"]: | |
| coarse = "coarse" in preprocessor_name | |
| self.preprocessor.load("Lineart") | |
| control_image = self.preprocessor( | |
| image=image, | |
| image_resolution=image_resolution, | |
| detect_resolution=preprocess_resolution, | |
| coarse=coarse, | |
| ) | |
| elif preprocessor_name == "Lineart (anime)": | |
| self.preprocessor.load("LineartAnime") | |
| control_image = self.preprocessor( | |
| image=image, | |
| image_resolution=image_resolution, | |
| detect_resolution=preprocess_resolution, | |
| ) | |
| if self.class_name == "StableDiffusionPipeline": | |
| if "anime" in preprocessor_name: | |
| self.load_controlnet_weight("lineart_anime") | |
| logger.info("Linear anime") | |
| else: | |
| self.load_controlnet_weight("lineart") | |
| return control_image | |
| def process_shuffle( | |
| self, | |
| image: np.ndarray, | |
| image_resolution: int, | |
| preprocessor_name: str, | |
| ) -> list[PIL.Image.Image]: | |
| if image is None: | |
| raise ValueError | |
| if preprocessor_name == "None": | |
| image = HWC3(image) | |
| image = resize_image(image, resolution=image_resolution) | |
| control_image = PIL.Image.fromarray(image) | |
| else: | |
| self.preprocessor.load(preprocessor_name) | |
| control_image = self.preprocessor( | |
| image=image, | |
| image_resolution=image_resolution, | |
| ) | |
| return control_image | |
| def process_ip2p( | |
| self, | |
| image: np.ndarray, | |
| image_resolution: int, | |
| ) -> list[PIL.Image.Image]: | |
| if image is None: | |
| raise ValueError | |
| image = HWC3(image) | |
| image = resize_image(image, resolution=image_resolution) | |
| control_image = PIL.Image.fromarray(image) | |
| return control_image | |
| def process_inpaint( | |
| self, | |
| image: np.ndarray, | |
| image_resolution: int, | |
| preprocess_resolution: int, | |
| image_mask: str, ### | |
| ) -> list[PIL.Image.Image]: | |
| if image is None: | |
| raise ValueError | |
| image = HWC3(image) | |
| image = resize_image(image, resolution=image_resolution) | |
| init_image = PIL.Image.fromarray(image) | |
| image_mask = HWC3(image_mask) | |
| image_mask = resize_image(image_mask, resolution=image_resolution) | |
| control_mask = PIL.Image.fromarray(image_mask) | |
| control_image = make_inpaint_condition(init_image, control_mask) | |
| return init_image, control_mask, control_image | |
| def process_img2img( | |
| self, | |
| image: np.ndarray, | |
| image_resolution: int, | |
| ) -> list[PIL.Image.Image]: | |
| if image is None: | |
| raise ValueError | |
| image = HWC3(image) | |
| image = resize_image(image, resolution=image_resolution) | |
| init_image = PIL.Image.fromarray(image) | |
| return init_image | |
| def get_scheduler(self, name): | |
| if name in SCHEDULER_CONFIG_MAP: | |
| scheduler_class, config = SCHEDULER_CONFIG_MAP[name] | |
| #return scheduler_class.from_config(self.pipe.scheduler.config, **config) | |
| # beta self.default_scheduler | |
| return scheduler_class.from_config(self.default_scheduler.config, **config) | |
| else: | |
| raise ValueError(f"Scheduler with name {name} not found. Valid schedulers: {', '.join(scheduler_names)}") | |
| def create_prompt_embeds( | |
| self, | |
| prompt, | |
| negative_prompt, | |
| textual_inversion, | |
| clip_skip, | |
| syntax_weights, | |
| ): | |
| if self.class_name == "StableDiffusionPipeline": | |
| if self.embed_loaded != textual_inversion and textual_inversion != []: | |
| # Textual Inversion | |
| for name, directory_name in textual_inversion: | |
| try: | |
| if directory_name.endswith(".pt"): | |
| model = torch.load(directory_name, map_location=self.device) | |
| model_tensors = model.get("string_to_param").get("*") | |
| s_model = {"emb_params": model_tensors} | |
| # save_file(s_model, directory_name[:-3] + '.safetensors') | |
| self.pipe.load_textual_inversion(s_model, token=name) | |
| else: | |
| # self.pipe.text_encoder.resize_token_embeddings(len(self.pipe.tokenizer),pad_to_multiple_of=128) | |
| # self.pipe.load_textual_inversion("./bad_prompt.pt", token="baddd") | |
| self.pipe.load_textual_inversion(directory_name, token=name) | |
| if not self.gui_active: | |
| logger.info(f"Applied : {name}") | |
| except Exception as e: | |
| exception = str(e) | |
| if name in exception: | |
| logger.debug(f"Previous loaded embed {name}") | |
| else: | |
| logger.error(exception) | |
| logger.error(f"Can't apply embed {name}") | |
| self.embed_loaded = textual_inversion | |
| # Clip skip | |
| # clip_skip_diffusers = None #clip_skip - 1 # future update | |
| if not hasattr(self, "compel"): | |
| self.compel = Compel( | |
| tokenizer=self.pipe.tokenizer, | |
| text_encoder=self.pipe.text_encoder, | |
| truncate_long_prompts=False, | |
| returned_embeddings_type=ReturnedEmbeddingsType.PENULTIMATE_HIDDEN_STATES_NORMALIZED if clip_skip else ReturnedEmbeddingsType.LAST_HIDDEN_STATES_NORMALIZED, | |
| ) | |
| # Prompt weights for textual inversion | |
| prompt_ti = self.pipe.maybe_convert_prompt(prompt, self.pipe.tokenizer) | |
| negative_prompt_ti = self.pipe.maybe_convert_prompt( | |
| negative_prompt, self.pipe.tokenizer | |
| ) | |
| # separate the multi-vector textual inversion by comma | |
| if self.embed_loaded != []: | |
| prompt_ti = add_comma_after_pattern_ti(prompt_ti) | |
| negative_prompt_ti = add_comma_after_pattern_ti(negative_prompt_ti) | |
| # Syntax weights | |
| self.pipe.to(self.device) | |
| if syntax_weights == "Classic": | |
| prompt_emb = get_embed_new(prompt_ti, self.pipe, self.compel) | |
| negative_prompt_emb = get_embed_new(negative_prompt_ti, self.pipe, self.compel) | |
| else: | |
| prompt_emb = get_embed_new(prompt_ti, self.pipe, self.compel, compel_process_sd=True) | |
| negative_prompt_emb = get_embed_new(negative_prompt_ti, self.pipe, self.compel, compel_process_sd=True) | |
| # Fix error shape | |
| if prompt_emb.shape != negative_prompt_emb.shape: | |
| ( | |
| prompt_emb, | |
| negative_prompt_emb, | |
| ) = self.compel.pad_conditioning_tensors_to_same_length( | |
| [prompt_emb, negative_prompt_emb] | |
| ) | |
| return prompt_emb, negative_prompt_emb | |
| else: | |
| # SDXL embed | |
| if self.embed_loaded != textual_inversion and textual_inversion != []: | |
| # Textual Inversion | |
| for name, directory_name in textual_inversion: | |
| try: | |
| from safetensors.torch import load_file | |
| state_dict = load_file(directory_name) | |
| self.pipe.load_textual_inversion(state_dict["clip_g"], token=name, text_encoder=self.pipe.text_encoder_2, tokenizer=self.pipe.tokenizer_2) | |
| self.pipe.load_textual_inversion(state_dict["clip_l"], token=name, text_encoder=self.pipe.text_encoder, tokenizer=self.pipe.tokenizer) | |
| if not self.gui_active: | |
| logger.info(f"Applied : {name}") | |
| except Exception as e: | |
| exception = str(e) | |
| if name in exception: | |
| logger.debug(f"Previous loaded embed {name}") | |
| else: | |
| logger.error(exception) | |
| logger.error(f"Can't apply embed {name}") | |
| self.embed_loaded = textual_inversion | |
| if not hasattr(self, "compel"): | |
| # Clip skip | |
| if clip_skip: | |
| # clip_skip_diffusers = None #clip_skip - 1 # future update | |
| self.compel = Compel( | |
| tokenizer=[self.pipe.tokenizer, self.pipe.tokenizer_2], | |
| text_encoder=[self.pipe.text_encoder, self.pipe.text_encoder_2], | |
| returned_embeddings_type=ReturnedEmbeddingsType.PENULTIMATE_HIDDEN_STATES_NON_NORMALIZED, | |
| requires_pooled=[False, True], | |
| truncate_long_prompts=False, | |
| ) | |
| else: | |
| # clip_skip_diffusers = None # clip_skip = None # future update | |
| self.compel = Compel( | |
| tokenizer=[self.pipe.tokenizer, self.pipe.tokenizer_2], | |
| text_encoder=[self.pipe.text_encoder, self.pipe.text_encoder_2], | |
| requires_pooled=[False, True], | |
| truncate_long_prompts=False, | |
| ) | |
| # Prompt weights for textual inversion | |
| try: | |
| prompt_ti = self.pipe.maybe_convert_prompt(prompt, self.pipe.tokenizer) | |
| negative_prompt_ti = self.pipe.maybe_convert_prompt(negative_prompt, self.pipe.tokenizer) | |
| except: | |
| prompt_ti = prompt | |
| negative_prompt_ti = negative_prompt | |
| logger.error("FAILED: Convert prompt for textual inversion") | |
| # prompt syntax style a1... | |
| if syntax_weights == "Classic": | |
| self.pipe.to("cuda") | |
| prompt_ti = get_embed_new(prompt_ti, self.pipe, self.compel, only_convert_string=True) | |
| negative_prompt_ti = get_embed_new(negative_prompt_ti, self.pipe, self.compel, only_convert_string=True) | |
| else: | |
| prompt_ti = prompt | |
| negative_prompt_ti = negative_prompt | |
| conditioning, pooled = self.compel([prompt_ti, negative_prompt_ti]) | |
| return conditioning, pooled | |
| def process_lora(self, select_lora, lora_weights_scale, unload=False): | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| if not unload: | |
| if select_lora != None: | |
| try: | |
| self.pipe = lora_mix_load( | |
| self.pipe, | |
| select_lora, | |
| lora_weights_scale, | |
| device=device, | |
| dtype=self.type_model_precision, | |
| ) | |
| logger.info(select_lora) | |
| except Exception as e: | |
| logger.error(f"ERROR: LoRA not compatible: {select_lora}") | |
| logger.debug(f"{str(e)}") | |
| return self.pipe | |
| else: | |
| # Unload numerically unstable but fast and need less memory | |
| if select_lora != None: | |
| try: | |
| self.pipe = lora_mix_load( | |
| self.pipe, | |
| select_lora, | |
| -lora_weights_scale, | |
| device=device, | |
| dtype=self.type_model_precision, | |
| ) | |
| logger.debug(f"Unload LoRA: {select_lora}") | |
| except: | |
| pass | |
| return self.pipe | |
| def load_style_file(self, style_json_file): | |
| if os.path.exists(style_json_file): | |
| try: | |
| file_json_read = get_json_content(style_json_file) | |
| self.styles_data = {k["name"]: (k["prompt"], k["negative_prompt"]) for k in file_json_read} | |
| self.STYLE_NAMES = list(self.styles_data.keys()) | |
| self.style_json_file = style_json_file | |
| logger.info(f"Styles json file loaded with {len(self.STYLE_NAMES)} styles") | |
| logger.debug(str(self.STYLE_NAMES)) | |
| except Exception as e: | |
| logger.error(str(e)) | |
| else: | |
| logger.error("Not found styles json file in directory") | |
| def callback_pipe(self, iter, t, latents): | |
| # convert latents to image | |
| with torch.no_grad(): | |
| latents = 1 / 0.18215 * latents | |
| image = self.pipe.vae.decode(latents).sample | |
| image = (image / 2 + 0.5).clamp(0, 1) | |
| # we always cast to float32 as this does not cause significant overhead and is compatible with bfloa16 | |
| image = image.cpu().permute(0, 2, 3, 1).float().numpy() | |
| # convert to PIL Images | |
| image = self.pipe.numpy_to_pil(image) | |
| # show one image | |
| # global preview_handle | |
| if self.preview_handle == None: | |
| self.preview_handle = display(image[0], display_id=True) | |
| else: | |
| self.preview_handle.update(image[0]) | |
| def __call__( | |
| self, | |
| prompt: str = "", | |
| negative_prompt: str = "", | |
| img_height: int = 512, | |
| img_width: int = 512, | |
| num_images: int = 1, | |
| num_steps: int = 30, | |
| guidance_scale: float = 7.5, | |
| clip_skip: Optional[bool] = True, | |
| seed: int = -1, | |
| sampler: str = "DPM++ 2M", | |
| syntax_weights: str = "Classic", | |
| lora_A: Optional[str] = None, | |
| lora_scale_A: float = 1.0, | |
| lora_B: Optional[str] = None, | |
| lora_scale_B: float = 1.0, | |
| lora_C: Optional[str] = None, | |
| lora_scale_C: float = 1.0, | |
| lora_D: Optional[str] = None, | |
| lora_scale_D: float = 1.0, | |
| lora_E: Optional[str] = None, | |
| lora_scale_E: float = 1.0, | |
| textual_inversion: List[Tuple[str, str]] = [], | |
| FreeU: bool = False, | |
| adetailer_A: bool = False, | |
| adetailer_A_params: Dict[str, Any] = {}, | |
| adetailer_B: bool = False, | |
| adetailer_B_params: Dict[str, Any] = {}, | |
| style_prompt: Optional[Any] = [""], | |
| style_json_file: Optional[Any] = "", | |
| image: Optional[Any] = None, | |
| preprocessor_name: Optional[str] = "None", | |
| preprocess_resolution: int = 512, | |
| image_resolution: int = 512, | |
| image_mask: Optional[Any] = None, | |
| strength: float = 0.35, | |
| low_threshold: int = 100, | |
| high_threshold: int = 200, | |
| value_threshold: float = 0.1, | |
| distance_threshold: float = 0.1, | |
| controlnet_conditioning_scale: float = 1.0, | |
| control_guidance_start: float = 0.0, | |
| control_guidance_end: float = 1.0, | |
| t2i_adapter_preprocessor: bool = True, | |
| t2i_adapter_conditioning_scale: float = 1.0, | |
| t2i_adapter_conditioning_factor: float = 1.0, | |
| upscaler_model_path: Optional[str] = None, # add latent | |
| upscaler_increases_size: float = 1.5, | |
| esrgan_tile: int = 100, | |
| esrgan_tile_overlap: int = 10, | |
| hires_steps: int = 25, | |
| hires_denoising_strength: float = 0.35, | |
| hires_prompt: str = "", | |
| hires_negative_prompt: str = "", | |
| hires_sampler: str = "Use same sampler", | |
| loop_generation: int = 1, | |
| display_images: bool = False, | |
| save_generated_images: bool = True, | |
| image_storage_location: str = "./images", | |
| generator_in_cpu: bool = False, | |
| leave_progress_bar: bool = False, | |
| disable_progress_bar: bool = False, | |
| hires_before_adetailer: bool = False, | |
| hires_after_adetailer: bool = True, | |
| retain_compel_previous_load: bool = False, | |
| retain_detailfix_model_previous_load: bool = False, | |
| retain_hires_model_previous_load: bool = False, | |
| image_previews: bool = False, | |
| xformers_memory_efficient_attention: bool = False, | |
| gui_active: bool = False, | |
| ): | |
| """ | |
| The call function for the generation. | |
| Args: | |
| prompt (str , optional): | |
| The prompt or prompts to guide image generation. | |
| negative_prompt (str , optional): | |
| The prompt or prompts to guide what to not include in image generation. Ignored when not using guidance (`guidance_scale < 1`). | |
| img_height (int, optional, defaults to 512): | |
| The height in pixels of the generated image. | |
| img_width (int, optional, defaults to 512): | |
| The width in pixels of the generated image. | |
| num_images (int, optional, defaults to 1): | |
| The number of images to generate per prompt. | |
| num_steps (int, optional, defaults to 30): | |
| The number of denoising steps. More denoising steps usually lead to a higher quality image at the | |
| expense of slower inference. | |
| guidance_scale (float, optional, defaults to 7.5): | |
| A higher guidance scale value encourages the model to generate images closely linked to the text | |
| `prompt` at the expense of lower image quality. Guidance scale is enabled when `guidance_scale > 1`. | |
| clip_skip (bool, optional): | |
| Number of layers to be skipped from CLIP while computing the prompt embeddings. It can be placed on | |
| the penultimate (True) or last layer (False). | |
| seed (int, optional, defaults to -1): | |
| A seed for controlling the randomness of the image generation process. -1 design a random seed. | |
| sampler (str, optional, defaults to "DPM++ 2M"): | |
| The sampler used for the generation process. Available samplers: DPM++ 2M, DPM++ 2M Karras, DPM++ 2M SDE, | |
| DPM++ 2M SDE Karras, DPM++ SDE, DPM++ SDE Karras, DPM2, DPM2 Karras, Euler, Euler a, Heun, LMS, LMS Karras, | |
| DDIM, DEIS, UniPC, DPM2 a, DPM2 a Karras, PNDM, LCM, DPM++ 2M Lu, DPM++ 2M Ef, DPM++ 2M SDE Lu and DPM++ 2M SDE Ef. | |
| syntax_weights (str, optional, defaults to "Classic"): | |
| Specifies the type of syntax weights used during generation. "Classic" is (word:weight), "Compel" is (word)weight | |
| lora_A (str, optional): | |
| Placeholder for lora A parameter. | |
| lora_scale_A (float, optional, defaults to 1.0): | |
| Placeholder for lora scale A parameter. | |
| lora_B (str, optional): | |
| Placeholder for lora B parameter. | |
| lora_scale_B (float, optional, defaults to 1.0): | |
| Placeholder for lora scale B parameter. | |
| lora_C (str, optional): | |
| Placeholder for lora C parameter. | |
| lora_scale_C (float, optional, defaults to 1.0): | |
| Placeholder for lora scale C parameter. | |
| lora_D (str, optional): | |
| Placeholder for lora D parameter. | |
| lora_scale_D (float, optional, defaults to 1.0): | |
| Placeholder for lora scale D parameter. | |
| lora_E (str, optional): | |
| Placeholder for lora E parameter. | |
| lora_scale_E (float, optional, defaults to 1.0): | |
| Placeholder for lora scale E parameter. | |
| textual_inversion (List[Tuple[str, str]], optional, defaults to []): | |
| Placeholder for textual inversion list of tuples. Help the model to adapt to a particular | |
| style. [("<token_activation>","<path_embeding>"),...] | |
| FreeU (bool, optional, defaults to False): | |
| Is a method that substantially improves diffusion model sample quality at no costs. | |
| adetailer_A (bool, optional, defaults to False): | |
| Guided Inpainting to Correct Image, it is preferable to use low values for strength. | |
| adetailer_A_params (Dict[str, Any], optional, defaults to {}): | |
| Placeholder for adetailer_A parameters in a dict example {"prompt": "my prompt", "inpaint_only": True ...}. | |
| If not specified, default values will be used: | |
| - face_detector_ad (bool): Indicates whether face detection is enabled. Defaults to True. | |
| - person_detector_ad (bool): Indicates whether person detection is enabled. Defaults to True. | |
| - hand_detector_ad (bool): Indicates whether hand detection is enabled. Defaults to False. | |
| - prompt (str): A prompt for the adetailer_A. Defaults to an empty string. | |
| - negative_prompt (str): A negative prompt for the adetailer_A. Defaults to an empty string. | |
| - strength (float): The strength parameter value. Defaults to 0.35. | |
| - mask_dilation (int): The mask dilation value. Defaults to 4. | |
| - mask_blur (int): The mask blur value. Defaults to 4. | |
| - mask_padding (int): The mask padding value. Defaults to 32. | |
| - inpaint_only (bool): Indicates if only inpainting is to be performed. Defaults to True. False is img2img mode | |
| - sampler (str): The sampler type to be used. Defaults to "Use same sampler". | |
| adetailer_B (bool, optional, defaults to False): | |
| Guided Inpainting to Correct Image, it is preferable to use low values for strength. | |
| adetailer_B_params (Dict[str, Any], optional, defaults to {}): | |
| Placeholder for adetailer_B parameters in a dict example {"prompt": "my prompt", "inpaint_only": True ...}. | |
| If not specified, default values will be used. | |
| style_prompt (str, optional): | |
| If a style that is in STYLE_NAMES is specified, it will be added to the original prompt and negative prompt. | |
| style_json_file (str, optional): | |
| JSON with styles to be applied and used in style_prompt. | |
| upscaler_model_path (str, optional): | |
| Placeholder for upscaler model path. | |
| upscaler_increases_size (float, optional, defaults to 1.5): | |
| Placeholder for upscaler increases size parameter. | |
| esrgan_tile (int, optional, defaults to 100): | |
| Tile if use a ESRGAN model. | |
| esrgan_tile_overlap (int, optional, defaults to 100): | |
| Tile overlap if use a ESRGAN model. | |
| hires_steps (int, optional, defaults to 25): | |
| The number of denoising steps for hires. More denoising steps usually lead to a higher quality image at the | |
| expense of slower inference. | |
| hires_denoising_strength (float, optional, defaults to 0.35): | |
| Strength parameter for the hires. | |
| hires_prompt (str , optional): | |
| The prompt for hires. If not specified, the main prompt will be used. | |
| hires_negative_prompt (str , optional): | |
| The negative prompt for hires. If not specified, the main negative prompt will be used. | |
| hires_sampler (str, optional, defaults to "Use same sampler"): | |
| The sampler used for the hires generation process. If not specified, the main sampler will be used. | |
| image (Any, optional): | |
| The image to be used for the Inpaint, ControlNet, or T2I adapter. | |
| preprocessor_name (str, optional, defaults to "None"): | |
| Preprocessor name for ControlNet. | |
| preprocess_resolution (int, optional, defaults to 512): | |
| Preprocess resolution for the Inpaint, ControlNet, or T2I adapter. | |
| image_resolution (int, optional, defaults to 512): | |
| Image resolution for the Img2Img, Inpaint, ControlNet, or T2I adapter. | |
| image_mask (Any, optional): | |
| Path image mask for the Inpaint. | |
| strength (float, optional, defaults to 0.35): | |
| Strength parameter for the Inpaint and Img2Img. | |
| low_threshold (int, optional, defaults to 100): | |
| Low threshold parameter for ControlNet and T2I Adapter Canny. | |
| high_threshold (int, optional, defaults to 200): | |
| High threshold parameter for ControlNet and T2I Adapter Canny. | |
| value_threshold (float, optional, defaults to 0.1): | |
| Value threshold parameter for ControlNet MLSD. | |
| distance_threshold (float, optional, defaults to 0.1): | |
| Distance threshold parameter for ControlNet MLSD. | |
| controlnet_conditioning_scale (float, optional, defaults to 1.0): | |
| The outputs of the ControlNet are multiplied by `controlnet_conditioning_scale` before they are added | |
| to the residual in the original `unet`. Used in ControlNet and Inpaint | |
| control_guidance_start (float, optional, defaults to 0.0): | |
| The percentage of total steps at which the ControlNet starts applying. Used in ControlNet and Inpaint | |
| control_guidance_end (float, optional, defaults to 1.0): | |
| The percentage of total steps at which the ControlNet stops applying. Used in ControlNet and Inpaint | |
| t2i_adapter_preprocessor (bool, optional, defaults to True): | |
| Preprocessor for the image in sdxl_canny by default is True. | |
| t2i_adapter_conditioning_scale (float, optional, defaults to 1.0): | |
| The outputs of the adapter are multiplied by `t2i_adapter_conditioning_scale` before they are added to the | |
| residual in the original unet. | |
| t2i_adapter_conditioning_factor (float, optional, defaults to 1.0): | |
| The fraction of timesteps for which adapter should be applied. If `t2i_adapter_conditioning_factor` is | |
| `0.0`, adapter is not applied at all. If `t2i_adapter_conditioning_factor` is `1.0`, adapter is applied for | |
| all timesteps. If `t2i_adapter_conditioning_factor` is `0.5`, adapter is applied for half of the timesteps. | |
| loop_generation (int, optional, defaults to 1): | |
| The number of times the specified `num_images` will be generated. | |
| display_images (bool, optional, defaults to False): | |
| If you use a notebook, you will be able to display the images generated with this parameter. | |
| save_generated_images (bool, optional, defaults to True): | |
| By default, the generated images are saved in the current location within the 'images' folder. You can disable this with this parameter. | |
| image_storage_location (str , optional, defaults to "./images"): | |
| The directory where the generated images are saved. | |
| generator_in_cpu (bool, optional, defaults to False): | |
| The generator by default is specified on the GPU. To obtain more consistent results across various environments, | |
| it is preferable to use the generator on the CPU. | |
| leave_progress_bar (bool, optional, defaults to False): | |
| Leave the progress bar after generating the image. | |
| disable_progress_bar (bool, optional, defaults to False): | |
| Do not display the progress bar during image generation. | |
| hires_before_adetailer (bool, optional, defaults to False): | |
| Apply an upscale and high-resolution fix before adetailer. | |
| hires_after_adetailer (bool, optional, defaults to True): | |
| Apply an upscale and high-resolution fix after adetailer. | |
| retain_compel_previous_load (bool, optional, defaults to False): | |
| The previous compel remains preloaded in memory. | |
| retain_detailfix_model_previous_load (bool, optional, defaults to False): | |
| The previous adetailer model remains preloaded in memory. | |
| retain_hires_model_previous_load (bool, optional, defaults to False): | |
| The previous hires model remains preloaded in memory. | |
| image_previews (bool, optional, defaults to False): | |
| Displaying the image denoising process. | |
| xformers_memory_efficient_attention (bool, optional, defaults to False): | |
| Improves generation time, currently disabled. | |
| gui_active (bool, optional, defaults to False): | |
| utility when used with a GUI, it changes the behavior especially by displaying confirmation messages or options. | |
| Specific parameter usage details: | |
| Additional parameters that will be used in Inpaint: | |
| - image | |
| - image_mask | |
| - image_resolution | |
| - strength | |
| for SD 1.5: | |
| - controlnet_conditioning_scale | |
| - control_guidance_start | |
| - control_guidance_end | |
| Additional parameters that will be used in img2img: | |
| - image | |
| - image_resolution | |
| - strength | |
| Additional parameters that will be used in ControlNet for SD 1.5 depending on the task: | |
| - image | |
| - preprocessor_name | |
| - preprocess_resolution | |
| - image_resolution | |
| - controlnet_conditioning_scale | |
| - control_guidance_start | |
| - control_guidance_end | |
| for Canny: | |
| - low_threshold | |
| - high_threshold | |
| for MLSD: | |
| - value_threshold | |
| - distance_threshold | |
| Additional parameters that will be used in T2I adapter for SDXL depending on the task: | |
| - image | |
| - preprocess_resolution | |
| - image_resolution | |
| - t2i_adapter_preprocessor | |
| - t2i_adapter_conditioning_scale | |
| - t2i_adapter_conditioning_factor | |
| """ | |
| if self.task_name != "txt2img" and image == None: | |
| raise ValueError( | |
| "You need to specify the <image> for this task." | |
| ) | |
| if img_height % 8 != 0: | |
| img_height = img_height + (8 - img_height % 8) | |
| logger.warning(f"Height must be divisible by 8, changed to {str(img_height)}") | |
| if img_width % 8 != 0: | |
| img_width = img_width + (8 - img_width % 8) | |
| logger.warning(f"Width must be divisible by 8, changed to {str(img_width)}") | |
| if image_resolution % 8 != 0: | |
| image_resolution = image_resolution + (8 - image_resolution % 8) | |
| logger.warning(f"Image resolution must be divisible by 8, changed to {str(image_resolution)}") | |
| if control_guidance_start >= control_guidance_end: | |
| logger.error( | |
| "Control guidance start (ControlNet Start Threshold) cannot be larger or equal to control guidance end (ControlNet Stop Threshold). The default values 0.0 and 1.0 will be used." | |
| ) | |
| control_guidance_start, control_guidance_end = 0.0, 1.0 | |
| self.gui_active = gui_active | |
| self.image_previews = image_previews | |
| if self.pipe == None: | |
| self.load_pipe( | |
| self.base_model_id, | |
| task_name=self.task_name, | |
| vae_model=self.vae_model, | |
| reload=True, | |
| ) | |
| self.pipe.set_progress_bar_config(leave=leave_progress_bar) | |
| self.pipe.set_progress_bar_config(disable=disable_progress_bar) | |
| xformers_memory_efficient_attention=False # disabled | |
| if xformers_memory_efficient_attention and torch.cuda.is_available(): | |
| self.pipe.disable_xformers_memory_efficient_attention() | |
| self.pipe.to(self.device) | |
| # Load style prompt file | |
| if style_json_file != "" and style_json_file != self.style_json_file: | |
| self.load_style_file(style_json_file) | |
| # Set style | |
| if isinstance(style_prompt, str): | |
| style_prompt = [style_prompt] | |
| if style_prompt != [""]: | |
| prompt, negative_prompt = apply_style(style_prompt, prompt, negative_prompt, self.styles_data, self.STYLE_NAMES) | |
| # LoRA load | |
| if self.lora_memory == [ | |
| lora_A, | |
| lora_B, | |
| lora_C, | |
| lora_D, | |
| lora_E, | |
| ] and self.lora_scale_memory == [ | |
| lora_scale_A, | |
| lora_scale_B, | |
| lora_scale_C, | |
| lora_scale_D, | |
| lora_scale_E, | |
| ]: | |
| for single_lora in self.lora_memory: | |
| if single_lora != None: | |
| logger.info(f"LoRA in memory: {single_lora}") | |
| pass | |
| else: | |
| logger.debug("_un, re and load_ lora") | |
| self.pipe = self.process_lora( | |
| self.lora_memory[0], self.lora_scale_memory[0], unload=True | |
| ) | |
| self.pipe = self.process_lora( | |
| self.lora_memory[1], self.lora_scale_memory[1], unload=True | |
| ) | |
| self.pipe = self.process_lora( | |
| self.lora_memory[2], self.lora_scale_memory[2], unload=True | |
| ) | |
| self.pipe = self.process_lora( | |
| self.lora_memory[3], self.lora_scale_memory[3], unload=True | |
| ) | |
| self.pipe = self.process_lora( | |
| self.lora_memory[4], self.lora_scale_memory[4], unload=True | |
| ) | |
| self.pipe = self.process_lora(lora_A, lora_scale_A) | |
| self.pipe = self.process_lora(lora_B, lora_scale_B) | |
| self.pipe = self.process_lora(lora_C, lora_scale_C) | |
| self.pipe = self.process_lora(lora_D, lora_scale_D) | |
| self.pipe = self.process_lora(lora_E, lora_scale_E) | |
| self.lora_memory = [lora_A, lora_B, lora_C, lora_D, lora_E] | |
| self.lora_scale_memory = [ | |
| lora_scale_A, | |
| lora_scale_B, | |
| lora_scale_C, | |
| lora_scale_D, | |
| lora_scale_E, | |
| ] | |
| # LCM config | |
| if sampler == "LCM" and self.LCMconfig == None: | |
| if self.class_name == "StableDiffusionPipeline": | |
| adapter_id = "latent-consistency/lcm-lora-sdv1-5" | |
| elif self.class_name == "StableDiffusionXLPipeline": | |
| adapter_id = "latent-consistency/lcm-lora-sdxl" | |
| self.process_lora(adapter_id, 1.0) | |
| self.LCMconfig = adapter_id | |
| logger.info("LCM") | |
| elif sampler != "LCM" and self.LCMconfig != None: | |
| self.process_lora(self.LCMconfig, 1.0, unload=True) | |
| self.LCMconfig = None | |
| elif self.LCMconfig != None: | |
| logger.info("LCM") | |
| # FreeU | |
| if FreeU: | |
| logger.info("FreeU active") | |
| if self.class_name == "StableDiffusionPipeline": | |
| # sd | |
| self.pipe.enable_freeu(s1=0.9, s2=0.2, b1=1.2, b2=1.4) | |
| else: | |
| # sdxl | |
| self.pipe.enable_freeu(s1=0.6, s2=0.4, b1=1.1, b2=1.2) | |
| self.FreeU = True | |
| elif self.FreeU: | |
| self.pipe.disable_freeu() | |
| self.FreeU = False | |
| # Prompt Optimizations | |
| if hasattr(self, "compel") and not retain_compel_previous_load: | |
| del self.compel | |
| prompt_emb, negative_prompt_emb = self.create_prompt_embeds( | |
| prompt=prompt, | |
| negative_prompt=negative_prompt, | |
| textual_inversion=textual_inversion, | |
| clip_skip=clip_skip, | |
| syntax_weights=syntax_weights, | |
| ) | |
| if self.class_name != "StableDiffusionPipeline": | |
| # Additional prompt for SDXL | |
| conditioning, pooled = prompt_emb.clone(), negative_prompt_emb.clone() | |
| prompt_emb = negative_prompt_emb = None | |
| if torch.cuda.is_available() and xformers_memory_efficient_attention: | |
| if xformers_memory_efficient_attention: | |
| self.pipe.enable_xformers_memory_efficient_attention() | |
| else: | |
| self.pipe.disable_xformers_memory_efficient_attention() | |
| try: | |
| #self.pipe.scheduler = DPMSolverSinglestepScheduler() # fix default params by random scheduler, not recomn | |
| self.pipe.scheduler = self.get_scheduler(sampler) | |
| except Exception as e: | |
| logger.debug(f"{e}") | |
| logger.warning(f"Error in sampler, please try again") | |
| #self.pipe = None | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| return | |
| self.pipe.safety_checker = None | |
| # Get image Global | |
| if self.task_name != "txt2img": | |
| if isinstance(image, str): | |
| # If the input is a string (file path), open it as an image | |
| image_pil = Image.open(image) | |
| numpy_array = np.array(image_pil, dtype=np.uint8) | |
| elif isinstance(image, Image.Image): | |
| # If the input is already a PIL Image, convert it to a NumPy array | |
| numpy_array = np.array(image, dtype=np.uint8) | |
| elif isinstance(image, np.ndarray): | |
| # If the input is a NumPy array, np.uint8 | |
| numpy_array = image.astype(np.uint8) | |
| else: | |
| if gui_active: | |
| logger.info( | |
| "Not found image" | |
| ) | |
| return | |
| else: | |
| raise ValueError( | |
| "Unsupported image type or not control image found; Bug report to https://github.com/R3gm/stablepy or https://github.com/R3gm/SD_diffusers_interactive" | |
| ) | |
| # Extract the RGB channels | |
| try: | |
| array_rgb = numpy_array[:, :, :3] | |
| except: | |
| logger.error("Unsupported image type") | |
| raise ValueError( | |
| "Unsupported image type; Bug report to https://github.com/R3gm/stablepy or https://github.com/R3gm/SD_diffusers_interactive" | |
| ) # return | |
| # Get params preprocess Global SD 1.5 | |
| preprocess_params_config = {} | |
| if self.task_name not in ["txt2img", "inpaint", "img2img"]: | |
| preprocess_params_config["image"] = array_rgb | |
| preprocess_params_config["image_resolution"] = image_resolution | |
| if self.task_name != "ip2p": | |
| if self.task_name != "shuffle": | |
| preprocess_params_config[ | |
| "preprocess_resolution" | |
| ] = preprocess_resolution | |
| if self.task_name != "mlsd" and self.task_name != "canny": | |
| preprocess_params_config["preprocessor_name"] = preprocessor_name | |
| # RUN Preprocess SD 1.5 | |
| if self.task_name == "inpaint": | |
| # Get mask for Inpaint | |
| if gui_active or os.path.exists(str(image_mask)): | |
| # Read image mask from gui | |
| mask_control_img = Image.open(image_mask) | |
| numpy_array_mask = np.array(mask_control_img, dtype=np.uint8) | |
| array_rgb_mask = numpy_array_mask[:, :, :3] | |
| elif not gui_active: | |
| # Convert control image to draw | |
| import base64 | |
| import matplotlib.pyplot as plt | |
| name_without_extension = os.path.splitext(image.split("/")[-1])[0] | |
| image64 = base64.b64encode(open(image, "rb").read()) | |
| image64 = image64.decode("utf-8") | |
| img = np.array(plt.imread(f"{image}")[:, :, :3]) | |
| # Create mask interactive | |
| logger.info(f"Draw the mask on this canvas using the mouse. When you finish, press 'Finish' in the bottom side of the canvas.") | |
| draw( | |
| image64, | |
| filename=f"./{name_without_extension}_draw.png", | |
| w=img.shape[1], | |
| h=img.shape[0], | |
| line_width=0.04 * img.shape[1], | |
| ) | |
| # Create mask and save | |
| with_mask = np.array( | |
| plt.imread(f"./{name_without_extension}_draw.png")[:, :, :3] | |
| ) | |
| mask = ( | |
| (with_mask[:, :, 0] == 1) | |
| * (with_mask[:, :, 1] == 0) | |
| * (with_mask[:, :, 2] == 0) | |
| ) | |
| plt.imsave(f"./{name_without_extension}_mask.png", mask, cmap="gray") | |
| mask_control = f"./{name_without_extension}_mask.png" | |
| logger.info(f"Mask saved: {mask_control}") | |
| # Read image mask | |
| mask_control_img = Image.open(mask_control) | |
| numpy_array_mask = np.array(mask_control_img, dtype=np.uint8) | |
| array_rgb_mask = numpy_array_mask[:, :, :3] | |
| else: | |
| raise ValueError("No images found") | |
| init_image, control_mask, control_image = self.process_inpaint( | |
| image=array_rgb, | |
| image_resolution=image_resolution, | |
| preprocess_resolution=preprocess_resolution, # Not used | |
| image_mask=array_rgb_mask, | |
| ) | |
| elif self.task_name == "openpose": | |
| logger.info("Openpose") | |
| control_image = self.process_openpose(**preprocess_params_config) | |
| elif self.task_name == "canny": | |
| logger.info("Canny") | |
| control_image = self.process_canny( | |
| **preprocess_params_config, | |
| low_threshold=low_threshold, | |
| high_threshold=high_threshold, | |
| ) | |
| elif self.task_name == "mlsd": | |
| logger.info("MLSD") | |
| control_image = self.process_mlsd( | |
| **preprocess_params_config, | |
| value_threshold=value_threshold, | |
| distance_threshold=distance_threshold, | |
| ) | |
| elif self.task_name == "scribble": | |
| logger.info("Scribble") | |
| control_image = self.process_scribble(**preprocess_params_config) | |
| elif self.task_name == "softedge": | |
| logger.info("Softedge") | |
| control_image = self.process_softedge(**preprocess_params_config) | |
| elif self.task_name == "segmentation": | |
| logger.info("Segmentation") | |
| control_image = self.process_segmentation(**preprocess_params_config) | |
| elif self.task_name == "depth": | |
| logger.info("Depth") | |
| control_image = self.process_depth(**preprocess_params_config) | |
| elif self.task_name == "normalbae": | |
| logger.info("NormalBae") | |
| control_image = self.process_normal(**preprocess_params_config) | |
| elif self.task_name == "lineart": | |
| logger.info("Lineart") | |
| control_image = self.process_lineart(**preprocess_params_config) | |
| elif self.task_name == "shuffle": | |
| logger.info("Shuffle") | |
| control_image = self.process_shuffle(**preprocess_params_config) | |
| elif self.task_name == "ip2p": | |
| logger.info("Ip2p") | |
| control_image = self.process_ip2p(**preprocess_params_config) | |
| elif self.task_name == "img2img": | |
| preprocess_params_config["image"] = array_rgb | |
| preprocess_params_config["image_resolution"] = image_resolution | |
| init_image = self.process_img2img(**preprocess_params_config) | |
| # RUN Preprocess T2I for SDXL | |
| if self.class_name == "StableDiffusionXLPipeline": | |
| # Get params preprocess XL | |
| preprocess_params_config_xl = {} | |
| if self.task_name not in ["txt2img", "inpaint", "img2img"]: | |
| preprocess_params_config_xl["image"] = array_rgb | |
| preprocess_params_config_xl["preprocess_resolution"] = preprocess_resolution | |
| preprocess_params_config_xl["image_resolution"] = image_resolution | |
| # preprocess_params_config_xl["additional_prompt"] = additional_prompt # "" | |
| if self.task_name == "sdxl_canny": # preprocessor true default | |
| logger.info("SDXL Canny: Preprocessor active by default") | |
| control_image = self.process_canny( | |
| **preprocess_params_config_xl, | |
| low_threshold=low_threshold, | |
| high_threshold=high_threshold, | |
| ) | |
| elif self.task_name == "sdxl_openpose": | |
| logger.info("SDXL Openpose") | |
| control_image = self.process_openpose( | |
| preprocessor_name = "Openpose" if t2i_adapter_preprocessor else "None", | |
| **preprocess_params_config_xl, | |
| ) | |
| elif self.task_name == "sdxl_sketch": | |
| logger.info("SDXL Scribble") | |
| control_image = self.process_scribble( | |
| preprocessor_name = "PidiNet" if t2i_adapter_preprocessor else "None", | |
| **preprocess_params_config_xl, | |
| ) | |
| elif self.task_name == "sdxl_depth-midas": | |
| logger.info("SDXL Depth") | |
| control_image = self.process_depth( | |
| preprocessor_name = "Midas" if t2i_adapter_preprocessor else "None", | |
| **preprocess_params_config_xl, | |
| ) | |
| elif self.task_name == "sdxl_lineart": | |
| logger.info("SDXL Lineart") | |
| control_image = self.process_lineart( | |
| preprocessor_name = "Lineart" if t2i_adapter_preprocessor else "None", | |
| **preprocess_params_config_xl, | |
| ) | |
| # Get general params for TASK | |
| if self.class_name == "StableDiffusionPipeline": | |
| # Base params pipe sd | |
| pipe_params_config = { | |
| "prompt": None, # prompt, | |
| "negative_prompt": None, # negative_prompt, | |
| "prompt_embeds": prompt_emb, | |
| "negative_prompt_embeds": negative_prompt_emb, | |
| "num_images": num_images, | |
| "num_steps": num_steps, | |
| "guidance_scale": guidance_scale, | |
| "clip_skip": None, # clip_skip, because we use clip skip of compel | |
| } | |
| else: | |
| # Base params pipe sdxl | |
| pipe_params_config = { | |
| "prompt" : None, | |
| "negative_prompt" : None, | |
| "num_inference_steps" : num_steps, | |
| "guidance_scale" : guidance_scale, | |
| "clip_skip" : None, | |
| "num_images_per_prompt" : num_images, | |
| } | |
| # New params | |
| if self.class_name == "StableDiffusionXLPipeline": | |
| # pipe sdxl | |
| if self.task_name == "txt2img": | |
| pipe_params_config["height"] = img_height | |
| pipe_params_config["width"] = img_width | |
| elif self.task_name == "inpaint": | |
| pipe_params_config["strength"] = strength | |
| pipe_params_config["image"] = init_image | |
| pipe_params_config["mask_image"] = control_mask | |
| logger.info(f"Image resolution: {str(init_image.size)}") | |
| elif self.task_name not in ["txt2img", "inpaint", "img2img"]: | |
| pipe_params_config["image"] = control_image | |
| pipe_params_config["adapter_conditioning_scale"] = t2i_adapter_conditioning_scale | |
| pipe_params_config["adapter_conditioning_factor"] = t2i_adapter_conditioning_factor | |
| logger.info(f"Image resolution: {str(control_image.size)}") | |
| elif self.task_name == "img2img": | |
| pipe_params_config["strength"] = strength | |
| pipe_params_config["image"] = init_image | |
| logger.info(f"Image resolution: {str(init_image.size)}") | |
| elif self.task_name == "txt2img": | |
| pipe_params_config["height"] = img_height | |
| pipe_params_config["width"] = img_width | |
| elif self.task_name == "inpaint": | |
| pipe_params_config["strength"] = strength | |
| pipe_params_config["init_image"] = init_image | |
| pipe_params_config["control_mask"] = control_mask | |
| pipe_params_config["control_image"] = control_image | |
| pipe_params_config[ | |
| "controlnet_conditioning_scale" | |
| ] = controlnet_conditioning_scale | |
| pipe_params_config["control_guidance_start"] = control_guidance_start | |
| pipe_params_config["control_guidance_end"] = control_guidance_end | |
| logger.info(f"Image resolution: {str(init_image.size)}") | |
| elif self.task_name not in ["txt2img", "inpaint", "img2img"]: | |
| pipe_params_config["control_image"] = control_image | |
| pipe_params_config[ | |
| "controlnet_conditioning_scale" | |
| ] = controlnet_conditioning_scale | |
| pipe_params_config["control_guidance_start"] = control_guidance_start | |
| pipe_params_config["control_guidance_end"] = control_guidance_end | |
| logger.info(f"Image resolution: {str(control_image.size)}") | |
| elif self.task_name == "img2img": | |
| pipe_params_config["strength"] = strength | |
| pipe_params_config["init_image"] = init_image | |
| logger.info(f"Image resolution: {str(init_image.size)}") | |
| # detailfix params and pipe global | |
| if adetailer_A or adetailer_B: | |
| # global params detailfix | |
| default_params_detailfix = { | |
| "face_detector_ad" : True, | |
| "person_detector_ad" : True, | |
| "hand_detector_ad" : False, | |
| "prompt": "", | |
| "negative_prompt" : "", | |
| "strength" : 0.35, | |
| "mask_dilation" : 4, | |
| "mask_blur" : 4, | |
| "mask_padding" : 32, | |
| #"sampler" : "Use same sampler", | |
| #"inpaint_only" : True, | |
| } | |
| # Pipe detailfix_pipe | |
| if not hasattr(self, "detailfix_pipe") or not retain_detailfix_model_previous_load: | |
| if adetailer_A_params.get("inpaint_only", False) == True or adetailer_B_params.get("inpaint_only", False) == True: | |
| detailfix_pipe = custom_task_model_loader( | |
| pipe=self.pipe, | |
| model_category="detailfix", | |
| task_name=self.task_name, | |
| torch_dtype=self.type_model_precision | |
| ) | |
| else: | |
| detailfix_pipe = custom_task_model_loader( | |
| pipe=self.pipe, | |
| model_category="detailfix_img2img", | |
| task_name=self.task_name, | |
| torch_dtype=self.type_model_precision | |
| ) | |
| if hasattr(self, "detailfix_pipe"): | |
| del self.detailfix_pipe | |
| if retain_detailfix_model_previous_load: | |
| if hasattr(self, "detailfix_pipe"): | |
| detailfix_pipe = self.detailfix_pipe | |
| else: | |
| self.detailfix_pipe = detailfix_pipe | |
| adetailer_A_params.pop("inpaint_only", None) | |
| adetailer_B_params.pop("inpaint_only", None) | |
| # Define base scheduler detailfix | |
| detailfix_pipe.default_scheduler = copy.deepcopy(self.default_scheduler) | |
| if adetailer_A_params.get("sampler", "Use same sampler") != "Use same sampler": | |
| logger.debug("detailfix_pipe will use the sampler from adetailer_A") | |
| detailfix_pipe.scheduler = self.get_scheduler(adetailer_A_params["sampler"]) | |
| adetailer_A_params.pop("sampler", None) | |
| if adetailer_B_params.get("sampler", "Use same sampler") != "Use same sampler": | |
| logger.debug("detailfix_pipe will use the sampler from adetailer_B") | |
| detailfix_pipe.scheduler = self.get_scheduler(adetailer_A_params["sampler"]) | |
| adetailer_B_params.pop("sampler", None) | |
| detailfix_pipe.set_progress_bar_config(leave=leave_progress_bar) | |
| detailfix_pipe.set_progress_bar_config(disable=disable_progress_bar) | |
| detailfix_pipe.to(self.device) | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| if adetailer_A: | |
| for key_param, default_value in default_params_detailfix.items(): | |
| if key_param not in adetailer_A_params: | |
| adetailer_A_params[key_param] = default_value | |
| elif type(default_value) != type(adetailer_A_params[key_param]): | |
| logger.warning(f"DetailFix A: Error type param, set default {str(key_param)}") | |
| adetailer_A_params[key_param] = default_value | |
| detailfix_params_A = { | |
| "prompt": adetailer_A_params["prompt"], | |
| "negative_prompt" : adetailer_A_params["negative_prompt"], | |
| "strength" : adetailer_A_params["strength"], | |
| "num_inference_steps" : num_steps, | |
| "guidance_scale" : guidance_scale, | |
| } | |
| # clear params yolo | |
| adetailer_A_params.pop('strength', None) | |
| adetailer_A_params.pop('prompt', None) | |
| adetailer_A_params.pop('negative_prompt', None) | |
| # Verify prompt detailfix_params_A and get valid | |
| prompt_empty_detailfix_A, negative_prompt_empty_detailfix_A, prompt_df_A, negative_prompt_df_A = process_prompts_valid( | |
| detailfix_params_A["prompt"], detailfix_params_A["negative_prompt"], prompt, negative_prompt | |
| ) | |
| # Params detailfix | |
| if self.class_name == "StableDiffusionPipeline": | |
| # SD detailfix | |
| # detailfix_params_A["controlnet_conditioning_scale"] = controlnet_conditioning_scale | |
| # detailfix_params_A["control_guidance_start"] = control_guidance_start | |
| # detailfix_params_A["control_guidance_end"] = control_guidance_end | |
| if prompt_empty_detailfix_A and negative_prompt_empty_detailfix_A: | |
| detailfix_params_A["prompt_embeds"] = prompt_emb | |
| detailfix_params_A["negative_prompt_embeds"] = negative_prompt_emb | |
| else: | |
| prompt_emb_ad, negative_prompt_emb_ad = self.create_prompt_embeds( | |
| prompt=prompt_df_A, | |
| negative_prompt=negative_prompt_df_A, | |
| textual_inversion=textual_inversion, | |
| clip_skip=clip_skip, | |
| syntax_weights=syntax_weights, | |
| ) | |
| detailfix_params_A["prompt_embeds"] = prompt_emb_ad | |
| detailfix_params_A["negative_prompt_embeds"] = negative_prompt_emb_ad | |
| detailfix_params_A["prompt"] = None | |
| detailfix_params_A["negative_prompt"] = None | |
| else: | |
| # SDXL detailfix | |
| if prompt_empty_detailfix_A and negative_prompt_empty_detailfix_A: | |
| conditioning_detailfix_A, pooled_detailfix_A = conditioning, pooled | |
| else: | |
| conditioning_detailfix_A, pooled_detailfix_A = self.create_prompt_embeds( | |
| prompt=prompt_df_A, | |
| negative_prompt=negative_prompt_df_A, | |
| textual_inversion=textual_inversion, | |
| clip_skip=clip_skip, | |
| syntax_weights=syntax_weights, | |
| ) | |
| detailfix_params_A.pop('prompt', None) | |
| detailfix_params_A.pop('negative_prompt', None) | |
| detailfix_params_A["prompt_embeds"] = conditioning_detailfix_A[0:1] | |
| detailfix_params_A["pooled_prompt_embeds"] = pooled_detailfix_A[0:1] | |
| detailfix_params_A["negative_prompt_embeds"] = conditioning_detailfix_A[1:2] | |
| detailfix_params_A["negative_pooled_prompt_embeds"] = pooled_detailfix_A[1:2] | |
| logger.debug(f"detailfix A prompt empty {prompt_empty_detailfix_A, negative_prompt_empty_detailfix_A}") | |
| if not prompt_empty_detailfix_A or not negative_prompt_empty_detailfix_A: | |
| logger.debug(f"Prompts detailfix A {prompt_df_A, negative_prompt_df_A}") | |
| logger.debug(f"Pipe params detailfix A \n{detailfix_params_A}") | |
| logger.debug(f"Params detailfix A \n{adetailer_A_params}") | |
| if adetailer_B: | |
| for key_param, default_value in default_params_detailfix.items(): | |
| if key_param not in adetailer_B_params: | |
| adetailer_B_params[key_param] = default_value | |
| elif type(default_value) != type(adetailer_B_params[key_param]): | |
| logger.warning(f"DetailfFix B: Error type param, set default {str(key_param)}") | |
| adetailer_B_params[key_param] = default_value | |
| detailfix_params_B = { | |
| "prompt": adetailer_B_params["prompt"], | |
| "negative_prompt" : adetailer_B_params["negative_prompt"], | |
| "strength" : adetailer_B_params["strength"], | |
| "num_inference_steps" : num_steps, | |
| "guidance_scale" : guidance_scale, | |
| } | |
| # clear params yolo | |
| adetailer_B_params.pop('strength', None) | |
| adetailer_B_params.pop('prompt', None) | |
| adetailer_B_params.pop('negative_prompt', None) | |
| # Verify prompt detailfix_params_B and get valid | |
| prompt_empty_detailfix_B, negative_prompt_empty_detailfix_B, prompt_df_B, negative_prompt_df_B = process_prompts_valid( | |
| detailfix_params_B["prompt"], detailfix_params_B["negative_prompt"], prompt, negative_prompt | |
| ) | |
| # Params detailfix | |
| if self.class_name == "StableDiffusionPipeline": | |
| # SD detailfix | |
| # detailfix_params_B["controlnet_conditioning_scale"] = controlnet_conditioning_scale | |
| # detailfix_params_B["control_guidance_start"] = control_guidance_start | |
| # detailfix_params_B["control_guidance_end"] = control_guidance_end | |
| if prompt_empty_detailfix_B and negative_prompt_empty_detailfix_B: | |
| detailfix_params_B["prompt_embeds"] = prompt_emb | |
| detailfix_params_B["negative_prompt_embeds"] = negative_prompt_emb | |
| else: | |
| prompt_emb_ad_b, negative_prompt_emb_ad_b = self.create_prompt_embeds( | |
| prompt=prompt_df_B, | |
| negative_prompt=negative_prompt_df_B, | |
| textual_inversion=textual_inversion, | |
| clip_skip=clip_skip, | |
| syntax_weights=syntax_weights, | |
| ) | |
| detailfix_params_B["prompt_embeds"] = prompt_emb_ad_b | |
| detailfix_params_B["negative_prompt_embeds"] = negative_prompt_emb_ad_b | |
| detailfix_params_B["prompt"] = None | |
| detailfix_params_B["negative_prompt"] = None | |
| else: | |
| # SDXL detailfix | |
| if prompt_empty_detailfix_B and negative_prompt_empty_detailfix_B: | |
| conditioning_detailfix_B, pooled_detailfix_B = conditioning, pooled | |
| else: | |
| conditioning_detailfix_B, pooled_detailfix_B = self.create_prompt_embeds( | |
| prompt=prompt_df_B, | |
| negative_prompt=negative_prompt_df_B, | |
| textual_inversion=textual_inversion, | |
| clip_skip=clip_skip, | |
| syntax_weights=syntax_weights, | |
| ) | |
| detailfix_params_B.pop('prompt', None) | |
| detailfix_params_B.pop('negative_prompt', None) | |
| detailfix_params_B["prompt_embeds"] = conditioning_detailfix_B[0:1] | |
| detailfix_params_B["pooled_prompt_embeds"] = pooled_detailfix_B[0:1] | |
| detailfix_params_B["negative_prompt_embeds"] = conditioning_detailfix_B[1:2] | |
| detailfix_params_B["negative_pooled_prompt_embeds"] = pooled_detailfix_B[1:2] | |
| logger.debug(f"detailfix B prompt empty {prompt_empty_detailfix_B, negative_prompt_empty_detailfix_B}") | |
| if not prompt_empty_detailfix_B or not negative_prompt_empty_detailfix_B: | |
| logger.debug(f"Prompts detailfix B {prompt_df_B, negative_prompt_df_B}") | |
| logger.debug(f"Pipe params detailfix B \n{detailfix_params_B}") | |
| logger.debug(f"Params detailfix B \n{adetailer_B_params}") | |
| if hires_steps > 1 and upscaler_model_path != None: | |
| # Hires params BASE | |
| hires_params_config = { | |
| "prompt" : None, | |
| "negative_prompt" : None, | |
| "num_inference_steps" : hires_steps, | |
| "guidance_scale" : guidance_scale, | |
| "clip_skip" : None, | |
| "strength" : hires_denoising_strength, | |
| } | |
| if self.class_name == "StableDiffusionPipeline": | |
| hires_params_config["eta"] = 1.0 | |
| # Verify prompt hires and get valid | |
| hires_prompt_empty, hires_negative_prompt_empty, prompt_hires_valid, negative_prompt_hires_valid = process_prompts_valid( | |
| hires_prompt, hires_negative_prompt, prompt, negative_prompt | |
| ) | |
| # Hires embed params | |
| if self.class_name == "StableDiffusionPipeline": | |
| if hires_prompt_empty and hires_negative_prompt_empty: | |
| hires_params_config["prompt_embeds"] = prompt_emb | |
| hires_params_config["negative_prompt_embeds"] = negative_prompt_emb | |
| else: | |
| prompt_emb_hires, negative_prompt_emb_hires = self.create_prompt_embeds( | |
| prompt=prompt_hires_valid, | |
| negative_prompt=negative_prompt_hires_valid, | |
| textual_inversion=textual_inversion, | |
| clip_skip=clip_skip, | |
| syntax_weights=syntax_weights, | |
| ) | |
| hires_params_config["prompt_embeds"] = prompt_emb_hires | |
| hires_params_config["negative_prompt_embeds"] = negative_prompt_emb_hires | |
| else: | |
| if hires_prompt_empty and hires_negative_prompt_empty: | |
| hires_conditioning, hires_pooled = conditioning, pooled | |
| else: | |
| hires_conditioning, hires_pooled = self.create_prompt_embeds( | |
| prompt=prompt_hires_valid, | |
| negative_prompt=negative_prompt_hires_valid, | |
| textual_inversion=textual_inversion, | |
| clip_skip=clip_skip, | |
| syntax_weights=syntax_weights, | |
| ) | |
| hires_params_config.pop('prompt', None) | |
| hires_params_config.pop('negative_prompt', None) | |
| hires_params_config["prompt_embeds"] = hires_conditioning[0:1] | |
| hires_params_config["pooled_prompt_embeds"] = hires_pooled[0:1] | |
| hires_params_config["negative_prompt_embeds"] = hires_conditioning[1:2] | |
| hires_params_config["negative_pooled_prompt_embeds"] = hires_pooled[1:2] | |
| # Hires pipe | |
| if not hasattr(self, "hires_pipe") or not retain_hires_model_previous_load: | |
| hires_pipe = custom_task_model_loader( | |
| pipe=self.pipe, | |
| model_category="hires", | |
| task_name=self.task_name, | |
| torch_dtype=self.type_model_precision | |
| ) | |
| if hasattr(self, "hires_pipe"): | |
| del self.hires_pipe | |
| if retain_hires_model_previous_load: | |
| if hasattr(self, "hires_pipe"): | |
| hires_pipe = self.hires_pipe | |
| else: | |
| self.hires_pipe = hires_pipe | |
| # Hires scheduler | |
| if hires_sampler != "Use same sampler": | |
| logger.debug("New hires sampler") | |
| hires_pipe.scheduler = self.get_scheduler(hires_sampler) | |
| hires_pipe.set_progress_bar_config(leave=leave_progress_bar) | |
| hires_pipe.set_progress_bar_config(disable=disable_progress_bar) | |
| hires_pipe.to(self.device) | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| else: | |
| hires_params_config = {} | |
| hires_pipe = None | |
| # Debug info | |
| try: | |
| logger.debug(f"INFO PIPE: {self.pipe.__class__.__name__}") | |
| logger.debug(f"text_encoder_type: {self.pipe.text_encoder.dtype}") | |
| logger.debug(f"unet_type: {self.pipe.unet.dtype}") | |
| logger.debug(f"vae_type: {self.pipe.vae.dtype}") | |
| logger.debug(f"pipe_type: {self.pipe.dtype}") | |
| logger.debug(f"scheduler_main_pipe: {self.pipe.scheduler}") | |
| if adetailer_A or adetailer_B: | |
| logger.debug(f"scheduler_detailfix: {detailfix_pipe.scheduler}") | |
| if hires_steps > 1 and upscaler_model_path != None: | |
| logger.debug(f"scheduler_hires: {hires_pipe.scheduler}") | |
| except Exception as e: | |
| logger.debug(f"{str(e)}") | |
| # === RUN PIPE === # | |
| for i in range(loop_generation): | |
| # number seed | |
| if seed == -1: | |
| seeds = [random.randint(0, 2147483647) for _ in range(num_images)] | |
| else: | |
| if num_images == 1: | |
| seeds = [seed] | |
| else: | |
| seeds = [seed] + [random.randint(0, 2147483647) for _ in range(num_images-1)] | |
| # generators | |
| generators = [] # List to store all the generators | |
| for calculate_seed in seeds: | |
| if generator_in_cpu or self.device.type == "cpu": | |
| generator = torch.Generator().manual_seed(calculate_seed) | |
| else: | |
| try: | |
| generator = torch.Generator("cuda").manual_seed(calculate_seed) | |
| except: | |
| logger.warning("Generator in CPU") | |
| generator = torch.Generator().manual_seed(calculate_seed) | |
| generators.append(generator) | |
| # fix img2img bug need concat tensor prompts with generator same number (only in batch inference) | |
| pipe_params_config["generator"] = generators if self.task_name != "img2img" else generators[0] # no list | |
| seeds = seeds if self.task_name != "img2img" else [seeds[0]] * num_images | |
| try: | |
| if self.class_name == "StableDiffusionXLPipeline": | |
| # sdxl pipe | |
| images = self.pipe( | |
| prompt_embeds=conditioning[0:1], | |
| pooled_prompt_embeds=pooled[0:1], | |
| negative_prompt_embeds=conditioning[1:2], | |
| negative_pooled_prompt_embeds=pooled[1:2], | |
| #generator=pipe_params_config["generator"], | |
| **pipe_params_config, | |
| ).images | |
| if self.task_name not in ["txt2img", "inpaint", "img2img"]: | |
| images = [control_image] + images | |
| elif self.task_name == "txt2img": | |
| images = self.run_pipe_SD(**pipe_params_config) | |
| elif self.task_name == "inpaint": | |
| images = self.run_pipe_inpaint(**pipe_params_config) | |
| elif self.task_name not in ["txt2img", "inpaint", "img2img"]: | |
| results = self.run_pipe( | |
| **pipe_params_config | |
| ) ## pipe ControlNet add condition_weights | |
| images = [control_image] + results | |
| del results | |
| elif self.task_name == "img2img": | |
| images = self.run_pipe_img2img(**pipe_params_config) | |
| except Exception as e: | |
| e = str(e) | |
| if "Tensor with 2 elements cannot be converted to Scalar" in e: | |
| logger.debug(e) | |
| logger.error("Error in sampler; trying with DDIM sampler") | |
| self.pipe.scheduler = self.default_scheduler | |
| self.pipe.scheduler = DDIMScheduler.from_config(self.pipe.scheduler.config) | |
| if self.class_name == "StableDiffusionXLPipeline": | |
| # sdxl pipe | |
| images = self.pipe( | |
| prompt_embeds=conditioning[0:1], | |
| pooled_prompt_embeds=pooled[0:1], | |
| negative_prompt_embeds=conditioning[1:2], | |
| negative_pooled_prompt_embeds=pooled[1:2], | |
| #generator=pipe_params_config["generator"], | |
| **pipe_params_config, | |
| ).images | |
| if self.task_name not in ["txt2img", "inpaint", "img2img"]: | |
| images = [control_image] + images | |
| elif self.task_name == "txt2img": | |
| images = self.run_pipe_SD(**pipe_params_config) | |
| elif self.task_name == "inpaint": | |
| images = self.run_pipe_inpaint(**pipe_params_config) | |
| elif self.task_name not in ["txt2img", "inpaint", "img2img"]: | |
| results = self.run_pipe( | |
| **pipe_params_config | |
| ) ## pipe ControlNet add condition_weights | |
| images = [control_image] + results | |
| del results | |
| elif self.task_name == "img2img": | |
| images = self.run_pipe_img2img(**pipe_params_config) | |
| elif "The size of tensor a (0) must match the size of tensor b (3) at non-singleton" in e: | |
| raise ValueError(f"steps / strength too low for the model to produce a satisfactory response") | |
| else: | |
| raise ValueError(e) | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| if hires_before_adetailer and upscaler_model_path != None: | |
| logger.debug(f"Hires before; same seed for each image (no batch)") | |
| images = process_images_high_resolution( | |
| images, | |
| upscaler_model_path, | |
| upscaler_increases_size, | |
| esrgan_tile, esrgan_tile_overlap, | |
| hires_steps, hires_params_config, | |
| self.task_name, | |
| generators[0], #pipe_params_config["generator"][0], # no generator | |
| hires_pipe, | |
| ) | |
| # Adetailer stuff | |
| if adetailer_A or adetailer_B: | |
| # image_pil_list = [] | |
| # for img_single in images: | |
| # image_ad = img_single.convert("RGB") | |
| # image_pil_list.append(image_ad) | |
| if self.task_name not in ["txt2img", "inpaint", "img2img"]: | |
| images = images[1:] | |
| if adetailer_A: | |
| images = ad_model_process( | |
| pipe_params_df=detailfix_params_A, | |
| detailfix_pipe=detailfix_pipe, | |
| image_list_task=images, | |
| **adetailer_A_params, | |
| ) | |
| if adetailer_B: | |
| images = ad_model_process( | |
| pipe_params_df=detailfix_params_B, | |
| detailfix_pipe=detailfix_pipe, | |
| image_list_task=images, | |
| **adetailer_B_params, | |
| ) | |
| if self.task_name not in ["txt2img", "inpaint", "img2img"]: | |
| images = [control_image] + images | |
| # del detailfix_pipe | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| if hires_after_adetailer and upscaler_model_path != None: | |
| logger.debug(f"Hires after; same seed for each image (no batch)") | |
| images = process_images_high_resolution( | |
| images, | |
| upscaler_model_path, | |
| upscaler_increases_size, | |
| esrgan_tile, esrgan_tile_overlap, | |
| hires_steps, hires_params_config, | |
| self.task_name, | |
| generators[0], #pipe_params_config["generator"][0], # no generator | |
| hires_pipe, | |
| ) | |
| logger.info(f"Seeds: {seeds}") | |
| # Show images if loop | |
| if display_images: | |
| mediapy.show_images(images) | |
| # logger.info(image_list) | |
| # del images | |
| if loop_generation > 1: | |
| time.sleep(0.5) | |
| # List images and save | |
| image_list = [] | |
| metadata = [ | |
| prompt, | |
| negative_prompt, | |
| self.base_model_id, | |
| self.vae_model, | |
| num_steps, | |
| guidance_scale, | |
| sampler, | |
| 0000000000, #calculate_seed, | |
| img_width, | |
| img_height, | |
| clip_skip, | |
| ] | |
| valid_seeds = [0] + seeds if self.task_name not in ["txt2img", "inpaint", "img2img"] else seeds | |
| for image_, seed_ in zip(images, valid_seeds): | |
| image_path = "not saved in storage" | |
| if save_generated_images: | |
| metadata[7] = seed_ | |
| image_path = save_pil_image_with_metadata(image_, image_storage_location, metadata) | |
| image_list.append(image_path) | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| if image_list[0] != "not saved in storage": | |
| logger.info(image_list) | |
| if hasattr(self, "compel") and not retain_compel_previous_load: | |
| del self.compel | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| return images, image_list | |