Spaces:
Running
on
Zero
Running
on
Zero
| import torch | |
| import argparse | |
| import random | |
| import re | |
| from typing import List, Optional, Union | |
| from .utils import setup_logging | |
| setup_logging() | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| def prepare_scheduler_for_custom_training(noise_scheduler, device): | |
| if hasattr(noise_scheduler, "all_snr"): | |
| return | |
| alphas_cumprod = noise_scheduler.alphas_cumprod | |
| sqrt_alphas_cumprod = torch.sqrt(alphas_cumprod) | |
| sqrt_one_minus_alphas_cumprod = torch.sqrt(1.0 - alphas_cumprod) | |
| alpha = sqrt_alphas_cumprod | |
| sigma = sqrt_one_minus_alphas_cumprod | |
| all_snr = (alpha / sigma) ** 2 | |
| noise_scheduler.all_snr = all_snr.to(device) | |
| def fix_noise_scheduler_betas_for_zero_terminal_snr(noise_scheduler): | |
| # fix beta: zero terminal SNR | |
| logger.info(f"fix noise scheduler betas: https://arxiv.org/abs/2305.08891") | |
| def enforce_zero_terminal_snr(betas): | |
| # Convert betas to alphas_bar_sqrt | |
| alphas = 1 - betas | |
| alphas_bar = alphas.cumprod(0) | |
| alphas_bar_sqrt = alphas_bar.sqrt() | |
| # Store old values. | |
| alphas_bar_sqrt_0 = alphas_bar_sqrt[0].clone() | |
| alphas_bar_sqrt_T = alphas_bar_sqrt[-1].clone() | |
| # Shift so last timestep is zero. | |
| alphas_bar_sqrt -= alphas_bar_sqrt_T | |
| # Scale so first timestep is back to old value. | |
| alphas_bar_sqrt *= alphas_bar_sqrt_0 / (alphas_bar_sqrt_0 - alphas_bar_sqrt_T) | |
| # Convert alphas_bar_sqrt to betas | |
| alphas_bar = alphas_bar_sqrt**2 | |
| alphas = alphas_bar[1:] / alphas_bar[:-1] | |
| alphas = torch.cat([alphas_bar[0:1], alphas]) | |
| betas = 1 - alphas | |
| return betas | |
| betas = noise_scheduler.betas | |
| betas = enforce_zero_terminal_snr(betas) | |
| alphas = 1.0 - betas | |
| alphas_cumprod = torch.cumprod(alphas, dim=0) | |
| # logger.info(f"original: {noise_scheduler.betas}") | |
| # logger.info(f"fixed: {betas}") | |
| noise_scheduler.betas = betas | |
| noise_scheduler.alphas = alphas | |
| noise_scheduler.alphas_cumprod = alphas_cumprod | |
| def apply_snr_weight(loss, timesteps, noise_scheduler, gamma, v_prediction=False): | |
| snr = torch.stack([noise_scheduler.all_snr[t] for t in timesteps]) | |
| min_snr_gamma = torch.minimum(snr, torch.full_like(snr, gamma)) | |
| if v_prediction: | |
| snr_weight = torch.div(min_snr_gamma, snr + 1).float().to(loss.device) | |
| else: | |
| snr_weight = torch.div(min_snr_gamma, snr).float().to(loss.device) | |
| loss = loss * snr_weight | |
| return loss | |
| def scale_v_prediction_loss_like_noise_prediction(loss, timesteps, noise_scheduler): | |
| scale = get_snr_scale(timesteps, noise_scheduler) | |
| loss = loss * scale | |
| return loss | |
| def get_snr_scale(timesteps, noise_scheduler): | |
| snr_t = torch.stack([noise_scheduler.all_snr[t] for t in timesteps]) # batch_size | |
| snr_t = torch.minimum(snr_t, torch.ones_like(snr_t) * 1000) # if timestep is 0, snr_t is inf, so limit it to 1000 | |
| scale = snr_t / (snr_t + 1) | |
| # # show debug info | |
| # logger.info(f"timesteps: {timesteps}, snr_t: {snr_t}, scale: {scale}") | |
| return scale | |
| def add_v_prediction_like_loss(loss, timesteps, noise_scheduler, v_pred_like_loss): | |
| scale = get_snr_scale(timesteps, noise_scheduler) | |
| # logger.info(f"add v-prediction like loss: {v_pred_like_loss}, scale: {scale}, loss: {loss}, time: {timesteps}") | |
| loss = loss + loss / scale * v_pred_like_loss | |
| return loss | |
| def apply_debiased_estimation(loss, timesteps, noise_scheduler, v_prediction=False): | |
| snr_t = torch.stack([noise_scheduler.all_snr[t] for t in timesteps]) # batch_size | |
| snr_t = torch.minimum(snr_t, torch.ones_like(snr_t) * 1000) # if timestep is 0, snr_t is inf, so limit it to 1000 | |
| if v_prediction: | |
| weight = 1 / (snr_t + 1) | |
| else: | |
| weight = 1 / torch.sqrt(snr_t) | |
| loss = weight * loss | |
| return loss | |
| # TODO train_utilと分散しているのでどちらかに寄せる | |
| def add_custom_train_arguments(parser: argparse.ArgumentParser, support_weighted_captions: bool = True): | |
| parser.add_argument( | |
| "--min_snr_gamma", | |
| type=float, | |
| default=None, | |
| help="gamma for reducing the weight of high loss timesteps. Lower numbers have stronger effect. 5 is recommended by paper. / 低いタイムステップでの高いlossに対して重みを減らすためのgamma値、低いほど効果が強く、論文では5が推奨", | |
| ) | |
| parser.add_argument( | |
| "--scale_v_pred_loss_like_noise_pred", | |
| action="store_true", | |
| help="scale v-prediction loss like noise prediction loss / v-prediction lossをnoise prediction lossと同じようにスケーリングする", | |
| ) | |
| parser.add_argument( | |
| "--v_pred_like_loss", | |
| type=float, | |
| default=None, | |
| help="add v-prediction like loss multiplied by this value / v-prediction lossをこの値をかけたものをlossに加算する", | |
| ) | |
| parser.add_argument( | |
| "--debiased_estimation_loss", | |
| action="store_true", | |
| help="debiased estimation loss / debiased estimation loss", | |
| ) | |
| if support_weighted_captions: | |
| parser.add_argument( | |
| "--weighted_captions", | |
| action="store_true", | |
| default=False, | |
| help="Enable weighted captions in the standard style (token:1.3). No commas inside parens, or shuffle/dropout may break the decoder. / 「[token]」、「(token)」「(token:1.3)」のような重み付きキャプションを有効にする。カンマを括弧内に入れるとシャッフルやdropoutで重みづけがおかしくなるので注意", | |
| ) | |
| re_attention = re.compile( | |
| r""" | |
| \\\(| | |
| \\\)| | |
| \\\[| | |
| \\]| | |
| \\\\| | |
| \\| | |
| \(| | |
| \[| | |
| :([+-]?[.\d]+)\)| | |
| \)| | |
| ]| | |
| [^\\()\[\]:]+| | |
| : | |
| """, | |
| re.X, | |
| ) | |
| def parse_prompt_attention(text): | |
| """ | |
| Parses a string with attention tokens and returns a list of pairs: text and its associated weight. | |
| Accepted tokens are: | |
| (abc) - increases attention to abc by a multiplier of 1.1 | |
| (abc:3.12) - increases attention to abc by a multiplier of 3.12 | |
| [abc] - decreases attention to abc by a multiplier of 1.1 | |
| \( - literal character '(' | |
| \[ - literal character '[' | |
| \) - literal character ')' | |
| \] - literal character ']' | |
| \\ - literal character '\' | |
| anything else - just text | |
| >>> parse_prompt_attention('normal text') | |
| [['normal text', 1.0]] | |
| >>> parse_prompt_attention('an (important) word') | |
| [['an ', 1.0], ['important', 1.1], [' word', 1.0]] | |
| >>> parse_prompt_attention('(unbalanced') | |
| [['unbalanced', 1.1]] | |
| >>> parse_prompt_attention('\(literal\]') | |
| [['(literal]', 1.0]] | |
| >>> parse_prompt_attention('(unnecessary)(parens)') | |
| [['unnecessaryparens', 1.1]] | |
| >>> parse_prompt_attention('a (((house:1.3)) [on] a (hill:0.5), sun, (((sky))).') | |
| [['a ', 1.0], | |
| ['house', 1.5730000000000004], | |
| [' ', 1.1], | |
| ['on', 1.0], | |
| [' a ', 1.1], | |
| ['hill', 0.55], | |
| [', sun, ', 1.1], | |
| ['sky', 1.4641000000000006], | |
| ['.', 1.1]] | |
| """ | |
| res = [] | |
| round_brackets = [] | |
| square_brackets = [] | |
| round_bracket_multiplier = 1.1 | |
| square_bracket_multiplier = 1 / 1.1 | |
| def multiply_range(start_position, multiplier): | |
| for p in range(start_position, len(res)): | |
| res[p][1] *= multiplier | |
| for m in re_attention.finditer(text): | |
| text = m.group(0) | |
| weight = m.group(1) | |
| if text.startswith("\\"): | |
| res.append([text[1:], 1.0]) | |
| elif text == "(": | |
| round_brackets.append(len(res)) | |
| elif text == "[": | |
| square_brackets.append(len(res)) | |
| elif weight is not None and len(round_brackets) > 0: | |
| multiply_range(round_brackets.pop(), float(weight)) | |
| elif text == ")" and len(round_brackets) > 0: | |
| multiply_range(round_brackets.pop(), round_bracket_multiplier) | |
| elif text == "]" and len(square_brackets) > 0: | |
| multiply_range(square_brackets.pop(), square_bracket_multiplier) | |
| else: | |
| res.append([text, 1.0]) | |
| for pos in round_brackets: | |
| multiply_range(pos, round_bracket_multiplier) | |
| for pos in square_brackets: | |
| multiply_range(pos, square_bracket_multiplier) | |
| if len(res) == 0: | |
| res = [["", 1.0]] | |
| # merge runs of identical weights | |
| i = 0 | |
| while i + 1 < len(res): | |
| if res[i][1] == res[i + 1][1]: | |
| res[i][0] += res[i + 1][0] | |
| res.pop(i + 1) | |
| else: | |
| i += 1 | |
| return res | |
| def get_prompts_with_weights(tokenizer, prompt: List[str], max_length: int): | |
| r""" | |
| Tokenize a list of prompts and return its tokens with weights of each token. | |
| No padding, starting or ending token is included. | |
| """ | |
| tokens = [] | |
| weights = [] | |
| truncated = False | |
| for text in prompt: | |
| texts_and_weights = parse_prompt_attention(text) | |
| text_token = [] | |
| text_weight = [] | |
| for word, weight in texts_and_weights: | |
| # tokenize and discard the starting and the ending token | |
| token = tokenizer(word).input_ids[1:-1] | |
| text_token += token | |
| # copy the weight by length of token | |
| text_weight += [weight] * len(token) | |
| # stop if the text is too long (longer than truncation limit) | |
| if len(text_token) > max_length: | |
| truncated = True | |
| break | |
| # truncate | |
| if len(text_token) > max_length: | |
| truncated = True | |
| text_token = text_token[:max_length] | |
| text_weight = text_weight[:max_length] | |
| tokens.append(text_token) | |
| weights.append(text_weight) | |
| if truncated: | |
| logger.warning("Prompt was truncated. Try to shorten the prompt or increase max_embeddings_multiples") | |
| return tokens, weights | |
| def pad_tokens_and_weights(tokens, weights, max_length, bos, eos, no_boseos_middle=True, chunk_length=77): | |
| r""" | |
| Pad the tokens (with starting and ending tokens) and weights (with 1.0) to max_length. | |
| """ | |
| max_embeddings_multiples = (max_length - 2) // (chunk_length - 2) | |
| weights_length = max_length if no_boseos_middle else max_embeddings_multiples * chunk_length | |
| for i in range(len(tokens)): | |
| tokens[i] = [bos] + tokens[i] + [eos] * (max_length - 1 - len(tokens[i])) | |
| if no_boseos_middle: | |
| weights[i] = [1.0] + weights[i] + [1.0] * (max_length - 1 - len(weights[i])) | |
| else: | |
| w = [] | |
| if len(weights[i]) == 0: | |
| w = [1.0] * weights_length | |
| else: | |
| for j in range(max_embeddings_multiples): | |
| w.append(1.0) # weight for starting token in this chunk | |
| w += weights[i][j * (chunk_length - 2) : min(len(weights[i]), (j + 1) * (chunk_length - 2))] | |
| w.append(1.0) # weight for ending token in this chunk | |
| w += [1.0] * (weights_length - len(w)) | |
| weights[i] = w[:] | |
| return tokens, weights | |
| def get_unweighted_text_embeddings( | |
| tokenizer, | |
| text_encoder, | |
| text_input: torch.Tensor, | |
| chunk_length: int, | |
| clip_skip: int, | |
| eos: int, | |
| pad: int, | |
| no_boseos_middle: Optional[bool] = True, | |
| ): | |
| """ | |
| When the length of tokens is a multiple of the capacity of the text encoder, | |
| it should be split into chunks and sent to the text encoder individually. | |
| """ | |
| max_embeddings_multiples = (text_input.shape[1] - 2) // (chunk_length - 2) | |
| if max_embeddings_multiples > 1: | |
| text_embeddings = [] | |
| for i in range(max_embeddings_multiples): | |
| # extract the i-th chunk | |
| text_input_chunk = text_input[:, i * (chunk_length - 2) : (i + 1) * (chunk_length - 2) + 2].clone() | |
| # cover the head and the tail by the starting and the ending tokens | |
| text_input_chunk[:, 0] = text_input[0, 0] | |
| if pad == eos: # v1 | |
| text_input_chunk[:, -1] = text_input[0, -1] | |
| else: # v2 | |
| for j in range(len(text_input_chunk)): | |
| if text_input_chunk[j, -1] != eos and text_input_chunk[j, -1] != pad: # 最後に普通の文字がある | |
| text_input_chunk[j, -1] = eos | |
| if text_input_chunk[j, 1] == pad: # BOSだけであとはPAD | |
| text_input_chunk[j, 1] = eos | |
| if clip_skip is None or clip_skip == 1: | |
| text_embedding = text_encoder(text_input_chunk)[0] | |
| else: | |
| enc_out = text_encoder(text_input_chunk, output_hidden_states=True, return_dict=True) | |
| text_embedding = enc_out["hidden_states"][-clip_skip] | |
| text_embedding = text_encoder.text_model.final_layer_norm(text_embedding) | |
| if no_boseos_middle: | |
| if i == 0: | |
| # discard the ending token | |
| text_embedding = text_embedding[:, :-1] | |
| elif i == max_embeddings_multiples - 1: | |
| # discard the starting token | |
| text_embedding = text_embedding[:, 1:] | |
| else: | |
| # discard both starting and ending tokens | |
| text_embedding = text_embedding[:, 1:-1] | |
| text_embeddings.append(text_embedding) | |
| text_embeddings = torch.concat(text_embeddings, axis=1) | |
| else: | |
| if clip_skip is None or clip_skip == 1: | |
| text_embeddings = text_encoder(text_input)[0] | |
| else: | |
| enc_out = text_encoder(text_input, output_hidden_states=True, return_dict=True) | |
| text_embeddings = enc_out["hidden_states"][-clip_skip] | |
| text_embeddings = text_encoder.text_model.final_layer_norm(text_embeddings) | |
| return text_embeddings | |
| def get_weighted_text_embeddings( | |
| tokenizer, | |
| text_encoder, | |
| prompt: Union[str, List[str]], | |
| device, | |
| max_embeddings_multiples: Optional[int] = 3, | |
| no_boseos_middle: Optional[bool] = False, | |
| clip_skip=None, | |
| ): | |
| r""" | |
| Prompts can be assigned with local weights using brackets. For example, | |
| prompt 'A (very beautiful) masterpiece' highlights the words 'very beautiful', | |
| and the embedding tokens corresponding to the words get multiplied by a constant, 1.1. | |
| Also, to regularize of the embedding, the weighted embedding would be scaled to preserve the original mean. | |
| Args: | |
| prompt (`str` or `List[str]`): | |
| The prompt or prompts to guide the image generation. | |
| max_embeddings_multiples (`int`, *optional*, defaults to `3`): | |
| The max multiple length of prompt embeddings compared to the max output length of text encoder. | |
| no_boseos_middle (`bool`, *optional*, defaults to `False`): | |
| If the length of text token is multiples of the capacity of text encoder, whether reserve the starting and | |
| ending token in each of the chunk in the middle. | |
| skip_parsing (`bool`, *optional*, defaults to `False`): | |
| Skip the parsing of brackets. | |
| skip_weighting (`bool`, *optional*, defaults to `False`): | |
| Skip the weighting. When the parsing is skipped, it is forced True. | |
| """ | |
| max_length = (tokenizer.model_max_length - 2) * max_embeddings_multiples + 2 | |
| if isinstance(prompt, str): | |
| prompt = [prompt] | |
| prompt_tokens, prompt_weights = get_prompts_with_weights(tokenizer, prompt, max_length - 2) | |
| # round up the longest length of tokens to a multiple of (model_max_length - 2) | |
| max_length = max([len(token) for token in prompt_tokens]) | |
| max_embeddings_multiples = min( | |
| max_embeddings_multiples, | |
| (max_length - 1) // (tokenizer.model_max_length - 2) + 1, | |
| ) | |
| max_embeddings_multiples = max(1, max_embeddings_multiples) | |
| max_length = (tokenizer.model_max_length - 2) * max_embeddings_multiples + 2 | |
| # pad the length of tokens and weights | |
| bos = tokenizer.bos_token_id | |
| eos = tokenizer.eos_token_id | |
| pad = tokenizer.pad_token_id | |
| prompt_tokens, prompt_weights = pad_tokens_and_weights( | |
| prompt_tokens, | |
| prompt_weights, | |
| max_length, | |
| bos, | |
| eos, | |
| no_boseos_middle=no_boseos_middle, | |
| chunk_length=tokenizer.model_max_length, | |
| ) | |
| prompt_tokens = torch.tensor(prompt_tokens, dtype=torch.long, device=device) | |
| # get the embeddings | |
| text_embeddings = get_unweighted_text_embeddings( | |
| tokenizer, | |
| text_encoder, | |
| prompt_tokens, | |
| tokenizer.model_max_length, | |
| clip_skip, | |
| eos, | |
| pad, | |
| no_boseos_middle=no_boseos_middle, | |
| ) | |
| prompt_weights = torch.tensor(prompt_weights, dtype=text_embeddings.dtype, device=device) | |
| # assign weights to the prompts and normalize in the sense of mean | |
| previous_mean = text_embeddings.float().mean(axis=[-2, -1]).to(text_embeddings.dtype) | |
| text_embeddings = text_embeddings * prompt_weights.unsqueeze(-1) | |
| current_mean = text_embeddings.float().mean(axis=[-2, -1]).to(text_embeddings.dtype) | |
| text_embeddings = text_embeddings * (previous_mean / current_mean).unsqueeze(-1).unsqueeze(-1) | |
| return text_embeddings | |
| # https://wandb.ai/johnowhitaker/multires_noise/reports/Multi-Resolution-Noise-for-Diffusion-Model-Training--VmlldzozNjYyOTU2 | |
| def pyramid_noise_like(noise, device, iterations=6, discount=0.4): | |
| b, c, w, h = noise.shape # EDIT: w and h get over-written, rename for a different variant! | |
| u = torch.nn.Upsample(size=(w, h), mode="bilinear").to(device) | |
| for i in range(iterations): | |
| r = random.random() * 2 + 2 # Rather than always going 2x, | |
| wn, hn = max(1, int(w / (r**i))), max(1, int(h / (r**i))) | |
| noise += u(torch.randn(b, c, wn, hn).to(device)) * discount**i | |
| if wn == 1 or hn == 1: | |
| break # Lowest resolution is 1x1 | |
| return noise / noise.std() # Scaled back to roughly unit variance | |
| # https://www.crosslabs.org//blog/diffusion-with-offset-noise | |
| def apply_noise_offset(latents, noise, noise_offset, adaptive_noise_scale): | |
| if noise_offset is None: | |
| return noise | |
| if adaptive_noise_scale is not None: | |
| # latent shape: (batch_size, channels, height, width) | |
| # abs mean value for each channel | |
| latent_mean = torch.abs(latents.mean(dim=(2, 3), keepdim=True)) | |
| # multiply adaptive noise scale to the mean value and add it to the noise offset | |
| noise_offset = noise_offset + adaptive_noise_scale * latent_mean | |
| noise_offset = torch.clamp(noise_offset, 0.0, None) # in case of adaptive noise scale is negative | |
| noise = noise + noise_offset * torch.randn((latents.shape[0], latents.shape[1], 1, 1), device=latents.device) | |
| return noise | |
| def apply_masked_loss(loss, batch): | |
| if "conditioning_images" in batch: | |
| # conditioning image is -1 to 1. we need to convert it to 0 to 1 | |
| mask_image = batch["conditioning_images"].to(dtype=loss.dtype)[:, 0].unsqueeze(1) # use R channel | |
| mask_image = mask_image / 2 + 0.5 | |
| # print(f"conditioning_image: {mask_image.shape}") | |
| elif "alpha_masks" in batch and batch["alpha_masks"] is not None: | |
| # alpha mask is 0 to 1 | |
| mask_image = batch["alpha_masks"].to(dtype=loss.dtype).unsqueeze(1) # add channel dimension | |
| # print(f"mask_image: {mask_image.shape}, {mask_image.mean()}") | |
| else: | |
| return loss | |
| # resize to the same size as the loss | |
| mask_image = torch.nn.functional.interpolate(mask_image, size=loss.shape[2:], mode="area") | |
| loss = loss * mask_image | |
| return loss | |
| """ | |
| ########################################## | |
| # Perlin Noise | |
| def rand_perlin_2d(device, shape, res, fade=lambda t: 6 * t**5 - 15 * t**4 + 10 * t**3): | |
| delta = (res[0] / shape[0], res[1] / shape[1]) | |
| d = (shape[0] // res[0], shape[1] // res[1]) | |
| grid = ( | |
| torch.stack( | |
| torch.meshgrid(torch.arange(0, res[0], delta[0], device=device), torch.arange(0, res[1], delta[1], device=device)), | |
| dim=-1, | |
| ) | |
| % 1 | |
| ) | |
| angles = 2 * torch.pi * torch.rand(res[0] + 1, res[1] + 1, device=device) | |
| gradients = torch.stack((torch.cos(angles), torch.sin(angles)), dim=-1) | |
| tile_grads = ( | |
| lambda slice1, slice2: gradients[slice1[0] : slice1[1], slice2[0] : slice2[1]] | |
| .repeat_interleave(d[0], 0) | |
| .repeat_interleave(d[1], 1) | |
| ) | |
| dot = lambda grad, shift: ( | |
| torch.stack((grid[: shape[0], : shape[1], 0] + shift[0], grid[: shape[0], : shape[1], 1] + shift[1]), dim=-1) | |
| * grad[: shape[0], : shape[1]] | |
| ).sum(dim=-1) | |
| n00 = dot(tile_grads([0, -1], [0, -1]), [0, 0]) | |
| n10 = dot(tile_grads([1, None], [0, -1]), [-1, 0]) | |
| n01 = dot(tile_grads([0, -1], [1, None]), [0, -1]) | |
| n11 = dot(tile_grads([1, None], [1, None]), [-1, -1]) | |
| t = fade(grid[: shape[0], : shape[1]]) | |
| return 1.414 * torch.lerp(torch.lerp(n00, n10, t[..., 0]), torch.lerp(n01, n11, t[..., 0]), t[..., 1]) | |
| def rand_perlin_2d_octaves(device, shape, res, octaves=1, persistence=0.5): | |
| noise = torch.zeros(shape, device=device) | |
| frequency = 1 | |
| amplitude = 1 | |
| for _ in range(octaves): | |
| noise += amplitude * rand_perlin_2d(device, shape, (frequency * res[0], frequency * res[1])) | |
| frequency *= 2 | |
| amplitude *= persistence | |
| return noise | |
| def perlin_noise(noise, device, octaves): | |
| _, c, w, h = noise.shape | |
| perlin = lambda: rand_perlin_2d_octaves(device, (w, h), (4, 4), octaves) | |
| noise_perlin = [] | |
| for _ in range(c): | |
| noise_perlin.append(perlin()) | |
| noise_perlin = torch.stack(noise_perlin).unsqueeze(0) # (1, c, w, h) | |
| noise += noise_perlin # broadcast for each batch | |
| return noise / noise.std() # Scaled back to roughly unit variance | |
| """ | |