Spaces:
Runtime error
Runtime error
| import torch | |
| from typing import List, Optional, Tuple, Union | |
| from diffusers.utils import ( | |
| USE_PEFT_BACKEND, | |
| BaseOutput, | |
| logging, | |
| replace_example_docstring, | |
| scale_lora_layers, | |
| unscale_lora_layers, | |
| ) | |
| from diffusers.utils.torch_utils import randn_tensor | |
| class EulerAncestralDiscreteSchedulerOutput(BaseOutput): | |
| prev_sample: torch.FloatTensor | |
| pred_original_sample: Optional[torch.FloatTensor] = None | |
| def eul_step( | |
| self, | |
| model_output: torch.FloatTensor, | |
| timestep: Union[float, torch.FloatTensor], | |
| sample: torch.FloatTensor, | |
| fusion_latent, | |
| pipe, | |
| generator: Optional[torch.Generator] = None, | |
| return_dict: bool = True, | |
| ) -> Union[EulerAncestralDiscreteSchedulerOutput, Tuple]: | |
| if ( | |
| isinstance(timestep, int) | |
| or isinstance(timestep, torch.IntTensor) | |
| or isinstance(timestep, torch.LongTensor) | |
| ): | |
| raise ValueError( | |
| ( | |
| "Passing integer indices (e.g. from `enumerate(timesteps)`) as timesteps to" | |
| " `EulerDiscreteScheduler.step()` is not supported. Make sure to pass" | |
| " one of the `scheduler.timesteps` as a timestep." | |
| ), | |
| ) | |
| if self.step_index is None: | |
| self._init_step_index(timestep) | |
| sigma = self.sigmas[self.step_index] | |
| # Upcast to avoid precision issues when computing prev_sample | |
| sample = sample.to(torch.float32) | |
| # 1. compute predicted original sample (x_0) from sigma-scaled predicted noise | |
| if self.config.prediction_type == "epsilon": ## True, 计算x_0 | |
| pred_original_sample = sample - sigma * model_output | |
| elif self.config.prediction_type == "v_prediction": | |
| # * c_out + input * c_skip | |
| pred_original_sample = model_output * (-sigma / (sigma**2 + 1) ** 0.5) + (sample / (sigma**2 + 1)) | |
| elif self.config.prediction_type == "sample": | |
| raise NotImplementedError("prediction_type not implemented yet: sample") | |
| else: | |
| raise ValueError( | |
| f"prediction_type given as {self.config.prediction_type} must be one of `epsilon`, or `v_prediction`" | |
| ) | |
| ## fusion latent | |
| pred_original_sample = fusion_latent | |
| sigma_from = self.sigmas[self.step_index] | |
| sigma_to = self.sigmas[self.step_index + 1] | |
| sigma_up = (sigma_to**2 * (sigma_from**2 - sigma_to**2) / sigma_from**2) ** 0.5 | |
| sigma_down = (sigma_to**2 - sigma_up**2) ** 0.5 | |
| # 2. Convert to an ODE derivative | |
| derivative = (sample - pred_original_sample) / sigma | |
| dt = sigma_down - sigma | |
| prev_sample = sample + derivative * dt | |
| device = model_output.device | |
| noise = randn_tensor(model_output.shape, dtype=model_output.dtype, device=device, generator=generator) | |
| prev_sample = prev_sample + noise * sigma_up | |
| # Cast sample back to model compatible dtype | |
| prev_sample = prev_sample.to(model_output.dtype) | |
| # upon completion increase step index by one | |
| self._step_index += 1 | |
| if not return_dict: | |
| return (prev_sample,) | |
| return EulerAncestralDiscreteSchedulerOutput( | |
| prev_sample=prev_sample, pred_original_sample=pred_original_sample | |
| ) |