Spaces:
Running
on
L4
Running
on
L4
#!/usr/bin/env python3 | |
""" | |
Preprocess Script for UASOL Dataset | |
This script processes sequences in the UASOL dataset by: | |
- Parsing camera parameters from a 'log.txt' file. | |
- Reading a 'complete.json' manifest that describes frames (RGB + depth). | |
- Converting depth from millimeters to meters. | |
- Rescaling images and depth maps to a fixed resolution (default 640x480). | |
- Saving the camera intrinsics and pose in .npz files. | |
Usage: | |
python preprocess_uasol.py \ | |
--input_dir /path/to/data_uasol \ | |
--output_dir /path/to/processed_uasol | |
""" | |
import os | |
import json | |
import numpy as np | |
import cv2 | |
from PIL import Image | |
from tqdm import tqdm | |
from concurrent.futures import ProcessPoolExecutor, as_completed | |
import argparse | |
import src.dust3r.datasets.utils.cropping as cropping | |
def parse_log_file(log_file): | |
""" | |
Parses the log.txt file and returns a dictionary of camera parameters. | |
Args: | |
log_file (str): Path to the log.txt file containing camera parameters. | |
Returns: | |
dict: A dictionary of camera parameters parsed from the file. | |
""" | |
camera_dict = {} | |
start_parse = False | |
with open(log_file, "r") as f: | |
for line in f: | |
line = line.strip() | |
if line.startswith("LEFT CAMERA PARAMETERS"): | |
start_parse = True | |
continue | |
if start_parse and ":" in line: | |
key, value = line.split(":", 1) | |
key = key.strip().replace(" ", "_").lower() | |
value = value.strip().strip(".") | |
# Handle numeric/list values | |
if "," in value or "[" in value: | |
# Convert to list of floats | |
value = [float(v.strip()) for v in value.strip("[]").split(",")] | |
else: | |
try: | |
value = float(value) | |
except ValueError: | |
pass | |
camera_dict[key] = value | |
return camera_dict | |
def process_data(task_args): | |
""" | |
Process a single frame of the dataset: | |
- Reads the RGB image and depth map. | |
- Converts depth from mm to meters. | |
- Rescales the image and depth to a fixed output resolution. | |
- Saves results (RGB, depth, camera intrinsics, and pose). | |
Args: | |
task_args (tuple): A tuple containing: | |
- data (dict): Frame info from 'complete.json'. | |
- seq_dir (str): Path to the sequence directory. | |
- out_rgb_dir (str): Output directory for RGB images. | |
- out_depth_dir (str): Output directory for depth maps. | |
- out_cam_dir (str): Output directory for camera intrinsics/pose. | |
- K (np.ndarray): 3x3 camera intrinsics matrix. | |
- H (int): Original image height. | |
- W (int): Original image width. | |
Returns: | |
str or None: | |
Returns an error message (str) if something goes wrong. | |
Otherwise, returns None on success. | |
""" | |
data, seq_dir, out_rgb_dir, out_depth_dir, out_cam_dir, K, H, W = task_args | |
try: | |
img_p = data["color_frame_left"] | |
depth_p = data["depth_frame"] | |
matrix = data["m"] | |
# Input file paths | |
img_path = os.path.join(seq_dir, "Images", img_p + ".png") | |
depth_path = os.path.join(seq_dir, "Images", depth_p + ".png") | |
if not (os.path.isfile(img_path) and os.path.isfile(depth_path)): | |
return f"Missing files for {img_p}" | |
# Read RGB | |
img = Image.open(img_path).convert("RGB") | |
# Read depth (16-bit or 32-bit), then convert mm to meters | |
depth = cv2.imread(depth_path, cv2.IMREAD_ANYDEPTH).astype(np.float32) | |
if depth.shape[0] != H or depth.shape[1] != W: | |
return f"Depth shape mismatch for {img_p}" | |
depth = depth / 1000.0 # mm to meters | |
# Build the pose matrix | |
pose = np.array(matrix, dtype=np.float32) | |
# Convert translation (last column) from mm to meters | |
pose[:3, 3] /= 1000.0 | |
# Rescale image and depth to desired output size (e.g., 640x480) | |
image, depthmap, camera_intrinsics = cropping.rescale_image_depthmap( | |
img, depth, K, output_resolution=(640, 480) | |
) | |
# Save outputs | |
out_img_path = os.path.join(out_rgb_dir, img_p + ".png") | |
out_depth_path = os.path.join(out_depth_dir, img_p + ".npy") | |
out_cam_path = os.path.join(out_cam_dir, img_p + ".npz") | |
image.save(out_img_path) | |
np.save(out_depth_path, depthmap) | |
np.savez(out_cam_path, intrinsics=camera_intrinsics, pose=pose) | |
except Exception as e: | |
return f"Error processing {img_p}: {e}" | |
return None | |
def main(): | |
parser = argparse.ArgumentParser(description="Preprocess UASOL dataset.") | |
parser.add_argument( | |
"--input_dir", required=True, help="Path to the root UASOL directory." | |
) | |
parser.add_argument( | |
"--output_dir", | |
required=True, | |
help="Path to the directory where processed data will be stored.", | |
) | |
args = parser.parse_args() | |
root = os.path.abspath(args.input_dir) | |
out_dir = os.path.abspath(args.output_dir) | |
os.makedirs(out_dir, exist_ok=True) | |
# Find all sequences that have a 'Images' folder | |
seqs = [] | |
for d in os.listdir(root): | |
images_path = os.path.join(root, d, "Images") | |
if os.path.isdir(images_path): | |
seqs.append(d) | |
for seq in seqs: | |
seq_dir = os.path.join(root, seq) | |
log_file = os.path.join(seq_dir, "log.txt") | |
manifest_file = os.path.join(seq_dir, "complete.json") | |
# Create output subdirectories | |
out_rgb_dir = os.path.join(out_dir, seq, "rgb") | |
out_depth_dir = os.path.join(out_dir, seq, "depth") | |
out_cam_dir = os.path.join(out_dir, seq, "cam") | |
os.makedirs(out_rgb_dir, exist_ok=True) | |
os.makedirs(out_depth_dir, exist_ok=True) | |
os.makedirs(out_cam_dir, exist_ok=True) | |
# Parse camera parameters from log.txt | |
camera_dict = parse_log_file(log_file) | |
# Extract relevant camera info | |
cx = camera_dict["optical_center_along_x_axis,_defined_in_pixels"] | |
cy = camera_dict["optical_center_along_y_axis,_defined_in_pixels"] | |
fx = camera_dict["focal_length_in_pixels_alog_x_axis"] | |
fy = camera_dict["focal_length_in_pixels_alog_y_axis"] | |
W, H = map(int, camera_dict["resolution"]) | |
# Optionally read any 'depth_min_and_max_range_values' if needed | |
# depth_range = camera_dict['depth_min_and_max_range_values'] | |
# Construct intrinsic matrix | |
K = np.eye(3, dtype=np.float32) | |
K[0, 0] = fx | |
K[1, 1] = fy | |
K[0, 2] = cx | |
K[1, 2] = cy | |
# Read the JSON manifest | |
if not os.path.isfile(manifest_file): | |
print( | |
f"Warning: No manifest file found at {manifest_file}. Skipping {seq}." | |
) | |
continue | |
with open(manifest_file, "r") as f: | |
metadata = json.load(f)["Data"] | |
# Build tasks for parallel processing | |
tasks = [] | |
for data in metadata: | |
tasks.append( | |
(data, seq_dir, out_rgb_dir, out_depth_dir, out_cam_dir, K, H, W) | |
) | |
# Process frames in parallel | |
with ProcessPoolExecutor(max_workers=os.cpu_count() or 4) as executor: | |
futures = { | |
executor.submit(process_data, t): t[0]["color_frame_left"] | |
for t in tasks | |
} | |
for future in tqdm( | |
as_completed(futures), total=len(futures), desc=f"Processing {seq}" | |
): | |
error = future.result() | |
if error: | |
print(error) | |
if __name__ == "__main__": | |
main() | |