Spaces:
Runtime error
Runtime error
user
5fb352c
| import torch | |
| from torch.nn.functional import silu | |
| from types import MethodType | |
| import modules.textual_inversion.textual_inversion | |
| from modules import devices, sd_hijack_optimizations, shared, sd_hijack_checkpoint | |
| from modules.hypernetworks import hypernetwork | |
| from modules.shared import cmd_opts | |
| from modules import sd_hijack_clip, sd_hijack_open_clip, sd_hijack_unet, sd_hijack_xlmr, xlmr | |
| import ldm.modules.attention | |
| import ldm.modules.diffusionmodules.model | |
| import ldm.modules.diffusionmodules.openaimodel | |
| import ldm.models.diffusion.ddim | |
| import ldm.models.diffusion.plms | |
| import ldm.modules.encoders.modules | |
| attention_CrossAttention_forward = ldm.modules.attention.CrossAttention.forward | |
| diffusionmodules_model_nonlinearity = ldm.modules.diffusionmodules.model.nonlinearity | |
| diffusionmodules_model_AttnBlock_forward = ldm.modules.diffusionmodules.model.AttnBlock.forward | |
| # new memory efficient cross attention blocks do not support hypernets and we already | |
| # have memory efficient cross attention anyway, so this disables SD2.0's memory efficient cross attention | |
| ldm.modules.attention.MemoryEfficientCrossAttention = ldm.modules.attention.CrossAttention | |
| ldm.modules.attention.BasicTransformerBlock.ATTENTION_MODES["softmax-xformers"] = ldm.modules.attention.CrossAttention | |
| # silence new console spam from SD2 | |
| ldm.modules.attention.print = lambda *args: None | |
| ldm.modules.diffusionmodules.model.print = lambda *args: None | |
| def apply_optimizations(): | |
| undo_optimizations() | |
| ldm.modules.diffusionmodules.model.nonlinearity = silu | |
| ldm.modules.diffusionmodules.openaimodel.th = sd_hijack_unet.th | |
| optimization_method = None | |
| if cmd_opts.force_enable_xformers or (cmd_opts.xformers and shared.xformers_available and torch.version.cuda and (6, 0) <= torch.cuda.get_device_capability(shared.device) <= (9, 0)): | |
| print("Applying xformers cross attention optimization.") | |
| ldm.modules.attention.CrossAttention.forward = sd_hijack_optimizations.xformers_attention_forward | |
| ldm.modules.diffusionmodules.model.AttnBlock.forward = sd_hijack_optimizations.xformers_attnblock_forward | |
| optimization_method = 'xformers' | |
| elif cmd_opts.opt_sub_quad_attention: | |
| print("Applying sub-quadratic cross attention optimization.") | |
| ldm.modules.attention.CrossAttention.forward = sd_hijack_optimizations.sub_quad_attention_forward | |
| ldm.modules.diffusionmodules.model.AttnBlock.forward = sd_hijack_optimizations.sub_quad_attnblock_forward | |
| optimization_method = 'sub-quadratic' | |
| elif cmd_opts.opt_split_attention_v1: | |
| print("Applying v1 cross attention optimization.") | |
| ldm.modules.attention.CrossAttention.forward = sd_hijack_optimizations.split_cross_attention_forward_v1 | |
| optimization_method = 'V1' | |
| elif not cmd_opts.disable_opt_split_attention and (cmd_opts.opt_split_attention_invokeai or not cmd_opts.opt_split_attention and not torch.cuda.is_available()): | |
| print("Applying cross attention optimization (InvokeAI).") | |
| ldm.modules.attention.CrossAttention.forward = sd_hijack_optimizations.split_cross_attention_forward_invokeAI | |
| optimization_method = 'InvokeAI' | |
| elif not cmd_opts.disable_opt_split_attention and (cmd_opts.opt_split_attention or torch.cuda.is_available()): | |
| print("Applying cross attention optimization (Doggettx).") | |
| ldm.modules.attention.CrossAttention.forward = sd_hijack_optimizations.split_cross_attention_forward | |
| ldm.modules.diffusionmodules.model.AttnBlock.forward = sd_hijack_optimizations.cross_attention_attnblock_forward | |
| optimization_method = 'Doggettx' | |
| return optimization_method | |
| def undo_optimizations(): | |
| ldm.modules.attention.CrossAttention.forward = hypernetwork.attention_CrossAttention_forward | |
| ldm.modules.diffusionmodules.model.nonlinearity = diffusionmodules_model_nonlinearity | |
| ldm.modules.diffusionmodules.model.AttnBlock.forward = diffusionmodules_model_AttnBlock_forward | |
| def fix_checkpoint(): | |
| """checkpoints are now added and removed in embedding/hypernet code, since torch doesn't want | |
| checkpoints to be added when not training (there's a warning)""" | |
| pass | |
| def weighted_loss(sd_model, pred, target, mean=True): | |
| #Calculate the weight normally, but ignore the mean | |
| loss = sd_model._old_get_loss(pred, target, mean=False) | |
| #Check if we have weights available | |
| weight = getattr(sd_model, '_custom_loss_weight', None) | |
| if weight is not None: | |
| loss *= weight | |
| #Return the loss, as mean if specified | |
| return loss.mean() if mean else loss | |
| def weighted_forward(sd_model, x, c, w, *args, **kwargs): | |
| try: | |
| #Temporarily append weights to a place accessible during loss calc | |
| sd_model._custom_loss_weight = w | |
| #Replace 'get_loss' with a weight-aware one. Otherwise we need to reimplement 'forward' completely | |
| #Keep 'get_loss', but don't overwrite the previous old_get_loss if it's already set | |
| if not hasattr(sd_model, '_old_get_loss'): | |
| sd_model._old_get_loss = sd_model.get_loss | |
| sd_model.get_loss = MethodType(weighted_loss, sd_model) | |
| #Run the standard forward function, but with the patched 'get_loss' | |
| return sd_model.forward(x, c, *args, **kwargs) | |
| finally: | |
| try: | |
| #Delete temporary weights if appended | |
| del sd_model._custom_loss_weight | |
| except AttributeError as e: | |
| pass | |
| #If we have an old loss function, reset the loss function to the original one | |
| if hasattr(sd_model, '_old_get_loss'): | |
| sd_model.get_loss = sd_model._old_get_loss | |
| del sd_model._old_get_loss | |
| def apply_weighted_forward(sd_model): | |
| #Add new function 'weighted_forward' that can be called to calc weighted loss | |
| sd_model.weighted_forward = MethodType(weighted_forward, sd_model) | |
| def undo_weighted_forward(sd_model): | |
| try: | |
| del sd_model.weighted_forward | |
| except AttributeError as e: | |
| pass | |
| class StableDiffusionModelHijack: | |
| fixes = None | |
| comments = [] | |
| layers = None | |
| circular_enabled = False | |
| clip = None | |
| optimization_method = None | |
| embedding_db = modules.textual_inversion.textual_inversion.EmbeddingDatabase() | |
| def __init__(self): | |
| self.embedding_db.add_embedding_dir(cmd_opts.embeddings_dir) | |
| def hijack(self, m): | |
| if type(m.cond_stage_model) == xlmr.BertSeriesModelWithTransformation: | |
| model_embeddings = m.cond_stage_model.roberta.embeddings | |
| model_embeddings.token_embedding = EmbeddingsWithFixes(model_embeddings.word_embeddings, self) | |
| m.cond_stage_model = sd_hijack_xlmr.FrozenXLMREmbedderWithCustomWords(m.cond_stage_model, self) | |
| elif type(m.cond_stage_model) == ldm.modules.encoders.modules.FrozenCLIPEmbedder: | |
| model_embeddings = m.cond_stage_model.transformer.text_model.embeddings | |
| model_embeddings.token_embedding = EmbeddingsWithFixes(model_embeddings.token_embedding, self) | |
| m.cond_stage_model = sd_hijack_clip.FrozenCLIPEmbedderWithCustomWords(m.cond_stage_model, self) | |
| elif type(m.cond_stage_model) == ldm.modules.encoders.modules.FrozenOpenCLIPEmbedder: | |
| m.cond_stage_model.model.token_embedding = EmbeddingsWithFixes(m.cond_stage_model.model.token_embedding, self) | |
| m.cond_stage_model = sd_hijack_open_clip.FrozenOpenCLIPEmbedderWithCustomWords(m.cond_stage_model, self) | |
| apply_weighted_forward(m) | |
| if m.cond_stage_key == "edit": | |
| sd_hijack_unet.hijack_ddpm_edit() | |
| self.optimization_method = apply_optimizations() | |
| self.clip = m.cond_stage_model | |
| def flatten(el): | |
| flattened = [flatten(children) for children in el.children()] | |
| res = [el] | |
| for c in flattened: | |
| res += c | |
| return res | |
| self.layers = flatten(m) | |
| def undo_hijack(self, m): | |
| if type(m.cond_stage_model) == xlmr.BertSeriesModelWithTransformation: | |
| m.cond_stage_model = m.cond_stage_model.wrapped | |
| elif type(m.cond_stage_model) == sd_hijack_clip.FrozenCLIPEmbedderWithCustomWords: | |
| m.cond_stage_model = m.cond_stage_model.wrapped | |
| model_embeddings = m.cond_stage_model.transformer.text_model.embeddings | |
| if type(model_embeddings.token_embedding) == EmbeddingsWithFixes: | |
| model_embeddings.token_embedding = model_embeddings.token_embedding.wrapped | |
| elif type(m.cond_stage_model) == sd_hijack_open_clip.FrozenOpenCLIPEmbedderWithCustomWords: | |
| m.cond_stage_model.wrapped.model.token_embedding = m.cond_stage_model.wrapped.model.token_embedding.wrapped | |
| m.cond_stage_model = m.cond_stage_model.wrapped | |
| undo_optimizations() | |
| undo_weighted_forward(m) | |
| self.apply_circular(False) | |
| self.layers = None | |
| self.clip = None | |
| def apply_circular(self, enable): | |
| if self.circular_enabled == enable: | |
| return | |
| self.circular_enabled = enable | |
| for layer in [layer for layer in self.layers if type(layer) == torch.nn.Conv2d]: | |
| layer.padding_mode = 'circular' if enable else 'zeros' | |
| def clear_comments(self): | |
| self.comments = [] | |
| def get_prompt_lengths(self, text): | |
| _, token_count = self.clip.process_texts([text]) | |
| return token_count, self.clip.get_target_prompt_token_count(token_count) | |
| class EmbeddingsWithFixes(torch.nn.Module): | |
| def __init__(self, wrapped, embeddings): | |
| super().__init__() | |
| self.wrapped = wrapped | |
| self.embeddings = embeddings | |
| def forward(self, input_ids): | |
| batch_fixes = self.embeddings.fixes | |
| self.embeddings.fixes = None | |
| inputs_embeds = self.wrapped(input_ids) | |
| if batch_fixes is None or len(batch_fixes) == 0 or max([len(x) for x in batch_fixes]) == 0: | |
| return inputs_embeds | |
| vecs = [] | |
| for fixes, tensor in zip(batch_fixes, inputs_embeds): | |
| for offset, embedding in fixes: | |
| emb = devices.cond_cast_unet(embedding.vec) | |
| emb_len = min(tensor.shape[0] - offset - 1, emb.shape[0]) | |
| tensor = torch.cat([tensor[0:offset + 1], emb[0:emb_len], tensor[offset + 1 + emb_len:]]) | |
| vecs.append(tensor) | |
| return torch.stack(vecs) | |
| def add_circular_option_to_conv_2d(): | |
| conv2d_constructor = torch.nn.Conv2d.__init__ | |
| def conv2d_constructor_circular(self, *args, **kwargs): | |
| return conv2d_constructor(self, *args, padding_mode='circular', **kwargs) | |
| torch.nn.Conv2d.__init__ = conv2d_constructor_circular | |
| model_hijack = StableDiffusionModelHijack() | |
| def register_buffer(self, name, attr): | |
| """ | |
| Fix register buffer bug for Mac OS. | |
| """ | |
| if type(attr) == torch.Tensor: | |
| if attr.device != devices.device: | |
| attr = attr.to(device=devices.device, dtype=(torch.float32 if devices.device.type == 'mps' else None)) | |
| setattr(self, name, attr) | |
| ldm.models.diffusion.ddim.DDIMSampler.register_buffer = register_buffer | |
| ldm.models.diffusion.plms.PLMSSampler.register_buffer = register_buffer | |