|  | import subprocess | 
					
						
						|  | import os | 
					
						
						|  | from typing import List, Optional, Union | 
					
						
						|  | from PIL import Image | 
					
						
						|  | import numpy as np | 
					
						
						|  | from dataclasses import dataclass | 
					
						
						|  | import re | 
					
						
						|  | from pathlib import Path | 
					
						
						|  |  | 
					
						
						|  | from modules.utils.constants import SOUND_FILE_EXT, VIDEO_FILE_EXT, IMAGE_FILE_EXT | 
					
						
						|  | from modules.utils.paths import TEMP_VIDEO_FRAMES_DIR, TEMP_VIDEO_OUT_FRAMES_DIR, OUTPUTS_VIDEOS_DIR | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | @dataclass | 
					
						
						|  | class VideoInfo: | 
					
						
						|  | num_frames: Optional[int] = None | 
					
						
						|  | frame_rate: Optional[int] = None | 
					
						
						|  | duration: Optional[float] = None | 
					
						
						|  | has_sound: Optional[bool] = None | 
					
						
						|  | codec: Optional[str] = None | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def extract_frames( | 
					
						
						|  | vid_input: str, | 
					
						
						|  | output_temp_dir: str = TEMP_VIDEO_FRAMES_DIR, | 
					
						
						|  | start_number: int = 0 | 
					
						
						|  | ): | 
					
						
						|  | """ | 
					
						
						|  | Extract frames as jpg files and save them into output_temp_dir. This needs FFmpeg installed. | 
					
						
						|  | """ | 
					
						
						|  | os.makedirs(output_temp_dir, exist_ok=True) | 
					
						
						|  | output_path = os.path.join(output_temp_dir, "%05d.jpg") | 
					
						
						|  |  | 
					
						
						|  | command = [ | 
					
						
						|  | 'ffmpeg', | 
					
						
						|  | '-y', | 
					
						
						|  | '-i', vid_input, | 
					
						
						|  | '-qscale:v', '2', | 
					
						
						|  | '-vf', f'scale=iw:ih', | 
					
						
						|  | '-start_number', str(start_number), | 
					
						
						|  | f'{output_path}' | 
					
						
						|  | ] | 
					
						
						|  |  | 
					
						
						|  | try: | 
					
						
						|  | subprocess.run(command, check=True) | 
					
						
						|  | except subprocess.CalledProcessError as e: | 
					
						
						|  | print("Error occurred while extracting frames from the video") | 
					
						
						|  | raise RuntimeError(f"An error occurred: {str(e)}") | 
					
						
						|  |  | 
					
						
						|  | return get_frames_from_dir(output_temp_dir) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def extract_sound( | 
					
						
						|  | vid_input: str, | 
					
						
						|  | output_temp_dir: str = TEMP_VIDEO_FRAMES_DIR, | 
					
						
						|  | ): | 
					
						
						|  | """ | 
					
						
						|  | Extract audio from a video file and save it as a separate sound file. This needs FFmpeg installed. | 
					
						
						|  | """ | 
					
						
						|  | if Path(vid_input).suffix == ".gif": | 
					
						
						|  | print("Sound extracting process has passed because gif has no sound") | 
					
						
						|  | return None | 
					
						
						|  |  | 
					
						
						|  | os.makedirs(output_temp_dir, exist_ok=True) | 
					
						
						|  | output_path = os.path.join(output_temp_dir, "sound.mp3") | 
					
						
						|  |  | 
					
						
						|  | command = [ | 
					
						
						|  | 'ffmpeg', | 
					
						
						|  | '-y', | 
					
						
						|  | '-i', vid_input, | 
					
						
						|  | '-vn', | 
					
						
						|  | output_path | 
					
						
						|  | ] | 
					
						
						|  |  | 
					
						
						|  | try: | 
					
						
						|  | subprocess.run(command, check=True) | 
					
						
						|  | except subprocess.CalledProcessError as e: | 
					
						
						|  | print(f"Warning: Failed to extract sound from the video: {e}") | 
					
						
						|  |  | 
					
						
						|  | return output_path | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def get_video_info(vid_input: str) -> VideoInfo: | 
					
						
						|  | """ | 
					
						
						|  | Extract video information using ffmpeg. | 
					
						
						|  | """ | 
					
						
						|  | command = [ | 
					
						
						|  | 'ffmpeg', | 
					
						
						|  | '-i', vid_input, | 
					
						
						|  | '-map', '0:v:0', | 
					
						
						|  | '-c', 'copy', | 
					
						
						|  | '-f', 'null', | 
					
						
						|  | '-' | 
					
						
						|  | ] | 
					
						
						|  |  | 
					
						
						|  | try: | 
					
						
						|  | result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, | 
					
						
						|  | encoding='utf-8', errors='replace', check=True) | 
					
						
						|  | output = result.stderr | 
					
						
						|  |  | 
					
						
						|  | num_frames = None | 
					
						
						|  | frame_rate = None | 
					
						
						|  | duration = None | 
					
						
						|  | has_sound = False | 
					
						
						|  | codec = None | 
					
						
						|  |  | 
					
						
						|  | for line in output.splitlines(): | 
					
						
						|  | if 'Stream #0:0' in line and 'Video:' in line: | 
					
						
						|  | fps_match = re.search(r'(\d+(?:\.\d+)?) fps', line) | 
					
						
						|  | if fps_match: | 
					
						
						|  | frame_rate = float(fps_match.group(1)) | 
					
						
						|  |  | 
					
						
						|  | codec_match = re.search(r'Video: (\w+)', line) | 
					
						
						|  | if codec_match: | 
					
						
						|  | codec = codec_match.group(1) | 
					
						
						|  |  | 
					
						
						|  | elif 'Duration:' in line: | 
					
						
						|  | duration_match = re.search(r'Duration: (\d{2}):(\d{2}):(\d{2}\.\d{2})', line) | 
					
						
						|  | if duration_match: | 
					
						
						|  | h, m, s = map(float, duration_match.groups()) | 
					
						
						|  | duration = h * 3600 + m * 60 + s | 
					
						
						|  |  | 
					
						
						|  | elif 'Stream' in line and 'Audio:' in line: | 
					
						
						|  | has_sound = True | 
					
						
						|  |  | 
					
						
						|  | if frame_rate and duration: | 
					
						
						|  | num_frames = int(frame_rate * duration) | 
					
						
						|  |  | 
					
						
						|  | return VideoInfo( | 
					
						
						|  | num_frames=num_frames, | 
					
						
						|  | frame_rate=frame_rate, | 
					
						
						|  | duration=duration, | 
					
						
						|  | has_sound=has_sound, | 
					
						
						|  | codec=codec | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | except subprocess.CalledProcessError as e: | 
					
						
						|  | print("Error occurred while getting info from the video") | 
					
						
						|  | return VideoInfo() | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def create_video_from_frames( | 
					
						
						|  | frames_dir: str, | 
					
						
						|  | frame_rate: Optional[int] = None, | 
					
						
						|  | sound_path: Optional[str] = None, | 
					
						
						|  | output_dir: Optional[str] = None, | 
					
						
						|  | output_mime_type: Optional[str] = None, | 
					
						
						|  | ): | 
					
						
						|  | """ | 
					
						
						|  | Create a video from frames and save it to the output_path. This needs FFmpeg installed. | 
					
						
						|  | """ | 
					
						
						|  | if not os.path.exists(frames_dir): | 
					
						
						|  | raise "frames_dir does not exist" | 
					
						
						|  |  | 
					
						
						|  | if output_dir is None: | 
					
						
						|  | output_dir = OUTPUTS_VIDEOS_DIR | 
					
						
						|  | os.makedirs(output_dir, exist_ok=True) | 
					
						
						|  |  | 
					
						
						|  | frame_img_mime_type = ".png" | 
					
						
						|  | pix_format = "yuv420p" | 
					
						
						|  | vid_codec, audio_codec = "libx264", "aac" | 
					
						
						|  |  | 
					
						
						|  | if output_mime_type is None: | 
					
						
						|  | output_mime_type = ".mp4" | 
					
						
						|  |  | 
					
						
						|  | output_mime_type = output_mime_type.lower() | 
					
						
						|  | if output_mime_type == ".mov": | 
					
						
						|  | pix_format = "yuva444p10le" | 
					
						
						|  | vid_codec, audio_codec = "prores_ks", "aac" | 
					
						
						|  |  | 
					
						
						|  | elif output_mime_type == ".webm": | 
					
						
						|  | pix_format = "yuva420p" | 
					
						
						|  | vid_codec, audio_codec = "libvpx-vp9", "libvorbis" | 
					
						
						|  |  | 
					
						
						|  | elif output_mime_type == ".gif": | 
					
						
						|  | pix_format = None | 
					
						
						|  | vid_codec, audio_codec = "gif", None | 
					
						
						|  |  | 
					
						
						|  | num_files = len(os.listdir(output_dir)) | 
					
						
						|  | filename = f"{num_files:05d}{output_mime_type}" | 
					
						
						|  | output_path = os.path.join(output_dir, filename) | 
					
						
						|  |  | 
					
						
						|  | if sound_path is None: | 
					
						
						|  | temp_sound = os.path.join(TEMP_VIDEO_FRAMES_DIR, "sound.mp3") | 
					
						
						|  | if os.path.exists(temp_sound): | 
					
						
						|  | sound_path = temp_sound | 
					
						
						|  |  | 
					
						
						|  | if frame_rate is None: | 
					
						
						|  | frame_rate = 25 | 
					
						
						|  |  | 
					
						
						|  | command = [ | 
					
						
						|  | 'ffmpeg', | 
					
						
						|  | '-y', | 
					
						
						|  | '-framerate', str(frame_rate), | 
					
						
						|  | '-i', os.path.join(frames_dir, f"%05d{frame_img_mime_type}"), | 
					
						
						|  | '-c:v', vid_codec, | 
					
						
						|  | ] | 
					
						
						|  |  | 
					
						
						|  | if output_mime_type == ".gif": | 
					
						
						|  | command += [ | 
					
						
						|  | "-filter_complex", "[0:v] palettegen=reserve_transparent=on [p]; [0:v][p] paletteuse", | 
					
						
						|  | "-loop", "0" | 
					
						
						|  | ] | 
					
						
						|  | else: | 
					
						
						|  | command += [ | 
					
						
						|  | '-pix_fmt', pix_format | 
					
						
						|  | ] | 
					
						
						|  |  | 
					
						
						|  | command += [output_path] | 
					
						
						|  |  | 
					
						
						|  | if output_mime_type != ".gif" and sound_path is not None: | 
					
						
						|  | command += [ | 
					
						
						|  | '-i', sound_path, | 
					
						
						|  | '-c:a', audio_codec, | 
					
						
						|  | '-strict', 'experimental', | 
					
						
						|  | '-b:a', '192k', | 
					
						
						|  | '-shortest' | 
					
						
						|  | ] | 
					
						
						|  | try: | 
					
						
						|  | subprocess.run(command, check=True) | 
					
						
						|  | except subprocess.CalledProcessError as e: | 
					
						
						|  | print("Error occurred while creating video from frames") | 
					
						
						|  | return output_path | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def get_frames_from_dir(vid_dir: str, | 
					
						
						|  | available_extensions: Optional[Union[List, str]] = None, | 
					
						
						|  | as_numpy: bool = False) -> List: | 
					
						
						|  | """Get image file paths list from the dir""" | 
					
						
						|  | if available_extensions is None: | 
					
						
						|  | available_extensions = [".jpg", ".jpeg", ".JPG", ".JPEG"] | 
					
						
						|  |  | 
					
						
						|  | if isinstance(available_extensions, str): | 
					
						
						|  | available_extensions = [available_extensions] | 
					
						
						|  |  | 
					
						
						|  | frame_names = [ | 
					
						
						|  | p for p in os.listdir(vid_dir) | 
					
						
						|  | if os.path.splitext(p)[-1] in available_extensions | 
					
						
						|  | ] | 
					
						
						|  | if not frame_names: | 
					
						
						|  | return [] | 
					
						
						|  | frame_names.sort(key=lambda x: int(os.path.splitext(x)[0])) | 
					
						
						|  |  | 
					
						
						|  | frames = [os.path.join(vid_dir, name) for name in frame_names] | 
					
						
						|  | if as_numpy: | 
					
						
						|  | frames = [np.array(Image.open(frame)) for frame in frames] | 
					
						
						|  |  | 
					
						
						|  | return frames | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def clean_temp_dir(temp_dir: Optional[str] = None): | 
					
						
						|  | """Removes media files from the video frames directory.""" | 
					
						
						|  | if temp_dir is None: | 
					
						
						|  | temp_dir = TEMP_VIDEO_FRAMES_DIR | 
					
						
						|  | temp_out_dir = TEMP_VIDEO_OUT_FRAMES_DIR | 
					
						
						|  | else: | 
					
						
						|  | temp_out_dir = os.path.join(temp_dir, "out") | 
					
						
						|  |  | 
					
						
						|  | clean_files_with_extension(temp_dir, SOUND_FILE_EXT) | 
					
						
						|  | clean_files_with_extension(temp_dir, IMAGE_FILE_EXT) | 
					
						
						|  | clean_files_with_extension(temp_out_dir, IMAGE_FILE_EXT) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def clean_files_with_extension(dir_path: str, extensions: List): | 
					
						
						|  | """Remove files with the given extensions from the directory.""" | 
					
						
						|  | for filename in os.listdir(dir_path): | 
					
						
						|  | if filename.lower().endswith(tuple(extensions)): | 
					
						
						|  | file_path = os.path.join(dir_path, filename) | 
					
						
						|  | try: | 
					
						
						|  | os.remove(file_path) | 
					
						
						|  | except Exception as e: | 
					
						
						|  | print("Error while removing image files") | 
					
						
						|  |  |