Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import os | |
| import shutil | |
| import yaml | |
| import tempfile | |
| import cv2 | |
| import huggingface_hub | |
| import subprocess | |
| import threading | |
| def stream_output(pipe): | |
| for line in iter(pipe.readline, ''): | |
| print(line, end='') | |
| pipe.close() | |
| HF_TKN = os.environ.get("GATED_HF_TOKEN") | |
| huggingface_hub.login(token=HF_TKN) | |
| huggingface_hub.hf_hub_download( | |
| repo_id='yzd-v/DWPose', | |
| filename='yolox_l.onnx', | |
| local_dir='./models/DWPose' | |
| ) | |
| huggingface_hub.hf_hub_download( | |
| repo_id='yzd-v/DWPose', | |
| filename='dw-ll_ucoco_384.onnx', | |
| local_dir='./models/DWPose' | |
| ) | |
| huggingface_hub.hf_hub_download( | |
| repo_id='ixaac/MimicMotion', | |
| filename='MimicMotion_1.pth', | |
| local_dir='./models' | |
| ) | |
| def print_directory_contents(path): | |
| for root, dirs, files in os.walk(path): | |
| level = root.replace(path, '').count(os.sep) | |
| indent = ' ' * 4 * (level) | |
| print(f"{indent}{os.path.basename(root)}/") | |
| subindent = ' ' * 4 * (level + 1) | |
| for f in files: | |
| print(f"{subindent}{f}") | |
| def check_outputs_folder(folder_path): | |
| # Check if the folder exists | |
| if os.path.exists(folder_path) and os.path.isdir(folder_path): | |
| # Delete all contents inside the folder | |
| for filename in os.listdir(folder_path): | |
| file_path = os.path.join(folder_path, filename) | |
| try: | |
| if os.path.isfile(file_path) or os.path.islink(file_path): | |
| os.unlink(file_path) # Remove file or link | |
| elif os.path.isdir(file_path): | |
| shutil.rmtree(file_path) # Remove directory | |
| except Exception as e: | |
| print(f'Failed to delete {file_path}. Reason: {e}') | |
| else: | |
| print(f'The folder {folder_path} does not exist.') | |
| def check_for_mp4_in_outputs(): | |
| # Define the path to the outputs folder | |
| outputs_folder = './outputs' | |
| # Check if the outputs folder exists | |
| if not os.path.exists(outputs_folder): | |
| return None | |
| # Check if there is a .mp4 file in the outputs folder | |
| mp4_files = [f for f in os.listdir(outputs_folder) if f.endswith('.mp4')] | |
| # Return the path to the mp4 file if it exists | |
| if mp4_files: | |
| return os.path.join(outputs_folder, mp4_files[0]) | |
| else: | |
| return None | |
| def get_video_fps(video_path): | |
| # Open the video file | |
| video_capture = cv2.VideoCapture(video_path) | |
| if not video_capture.isOpened(): | |
| raise ValueError("Error opening video file") | |
| # Get the FPS value | |
| fps = video_capture.get(cv2.CAP_PROP_FPS) | |
| # Release the video capture object | |
| video_capture.release() | |
| return fps | |
| def infer(ref_image_in, ref_video_in): | |
| # check if 'outputs' dir exists and empty it if necessary | |
| check_outputs_folder('./outputs') | |
| # Create a temporary directory | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| print("Temporary directory created:", temp_dir) | |
| # Define the values for the variables | |
| ref_video_path = ref_video_in | |
| ref_image_path = ref_image_in | |
| num_frames = 16 | |
| resolution = 576 | |
| frames_overlap = 6 | |
| num_inference_steps = 25 | |
| noise_aug_strength = 0 | |
| guidance_scale = 2.0 | |
| sample_stride = 2 | |
| fps = get_video_fps(ref_video_in) | |
| seed = 42 | |
| # Create the data structure | |
| data = { | |
| 'base_model_path': 'stabilityai/stable-video-diffusion-img2vid-xt-1-1', | |
| 'ckpt_path': 'models/MimicMotion_1.pth', | |
| 'test_case': [ | |
| { | |
| 'ref_video_path': ref_video_path, | |
| 'ref_image_path': ref_image_path, | |
| 'num_frames': num_frames, | |
| 'resolution': resolution, | |
| 'frames_overlap': frames_overlap, | |
| 'num_inference_steps': num_inference_steps, | |
| 'noise_aug_strength': noise_aug_strength, | |
| 'guidance_scale': guidance_scale, | |
| 'sample_stride': sample_stride, | |
| 'fps': fps, | |
| 'seed': seed | |
| } | |
| ] | |
| } | |
| # Define the file path | |
| file_path = os.path.join(temp_dir, 'config.yaml') | |
| # Write the data to a YAML file | |
| with open(file_path, 'w') as file: | |
| yaml.dump(data, file, default_flow_style=False) | |
| print("YAML file 'config.yaml' created successfully in", file_path) | |
| # Execute the inference command | |
| command = ['python', 'inference.py', '--inference_config', file_path] | |
| process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1) | |
| # Create threads to handle stdout and stderr | |
| stdout_thread = threading.Thread(target=stream_output, args=(process.stdout,)) | |
| stderr_thread = threading.Thread(target=stream_output, args=(process.stderr,)) | |
| # Start the threads | |
| stdout_thread.start() | |
| stderr_thread.start() | |
| # Wait for the process to complete and the threads to finish | |
| process.wait() | |
| stdout_thread.join() | |
| stderr_thread.join() | |
| print("Inference script finished with return code:", process.returncode) | |
| # Print the outputs directory contents | |
| print_directory_contents('./outputs') | |
| # Call the function and print the result | |
| mp4_file_path = check_for_mp4_in_outputs() | |
| print(mp4_file_path) | |
| return mp4_file_path | |
| with gr.Blocks() as demo: | |
| with gr.Column(): | |
| with gr.Row(): | |
| with gr.Column(): | |
| with gr.Row(): | |
| ref_image_in = gr.Image(type="filepath") | |
| ref_video_in = gr.Video() | |
| submit_btn = gr.Button("Submit") | |
| output_video = gr.Video() | |
| submit_btn.click( | |
| fn = infer, | |
| inputs = [ref_image_in, ref_video_in], | |
| outputs = [output_video] | |
| ) | |
| demo.launch() |