| import shutil | |
| import subprocess | |
| from pathlib import Path | |
| from typing import Literal | |
| import numpy as np | |
| VideoCodec = Literal["h264", "vp9", "gif"] | |
| def _check_ffmpeg_installed() -> None: | |
| """Raise an error if ffmpeg is not available on the system PATH.""" | |
| if shutil.which("ffmpeg") is None: | |
| raise RuntimeError( | |
| "ffmpeg is required to write video but was not found on your system. " | |
| "Please install ffmpeg and ensure it is available on your PATH." | |
| ) | |
| def _check_array_format(video: np.ndarray) -> None: | |
| """Raise an error if the array is not in the expected format.""" | |
| if not (video.ndim == 4 and video.shape[-1] == 3): | |
| raise ValueError( | |
| f"Expected RGB input shaped (F, H, W, 3), got {video.shape}. " | |
| f"Input has {video.ndim} dimensions, expected 4." | |
| ) | |
| if video.dtype != np.uint8: | |
| raise TypeError( | |
| f"Expected dtype=uint8, got {video.dtype}. " | |
| "Please convert your video data to uint8 format." | |
| ) | |
| def _check_path(file_path: str | Path) -> None: | |
| """Raise an error if the parent directory does not exist.""" | |
| file_path = Path(file_path) | |
| if not file_path.parent.exists(): | |
| try: | |
| file_path.parent.mkdir(parents=True, exist_ok=True) | |
| except OSError as e: | |
| raise ValueError( | |
| f"Failed to create parent directory {file_path.parent}: {e}" | |
| ) | |
| def write_video( | |
| file_path: str | Path, video: np.ndarray, fps: float, codec: VideoCodec | |
| ) -> None: | |
| """RGB uint8 only, shape (F, H, W, 3).""" | |
| _check_ffmpeg_installed() | |
| _check_path(file_path) | |
| if codec not in {"h264", "vp9", "gif"}: | |
| raise ValueError("Unsupported codec. Use h264, vp9, or gif.") | |
| arr = np.asarray(video) | |
| _check_array_format(arr) | |
| frames = np.ascontiguousarray(arr) | |
| _, height, width, _ = frames.shape | |
| out_path = str(file_path) | |
| cmd = [ | |
| "ffmpeg", | |
| "-y", | |
| "-f", | |
| "rawvideo", | |
| "-s", | |
| f"{width}x{height}", | |
| "-pix_fmt", | |
| "rgb24", | |
| "-r", | |
| str(fps), | |
| "-i", | |
| "-", | |
| "-an", | |
| ] | |
| if codec == "gif": | |
| video_filter = "split[s0][s1];[s0]palettegen[p];[s1][p]paletteuse" | |
| cmd += [ | |
| "-vf", | |
| video_filter, | |
| "-loop", | |
| "0", | |
| ] | |
| elif codec == "h264": | |
| cmd += [ | |
| "-vcodec", | |
| "libx264", | |
| "-pix_fmt", | |
| "yuv420p", | |
| "-movflags", | |
| "+faststart", | |
| ] | |
| elif codec == "vp9": | |
| bpp = 0.08 | |
| bps = int(width * height * fps * bpp) | |
| if bps >= 1_000_000: | |
| bitrate = f"{round(bps / 1_000_000)}M" | |
| elif bps >= 1_000: | |
| bitrate = f"{round(bps / 1_000)}k" | |
| else: | |
| bitrate = str(max(bps, 1)) | |
| cmd += [ | |
| "-vcodec", | |
| "libvpx-vp9", | |
| "-b:v", | |
| bitrate, | |
| "-pix_fmt", | |
| "yuv420p", | |
| ] | |
| cmd += [out_path] | |
| proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stderr=subprocess.PIPE) | |
| try: | |
| for frame in frames: | |
| proc.stdin.write(frame.tobytes()) | |
| finally: | |
| if proc.stdin: | |
| proc.stdin.close() | |
| stderr = ( | |
| proc.stderr.read().decode("utf-8", errors="ignore") if proc.stderr else "" | |
| ) | |
| ret = proc.wait() | |
| if ret != 0: | |
| raise RuntimeError(f"ffmpeg failed with code {ret}\n{stderr}") | |