Spaces:
Running
on
L4
Running
on
L4
import numpy as np | |
import torch | |
from PIL import Image | |
import argparse | |
import os | |
import json | |
from typing import List, Optional, Tuple | |
import scipy.spatial.transform as spt | |
from omegaconf import OmegaConf | |
from modeling.pipeline import VMemPipeline | |
from utils import load_img_and_K, transform_img_and_K, get_default_intrinsics | |
class Navigator: | |
""" | |
Navigator class for moving through a 3D scene with a virtual camera. | |
Provides methods to move forward and turn left/right, generating new camera poses | |
and rendering frames using VMemPipeline. | |
""" | |
def __init__(self, pipeline: VMemPipeline, step_size: float = 0.1, num_interpolation_frames: int = 4): | |
""" | |
Initialize the Navigator. | |
Args: | |
pipeline: The VMemPipeline used for rendering frames | |
step_size: The distance to move forward with each step | |
num_interpolation_frames: Number of frames to generate for each movement | |
""" | |
self.pipeline = pipeline | |
self.step_size = step_size | |
self.current_pose = None | |
self.current_K = None | |
self.frames = [] | |
self.num_interpolation_frames = num_interpolation_frames | |
self.pose_history = [] # Store history of camera poses | |
def initialize(self, image, initial_pose, initial_K): | |
""" | |
Initialize the navigator with an image and camera parameters. | |
Uses the pipeline's initialize method to set up the state. | |
Args: | |
image: Initial image tensor | |
initial_pose: Initial camera pose (4x4 camera-to-world matrix) | |
initial_K: Initial camera intrinsics matrix | |
Returns: | |
The initial frame as PIL Image | |
""" | |
self.current_pose = initial_pose | |
self.current_K = initial_K | |
# Use the pipeline's initialize method | |
initial_frame = self.pipeline.initialize(image, initial_pose, initial_K) | |
self.frames = [initial_frame] | |
# Save the initial pose | |
self.pose_history.append({ | |
"file_path": f"images/frame_001.png", | |
"transform_matrix": initial_pose.tolist() if isinstance(initial_pose, np.ndarray) else initial_pose | |
}) | |
return initial_frame | |
def initialize_pipeline(self, image_tensor, initial_pose, initial_K): | |
""" | |
Initialize the pipeline with the first image and camera pose. | |
Deprecated: Use initialize() instead. | |
Args: | |
image_tensor: Initial image tensor [1, C, H, W] | |
initial_pose: Initial camera pose (4x4 camera-to-world matrix) | |
initial_K: Initial camera intrinsics matrix | |
Returns: | |
The generated frame as PIL Image | |
""" | |
return self.initialize(image_tensor, initial_pose, initial_K) | |
def initialize_with_image(self, image, initial_pose, initial_K): | |
""" | |
Initialize the navigator with an image and camera parameters. | |
Deprecated: Use initialize() instead. | |
Args: | |
image: Initial image tensor | |
initial_pose: Initial camera pose (4x4 camera-to-world matrix) | |
initial_K: Initial camera intrinsics matrix | |
Returns: | |
The generated frame as PIL Image | |
""" | |
return self.initialize(image, initial_pose, initial_K) | |
def _interpolate_poses(self, start_pose, end_pose, num_frames): | |
""" | |
Interpolate between two camera poses. | |
Args: | |
start_pose: Starting camera pose (4x4 matrix) | |
end_pose: Ending camera pose (4x4 matrix) | |
num_frames: Number of interpolation frames to generate (including end pose) | |
Returns: | |
List of interpolated camera poses | |
""" | |
# Extract rotation matrices | |
start_R = start_pose[:3, :3] | |
end_R = end_pose[:3, :3] | |
# Extract translation vectors | |
start_t = start_pose[:3, 3] | |
end_t = end_pose[:3, 3] | |
# Convert rotation matrices to quaternions for smooth interpolation | |
start_quat = spt.Rotation.from_matrix(start_R).as_quat() | |
end_quat = spt.Rotation.from_matrix(end_R).as_quat() | |
# Generate interpolated poses | |
interpolated_poses = [] | |
for i in range(num_frames): | |
# Interpolation factor (0 to 1) | |
t = (i + 1) / num_frames | |
# Interpolate translation | |
interp_t = (1 - t) * start_t + t * end_t | |
# Interpolate rotation (SLERP) | |
interp_quat = spt.Slerp( | |
np.array([0, 1]), | |
spt.Rotation.from_quat([start_quat, end_quat]) | |
)(t).as_matrix() | |
# Create interpolated pose matrix | |
interp_pose = np.eye(4) | |
interp_pose[:3, :3] = interp_quat | |
interp_pose[:3, 3] = interp_t | |
interpolated_poses.append(interp_pose) | |
return interpolated_poses | |
def move_backward(self, num_steps: int = 1) -> List[Image.Image]: | |
""" | |
Move the camera backward along its viewing direction with smooth interpolation. | |
Args: | |
num_steps: Number of steps to move forward | |
Returns: | |
List of generated frames as PIL Images | |
""" | |
if self.current_pose is None: | |
print("Navigator not initialized. Call initialize first.") | |
return None | |
# Get the current forward direction from the camera pose | |
forward_dir = self.current_pose[:3, 2] | |
# Create the target pose | |
target_pose = self.current_pose.copy() | |
target_pose[:3, 3] += forward_dir * self.step_size * num_steps | |
# Interpolate between current pose and target pose | |
interpolated_poses = self._interpolate_poses( | |
self.current_pose, | |
target_pose, | |
self.num_interpolation_frames | |
) | |
# Create list of intrinsics (same for all frames) | |
interpolated_Ks = [self.current_K] * len(interpolated_poses) | |
# Generate frames for interpolated poses | |
new_frames = self.pipeline.generate_trajectory_frames(interpolated_poses, | |
interpolated_Ks, | |
use_non_maximum_suppression=False) | |
# Update the current pose to the final pose | |
self.current_pose = interpolated_poses[-1] | |
self.frames.extend(new_frames) | |
# Save the final pose | |
self.pose_history.append({ | |
"file_path": f"images/frame_{len(self.pose_history) + 1:03d}.png", | |
"transform_matrix": self.current_pose.tolist() if isinstance(self.current_pose, np.ndarray) else self.current_pose | |
}) | |
return new_frames | |
def move_forward(self, num_steps: int = 1) -> List[Image.Image]: | |
""" | |
Move the camera forward along its viewing direction with smooth interpolation. | |
Args: | |
num_steps: Number of steps to move forward | |
Returns: | |
List of generated frames as PIL Images | |
""" | |
if self.current_pose is None: | |
print("Navigator not initialized. Call initialize first.") | |
return None | |
# Get the current forward direction from the camera pose | |
forward_dir = self.current_pose[:3, 2] | |
# Create the target pose | |
target_pose = self.current_pose.copy() | |
target_pose[:3, 3] -= forward_dir * self.step_size * num_steps | |
# Interpolate between current pose and target pose | |
interpolated_poses = self._interpolate_poses( | |
self.current_pose, | |
target_pose, | |
self.num_interpolation_frames | |
) | |
# Create list of intrinsics (same for all frames) | |
interpolated_Ks = [self.current_K] * len(interpolated_poses) | |
# Generate frames for interpolated poses | |
new_frames = self.pipeline.generate_trajectory_frames(interpolated_poses, | |
interpolated_Ks, | |
use_non_maximum_suppression=False) | |
# Update the current pose to the final pose | |
self.current_pose = interpolated_poses[-1] | |
self.frames.extend(new_frames) | |
# Save the final pose | |
self.pose_history.append({ | |
"file_path": f"images/frame_{len(self.pose_history) + 1:03d}.png", | |
"transform_matrix": self.current_pose.tolist() if isinstance(self.current_pose, np.ndarray) else self.current_pose | |
}) | |
return new_frames | |
def turn_left(self, degrees: float = 3) -> List[Image.Image]: | |
""" | |
Rotate the camera left around the up vector with smooth interpolation. | |
Args: | |
degrees: Rotation angle in degrees | |
Returns: | |
List of generated frames as PIL Images | |
""" | |
return self._turn(degrees) | |
def turn_right(self, degrees: float = 3) -> List[Image.Image]: | |
""" | |
Rotate the camera right around the up vector with smooth interpolation. | |
Args: | |
degrees: Rotation angle in degrees | |
Returns: | |
List of generated frames as PIL Images | |
""" | |
return self._turn(-degrees) | |
def _turn(self, degrees: float) -> List[Image.Image]: | |
""" | |
Helper method to turn the camera by the specified angle with smooth interpolation. | |
Positive angles turn left, negative angles turn right. | |
Args: | |
degrees: Rotation angle in degrees | |
Returns: | |
List of generated frames as PIL Images | |
""" | |
if self.current_pose is None: | |
print("Navigator not initialized. Call initialize first.") | |
return None | |
# Convert degrees to radians | |
angle_rad = np.radians(degrees) | |
# Create rotation matrix around the up axis (assuming Y is up) | |
rotation = np.array([ | |
[np.cos(angle_rad), 0, np.sin(angle_rad), 0], | |
[0, 1, 0, 0], | |
[-np.sin(angle_rad), 0, np.cos(angle_rad), 0], | |
[0, 0, 0, 1] | |
]) | |
# Apply rotation to the current pose | |
position = self.current_pose[:3, 3].copy() | |
rotation_matrix = self.current_pose[:3, :3].copy() | |
# Create the target pose | |
target_pose = np.eye(4) | |
target_pose[:3, :3] = rotation[:3, :3] @ rotation_matrix | |
target_pose[:3, 3] = position | |
# Interpolate between current pose and target pose | |
interpolated_poses = self._interpolate_poses( | |
self.current_pose, | |
target_pose, | |
self.num_interpolation_frames | |
) | |
# Create list of intrinsics (same for all frames) | |
interpolated_Ks = [self.current_K] * len(interpolated_poses) | |
# Generate frames for interpolated poses | |
new_frames = self.pipeline.generate_trajectory_frames(interpolated_poses, interpolated_Ks) | |
# Update the current pose to the final pose | |
self.current_pose = interpolated_poses[-1] | |
self.frames.extend(new_frames) | |
# Save the final pose | |
self.pose_history.append({ | |
"file_path": f"images/frame_{len(self.pose_history) + 1:03d}.png", | |
"transform_matrix": self.current_pose.tolist() if isinstance(self.current_pose, np.ndarray) else self.current_pose | |
}) | |
return new_frames | |
def navigate(self, commands: List[str]) -> List[List[Image.Image]]: | |
""" | |
Execute a series of navigation commands and return the generated frames. | |
Args: | |
commands: List of commands ('w', 'a', 'd', 'q') | |
Returns: | |
List of lists of generated frames, one list per command | |
""" | |
if self.current_pose is None: | |
print("Navigator not initialized. Call initialize first.") | |
return [] | |
all_generated_frames = [] | |
for idx, cmd in enumerate(commands): | |
if cmd == 'w': | |
frames = self.move_forward() | |
if frames: | |
all_generated_frames.extend(frames) | |
elif cmd == 's': | |
# self.pipeline.temporal_only = True | |
frames = self.move_backward() | |
if frames: | |
all_generated_frames.extend(frames) | |
elif cmd == 'a': | |
frames = self.turn_left(4) | |
if frames: | |
all_generated_frames.extend(frames) | |
elif cmd == 'd': | |
frames = self.turn_right(4) | |
if frames: | |
all_generated_frames.extend(frames) | |
return all_generated_frames | |
def undo(self) -> bool: | |
""" | |
Undo the last navigation step by removing the most recent frames and poses. | |
Uses the pipeline's undo_latest_move method to handle the frame removal. | |
Returns: | |
bool: True if undo was successful, False otherwise | |
""" | |
# Check if we have enough poses to undo | |
if len(self.pose_history) <= 1: | |
print("Cannot undo: at initial position") | |
return False | |
# Use pipeline's undo function to remove the last batch of frames | |
success = self.pipeline.undo_latest_move() | |
if success: | |
# Remove the last pose from history | |
self.pose_history.pop() | |
# Set current pose to the previous pose | |
prev_pose_data = self.pose_history[-1] | |
self.current_pose = np.array(prev_pose_data["transform_matrix"]) | |
# Remove frames from the frames list | |
frames_to_remove = min(self.pipeline.config.model.target_num_frames, len(self.frames) - 1) | |
for _ in range(frames_to_remove): | |
if len(self.frames) > 1: # Keep at least the initial frame | |
self.frames.pop() | |
print(f"Successfully undid last movement. Now at position {len(self.pose_history)}") | |
return True | |
return False | |
def save_camera_poses(self, output_path): | |
""" | |
Save the camera pose history to a JSON file in the format | |
required for NeRF training. | |
Args: | |
output_path: Path to save the JSON file | |
""" | |
# Create the output directory if it doesn't exist | |
os.makedirs(os.path.dirname(output_path), exist_ok=True) | |
# Format the data as required | |
transforms_data = { | |
"frames": self.pose_history | |
} | |
# Save to JSON file | |
with open(output_path, 'w') as f: | |
json.dump(transforms_data, f, indent=4) | |
print(f"Camera poses saved to {output_path}") | |
def main(): | |
parser = argparse.ArgumentParser(description="Interactive navigation in VMem") | |
parser.add_argument("--config", type=str, default="configs/inference/inference.yaml", help="Path to config file") | |
parser.add_argument("--step_size", type=float, default=0.1, help="Forward step size") | |
parser.add_argument("--interpolation_frames", type=int, default=4, help="Number of frames for each movement") | |
parser.add_argument("--commands", type=str, default="a,a,a,a,a,d,d,d,d,d,d,w,w,w,w,a,a,a,a,d,d,d,d,s,s,s,s", help="Comma-separated commands to execute (w,a,s,d,c,q) where c is circulate") | |
# parser.add_argument("--commands", type=str, default="d,d,d,d,w,w,w,d,d,d,d,d,a,a,a,a,a,s,s", help="Comma-separated commands to execute (w,a,s,d,c,q) where c is circulate") | |
parser.add_argument("--output_dir", type=str, default="./visualization/navigation_frames", help="Directory to save output frames") | |
parser.add_argument("--save_poses", type=str, default="./visualization/transforms.json", help="Path to save camera poses in NeRF format") | |
args = parser.parse_args() | |
# Load configuration | |
config = OmegaConf.load(args.config) | |
# Initialize the pipeline | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
pipeline = VMemPipeline(config, device=device) | |
# Create the navigator | |
navigator = Navigator(pipeline, step_size=args.step_size, num_interpolation_frames=args.interpolation_frames) | |
# Load episode data | |
frame_path = "test_samples/oxford.jpeg" | |
image, _ = load_img_and_K(frame_path, None, K=None, device=device) | |
image, _ = transform_img_and_K(image, (config.model.height, config.model.width), mode="crop", K=None) | |
ori_K = np.array(get_default_intrinsics()[0]) | |
initial_pose = np.eye(4) | |
# Initialize the navigator with the first frame using pipeline's initialize method | |
initial_frame = navigator.initialize(image, initial_pose, ori_K) | |
# Create output directory if needed | |
if args.output_dir: | |
os.makedirs(args.output_dir, exist_ok=True) | |
initial_frame.save(os.path.join(args.output_dir, "initial.png")) | |
# If commands are provided, execute them in sequence | |
commands = args.commands.split(',') | |
all_frames_lists = navigator.navigate(commands) | |
# Save camera poses | |
if args.save_poses: | |
navigator.save_camera_poses(args.save_poses) | |
if __name__ == "__main__": | |
main() | |