Spaces:
Build error
Build error
| import os | |
| import gradio as gr | |
| import safetensors.torch | |
| import torchvision.transforms.v2 as transforms | |
| import cv2 | |
| import torch | |
| from torch.utils.bottleneck import BottleNeck | |
| import numpy as np | |
| from typing import List, Optional, Tuple, Union | |
| from PIL import Image | |
| import io | |
| from io import BytesIO | |
| from diffusers import HunyuanVideoPipeline, FlowMatchEulerDiscreteScheduler | |
| from diffusers.models.transformers.transformer_hunyuan_video import HunyuanVideoPatchEmbed, HunyuanVideoTransformer3DModel | |
| from diffusers.utils import export_to_video | |
| from diffusers.models.attention import Attention | |
| from diffusers.utils.state_dict_utils import convert_state_dict_to_diffusers, convert_unet_state_dict_to_peft | |
| from peft import LoraConfig, get_peft_model_state_dict, set_peft_model_state_dict | |
| from diffusers.models.embeddings import apply_rotary_emb | |
| from diffusers.callbacks import MultiPipelineCallbacks, PipelineCallback | |
| from diffusers.loaders import HunyuanVideoLoraLoaderMixin | |
| from diffusers.models import AutoencoderKLHunyuanVideo, HunyuanVideoTransformer3DModel | |
| from diffusers.schedulers import FlowMatchEulerDiscreteScheduler | |
| from diffusers.utils import is_torch_xla_available, logging, replace_example_docstring | |
| from diffusers.utils.torch_utils import randn_tensor | |
| from diffusers.video_processor import VideoProcessor | |
| from diffusers.pipelines.pipeline_utils import DiffusionPipeline | |
| from diffusers.pipelines.hunyuan_video.pipeline_output import HunyuanVideoPipelineOutput | |
| from diffusers.pipelines.hunyuan_video.pipeline_hunyuan_video import retrieve_timesteps, DEFAULT_PROMPT_TEMPLATE | |
| from diffusers.utils import load_image | |
| from huggingface_hub import hf_hub_download | |
| import requests | |
| import io | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| # Define video transformations | |
| video_transforms = transforms.Compose( | |
| [ | |
| transforms.Lambda(lambda x: x / 255.0), | |
| transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5], inplace=True), | |
| ] | |
| ) | |
| model_id = "hunyuanvideo-community/HunyuanVideo" | |
| lora_path = hf_hub_download("dashtoon/hunyuan-video-keyframe-control-lora", "i2v.sft") # Replace with the actual LORA path | |
| transformer = HunyuanVideoTransformer3DModel.from_pretrained(model_id, subfolder="transformer", torch_dtype=torch.float16) | |
| global pipe | |
| pipe = HunyuanVideoPipeline.from_pretrained(model_id, transformer=transformer, torch_dtype=torch.float16) | |
| # Enable memory savings | |
| pipe.vae.enable_tiling() | |
| pipe.enable_model_cpu_offload() | |
| with torch.no_grad(): # enable image inputs | |
| initial_input_channels = pipe.transformer.config.in_channels | |
| new_img_in = HunyuanVideoPatchEmbed( | |
| patch_size=(pipe.transformer.config.patch_size_t, pipe.transformer.config.patch_size, pipe.transformer.config.patch_size), | |
| in_chans=pipe.transformer.config.in_channels * 2, | |
| embed_dim=pipe.transformer.config.num_attention_heads * pipe.transformer.config.attention_head_dim, | |
| ) | |
| new_img_in = new_img_in.to(pipe.device, dtype=pipe.dtype) | |
| new_img_in.proj.weight.zero_() | |
| new_img_in.proj.weight[:, :initial_input_channels].copy_(pipe.transformer.x_embedder.proj.weight) | |
| if pipe.transformer.x_embedder.proj.bias is not None: | |
| new_img_in.proj.bias.copy_(pipe.transformer.x_embedder.proj.bias) | |
| pipe.transformer.x_embedder = new_img_in | |
| lora_state_dict = safetensors.torch.load_file(lora_path, device="cuda") | |
| transformer_lora_state_dict = {f'{k.replace("transformer.", "")}': v for k, v in lora_state_dict.items() if k.startswith("transformer.") and "lora" in k} | |
| pipe.load_lora_into_transformer(transformer_lora_state_dict, transformer=pipe.transformer, adapter_name="i2v", _pipeline=pipe) | |
| pipe.set_adapters(["i2v"], adapter_weights=[1.0]) | |
| pipe.fuse_lora(components=["transformer"], lora_scale=1.0, adapter_names=["i2v"]) | |
| pipe.unload_lora_weights() | |
| def resize_image_to_bucket(image: Union[Image.Image, np.ndarray], bucket_reso: Tuple[int, int]) -> np.ndarray: | |
| """ | |
| Resize the image to the bucket resolution. | |
| """ | |
| if isinstance(image, Image.Image): | |
| image = np.array(image) | |
| elif not isinstance(image, np.ndarray): | |
| raise ValueError("Image must be a PIL Image or NumPy array") | |
| image_height, image_width = image.shape[:2] | |
| if bucket_reso == (image_width, image_height): | |
| return image | |
| bucket_width, bucket_height = bucket_reso | |
| scale_width = bucket_width / image_width | |
| scale_height = bucket_height / image_height | |
| scale = max(scale_width, scale_height) | |
| image_width = int(image_width * scale + 0.5) | |
| image_height = int(image_height * scale + 0.5) | |
| if scale > 1: | |
| image = Image.fromarray(image) | |
| image = image.resize((image_width, image_height), Image.LANCZOS) | |
| image = np.array(image) | |
| else: | |
| image = cv2.resize(image, (image_width, image_height), interpolation=cv2.INTER_AREA) | |
| # crop the image to the bucket resolution | |
| crop_left = (image_width - bucket_width) // 2 | |
| crop_top = (image_height - bucket_height) // 2 | |
| image = image[crop_top:crop_top + bucket_height, crop_left:crop_left + bucket_width] | |
| return image | |
| def generate_video(prompt: str, frame1: Image.Image, frame2: Image.Image, resolution: str, guidance_scale: float, num_frames: int, num_inference_steps: int, fps: int) -> bytes: | |
| # Debugging print statements | |
| print(f"Frame 1 Type: {type(frame1)}") | |
| print(f"Frame 2 Type: {type(frame2)}") | |
| print(f"Resolution: {resolution}") | |
| # Parse resolution | |
| width, height = map(int, resolution.split('x')) | |
| # Load and preprocess frames | |
| cond_frame1 = np.array(frame1) | |
| cond_frame2 = np.array(frame2) | |
| cond_frame1 = resize_image_to_bucket(cond_frame1, bucket_reso=(width, height)) | |
| cond_frame2 = resize_image_to_bucket(cond_frame2, bucket_reso=(width, height)) | |
| cond_video = np.zeros(shape=(num_frames, height, width, 3)) | |
| cond_video[0], cond_video[-1] = cond_frame1, cond_frame2 | |
| cond_video = torch.from_numpy(cond_video.copy()).permute(0, 3, 1, 2) | |
| cond_video = torch.stack([video_transforms(x) for x in cond_video], dim=0).unsqueeze(0) | |
| with torch.no_grad(): | |
| image_or_video = cond_video.to(device="cuda", dtype=pipe.dtype) | |
| image_or_video = image_or_video.permute(0, 2, 1, 3, 4).contiguous() # [B, F, C, H, W] -> [B, C, F, H, W] | |
| cond_latents = pipe.vae.encode(image_or_video).latent_dist.sample() | |
| cond_latents = cond_latents * pipe.vae.config.scaling_factor | |
| cond_latents = cond_latents.to(device=device, dtype=pipe.dtype) | |
| assert not torch.any(torch.isnan(cond_latents)) | |
| # Generate video | |
| video = call_pipe( | |
| pipe, | |
| prompt=prompt, | |
| num_frames=num_frames, | |
| num_inference_steps=num_inference_steps, | |
| image_latents=cond_latents, | |
| width=width, | |
| height=height, | |
| guidance_scale=guidance_scale, | |
| generator=torch.Generator(device="cuda").manual_seed(0), | |
| ).frames[0] | |
| # Export to video | |
| video_path = "output.mp4" | |
| # video_bytes = io.BytesIO() | |
| export_to_video(video, video_path, fps=fps) | |
| torch.cuda.empty_cache() | |
| return video_path | |
| def call_pipe( | |
| pipe, | |
| prompt: Union[str, List[str]] = None, | |
| prompt_2: Union[str, List[str]] = None, | |
| height: int = 720, | |
| width: int = 1280, | |
| num_frames: int = 129, | |
| num_inference_steps: int = 50, | |
| sigmas: Optional[List[float]] = None, | |
| guidance_scale: float = 6.0, | |
| num_videos_per_prompt: Optional[int] = 1, | |
| generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None, | |
| latents: Optional[torch.Tensor] = None, | |
| prompt_embeds: Optional[torch.Tensor] = None, | |
| pooled_prompt_embeds: Optional[torch.Tensor] = None, | |
| prompt_attention_mask: Optional[torch.Tensor] = None, | |
| output_type: Optional[str] = "pil", | |
| return_dict: bool = True, | |
| attention_kwargs: Optional[dict] = None, | |
| callback_on_step_end: Optional[Union[callable, PipelineCallback, MultiPipelineCallbacks]] = None, | |
| callback_on_step_end_tensor_inputs: Optional[List[str]] = None, | |
| prompt_template: Optional[dict] = DEFAULT_PROMPT_TEMPLATE, | |
| max_sequence_length: int = 256, | |
| image_latents: Optional[torch.Tensor] = None, | |
| ): | |
| if isinstance(callback_on_step_end, (PipelineCallback, MultiPipelineCallbacks)): | |
| callback_on_step_end_tensor_inputs = callback_on_step_end.tensor_inputs | |
| # 1. Check inputs. Raise error if not correct | |
| pipe.check_inputs( | |
| prompt, | |
| prompt_2, | |
| height, | |
| width, | |
| prompt_embeds, | |
| callback_on_step_end_tensor_inputs, | |
| prompt_template, | |
| ) | |
| pipe._guidance_scale = guidance_scale | |
| pipe._attention_kwargs = attention_kwargs | |
| pipe._current_timestep = None | |
| pipe._interrupt = False | |
| device = pipe._execution_device | |
| # 2. Define call parameters | |
| if prompt is not None and isinstance(prompt, str): | |
| batch_size = 1 | |
| elif prompt is not None and isinstance(prompt, list): | |
| batch_size = len(prompt) | |
| else: | |
| batch_size = prompt_embeds.shape[0] | |
| # 3. Encode input prompt | |
| prompt_embeds, pooled_prompt_embeds, prompt_attention_mask = pipe.encode_prompt( | |
| prompt=prompt, | |
| prompt_2=prompt_2, | |
| prompt_template=prompt_template, | |
| num_videos_per_prompt=num_videos_per_prompt, | |
| prompt_embeds=prompt_embeds, | |
| pooled_prompt_embeds=pooled_prompt_embeds, | |
| prompt_attention_mask=prompt_attention_mask, | |
| device=device, | |
| max_sequence_length=max_sequence_length, | |
| ) | |
| transformer_dtype = pipe.transformer.dtype | |
| prompt_embeds = prompt_embeds.to(transformer_dtype) | |
| prompt_attention_mask = prompt_attention_mask.to(transformer_dtype) | |
| if pooled_prompt_embeds is not None: | |
| pooled_prompt_embeds = pooled_prompt_embeds.to(transformer_dtype) | |
| # 4. Prepare timesteps | |
| sigmas = np.linspace(1.0, 0.0, num_inference_steps + 1)[:-1] if sigmas is None else sigmas | |
| timesteps, num_inference_steps = retrieve_timesteps( | |
| pipe.scheduler, | |
| num_inference_steps, | |
| device, | |
| sigmas=sigmas, | |
| ) | |
| # 5. Prepare latent variables | |
| num_channels_latents = pipe.transformer.config.in_channels | |
| num_latent_frames = (num_frames - 1) // pipe.vae_scale_factor_temporal + 1 | |
| latents = pipe.prepare_latents( | |
| batch_size * num_videos_per_prompt, | |
| num_channels_latents, | |
| height, | |
| width, | |
| num_latent_frames, | |
| torch.float32, | |
| device, | |
| generator, | |
| latents, | |
| ) | |
| # 6. Prepare guidance condition | |
| guidance = torch.tensor([guidance_scale] * latents.shape[0], dtype=transformer_dtype, device=device) * 1000.0 | |
| # 7. Denoising loop | |
| num_warmup_steps = len(timesteps) - num_inference_steps * pipe.scheduler.order | |
| pipe._num_timesteps = len(timesteps) | |
| with pipe.progress_bar(total=num_inference_steps) as progress_bar: | |
| for i, t in enumerate(timesteps): | |
| if pipe.interrupt: | |
| continue | |
| pipe._current_timestep = t | |
| latent_model_input = latents.to(transformer_dtype) | |
| timestep = t.expand(latents.shape[0]).to(latents.dtype) | |
| noise_pred = pipe.transformer( | |
| hidden_states=torch.cat([latent_model_input, image_latents], dim=1), | |
| timestep=timestep, | |
| encoder_hidden_states=prompt_embeds, | |
| encoder_attention_mask=prompt_attention_mask, | |
| pooled_projections=pooled_prompt_embeds, | |
| guidance=guidance, | |
| attention_kwargs=attention_kwargs, | |
| return_dict=False, | |
| )[0] | |
| # compute the previous noisy sample x_t -> x_t-1 | |
| latents = pipe.scheduler.step(noise_pred, t, latents, return_dict=False)[0] | |
| if callback_on_step_end is not None: | |
| callback_kwargs = {} | |
| for k in callback_on_step_end_tensor_inputs: | |
| callback_kwargs[k] = locals()[k] | |
| callback_outputs = callback_on_step_end(pipe, i, t, callback_kwargs) | |
| latents = callback_outputs.pop("latents", latents) | |
| prompt_embeds = callback_outputs.pop("prompt_embeds", prompt_embeds) | |
| # call the callback, if provided | |
| if i == len(timesteps) - 1 or ((i + 1) > num_warmup_steps and (i + 1) % pipe.scheduler.order == 0): | |
| progress_bar.update() | |
| pipe._current_timestep = None | |
| if not output_type == "latent": | |
| latents = latents.to(pipe.vae.dtype) / pipe.vae.config.scaling_factor | |
| video = pipe.vae.decode(latents, return_dict=False)[0] | |
| video = pipe.video_processor.postprocess_video(video, output_type=output_type) | |
| else: | |
| video = latents | |
| # Offload all models | |
| pipe.maybe_free_model_hooks() | |
| if not return_dict: | |
| return (video,) | |
| return HunyuanVideoPipelineOutput(frames=video) | |
| def main(): | |
| # Define the interface inputs | |
| inputs = [ | |
| gr.Textbox(label="Prompt", value="a woman"), | |
| gr.Image(label="Frame 1", type="pil"), | |
| gr.Image(label="Frame 2", type="pil"), | |
| gr.Dropdown( | |
| label="Resolution", | |
| choices=["720x1280", "544x960", "1280x720", "960x544", "720x720"], | |
| value="544x960" | |
| ), | |
| # gr.Textbox(label="Frame 1 URL", value="https://i-bacon.bunkr.ru/11b45aa7-630b-4189-996f-a6b37a697786.png"), | |
| # gr.Textbox(label="Frame 2 URL", value="https://i-bacon.bunkr.ru/2382224f-120e-482d-a75d-f1a1bf13038c.png"), | |
| gr.Slider(minimum=0.1, maximum=20, step=0.1, label="Guidance Scale", value=6.0), | |
| gr.Slider(minimum=1, maximum=129, step=1, label="Number of Frames", value=49), | |
| gr.Slider(minimum=1, maximum=100, step=1, label="Number of Inference Steps", value=30), | |
| gr.Slider(minimum=1, maximum=60, step=1, label="FPS", value=16) | |
| ] | |
| # Define the interface outputs | |
| outputs = [ | |
| gr.Video(label="Generated Video"), | |
| ] | |
| # Create the Gradio interface | |
| iface = gr.Interface( | |
| fn=generate_video, | |
| inputs=inputs, | |
| outputs=outputs, | |
| title="Hunyuan Video Generator", | |
| description="Generate videos using the HunyuanVideo model with a prompt and two frames as conditions.", | |
| ) | |
| # Launch the Gradio app | |
| iface.launch(show_error=True) | |
| if __name__ == "__main__": | |
| main() |