from spandrel import ModelLoader import torch from pathlib import Path from PIL import Image import gradio as App import numpy as np import subprocess import logging import spaces import time import os import gc import io import cv2 from gradio import themes from rich.console import Console from rich.logging import RichHandler # ============================== # # Core Settings # # ============================== # Theme = themes.Citrus(primary_hue='blue', radius_size=themes.sizes.radius_xxl) ModelDir = Path('./Models') TempDir = Path('./Temp') os.environ['GRADIO_TEMP_DIR'] = str(TempDir) ModelFileType = '.pth' # ============================== # # Enhanced Logging # # ============================== # logging.basicConfig(level=logging.INFO, format='%(message)s', datefmt='[%X]', handlers=[RichHandler(console=Console(), rich_tracebacks=True)]) Logger = logging.getLogger('Video2x') logging.getLogger('httpx').setLevel(logging.WARNING) # ============================== # # Device Configuration # # ============================== # @spaces.GPU def GetDeviceName(): Device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') Logger.info(f'โš™๏ธ Using device: {Device}') return Device Device = GetDeviceName() # ============================== # # Optimized Functions # # ============================== # def FormatTimeEstimate(Seconds): Hours = int(Seconds // 3600) Minutes = int((Seconds % 3600) // 60) Seconds = int(Seconds % 60) if Hours > 0: return f'{Hours}h {Minutes}m {Seconds}s' elif Minutes > 0: return f'{Minutes}m {Seconds}s' else: return f'{Seconds}s' def ListModels(): Models = sorted([File.name for File in ModelDir.glob('*' + ModelFileType) if File.is_file()]) Logger.info(f'๐Ÿ“š Found {len(Models)} Models In Directory') return Models def LoadModel(ModelName): if Device.type == 'cuda': torch.cuda.empty_cache() Logger.info(f'๐Ÿ”„ Loading model: {ModelName} onto {Device}') Model = ModelLoader().load_from_file(ModelDir / (ModelName + ModelFileType)).to(Device).eval() # Use .to(Device) Logger.info('โœ… Model Loaded Successfully') return Model @spaces.GPU def ProcessSingleFrame(OriginalImage, Model, TileGridSize): if TileGridSize > 1: Logger.info(f'๐Ÿงฉ Processing With Tile Grid {TileGridSize}x{TileGridSize}') Width, Height = OriginalImage.size TileWidth, TileHeight = Width // TileGridSize, Height // TileGridSize UpscaledTilesGrid = [] for Row in range(TileGridSize): CurrentRowTiles = [] for Col in range(TileGridSize): Tile = OriginalImage.crop((Col * TileWidth, Row * TileHeight, (Col + 1) * TileWidth, (Row + 1) * TileHeight)) TileTensor = torch.from_numpy(np.array(Tile)).permute(2, 0, 1).unsqueeze(0).float().to(Device) / 255.0 with torch.no_grad(): UpscaledTileTensor = Model(TileTensor) UpscaledTileNumpy = UpscaledTileTensor.squeeze(0).permute(1, 2, 0).cpu().numpy() CurrentRowTiles.append(Image.fromarray(np.uint8(UpscaledTileNumpy.clip(0.0, 1.0) * 255.0), mode='RGB')) del TileTensor, UpscaledTileTensor, UpscaledTileNumpy UpscaledTilesGrid.append(CurrentRowTiles) FirstTileWidth, FirstTileHeight = UpscaledTilesGrid[0][0].size UpscaledImage = Image.new('RGB', (FirstTileWidth * TileGridSize, FirstTileHeight * TileGridSize)) for Row in range(TileGridSize): for Col in range(TileGridSize): UpscaledImage.paste(UpscaledTilesGrid[Row][Col], (Col * FirstTileWidth, Row * FirstTileHeight)) else: TorchImage = torch.from_numpy(np.array(OriginalImage)).permute(2, 0, 1).unsqueeze(0).float().to(Device) / 255.0 with torch.no_grad(): ResultTensor = Model(TorchImage) ResultNumpy = ResultTensor.squeeze(0).permute(1, 2, 0).cpu().numpy() UpscaledImage = Image.fromarray(np.uint8(ResultNumpy.clip(0.0, 1.0) * 255.0), mode='RGB') del TorchImage, ResultTensor, ResultNumpy return UpscaledImage @spaces.GPU def Process(VideoInputPath, ModelName, FrameRateValue, TileGridSize, FileType, Progress=App.Progress()): # First yield should match the order of outputs in the click function yield None, App.update(interactive=False, value=None) if not VideoInputPath or not ModelName or not FileType: Logger.error('โ›” Missing Inputs!') return None, None VideoPath = Path(VideoInputPath) OutputVideoPath = VideoPath.parent / f'{VideoPath.stem}_{Path(ModelName).stem}{"_Tiled" + str(TileGridSize) if TileGridSize > 1 else ""}{FileType}' # Load model Progress(0.0, '๐Ÿ”„ Loading Model') Model = LoadModel(ModelName) # Extract video info Logger.info(f'๐ŸŽฌ Extracting Video Information From {VideoPath.name}') VideoCapture = cv2.VideoCapture(str(VideoPath)) FrameCount = int(VideoCapture.get(cv2.CAP_PROP_FRAME_COUNT)) if not FrameRateValue: FrameRateValue = VideoCapture.get(cv2.CAP_PROP_FPS) Logger.info(f'๐ŸŽž๏ธ Processing {FrameCount} Frames At {FrameRateValue} FPS') # In-memory frames processing FrameBuffer = [] AllFrames = [] # Time tracking variables StartTime = time.time() FrameProcessingTime = None for FrameIndex in range(FrameCount): FrameStartTime = time.time() Success, Frame = VideoCapture.read() if not Success: Logger.warning(f'โš ๏ธ Failed To Read Frame {FrameIndex}') continue # Convert from BGR to RGB OriginalImage = Image.fromarray(cv2.cvtColor(Frame, cv2.COLOR_BGR2RGB)) UpscaledImage = ProcessSingleFrame(OriginalImage, Model, TileGridSize) # Store for preview ResizedOriginalImage = OriginalImage.resize(UpscaledImage.size, Image.Resampling.LANCZOS) AllFrames.append((ResizedOriginalImage, UpscaledImage.copy())) # Save to buffer for video output ImageBytes = io.BytesIO() UpscaledImage.save(ImageBytes, format='PNG') FrameBuffer.append(ImageBytes.getvalue()) # Calculate time estimates CurrentFrameTime = time.time() - FrameStartTime if FrameIndex == 0: FrameProcessingTime = CurrentFrameTime Logger.info(f'โฑ๏ธ First Frame Took {FrameProcessingTime:.2f}s To Process') # Calculate remaining time based on average processing time so far ElapsedTime = time.time() - StartTime AverageTimePerFrame = ElapsedTime / (FrameIndex + 1) RemainingFrames = FrameCount - (FrameIndex + 1) EstimatedRemainingTime = RemainingFrames * AverageTimePerFrame # Format time estimates for display RemainingTimeFormatted = FormatTimeEstimate(EstimatedRemainingTime) Progress( (FrameIndex + 1) / FrameCount, f'๐Ÿ”„ Frame {FrameIndex+1}/{FrameCount} | ETA: {RemainingTimeFormatted}' ) del OriginalImage, UpscaledImage, ImageBytes gc.collect() VideoCapture.release() # Write frames to temporary files for ffmpeg Logger.info('๐Ÿ’พ Preparing Frames For Video Encoding') os.makedirs(TempDir, exist_ok=True) for Index, FrameData in enumerate(FrameBuffer): with open(f'{TempDir}/Frame_{Index:06d}.png', 'wb') as f: f.write(FrameData) # Create video Progress(1.0, '๐ŸŽฅ Encoding Video') Logger.info('๐ŸŽฅ Encoding Final Video') FfmpegCmd = f'ffmpeg -y -framerate {FrameRateValue} -i "{TempDir}/Frame_%06d.png" -c:v libx264 -pix_fmt yuv420p "{OutputVideoPath}" -hide_banner -loglevel error' subprocess.run(FfmpegCmd, shell=True, check=True) # Clean up for File in Path(TempDir).glob('Frame_*.png'): File.unlink() Logger.info(f'๐ŸŽ‰ Video Saved To: {OutputVideoPath}') # Update UI - return values directly in the order specified in the click function FirstFrame = AllFrames[0] if AllFrames else None DownloadValue = App.update(interactive=True, value=str(OutputVideoPath)) yield FirstFrame, DownloadValue # Release resources del Model, FrameBuffer, AllFrames Progress(1.0, '๐Ÿงน Cleaning Up Resources') gc.collect() if Device.type == 'cuda': torch.cuda.empty_cache() Logger.info('๐Ÿงน CUDA Memory Cleaned Up') Logger.info('๐Ÿงน Model Unloaded') Progress(1.0, '๐Ÿ“ฆ Done!') # ============================== # # Streamlined UI # # ============================== # with App.Blocks(title='Video Upscaler', theme=Theme, delete_cache=(60, 600)) as Interface: App.Markdown('# ๐ŸŽž๏ธ Video Upscaler') App.Markdown(''' Space created by [Hyphonical](https://huggingface.co/Hyphonical), this space uses several models from [styler00dollar/VSGAN-tensorrt-docker](https://github.com/styler00dollar/VSGAN-tensorrt-docker/releases/tag/models) You may always request adding more models by opening a [new discussion](https://huggingface.co/spaces/Hyphonical/Video2x/discussions/new). The main program uses spandrel to load the models and ffmpeg to process the video. You may run out of time using the ZeroGPU, you could clone the space or run it locally for better performance. ''') with App.Row(): with App.Column(scale=1): with App.Group(): InputVideo = App.Video(label='Input Video', sources=['upload'], height=300) ModelList = ListModels() ModelNames = [Path(Model).stem for Model in ModelList] InputModel = App.Dropdown(choices=ModelNames, label='Select Model', value=ModelNames[0] if ModelNames else None) with App.Row(): InputFrameRate = App.Slider(label='Frame Rate', minimum=1, maximum=60, value=23.976, step=0.001) InputTileGridSize = App.Slider(label='Tile Grid Size', minimum=1, maximum=6, value=1, step=1, show_reset_button=False) InputFileType = App.Dropdown(choices=['.mp4', '.mkv'], label='Output File Type', value='.mkv', interactive=True) SubmitButton = App.Button('๐Ÿš€ Upscale Video') with App.Column(scale=1, show_progress=True): OutputSlider = App.ImageSlider(label='Output Preview', value=None, height=300) DownloadOutput = App.DownloadButton(label='๐Ÿ’พ Download Video', interactive=False) with App.Accordion(label='๐Ÿ“œ Instructions', open=False): App.Markdown(''' ### How To Use The Video Upscaler 1. **Upload A Video:** Begin by uploading your video file using the 'Input Video' section. 2. **Select A Model:** Choose an appropriate upscaling model from the 'Select Model' dropdown menu. 3. **Adjust Settings (Optional):** Modify the 'Frame Rate' slider if you want to change the output video's frame rate. Adjust the 'Tile Grid Size' for memory optimization. Larger models might require a higher grid size, but processing could be slower. 4. **Start Processing:** Click the '๐Ÿš€ Upscale Video' button to begin the upscaling process. 5. **Download The Result:** Once the process is complete, download the upscaled video using the '๐Ÿ’พ Download Video' button. > Tip: If you get a CUDA out of memory error, try increasing the Tile Grid Size. This will split the image into smaller tiles for processing, which can help reduce memory usage. ''') SubmitButton.click(fn=Process, inputs=[InputVideo, InputModel, InputFrameRate, InputTileGridSize, InputFileType], outputs=[OutputSlider, DownloadOutput]) if __name__ == '__main__': os.makedirs(ModelDir, exist_ok=True) os.makedirs(TempDir, exist_ok=True) Logger.info('๐Ÿš€ Starting Video Upscaler') Interface.launch(pwa=True)