vmem / extern /CUT3R /datasets_preprocess /preprocess_dynamic_replica.py
liguang0115's picture
Add initial project structure with core files, configurations, and sample images
2df809d
#!/usr/bin/env python3
"""
Preprocess the Dynamic Replica dataset.
This script reads frame annotations (stored in compressed JSON files),
loads images, depth maps, optical flow, and camera parameters, and saves
processed images, depth maps, flow files, and camera metadata (intrinsics and poses)
to an output directory organized by split, sequence, and camera view.
Usage:
python preprocess_dynamic_replica.py --root_dir /path/to/data_dynamic_replica \
--out_dir /path/to/processed_dynamic_replica \
[--splits train valid test] \
[--num_processes 8]
"""
import argparse
import gzip
import json
import os
import os.path as osp
import re
import shutil
import time
from collections import defaultdict
from dataclasses import dataclass
from multiprocessing import Pool, cpu_count
from typing import List, Optional
import cv2
import matplotlib.pyplot as plt
import numpy as np
import PIL.Image
import torch
from PIL import Image
from pytorch3d.implicitron.dataset.types import (
FrameAnnotation as ImplicitronFrameAnnotation,
load_dataclass,
)
from tqdm import tqdm
import imageio
# Enable OpenEXR support in OpenCV.
os.environ["OPENCV_IO_ENABLE_OPENEXR"] = "1"
TAG_CHAR = np.array([202021.25], np.float32)
def readFlow(fn):
"""Read .flo file in Middlebury format."""
with open(fn, "rb") as f:
magic = np.fromfile(f, np.float32, count=1)
if 202021.25 != magic:
print("Magic number incorrect. Invalid .flo file")
return None
else:
w = np.fromfile(f, np.int32, count=1)
h = np.fromfile(f, np.int32, count=1)
data = np.fromfile(f, np.float32, count=2 * int(w) * int(h))
return np.resize(data, (int(h), int(w), 2))
def readPFM(file):
with open(file, "rb") as f:
header = f.readline().rstrip()
if header == b"PF":
color = True
elif header == b"Pf":
color = False
else:
raise Exception("Not a PFM file.")
dim_match = re.match(rb"^(\d+)\s(\d+)\s$", f.readline())
if dim_match:
width, height = map(int, dim_match.groups())
else:
raise Exception("Malformed PFM header.")
scale = float(f.readline().rstrip())
endian = "<" if scale < 0 else ">"
if scale < 0:
scale = -scale
data = np.fromfile(f, endian + "f")
shape = (height, width, 3) if color else (height, width)
data = np.reshape(data, shape)
data = np.flipud(data)
return data
def read_gen(file_name, pil=False):
ext = osp.splitext(file_name)[-1].lower()
if ext in [".png", ".jpeg", ".ppm", ".jpg"]:
return Image.open(file_name)
elif ext in [".bin", ".raw"]:
return np.load(file_name)
elif ext == ".flo":
return readFlow(file_name).astype(np.float32)
elif ext == ".pfm":
flow = readPFM(file_name).astype(np.float32)
return flow if len(flow.shape) == 2 else flow[:, :, :-1]
return []
def _load_16big_png_depth(depth_png):
with Image.open(depth_png) as depth_pil:
depth = (
np.frombuffer(np.array(depth_pil, dtype=np.uint16), dtype=np.float16)
.astype(np.float32)
.reshape((depth_pil.size[1], depth_pil.size[0]))
)
return depth
@dataclass
class DynamicReplicaFrameAnnotation(ImplicitronFrameAnnotation):
"""A dataclass used to load annotations from .json for Dynamic Replica."""
camera_name: Optional[str] = None
instance_id_map_path: Optional[str] = None
flow_forward: Optional[str] = None
flow_forward_mask: Optional[str] = None
flow_backward: Optional[str] = None
flow_backward_mask: Optional[str] = None
trajectories: Optional[str] = None
def _get_pytorch3d_camera(entry_viewpoint, image_size, scale: float):
"""
Convert the camera parameters stored in an annotation to PyTorch3D convention.
Returns:
R, tvec, focal, principal_point
"""
assert entry_viewpoint is not None
principal_point = torch.tensor(entry_viewpoint.principal_point, dtype=torch.float)
focal_length = torch.tensor(entry_viewpoint.focal_length, dtype=torch.float)
half_image_size_wh_orig = (
torch.tensor(list(reversed(image_size)), dtype=torch.float) / 2.0
)
fmt = entry_viewpoint.intrinsics_format
if fmt.lower() == "ndc_norm_image_bounds":
rescale = half_image_size_wh_orig
elif fmt.lower() == "ndc_isotropic":
rescale = half_image_size_wh_orig.min()
else:
raise ValueError(f"Unknown intrinsics format: {fmt}")
principal_point_px = half_image_size_wh_orig - principal_point * rescale
focal_length_px = focal_length * rescale
# Prepare rotation and translation for PyTorch3D
R = torch.tensor(entry_viewpoint.R, dtype=torch.float)
T = torch.tensor(entry_viewpoint.T, dtype=torch.float)
R_pytorch3d = R.clone()
T_pytorch3d = T.clone()
T_pytorch3d[..., :2] *= -1
R_pytorch3d[..., :, :2] *= -1
tvec = T_pytorch3d
return R, tvec, focal_length_px, principal_point_px
# Global configuration for splits and output.
SPLITS = ["train", "valid", "test"]
# (You can override the default root and out_dir via command-line arguments.)
def process_split_data(args):
"""
Process all frames for a given split.
Reads the frame annotation file for the given split, groups frames per sequence
and camera, and for each frame loads the image, depth map, optical flows (if available),
computes the camera intrinsics and pose (using _get_pytorch3d_camera), and saves the data.
"""
split, root_dir, out_dir = args
split_dir = osp.join(root_dir, split)
# The frame annotations are stored in a compressed json file.
frame_annotations_file = osp.join(split_dir, f"frame_annotations_{split}.jgz")
with gzip.open(frame_annotations_file, "rt", encoding="utf8") as zipfile:
frame_annots_list = load_dataclass(zipfile, List[DynamicReplicaFrameAnnotation])
# Group frames by sequence and camera.
seq_annot = defaultdict(lambda: defaultdict(list))
for frame_annot in frame_annots_list:
seq_annot[frame_annot.sequence_name][frame_annot.camera_name].append(
frame_annot
)
# Process each sequence.
for seq_name in tqdm(seq_annot.keys(), desc=f"Processing split '{split}'"):
# For each camera (e.g., 'left', 'right'), create output directories.
for cam in ["left", "right"]:
out_img_dir = osp.join(out_dir, split, seq_name, cam, "rgb")
out_depth_dir = osp.join(out_dir, split, seq_name, cam, "depth")
out_fflow_dir = osp.join(out_dir, split, seq_name, cam, "flow_forward")
out_bflow_dir = osp.join(out_dir, split, seq_name, cam, "flow_backward")
out_cam_dir = osp.join(out_dir, split, seq_name, cam, "cam")
os.makedirs(out_img_dir, exist_ok=True)
os.makedirs(out_depth_dir, exist_ok=True)
os.makedirs(out_fflow_dir, exist_ok=True)
os.makedirs(out_bflow_dir, exist_ok=True)
os.makedirs(out_cam_dir, exist_ok=True)
for framedata in tqdm(
seq_annot[seq_name][cam], desc=f"Seq {seq_name} [{cam}]", leave=False
):
timestamp = framedata.frame_timestamp
im_path = osp.join(split_dir, framedata.image.path)
depth_path = osp.join(split_dir, framedata.depth.path)
if framedata.flow_forward["path"]:
flow_forward_path = osp.join(
split_dir, framedata.flow_forward["path"]
)
flow_forward_mask_path = osp.join(
split_dir, framedata.flow_forward_mask["path"]
)
if framedata.flow_backward["path"]:
flow_backward_path = osp.join(
split_dir, framedata.flow_backward["path"]
)
flow_backward_mask_path = osp.join(
split_dir, framedata.flow_backward_mask["path"]
)
# Ensure required files exist.
assert os.path.isfile(im_path), im_path
assert os.path.isfile(depth_path), depth_path
if framedata.flow_forward["path"]:
assert os.path.isfile(flow_forward_path), flow_forward_path
assert os.path.isfile(
flow_forward_mask_path
), flow_forward_mask_path
if framedata.flow_backward["path"]:
assert os.path.isfile(flow_backward_path), flow_backward_path
assert os.path.isfile(
flow_backward_mask_path
), flow_backward_mask_path
viewpoint = framedata.viewpoint
# Load depth map.
depth = _load_16big_png_depth(depth_path)
# Process optical flow if available.
if framedata.flow_forward["path"]:
flow_forward = cv2.imread(flow_forward_path, cv2.IMREAD_UNCHANGED)
flow_forward_mask = cv2.imread(
flow_forward_mask_path, cv2.IMREAD_UNCHANGED
)
np.savez(
osp.join(out_fflow_dir, f"{timestamp}.npz"),
flow=flow_forward,
mask=flow_forward_mask,
)
if framedata.flow_backward["path"]:
flow_backward = cv2.imread(flow_backward_path, cv2.IMREAD_UNCHANGED)
flow_backward_mask = cv2.imread(
flow_backward_mask_path, cv2.IMREAD_UNCHANGED
)
np.savez(
osp.join(out_bflow_dir, f"{timestamp}.npz"),
flow=flow_backward,
mask=flow_backward_mask,
)
# Get camera parameters.
R, t, focal, pp = _get_pytorch3d_camera(
viewpoint, framedata.image.size, scale=1.0
)
intrinsics = np.eye(3)
intrinsics[0, 0] = focal[0].item()
intrinsics[1, 1] = focal[1].item()
intrinsics[0, 2] = pp[0].item()
intrinsics[1, 2] = pp[1].item()
pose = np.eye(4)
# Invert the camera pose.
pose[:3, :3] = R.numpy().T
pose[:3, 3] = -R.numpy().T @ t.numpy()
# Define output file paths.
out_img_path = osp.join(out_img_dir, f"{timestamp}.png")
out_depth_path = osp.join(out_depth_dir, f"{timestamp}.npy")
out_cam_path = osp.join(out_cam_dir, f"{timestamp}.npz")
# Copy RGB image.
shutil.copy(im_path, out_img_path)
# Save depth.
np.save(out_depth_path, depth)
# Save camera metadata.
np.savez(out_cam_path, intrinsics=intrinsics, pose=pose)
# (Optionally, you could return some summary information.)
return None
def main():
parser = argparse.ArgumentParser(
description="Preprocess Dynamic Replica dataset: convert raw annotations, images, "
"depth, and flow files to a processed format."
)
parser.add_argument(
"--root_dir",
type=str,
required=True,
help="Root directory of the Dynamic Replica data.",
)
parser.add_argument(
"--out_dir",
type=str,
required=True,
help="Output directory for processed data.",
)
parser.add_argument(
"--splits",
type=str,
nargs="+",
default=SPLITS,
help="List of splits to process (default: train valid test).",
)
parser.add_argument(
"--num_processes",
type=int,
default=cpu_count(),
help="Number of processes to use (default: number of CPU cores).",
)
args = parser.parse_args()
os.makedirs(args.out_dir, exist_ok=True)
tasks = [(split, args.root_dir, args.out_dir) for split in args.splits]
print("Processing splits:", args.splits)
with Pool(processes=args.num_processes) as pool:
list(
tqdm(
pool.imap(process_split_data, tasks),
total=len(tasks),
desc="Overall Progress",
)
)
if __name__ == "__main__":
main()