|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | import copy | 
					
						
						|  | import gc | 
					
						
						|  | import unittest | 
					
						
						|  |  | 
					
						
						|  | import numpy as np | 
					
						
						|  | import torch | 
					
						
						|  | from transformers import CLIPTextConfig, CLIPTextModel, CLIPTextModelWithProjection, CLIPTokenizer | 
					
						
						|  |  | 
					
						
						|  | from diffusers import ( | 
					
						
						|  | AutoencoderKL, | 
					
						
						|  | ControlNetModel, | 
					
						
						|  | EulerDiscreteScheduler, | 
					
						
						|  | HeunDiscreteScheduler, | 
					
						
						|  | LCMScheduler, | 
					
						
						|  | StableDiffusionXLControlNetPipeline, | 
					
						
						|  | StableDiffusionXLImg2ImgPipeline, | 
					
						
						|  | UNet2DConditionModel, | 
					
						
						|  | ) | 
					
						
						|  | from diffusers.models.unets.unet_2d_blocks import UNetMidBlock2D | 
					
						
						|  | from diffusers.pipelines.controlnet.pipeline_controlnet import MultiControlNetModel | 
					
						
						|  | from diffusers.utils.import_utils import is_xformers_available | 
					
						
						|  | from diffusers.utils.testing_utils import ( | 
					
						
						|  | enable_full_determinism, | 
					
						
						|  | load_image, | 
					
						
						|  | require_torch_gpu, | 
					
						
						|  | slow, | 
					
						
						|  | torch_device, | 
					
						
						|  | ) | 
					
						
						|  | from diffusers.utils.torch_utils import randn_tensor | 
					
						
						|  |  | 
					
						
						|  | from ..pipeline_params import ( | 
					
						
						|  | IMAGE_TO_IMAGE_IMAGE_PARAMS, | 
					
						
						|  | TEXT_TO_IMAGE_BATCH_PARAMS, | 
					
						
						|  | TEXT_TO_IMAGE_IMAGE_PARAMS, | 
					
						
						|  | TEXT_TO_IMAGE_PARAMS, | 
					
						
						|  | ) | 
					
						
						|  | from ..test_pipelines_common import ( | 
					
						
						|  | IPAdapterTesterMixin, | 
					
						
						|  | PipelineKarrasSchedulerTesterMixin, | 
					
						
						|  | PipelineLatentTesterMixin, | 
					
						
						|  | PipelineTesterMixin, | 
					
						
						|  | SDXLOptionalComponentsTesterMixin, | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | enable_full_determinism() | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | class StableDiffusionXLControlNetPipelineFastTests( | 
					
						
						|  | IPAdapterTesterMixin, | 
					
						
						|  | PipelineLatentTesterMixin, | 
					
						
						|  | PipelineKarrasSchedulerTesterMixin, | 
					
						
						|  | PipelineTesterMixin, | 
					
						
						|  | SDXLOptionalComponentsTesterMixin, | 
					
						
						|  | unittest.TestCase, | 
					
						
						|  | ): | 
					
						
						|  | pipeline_class = StableDiffusionXLControlNetPipeline | 
					
						
						|  | params = TEXT_TO_IMAGE_PARAMS | 
					
						
						|  | batch_params = TEXT_TO_IMAGE_BATCH_PARAMS | 
					
						
						|  | image_params = IMAGE_TO_IMAGE_IMAGE_PARAMS | 
					
						
						|  | image_latents_params = TEXT_TO_IMAGE_IMAGE_PARAMS | 
					
						
						|  |  | 
					
						
						|  | def get_dummy_components(self, time_cond_proj_dim=None): | 
					
						
						|  | torch.manual_seed(0) | 
					
						
						|  | unet = UNet2DConditionModel( | 
					
						
						|  | block_out_channels=(32, 64), | 
					
						
						|  | layers_per_block=2, | 
					
						
						|  | sample_size=32, | 
					
						
						|  | in_channels=4, | 
					
						
						|  | out_channels=4, | 
					
						
						|  | down_block_types=("DownBlock2D", "CrossAttnDownBlock2D"), | 
					
						
						|  | up_block_types=("CrossAttnUpBlock2D", "UpBlock2D"), | 
					
						
						|  |  | 
					
						
						|  | attention_head_dim=(2, 4), | 
					
						
						|  | use_linear_projection=True, | 
					
						
						|  | addition_embed_type="text_time", | 
					
						
						|  | addition_time_embed_dim=8, | 
					
						
						|  | transformer_layers_per_block=(1, 2), | 
					
						
						|  | projection_class_embeddings_input_dim=80, | 
					
						
						|  | cross_attention_dim=64, | 
					
						
						|  | time_cond_proj_dim=time_cond_proj_dim, | 
					
						
						|  | ) | 
					
						
						|  | torch.manual_seed(0) | 
					
						
						|  | controlnet = ControlNetModel( | 
					
						
						|  | block_out_channels=(32, 64), | 
					
						
						|  | layers_per_block=2, | 
					
						
						|  | in_channels=4, | 
					
						
						|  | down_block_types=("DownBlock2D", "CrossAttnDownBlock2D"), | 
					
						
						|  | conditioning_embedding_out_channels=(16, 32), | 
					
						
						|  |  | 
					
						
						|  | attention_head_dim=(2, 4), | 
					
						
						|  | use_linear_projection=True, | 
					
						
						|  | addition_embed_type="text_time", | 
					
						
						|  | addition_time_embed_dim=8, | 
					
						
						|  | transformer_layers_per_block=(1, 2), | 
					
						
						|  | projection_class_embeddings_input_dim=80, | 
					
						
						|  | cross_attention_dim=64, | 
					
						
						|  | ) | 
					
						
						|  | torch.manual_seed(0) | 
					
						
						|  | scheduler = EulerDiscreteScheduler( | 
					
						
						|  | beta_start=0.00085, | 
					
						
						|  | beta_end=0.012, | 
					
						
						|  | steps_offset=1, | 
					
						
						|  | beta_schedule="scaled_linear", | 
					
						
						|  | timestep_spacing="leading", | 
					
						
						|  | ) | 
					
						
						|  | torch.manual_seed(0) | 
					
						
						|  | vae = AutoencoderKL( | 
					
						
						|  | block_out_channels=[32, 64], | 
					
						
						|  | in_channels=3, | 
					
						
						|  | out_channels=3, | 
					
						
						|  | down_block_types=["DownEncoderBlock2D", "DownEncoderBlock2D"], | 
					
						
						|  | up_block_types=["UpDecoderBlock2D", "UpDecoderBlock2D"], | 
					
						
						|  | latent_channels=4, | 
					
						
						|  | ) | 
					
						
						|  | torch.manual_seed(0) | 
					
						
						|  | text_encoder_config = CLIPTextConfig( | 
					
						
						|  | bos_token_id=0, | 
					
						
						|  | eos_token_id=2, | 
					
						
						|  | hidden_size=32, | 
					
						
						|  | intermediate_size=37, | 
					
						
						|  | layer_norm_eps=1e-05, | 
					
						
						|  | num_attention_heads=4, | 
					
						
						|  | num_hidden_layers=5, | 
					
						
						|  | pad_token_id=1, | 
					
						
						|  | vocab_size=1000, | 
					
						
						|  |  | 
					
						
						|  | hidden_act="gelu", | 
					
						
						|  | projection_dim=32, | 
					
						
						|  | ) | 
					
						
						|  | text_encoder = CLIPTextModel(text_encoder_config) | 
					
						
						|  | tokenizer = CLIPTokenizer.from_pretrained("hf-internal-testing/tiny-random-clip") | 
					
						
						|  |  | 
					
						
						|  | text_encoder_2 = CLIPTextModelWithProjection(text_encoder_config) | 
					
						
						|  | tokenizer_2 = CLIPTokenizer.from_pretrained("hf-internal-testing/tiny-random-clip") | 
					
						
						|  |  | 
					
						
						|  | components = { | 
					
						
						|  | "unet": unet, | 
					
						
						|  | "controlnet": controlnet, | 
					
						
						|  | "scheduler": scheduler, | 
					
						
						|  | "vae": vae, | 
					
						
						|  | "text_encoder": text_encoder, | 
					
						
						|  | "tokenizer": tokenizer, | 
					
						
						|  | "text_encoder_2": text_encoder_2, | 
					
						
						|  | "tokenizer_2": tokenizer_2, | 
					
						
						|  | "feature_extractor": None, | 
					
						
						|  | "image_encoder": None, | 
					
						
						|  | } | 
					
						
						|  | return components | 
					
						
						|  |  | 
					
						
						|  | def get_dummy_inputs(self, device, seed=0): | 
					
						
						|  | if str(device).startswith("mps"): | 
					
						
						|  | generator = torch.manual_seed(seed) | 
					
						
						|  | else: | 
					
						
						|  | generator = torch.Generator(device=device).manual_seed(seed) | 
					
						
						|  |  | 
					
						
						|  | controlnet_embedder_scale_factor = 2 | 
					
						
						|  | image = randn_tensor( | 
					
						
						|  | (1, 3, 32 * controlnet_embedder_scale_factor, 32 * controlnet_embedder_scale_factor), | 
					
						
						|  | generator=generator, | 
					
						
						|  | device=torch.device(device), | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | inputs = { | 
					
						
						|  | "prompt": "A painting of a squirrel eating a burger", | 
					
						
						|  | "generator": generator, | 
					
						
						|  | "num_inference_steps": 2, | 
					
						
						|  | "guidance_scale": 6.0, | 
					
						
						|  | "output_type": "np", | 
					
						
						|  | "image": image, | 
					
						
						|  | } | 
					
						
						|  |  | 
					
						
						|  | return inputs | 
					
						
						|  |  | 
					
						
						|  | def test_attention_slicing_forward_pass(self): | 
					
						
						|  | return self._test_attention_slicing_forward_pass(expected_max_diff=2e-3) | 
					
						
						|  |  | 
					
						
						|  | def test_ip_adapter_single(self, from_ssd1b=False, expected_pipe_slice=None): | 
					
						
						|  | if not from_ssd1b: | 
					
						
						|  | expected_pipe_slice = None | 
					
						
						|  | if torch_device == "cpu": | 
					
						
						|  | expected_pipe_slice = np.array( | 
					
						
						|  | [0.7331, 0.5907, 0.5667, 0.6029, 0.5679, 0.5968, 0.4033, 0.4761, 0.5090] | 
					
						
						|  | ) | 
					
						
						|  | return super().test_ip_adapter_single(expected_pipe_slice=expected_pipe_slice) | 
					
						
						|  |  | 
					
						
						|  | @unittest.skipIf( | 
					
						
						|  | torch_device != "cuda" or not is_xformers_available(), | 
					
						
						|  | reason="XFormers attention is only available with CUDA and `xformers` installed", | 
					
						
						|  | ) | 
					
						
						|  | def test_xformers_attention_forwardGenerator_pass(self): | 
					
						
						|  | self._test_xformers_attention_forwardGenerator_pass(expected_max_diff=2e-3) | 
					
						
						|  |  | 
					
						
						|  | def test_inference_batch_single_identical(self): | 
					
						
						|  | self._test_inference_batch_single_identical(expected_max_diff=2e-3) | 
					
						
						|  |  | 
					
						
						|  | def test_save_load_optional_components(self): | 
					
						
						|  | self._test_save_load_optional_components() | 
					
						
						|  |  | 
					
						
						|  | @require_torch_gpu | 
					
						
						|  | def test_stable_diffusion_xl_offloads(self): | 
					
						
						|  | pipes = [] | 
					
						
						|  | components = self.get_dummy_components() | 
					
						
						|  | sd_pipe = self.pipeline_class(**components).to(torch_device) | 
					
						
						|  | pipes.append(sd_pipe) | 
					
						
						|  |  | 
					
						
						|  | components = self.get_dummy_components() | 
					
						
						|  | sd_pipe = self.pipeline_class(**components) | 
					
						
						|  | sd_pipe.enable_model_cpu_offload() | 
					
						
						|  | pipes.append(sd_pipe) | 
					
						
						|  |  | 
					
						
						|  | components = self.get_dummy_components() | 
					
						
						|  | sd_pipe = self.pipeline_class(**components) | 
					
						
						|  | sd_pipe.enable_sequential_cpu_offload() | 
					
						
						|  | pipes.append(sd_pipe) | 
					
						
						|  |  | 
					
						
						|  | image_slices = [] | 
					
						
						|  | for pipe in pipes: | 
					
						
						|  | pipe.unet.set_default_attn_processor() | 
					
						
						|  |  | 
					
						
						|  | inputs = self.get_dummy_inputs(torch_device) | 
					
						
						|  | image = pipe(**inputs).images | 
					
						
						|  |  | 
					
						
						|  | image_slices.append(image[0, -3:, -3:, -1].flatten()) | 
					
						
						|  |  | 
					
						
						|  | assert np.abs(image_slices[0] - image_slices[1]).max() < 1e-3 | 
					
						
						|  | assert np.abs(image_slices[0] - image_slices[2]).max() < 1e-3 | 
					
						
						|  |  | 
					
						
						|  | def test_stable_diffusion_xl_multi_prompts(self): | 
					
						
						|  | components = self.get_dummy_components() | 
					
						
						|  | sd_pipe = self.pipeline_class(**components).to(torch_device) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | inputs = self.get_dummy_inputs(torch_device) | 
					
						
						|  | output = sd_pipe(**inputs) | 
					
						
						|  | image_slice_1 = output.images[0, -3:, -3:, -1] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | inputs = self.get_dummy_inputs(torch_device) | 
					
						
						|  | inputs["prompt_2"] = inputs["prompt"] | 
					
						
						|  | output = sd_pipe(**inputs) | 
					
						
						|  | image_slice_2 = output.images[0, -3:, -3:, -1] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | assert np.abs(image_slice_1.flatten() - image_slice_2.flatten()).max() < 1e-4 | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | inputs = self.get_dummy_inputs(torch_device) | 
					
						
						|  | inputs["prompt_2"] = "different prompt" | 
					
						
						|  | output = sd_pipe(**inputs) | 
					
						
						|  | image_slice_3 = output.images[0, -3:, -3:, -1] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | assert np.abs(image_slice_1.flatten() - image_slice_3.flatten()).max() > 1e-4 | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | inputs = self.get_dummy_inputs(torch_device) | 
					
						
						|  | inputs["negative_prompt"] = "negative prompt" | 
					
						
						|  | output = sd_pipe(**inputs) | 
					
						
						|  | image_slice_1 = output.images[0, -3:, -3:, -1] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | inputs = self.get_dummy_inputs(torch_device) | 
					
						
						|  | inputs["negative_prompt"] = "negative prompt" | 
					
						
						|  | inputs["negative_prompt_2"] = inputs["negative_prompt"] | 
					
						
						|  | output = sd_pipe(**inputs) | 
					
						
						|  | image_slice_2 = output.images[0, -3:, -3:, -1] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | assert np.abs(image_slice_1.flatten() - image_slice_2.flatten()).max() < 1e-4 | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | inputs = self.get_dummy_inputs(torch_device) | 
					
						
						|  | inputs["negative_prompt"] = "negative prompt" | 
					
						
						|  | inputs["negative_prompt_2"] = "different negative prompt" | 
					
						
						|  | output = sd_pipe(**inputs) | 
					
						
						|  | image_slice_3 = output.images[0, -3:, -3:, -1] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | assert np.abs(image_slice_1.flatten() - image_slice_3.flatten()).max() > 1e-4 | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def test_stable_diffusion_xl_prompt_embeds(self): | 
					
						
						|  | components = self.get_dummy_components() | 
					
						
						|  | sd_pipe = self.pipeline_class(**components) | 
					
						
						|  | sd_pipe = sd_pipe.to(torch_device) | 
					
						
						|  | sd_pipe = sd_pipe.to(torch_device) | 
					
						
						|  | sd_pipe.set_progress_bar_config(disable=None) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | inputs = self.get_dummy_inputs(torch_device) | 
					
						
						|  | inputs["prompt"] = 2 * [inputs["prompt"]] | 
					
						
						|  | inputs["num_images_per_prompt"] = 2 | 
					
						
						|  |  | 
					
						
						|  | output = sd_pipe(**inputs) | 
					
						
						|  | image_slice_1 = output.images[0, -3:, -3:, -1] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | inputs = self.get_dummy_inputs(torch_device) | 
					
						
						|  | prompt = 2 * [inputs.pop("prompt")] | 
					
						
						|  |  | 
					
						
						|  | ( | 
					
						
						|  | prompt_embeds, | 
					
						
						|  | negative_prompt_embeds, | 
					
						
						|  | pooled_prompt_embeds, | 
					
						
						|  | negative_pooled_prompt_embeds, | 
					
						
						|  | ) = sd_pipe.encode_prompt(prompt) | 
					
						
						|  |  | 
					
						
						|  | output = sd_pipe( | 
					
						
						|  | **inputs, | 
					
						
						|  | prompt_embeds=prompt_embeds, | 
					
						
						|  | negative_prompt_embeds=negative_prompt_embeds, | 
					
						
						|  | pooled_prompt_embeds=pooled_prompt_embeds, | 
					
						
						|  | negative_pooled_prompt_embeds=negative_pooled_prompt_embeds, | 
					
						
						|  | ) | 
					
						
						|  | image_slice_2 = output.images[0, -3:, -3:, -1] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | assert np.abs(image_slice_1.flatten() - image_slice_2.flatten()).max() < 1e-4 | 
					
						
						|  |  | 
					
						
						|  | def test_controlnet_sdxl_guess(self): | 
					
						
						|  | device = "cpu" | 
					
						
						|  |  | 
					
						
						|  | components = self.get_dummy_components() | 
					
						
						|  |  | 
					
						
						|  | sd_pipe = self.pipeline_class(**components) | 
					
						
						|  | sd_pipe = sd_pipe.to(device) | 
					
						
						|  |  | 
					
						
						|  | sd_pipe.set_progress_bar_config(disable=None) | 
					
						
						|  |  | 
					
						
						|  | inputs = self.get_dummy_inputs(device) | 
					
						
						|  | inputs["guess_mode"] = True | 
					
						
						|  |  | 
					
						
						|  | output = sd_pipe(**inputs) | 
					
						
						|  | image_slice = output.images[0, -3:, -3:, -1] | 
					
						
						|  | expected_slice = np.array( | 
					
						
						|  | [0.7330834, 0.590667, 0.5667336, 0.6029023, 0.5679491, 0.5968194, 0.4032986, 0.47612396, 0.5089609] | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | assert np.abs(image_slice.flatten() - expected_slice).max() < 1e-4 | 
					
						
						|  |  | 
					
						
						|  | def test_controlnet_sdxl_lcm(self): | 
					
						
						|  | device = "cpu" | 
					
						
						|  |  | 
					
						
						|  | components = self.get_dummy_components(time_cond_proj_dim=256) | 
					
						
						|  | sd_pipe = StableDiffusionXLControlNetPipeline(**components) | 
					
						
						|  | sd_pipe.scheduler = LCMScheduler.from_config(sd_pipe.scheduler.config) | 
					
						
						|  | sd_pipe = sd_pipe.to(torch_device) | 
					
						
						|  | sd_pipe.set_progress_bar_config(disable=None) | 
					
						
						|  |  | 
					
						
						|  | inputs = self.get_dummy_inputs(device) | 
					
						
						|  | output = sd_pipe(**inputs) | 
					
						
						|  | image = output.images | 
					
						
						|  |  | 
					
						
						|  | image_slice = image[0, -3:, -3:, -1] | 
					
						
						|  |  | 
					
						
						|  | assert image.shape == (1, 64, 64, 3) | 
					
						
						|  | expected_slice = np.array([0.7799, 0.614, 0.6162, 0.7082, 0.6662, 0.5833, 0.4148, 0.5182, 0.4866]) | 
					
						
						|  |  | 
					
						
						|  | assert np.abs(image_slice.flatten() - expected_slice).max() < 1e-2 | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def test_controlnet_sdxl_two_mixture_of_denoiser_fast(self): | 
					
						
						|  | components = self.get_dummy_components() | 
					
						
						|  | pipe_1 = StableDiffusionXLControlNetPipeline(**components).to(torch_device) | 
					
						
						|  | pipe_1.unet.set_default_attn_processor() | 
					
						
						|  |  | 
					
						
						|  | components_without_controlnet = {k: v for k, v in components.items() if k != "controlnet"} | 
					
						
						|  | pipe_2 = StableDiffusionXLImg2ImgPipeline(**components_without_controlnet).to(torch_device) | 
					
						
						|  | pipe_2.unet.set_default_attn_processor() | 
					
						
						|  |  | 
					
						
						|  | def assert_run_mixture( | 
					
						
						|  | num_steps, | 
					
						
						|  | split, | 
					
						
						|  | scheduler_cls_orig, | 
					
						
						|  | expected_tss, | 
					
						
						|  | num_train_timesteps=pipe_1.scheduler.config.num_train_timesteps, | 
					
						
						|  | ): | 
					
						
						|  | inputs = self.get_dummy_inputs(torch_device) | 
					
						
						|  | inputs["num_inference_steps"] = num_steps | 
					
						
						|  |  | 
					
						
						|  | class scheduler_cls(scheduler_cls_orig): | 
					
						
						|  | pass | 
					
						
						|  |  | 
					
						
						|  | pipe_1.scheduler = scheduler_cls.from_config(pipe_1.scheduler.config) | 
					
						
						|  | pipe_2.scheduler = scheduler_cls.from_config(pipe_2.scheduler.config) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | pipe_1.scheduler.set_timesteps(num_steps) | 
					
						
						|  | expected_steps = pipe_1.scheduler.timesteps.tolist() | 
					
						
						|  |  | 
					
						
						|  | if pipe_1.scheduler.order == 2: | 
					
						
						|  | expected_steps_1 = list(filter(lambda ts: ts >= split, expected_tss)) | 
					
						
						|  | expected_steps_2 = expected_steps_1[-1:] + list(filter(lambda ts: ts < split, expected_tss)) | 
					
						
						|  | expected_steps = expected_steps_1 + expected_steps_2 | 
					
						
						|  | else: | 
					
						
						|  | expected_steps_1 = list(filter(lambda ts: ts >= split, expected_tss)) | 
					
						
						|  | expected_steps_2 = list(filter(lambda ts: ts < split, expected_tss)) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | done_steps = [] | 
					
						
						|  | old_step = copy.copy(scheduler_cls.step) | 
					
						
						|  |  | 
					
						
						|  | def new_step(self, *args, **kwargs): | 
					
						
						|  | done_steps.append(args[1].cpu().item()) | 
					
						
						|  | return old_step(self, *args, **kwargs) | 
					
						
						|  |  | 
					
						
						|  | scheduler_cls.step = new_step | 
					
						
						|  |  | 
					
						
						|  | inputs_1 = { | 
					
						
						|  | **inputs, | 
					
						
						|  | **{ | 
					
						
						|  | "denoising_end": 1.0 - (split / num_train_timesteps), | 
					
						
						|  | "output_type": "latent", | 
					
						
						|  | }, | 
					
						
						|  | } | 
					
						
						|  | latents = pipe_1(**inputs_1).images[0] | 
					
						
						|  |  | 
					
						
						|  | assert expected_steps_1 == done_steps, f"Failure with {scheduler_cls.__name__} and {num_steps} and {split}" | 
					
						
						|  |  | 
					
						
						|  | inputs_2 = { | 
					
						
						|  | **inputs, | 
					
						
						|  | **{ | 
					
						
						|  | "denoising_start": 1.0 - (split / num_train_timesteps), | 
					
						
						|  | "image": latents, | 
					
						
						|  | }, | 
					
						
						|  | } | 
					
						
						|  | pipe_2(**inputs_2).images[0] | 
					
						
						|  |  | 
					
						
						|  | assert expected_steps_2 == done_steps[len(expected_steps_1) :] | 
					
						
						|  | assert expected_steps == done_steps, f"Failure with {scheduler_cls.__name__} and {num_steps} and {split}" | 
					
						
						|  |  | 
					
						
						|  | steps = 10 | 
					
						
						|  | for split in [300, 700]: | 
					
						
						|  | for scheduler_cls_timesteps in [ | 
					
						
						|  | (EulerDiscreteScheduler, [901, 801, 701, 601, 501, 401, 301, 201, 101, 1]), | 
					
						
						|  | ( | 
					
						
						|  | HeunDiscreteScheduler, | 
					
						
						|  | [ | 
					
						
						|  | 901.0, | 
					
						
						|  | 801.0, | 
					
						
						|  | 801.0, | 
					
						
						|  | 701.0, | 
					
						
						|  | 701.0, | 
					
						
						|  | 601.0, | 
					
						
						|  | 601.0, | 
					
						
						|  | 501.0, | 
					
						
						|  | 501.0, | 
					
						
						|  | 401.0, | 
					
						
						|  | 401.0, | 
					
						
						|  | 301.0, | 
					
						
						|  | 301.0, | 
					
						
						|  | 201.0, | 
					
						
						|  | 201.0, | 
					
						
						|  | 101.0, | 
					
						
						|  | 101.0, | 
					
						
						|  | 1.0, | 
					
						
						|  | 1.0, | 
					
						
						|  | ], | 
					
						
						|  | ), | 
					
						
						|  | ]: | 
					
						
						|  | assert_run_mixture(steps, split, scheduler_cls_timesteps[0], scheduler_cls_timesteps[1]) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | class StableDiffusionXLMultiControlNetPipelineFastTests( | 
					
						
						|  | PipelineTesterMixin, PipelineKarrasSchedulerTesterMixin, SDXLOptionalComponentsTesterMixin, unittest.TestCase | 
					
						
						|  | ): | 
					
						
						|  | pipeline_class = StableDiffusionXLControlNetPipeline | 
					
						
						|  | params = TEXT_TO_IMAGE_PARAMS | 
					
						
						|  | batch_params = TEXT_TO_IMAGE_BATCH_PARAMS | 
					
						
						|  | image_params = frozenset([]) | 
					
						
						|  |  | 
					
						
						|  | def get_dummy_components(self): | 
					
						
						|  | torch.manual_seed(0) | 
					
						
						|  | unet = UNet2DConditionModel( | 
					
						
						|  | block_out_channels=(32, 64), | 
					
						
						|  | layers_per_block=2, | 
					
						
						|  | sample_size=32, | 
					
						
						|  | in_channels=4, | 
					
						
						|  | out_channels=4, | 
					
						
						|  | down_block_types=("DownBlock2D", "CrossAttnDownBlock2D"), | 
					
						
						|  | up_block_types=("CrossAttnUpBlock2D", "UpBlock2D"), | 
					
						
						|  |  | 
					
						
						|  | attention_head_dim=(2, 4), | 
					
						
						|  | use_linear_projection=True, | 
					
						
						|  | addition_embed_type="text_time", | 
					
						
						|  | addition_time_embed_dim=8, | 
					
						
						|  | transformer_layers_per_block=(1, 2), | 
					
						
						|  | projection_class_embeddings_input_dim=80, | 
					
						
						|  | cross_attention_dim=64, | 
					
						
						|  | ) | 
					
						
						|  | torch.manual_seed(0) | 
					
						
						|  |  | 
					
						
						|  | def init_weights(m): | 
					
						
						|  | if isinstance(m, torch.nn.Conv2d): | 
					
						
						|  | torch.nn.init.normal_(m.weight) | 
					
						
						|  | m.bias.data.fill_(1.0) | 
					
						
						|  |  | 
					
						
						|  | controlnet1 = ControlNetModel( | 
					
						
						|  | block_out_channels=(32, 64), | 
					
						
						|  | layers_per_block=2, | 
					
						
						|  | in_channels=4, | 
					
						
						|  | down_block_types=("DownBlock2D", "CrossAttnDownBlock2D"), | 
					
						
						|  | conditioning_embedding_out_channels=(16, 32), | 
					
						
						|  |  | 
					
						
						|  | attention_head_dim=(2, 4), | 
					
						
						|  | use_linear_projection=True, | 
					
						
						|  | addition_embed_type="text_time", | 
					
						
						|  | addition_time_embed_dim=8, | 
					
						
						|  | transformer_layers_per_block=(1, 2), | 
					
						
						|  | projection_class_embeddings_input_dim=80, | 
					
						
						|  | cross_attention_dim=64, | 
					
						
						|  | ) | 
					
						
						|  | controlnet1.controlnet_down_blocks.apply(init_weights) | 
					
						
						|  |  | 
					
						
						|  | torch.manual_seed(0) | 
					
						
						|  | controlnet2 = ControlNetModel( | 
					
						
						|  | block_out_channels=(32, 64), | 
					
						
						|  | layers_per_block=2, | 
					
						
						|  | in_channels=4, | 
					
						
						|  | down_block_types=("DownBlock2D", "CrossAttnDownBlock2D"), | 
					
						
						|  | conditioning_embedding_out_channels=(16, 32), | 
					
						
						|  |  | 
					
						
						|  | attention_head_dim=(2, 4), | 
					
						
						|  | use_linear_projection=True, | 
					
						
						|  | addition_embed_type="text_time", | 
					
						
						|  | addition_time_embed_dim=8, | 
					
						
						|  | transformer_layers_per_block=(1, 2), | 
					
						
						|  | projection_class_embeddings_input_dim=80, | 
					
						
						|  | cross_attention_dim=64, | 
					
						
						|  | ) | 
					
						
						|  | controlnet2.controlnet_down_blocks.apply(init_weights) | 
					
						
						|  |  | 
					
						
						|  | torch.manual_seed(0) | 
					
						
						|  | scheduler = EulerDiscreteScheduler( | 
					
						
						|  | beta_start=0.00085, | 
					
						
						|  | beta_end=0.012, | 
					
						
						|  | steps_offset=1, | 
					
						
						|  | beta_schedule="scaled_linear", | 
					
						
						|  | timestep_spacing="leading", | 
					
						
						|  | ) | 
					
						
						|  | torch.manual_seed(0) | 
					
						
						|  | vae = AutoencoderKL( | 
					
						
						|  | block_out_channels=[32, 64], | 
					
						
						|  | in_channels=3, | 
					
						
						|  | out_channels=3, | 
					
						
						|  | down_block_types=["DownEncoderBlock2D", "DownEncoderBlock2D"], | 
					
						
						|  | up_block_types=["UpDecoderBlock2D", "UpDecoderBlock2D"], | 
					
						
						|  | latent_channels=4, | 
					
						
						|  | ) | 
					
						
						|  | torch.manual_seed(0) | 
					
						
						|  | text_encoder_config = CLIPTextConfig( | 
					
						
						|  | bos_token_id=0, | 
					
						
						|  | eos_token_id=2, | 
					
						
						|  | hidden_size=32, | 
					
						
						|  | intermediate_size=37, | 
					
						
						|  | layer_norm_eps=1e-05, | 
					
						
						|  | num_attention_heads=4, | 
					
						
						|  | num_hidden_layers=5, | 
					
						
						|  | pad_token_id=1, | 
					
						
						|  | vocab_size=1000, | 
					
						
						|  |  | 
					
						
						|  | hidden_act="gelu", | 
					
						
						|  | projection_dim=32, | 
					
						
						|  | ) | 
					
						
						|  | text_encoder = CLIPTextModel(text_encoder_config) | 
					
						
						|  | tokenizer = CLIPTokenizer.from_pretrained("hf-internal-testing/tiny-random-clip") | 
					
						
						|  |  | 
					
						
						|  | text_encoder_2 = CLIPTextModelWithProjection(text_encoder_config) | 
					
						
						|  | tokenizer_2 = CLIPTokenizer.from_pretrained("hf-internal-testing/tiny-random-clip") | 
					
						
						|  |  | 
					
						
						|  | controlnet = MultiControlNetModel([controlnet1, controlnet2]) | 
					
						
						|  |  | 
					
						
						|  | components = { | 
					
						
						|  | "unet": unet, | 
					
						
						|  | "controlnet": controlnet, | 
					
						
						|  | "scheduler": scheduler, | 
					
						
						|  | "vae": vae, | 
					
						
						|  | "text_encoder": text_encoder, | 
					
						
						|  | "tokenizer": tokenizer, | 
					
						
						|  | "text_encoder_2": text_encoder_2, | 
					
						
						|  | "tokenizer_2": tokenizer_2, | 
					
						
						|  | "feature_extractor": None, | 
					
						
						|  | "image_encoder": None, | 
					
						
						|  | } | 
					
						
						|  | return components | 
					
						
						|  |  | 
					
						
						|  | def get_dummy_inputs(self, device, seed=0): | 
					
						
						|  | if str(device).startswith("mps"): | 
					
						
						|  | generator = torch.manual_seed(seed) | 
					
						
						|  | else: | 
					
						
						|  | generator = torch.Generator(device=device).manual_seed(seed) | 
					
						
						|  |  | 
					
						
						|  | controlnet_embedder_scale_factor = 2 | 
					
						
						|  |  | 
					
						
						|  | images = [ | 
					
						
						|  | randn_tensor( | 
					
						
						|  | (1, 3, 32 * controlnet_embedder_scale_factor, 32 * controlnet_embedder_scale_factor), | 
					
						
						|  | generator=generator, | 
					
						
						|  | device=torch.device(device), | 
					
						
						|  | ), | 
					
						
						|  | randn_tensor( | 
					
						
						|  | (1, 3, 32 * controlnet_embedder_scale_factor, 32 * controlnet_embedder_scale_factor), | 
					
						
						|  | generator=generator, | 
					
						
						|  | device=torch.device(device), | 
					
						
						|  | ), | 
					
						
						|  | ] | 
					
						
						|  |  | 
					
						
						|  | inputs = { | 
					
						
						|  | "prompt": "A painting of a squirrel eating a burger", | 
					
						
						|  | "generator": generator, | 
					
						
						|  | "num_inference_steps": 2, | 
					
						
						|  | "guidance_scale": 6.0, | 
					
						
						|  | "output_type": "np", | 
					
						
						|  | "image": images, | 
					
						
						|  | } | 
					
						
						|  |  | 
					
						
						|  | return inputs | 
					
						
						|  |  | 
					
						
						|  | def test_control_guidance_switch(self): | 
					
						
						|  | components = self.get_dummy_components() | 
					
						
						|  | pipe = self.pipeline_class(**components) | 
					
						
						|  | pipe.to(torch_device) | 
					
						
						|  |  | 
					
						
						|  | scale = 10.0 | 
					
						
						|  | steps = 4 | 
					
						
						|  |  | 
					
						
						|  | inputs = self.get_dummy_inputs(torch_device) | 
					
						
						|  | inputs["num_inference_steps"] = steps | 
					
						
						|  | inputs["controlnet_conditioning_scale"] = scale | 
					
						
						|  | output_1 = pipe(**inputs)[0] | 
					
						
						|  |  | 
					
						
						|  | inputs = self.get_dummy_inputs(torch_device) | 
					
						
						|  | inputs["num_inference_steps"] = steps | 
					
						
						|  | inputs["controlnet_conditioning_scale"] = scale | 
					
						
						|  | output_2 = pipe(**inputs, control_guidance_start=0.1, control_guidance_end=0.2)[0] | 
					
						
						|  |  | 
					
						
						|  | inputs = self.get_dummy_inputs(torch_device) | 
					
						
						|  | inputs["num_inference_steps"] = steps | 
					
						
						|  | inputs["controlnet_conditioning_scale"] = scale | 
					
						
						|  | output_3 = pipe(**inputs, control_guidance_start=[0.1, 0.3], control_guidance_end=[0.2, 0.7])[0] | 
					
						
						|  |  | 
					
						
						|  | inputs = self.get_dummy_inputs(torch_device) | 
					
						
						|  | inputs["num_inference_steps"] = steps | 
					
						
						|  | inputs["controlnet_conditioning_scale"] = scale | 
					
						
						|  | output_4 = pipe(**inputs, control_guidance_start=0.4, control_guidance_end=[0.5, 0.8])[0] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | assert np.sum(np.abs(output_1 - output_2)) > 1e-3 | 
					
						
						|  | assert np.sum(np.abs(output_1 - output_3)) > 1e-3 | 
					
						
						|  | assert np.sum(np.abs(output_1 - output_4)) > 1e-3 | 
					
						
						|  |  | 
					
						
						|  | def test_attention_slicing_forward_pass(self): | 
					
						
						|  | return self._test_attention_slicing_forward_pass(expected_max_diff=2e-3) | 
					
						
						|  |  | 
					
						
						|  | @unittest.skipIf( | 
					
						
						|  | torch_device != "cuda" or not is_xformers_available(), | 
					
						
						|  | reason="XFormers attention is only available with CUDA and `xformers` installed", | 
					
						
						|  | ) | 
					
						
						|  | def test_xformers_attention_forwardGenerator_pass(self): | 
					
						
						|  | self._test_xformers_attention_forwardGenerator_pass(expected_max_diff=2e-3) | 
					
						
						|  |  | 
					
						
						|  | def test_inference_batch_single_identical(self): | 
					
						
						|  | self._test_inference_batch_single_identical(expected_max_diff=2e-3) | 
					
						
						|  |  | 
					
						
						|  | def test_save_load_optional_components(self): | 
					
						
						|  | return self._test_save_load_optional_components() | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | class StableDiffusionXLMultiControlNetOneModelPipelineFastTests( | 
					
						
						|  | PipelineKarrasSchedulerTesterMixin, PipelineTesterMixin, SDXLOptionalComponentsTesterMixin, unittest.TestCase | 
					
						
						|  | ): | 
					
						
						|  | pipeline_class = StableDiffusionXLControlNetPipeline | 
					
						
						|  | params = TEXT_TO_IMAGE_PARAMS | 
					
						
						|  | batch_params = TEXT_TO_IMAGE_BATCH_PARAMS | 
					
						
						|  | image_params = frozenset([]) | 
					
						
						|  |  | 
					
						
						|  | def get_dummy_components(self): | 
					
						
						|  | torch.manual_seed(0) | 
					
						
						|  | unet = UNet2DConditionModel( | 
					
						
						|  | block_out_channels=(32, 64), | 
					
						
						|  | layers_per_block=2, | 
					
						
						|  | sample_size=32, | 
					
						
						|  | in_channels=4, | 
					
						
						|  | out_channels=4, | 
					
						
						|  | down_block_types=("DownBlock2D", "CrossAttnDownBlock2D"), | 
					
						
						|  | up_block_types=("CrossAttnUpBlock2D", "UpBlock2D"), | 
					
						
						|  |  | 
					
						
						|  | attention_head_dim=(2, 4), | 
					
						
						|  | use_linear_projection=True, | 
					
						
						|  | addition_embed_type="text_time", | 
					
						
						|  | addition_time_embed_dim=8, | 
					
						
						|  | transformer_layers_per_block=(1, 2), | 
					
						
						|  | projection_class_embeddings_input_dim=80, | 
					
						
						|  | cross_attention_dim=64, | 
					
						
						|  | ) | 
					
						
						|  | torch.manual_seed(0) | 
					
						
						|  |  | 
					
						
						|  | def init_weights(m): | 
					
						
						|  | if isinstance(m, torch.nn.Conv2d): | 
					
						
						|  | torch.nn.init.normal_(m.weight) | 
					
						
						|  | m.bias.data.fill_(1.0) | 
					
						
						|  |  | 
					
						
						|  | controlnet = ControlNetModel( | 
					
						
						|  | block_out_channels=(32, 64), | 
					
						
						|  | layers_per_block=2, | 
					
						
						|  | in_channels=4, | 
					
						
						|  | down_block_types=("DownBlock2D", "CrossAttnDownBlock2D"), | 
					
						
						|  | conditioning_embedding_out_channels=(16, 32), | 
					
						
						|  |  | 
					
						
						|  | attention_head_dim=(2, 4), | 
					
						
						|  | use_linear_projection=True, | 
					
						
						|  | addition_embed_type="text_time", | 
					
						
						|  | addition_time_embed_dim=8, | 
					
						
						|  | transformer_layers_per_block=(1, 2), | 
					
						
						|  | projection_class_embeddings_input_dim=80, | 
					
						
						|  | cross_attention_dim=64, | 
					
						
						|  | ) | 
					
						
						|  | controlnet.controlnet_down_blocks.apply(init_weights) | 
					
						
						|  |  | 
					
						
						|  | torch.manual_seed(0) | 
					
						
						|  | scheduler = EulerDiscreteScheduler( | 
					
						
						|  | beta_start=0.00085, | 
					
						
						|  | beta_end=0.012, | 
					
						
						|  | steps_offset=1, | 
					
						
						|  | beta_schedule="scaled_linear", | 
					
						
						|  | timestep_spacing="leading", | 
					
						
						|  | ) | 
					
						
						|  | torch.manual_seed(0) | 
					
						
						|  | vae = AutoencoderKL( | 
					
						
						|  | block_out_channels=[32, 64], | 
					
						
						|  | in_channels=3, | 
					
						
						|  | out_channels=3, | 
					
						
						|  | down_block_types=["DownEncoderBlock2D", "DownEncoderBlock2D"], | 
					
						
						|  | up_block_types=["UpDecoderBlock2D", "UpDecoderBlock2D"], | 
					
						
						|  | latent_channels=4, | 
					
						
						|  | ) | 
					
						
						|  | torch.manual_seed(0) | 
					
						
						|  | text_encoder_config = CLIPTextConfig( | 
					
						
						|  | bos_token_id=0, | 
					
						
						|  | eos_token_id=2, | 
					
						
						|  | hidden_size=32, | 
					
						
						|  | intermediate_size=37, | 
					
						
						|  | layer_norm_eps=1e-05, | 
					
						
						|  | num_attention_heads=4, | 
					
						
						|  | num_hidden_layers=5, | 
					
						
						|  | pad_token_id=1, | 
					
						
						|  | vocab_size=1000, | 
					
						
						|  |  | 
					
						
						|  | hidden_act="gelu", | 
					
						
						|  | projection_dim=32, | 
					
						
						|  | ) | 
					
						
						|  | text_encoder = CLIPTextModel(text_encoder_config) | 
					
						
						|  | tokenizer = CLIPTokenizer.from_pretrained("hf-internal-testing/tiny-random-clip") | 
					
						
						|  |  | 
					
						
						|  | text_encoder_2 = CLIPTextModelWithProjection(text_encoder_config) | 
					
						
						|  | tokenizer_2 = CLIPTokenizer.from_pretrained("hf-internal-testing/tiny-random-clip") | 
					
						
						|  |  | 
					
						
						|  | controlnet = MultiControlNetModel([controlnet]) | 
					
						
						|  |  | 
					
						
						|  | components = { | 
					
						
						|  | "unet": unet, | 
					
						
						|  | "controlnet": controlnet, | 
					
						
						|  | "scheduler": scheduler, | 
					
						
						|  | "vae": vae, | 
					
						
						|  | "text_encoder": text_encoder, | 
					
						
						|  | "tokenizer": tokenizer, | 
					
						
						|  | "text_encoder_2": text_encoder_2, | 
					
						
						|  | "tokenizer_2": tokenizer_2, | 
					
						
						|  | "feature_extractor": None, | 
					
						
						|  | "image_encoder": None, | 
					
						
						|  | } | 
					
						
						|  | return components | 
					
						
						|  |  | 
					
						
						|  | def get_dummy_inputs(self, device, seed=0): | 
					
						
						|  | if str(device).startswith("mps"): | 
					
						
						|  | generator = torch.manual_seed(seed) | 
					
						
						|  | else: | 
					
						
						|  | generator = torch.Generator(device=device).manual_seed(seed) | 
					
						
						|  |  | 
					
						
						|  | controlnet_embedder_scale_factor = 2 | 
					
						
						|  | images = [ | 
					
						
						|  | randn_tensor( | 
					
						
						|  | (1, 3, 32 * controlnet_embedder_scale_factor, 32 * controlnet_embedder_scale_factor), | 
					
						
						|  | generator=generator, | 
					
						
						|  | device=torch.device(device), | 
					
						
						|  | ), | 
					
						
						|  | ] | 
					
						
						|  |  | 
					
						
						|  | inputs = { | 
					
						
						|  | "prompt": "A painting of a squirrel eating a burger", | 
					
						
						|  | "generator": generator, | 
					
						
						|  | "num_inference_steps": 2, | 
					
						
						|  | "guidance_scale": 6.0, | 
					
						
						|  | "output_type": "np", | 
					
						
						|  | "image": images, | 
					
						
						|  | } | 
					
						
						|  |  | 
					
						
						|  | return inputs | 
					
						
						|  |  | 
					
						
						|  | def test_control_guidance_switch(self): | 
					
						
						|  | components = self.get_dummy_components() | 
					
						
						|  | pipe = self.pipeline_class(**components) | 
					
						
						|  | pipe.to(torch_device) | 
					
						
						|  |  | 
					
						
						|  | scale = 10.0 | 
					
						
						|  | steps = 4 | 
					
						
						|  |  | 
					
						
						|  | inputs = self.get_dummy_inputs(torch_device) | 
					
						
						|  | inputs["num_inference_steps"] = steps | 
					
						
						|  | inputs["controlnet_conditioning_scale"] = scale | 
					
						
						|  | output_1 = pipe(**inputs)[0] | 
					
						
						|  |  | 
					
						
						|  | inputs = self.get_dummy_inputs(torch_device) | 
					
						
						|  | inputs["num_inference_steps"] = steps | 
					
						
						|  | inputs["controlnet_conditioning_scale"] = scale | 
					
						
						|  | output_2 = pipe(**inputs, control_guidance_start=0.1, control_guidance_end=0.2)[0] | 
					
						
						|  |  | 
					
						
						|  | inputs = self.get_dummy_inputs(torch_device) | 
					
						
						|  | inputs["num_inference_steps"] = steps | 
					
						
						|  | inputs["controlnet_conditioning_scale"] = scale | 
					
						
						|  | output_3 = pipe( | 
					
						
						|  | **inputs, | 
					
						
						|  | control_guidance_start=[0.1], | 
					
						
						|  | control_guidance_end=[0.2], | 
					
						
						|  | )[0] | 
					
						
						|  |  | 
					
						
						|  | inputs = self.get_dummy_inputs(torch_device) | 
					
						
						|  | inputs["num_inference_steps"] = steps | 
					
						
						|  | inputs["controlnet_conditioning_scale"] = scale | 
					
						
						|  | output_4 = pipe(**inputs, control_guidance_start=0.4, control_guidance_end=[0.5])[0] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | assert np.sum(np.abs(output_1 - output_2)) > 1e-3 | 
					
						
						|  | assert np.sum(np.abs(output_1 - output_3)) > 1e-3 | 
					
						
						|  | assert np.sum(np.abs(output_1 - output_4)) > 1e-3 | 
					
						
						|  |  | 
					
						
						|  | def test_attention_slicing_forward_pass(self): | 
					
						
						|  | return self._test_attention_slicing_forward_pass(expected_max_diff=2e-3) | 
					
						
						|  |  | 
					
						
						|  | @unittest.skipIf( | 
					
						
						|  | torch_device != "cuda" or not is_xformers_available(), | 
					
						
						|  | reason="XFormers attention is only available with CUDA and `xformers` installed", | 
					
						
						|  | ) | 
					
						
						|  | def test_xformers_attention_forwardGenerator_pass(self): | 
					
						
						|  | self._test_xformers_attention_forwardGenerator_pass(expected_max_diff=2e-3) | 
					
						
						|  |  | 
					
						
						|  | def test_inference_batch_single_identical(self): | 
					
						
						|  | self._test_inference_batch_single_identical(expected_max_diff=2e-3) | 
					
						
						|  |  | 
					
						
						|  | def test_save_load_optional_components(self): | 
					
						
						|  | self._test_save_load_optional_components() | 
					
						
						|  |  | 
					
						
						|  | def test_negative_conditions(self): | 
					
						
						|  | components = self.get_dummy_components() | 
					
						
						|  | pipe = self.pipeline_class(**components) | 
					
						
						|  | pipe.to(torch_device) | 
					
						
						|  |  | 
					
						
						|  | inputs = self.get_dummy_inputs(torch_device) | 
					
						
						|  | image = pipe(**inputs).images | 
					
						
						|  | image_slice_without_neg_cond = image[0, -3:, -3:, -1] | 
					
						
						|  |  | 
					
						
						|  | image = pipe( | 
					
						
						|  | **inputs, | 
					
						
						|  | negative_original_size=(512, 512), | 
					
						
						|  | negative_crops_coords_top_left=(0, 0), | 
					
						
						|  | negative_target_size=(1024, 1024), | 
					
						
						|  | ).images | 
					
						
						|  | image_slice_with_neg_cond = image[0, -3:, -3:, -1] | 
					
						
						|  |  | 
					
						
						|  | self.assertTrue(np.abs(image_slice_without_neg_cond - image_slice_with_neg_cond).max() > 1e-2) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | @slow | 
					
						
						|  | @require_torch_gpu | 
					
						
						|  | class ControlNetSDXLPipelineSlowTests(unittest.TestCase): | 
					
						
						|  | def setUp(self): | 
					
						
						|  | super().setUp() | 
					
						
						|  | gc.collect() | 
					
						
						|  | torch.cuda.empty_cache() | 
					
						
						|  |  | 
					
						
						|  | def tearDown(self): | 
					
						
						|  | super().tearDown() | 
					
						
						|  | gc.collect() | 
					
						
						|  | torch.cuda.empty_cache() | 
					
						
						|  |  | 
					
						
						|  | def test_canny(self): | 
					
						
						|  | controlnet = ControlNetModel.from_pretrained("diffusers/controlnet-canny-sdxl-1.0") | 
					
						
						|  |  | 
					
						
						|  | pipe = StableDiffusionXLControlNetPipeline.from_pretrained( | 
					
						
						|  | "stabilityai/stable-diffusion-xl-base-1.0", controlnet=controlnet | 
					
						
						|  | ) | 
					
						
						|  | pipe.enable_sequential_cpu_offload() | 
					
						
						|  | pipe.set_progress_bar_config(disable=None) | 
					
						
						|  |  | 
					
						
						|  | generator = torch.Generator(device="cpu").manual_seed(0) | 
					
						
						|  | prompt = "bird" | 
					
						
						|  | image = load_image( | 
					
						
						|  | "https://huggingface.co/datasets/hf-internal-testing/diffusers-images/resolve/main/sd_controlnet/bird_canny.png" | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | images = pipe(prompt, image=image, generator=generator, output_type="np", num_inference_steps=3).images | 
					
						
						|  |  | 
					
						
						|  | assert images[0].shape == (768, 512, 3) | 
					
						
						|  |  | 
					
						
						|  | original_image = images[0, -3:, -3:, -1].flatten() | 
					
						
						|  | expected_image = np.array([0.4185, 0.4127, 0.4089, 0.4046, 0.4115, 0.4096, 0.4081, 0.4112, 0.3913]) | 
					
						
						|  | assert np.allclose(original_image, expected_image, atol=1e-04) | 
					
						
						|  |  | 
					
						
						|  | def test_depth(self): | 
					
						
						|  | controlnet = ControlNetModel.from_pretrained("diffusers/controlnet-depth-sdxl-1.0") | 
					
						
						|  |  | 
					
						
						|  | pipe = StableDiffusionXLControlNetPipeline.from_pretrained( | 
					
						
						|  | "stabilityai/stable-diffusion-xl-base-1.0", controlnet=controlnet | 
					
						
						|  | ) | 
					
						
						|  | pipe.enable_sequential_cpu_offload() | 
					
						
						|  | pipe.set_progress_bar_config(disable=None) | 
					
						
						|  |  | 
					
						
						|  | generator = torch.Generator(device="cpu").manual_seed(0) | 
					
						
						|  | prompt = "Stormtrooper's lecture" | 
					
						
						|  | image = load_image( | 
					
						
						|  | "https://huggingface.co/datasets/hf-internal-testing/diffusers-images/resolve/main/sd_controlnet/stormtrooper_depth.png" | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | images = pipe(prompt, image=image, generator=generator, output_type="np", num_inference_steps=3).images | 
					
						
						|  |  | 
					
						
						|  | assert images[0].shape == (512, 512, 3) | 
					
						
						|  |  | 
					
						
						|  | original_image = images[0, -3:, -3:, -1].flatten() | 
					
						
						|  | expected_image = np.array([0.4399, 0.5112, 0.5478, 0.4314, 0.472, 0.4823, 0.4647, 0.4957, 0.4853]) | 
					
						
						|  | assert np.allclose(original_image, expected_image, atol=1e-04) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | class StableDiffusionSSD1BControlNetPipelineFastTests(StableDiffusionXLControlNetPipelineFastTests): | 
					
						
						|  | def test_controlnet_sdxl_guess(self): | 
					
						
						|  | device = "cpu" | 
					
						
						|  |  | 
					
						
						|  | components = self.get_dummy_components() | 
					
						
						|  |  | 
					
						
						|  | sd_pipe = self.pipeline_class(**components) | 
					
						
						|  | sd_pipe = sd_pipe.to(device) | 
					
						
						|  |  | 
					
						
						|  | sd_pipe.set_progress_bar_config(disable=None) | 
					
						
						|  |  | 
					
						
						|  | inputs = self.get_dummy_inputs(device) | 
					
						
						|  | inputs["guess_mode"] = True | 
					
						
						|  |  | 
					
						
						|  | output = sd_pipe(**inputs) | 
					
						
						|  | image_slice = output.images[0, -3:, -3:, -1] | 
					
						
						|  | expected_slice = np.array( | 
					
						
						|  | [0.6831671, 0.5702532, 0.5459845, 0.6299793, 0.58563006, 0.6033695, 0.4493941, 0.46132287, 0.5035841] | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | assert np.abs(image_slice.flatten() - expected_slice).max() < 1e-4 | 
					
						
						|  |  | 
					
						
						|  | def test_ip_adapter_single(self): | 
					
						
						|  | expected_pipe_slice = None | 
					
						
						|  | if torch_device == "cpu": | 
					
						
						|  | expected_pipe_slice = np.array([0.6832, 0.5703, 0.5460, 0.6300, 0.5856, 0.6034, 0.4494, 0.4613, 0.5036]) | 
					
						
						|  | return super().test_ip_adapter_single(from_ssd1b=True, expected_pipe_slice=expected_pipe_slice) | 
					
						
						|  |  | 
					
						
						|  | def test_controlnet_sdxl_lcm(self): | 
					
						
						|  | device = "cpu" | 
					
						
						|  |  | 
					
						
						|  | components = self.get_dummy_components(time_cond_proj_dim=256) | 
					
						
						|  | sd_pipe = StableDiffusionXLControlNetPipeline(**components) | 
					
						
						|  | sd_pipe.scheduler = LCMScheduler.from_config(sd_pipe.scheduler.config) | 
					
						
						|  | sd_pipe = sd_pipe.to(torch_device) | 
					
						
						|  | sd_pipe.set_progress_bar_config(disable=None) | 
					
						
						|  |  | 
					
						
						|  | inputs = self.get_dummy_inputs(device) | 
					
						
						|  | output = sd_pipe(**inputs) | 
					
						
						|  | image = output.images | 
					
						
						|  |  | 
					
						
						|  | image_slice = image[0, -3:, -3:, -1] | 
					
						
						|  |  | 
					
						
						|  | assert image.shape == (1, 64, 64, 3) | 
					
						
						|  | expected_slice = np.array([0.6850, 0.5135, 0.5545, 0.7033, 0.6617, 0.5971, 0.4165, 0.5480, 0.5070]) | 
					
						
						|  |  | 
					
						
						|  | assert np.abs(image_slice.flatten() - expected_slice).max() < 1e-2 | 
					
						
						|  |  | 
					
						
						|  | def test_conditioning_channels(self): | 
					
						
						|  | unet = UNet2DConditionModel( | 
					
						
						|  | block_out_channels=(32, 64), | 
					
						
						|  | layers_per_block=2, | 
					
						
						|  | sample_size=32, | 
					
						
						|  | in_channels=4, | 
					
						
						|  | out_channels=4, | 
					
						
						|  | down_block_types=("DownBlock2D", "CrossAttnDownBlock2D"), | 
					
						
						|  | up_block_types=("CrossAttnUpBlock2D", "UpBlock2D"), | 
					
						
						|  | mid_block_type="UNetMidBlock2D", | 
					
						
						|  |  | 
					
						
						|  | attention_head_dim=(2, 4), | 
					
						
						|  | use_linear_projection=True, | 
					
						
						|  | addition_embed_type="text_time", | 
					
						
						|  | addition_time_embed_dim=8, | 
					
						
						|  | transformer_layers_per_block=(1, 2), | 
					
						
						|  | projection_class_embeddings_input_dim=80, | 
					
						
						|  | cross_attention_dim=64, | 
					
						
						|  | time_cond_proj_dim=None, | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | controlnet = ControlNetModel.from_unet(unet, conditioning_channels=4) | 
					
						
						|  | assert type(controlnet.mid_block) == UNetMidBlock2D | 
					
						
						|  | assert controlnet.conditioning_channels == 4 | 
					
						
						|  |  | 
					
						
						|  | def get_dummy_components(self, time_cond_proj_dim=None): | 
					
						
						|  | torch.manual_seed(0) | 
					
						
						|  | unet = UNet2DConditionModel( | 
					
						
						|  | block_out_channels=(32, 64), | 
					
						
						|  | layers_per_block=2, | 
					
						
						|  | sample_size=32, | 
					
						
						|  | in_channels=4, | 
					
						
						|  | out_channels=4, | 
					
						
						|  | down_block_types=("DownBlock2D", "CrossAttnDownBlock2D"), | 
					
						
						|  | up_block_types=("CrossAttnUpBlock2D", "UpBlock2D"), | 
					
						
						|  | mid_block_type="UNetMidBlock2D", | 
					
						
						|  |  | 
					
						
						|  | attention_head_dim=(2, 4), | 
					
						
						|  | use_linear_projection=True, | 
					
						
						|  | addition_embed_type="text_time", | 
					
						
						|  | addition_time_embed_dim=8, | 
					
						
						|  | transformer_layers_per_block=(1, 2), | 
					
						
						|  | projection_class_embeddings_input_dim=80, | 
					
						
						|  | cross_attention_dim=64, | 
					
						
						|  | time_cond_proj_dim=time_cond_proj_dim, | 
					
						
						|  | ) | 
					
						
						|  | torch.manual_seed(0) | 
					
						
						|  | controlnet = ControlNetModel( | 
					
						
						|  | block_out_channels=(32, 64), | 
					
						
						|  | layers_per_block=2, | 
					
						
						|  | in_channels=4, | 
					
						
						|  | down_block_types=("DownBlock2D", "CrossAttnDownBlock2D"), | 
					
						
						|  | conditioning_embedding_out_channels=(16, 32), | 
					
						
						|  | mid_block_type="UNetMidBlock2D", | 
					
						
						|  |  | 
					
						
						|  | attention_head_dim=(2, 4), | 
					
						
						|  | use_linear_projection=True, | 
					
						
						|  | addition_embed_type="text_time", | 
					
						
						|  | addition_time_embed_dim=8, | 
					
						
						|  | transformer_layers_per_block=(1, 2), | 
					
						
						|  | projection_class_embeddings_input_dim=80, | 
					
						
						|  | cross_attention_dim=64, | 
					
						
						|  | ) | 
					
						
						|  | torch.manual_seed(0) | 
					
						
						|  | scheduler = EulerDiscreteScheduler( | 
					
						
						|  | beta_start=0.00085, | 
					
						
						|  | beta_end=0.012, | 
					
						
						|  | steps_offset=1, | 
					
						
						|  | beta_schedule="scaled_linear", | 
					
						
						|  | timestep_spacing="leading", | 
					
						
						|  | ) | 
					
						
						|  | torch.manual_seed(0) | 
					
						
						|  | vae = AutoencoderKL( | 
					
						
						|  | block_out_channels=[32, 64], | 
					
						
						|  | in_channels=3, | 
					
						
						|  | out_channels=3, | 
					
						
						|  | down_block_types=["DownEncoderBlock2D", "DownEncoderBlock2D"], | 
					
						
						|  | up_block_types=["UpDecoderBlock2D", "UpDecoderBlock2D"], | 
					
						
						|  | latent_channels=4, | 
					
						
						|  | ) | 
					
						
						|  | torch.manual_seed(0) | 
					
						
						|  | text_encoder_config = CLIPTextConfig( | 
					
						
						|  | bos_token_id=0, | 
					
						
						|  | eos_token_id=2, | 
					
						
						|  | hidden_size=32, | 
					
						
						|  | intermediate_size=37, | 
					
						
						|  | layer_norm_eps=1e-05, | 
					
						
						|  | num_attention_heads=4, | 
					
						
						|  | num_hidden_layers=5, | 
					
						
						|  | pad_token_id=1, | 
					
						
						|  | vocab_size=1000, | 
					
						
						|  |  | 
					
						
						|  | hidden_act="gelu", | 
					
						
						|  | projection_dim=32, | 
					
						
						|  | ) | 
					
						
						|  | text_encoder = CLIPTextModel(text_encoder_config) | 
					
						
						|  | tokenizer = CLIPTokenizer.from_pretrained("hf-internal-testing/tiny-random-clip") | 
					
						
						|  |  | 
					
						
						|  | text_encoder_2 = CLIPTextModelWithProjection(text_encoder_config) | 
					
						
						|  | tokenizer_2 = CLIPTokenizer.from_pretrained("hf-internal-testing/tiny-random-clip") | 
					
						
						|  |  | 
					
						
						|  | components = { | 
					
						
						|  | "unet": unet, | 
					
						
						|  | "controlnet": controlnet, | 
					
						
						|  | "scheduler": scheduler, | 
					
						
						|  | "vae": vae, | 
					
						
						|  | "text_encoder": text_encoder, | 
					
						
						|  | "tokenizer": tokenizer, | 
					
						
						|  | "text_encoder_2": text_encoder_2, | 
					
						
						|  | "tokenizer_2": tokenizer_2, | 
					
						
						|  | "feature_extractor": None, | 
					
						
						|  | "image_encoder": None, | 
					
						
						|  | } | 
					
						
						|  | return components | 
					
						
						|  |  |