Spaces:
Running
on
Zero
Running
on
Zero
import cv2 | |
import numpy as np | |
import os | |
import pickle | |
import gzip | |
from datetime import datetime | |
from pathlib import Path | |
import decord | |
import argparse | |
import json | |
import glob | |
import time | |
from typing import Dict, List, Optional, Tuple, Union, Any | |
class PoseProcessor: | |
""" | |
A class for processing pose landmarks and converting them to normalized numpy arrays. | |
""" | |
def __init__(self, pose_indices: Optional[List[int]] = None, | |
normalize_keypoints: bool = True, fill_missing_value: float = -9999.0): | |
""" | |
Initialize the PoseProcessor. | |
Args: | |
pose_indices: List of pose landmark indices to extract. | |
Default is [0,11,12,13,14,15,16] (nose, shoulders, elbows, wrists) | |
normalize_keypoints: Whether to normalize keypoints to signing space | |
fill_missing_value: Value to use for missing keypoints | |
""" | |
self.pose_indices = pose_indices if pose_indices else [0, 11, 12, 13, 14, 15, 16] | |
self.normalize_keypoints = normalize_keypoints | |
self.fill_missing_value = fill_missing_value | |
# Number of coordinates per keypoint (x, y) | |
self.coords_per_keypoint = 2 | |
self.output_shape = (len(self.pose_indices), self.coords_per_keypoint) | |
def normalize_pose_keypoints(self, pose_landmarks: List[List[float]]) -> List[List[float]]: | |
""" | |
Normalize pose keypoints to signing space. | |
Args: | |
pose_landmarks: List of pose landmarks from MediaPipe | |
Returns: | |
List of normalized pose keypoints | |
""" | |
# Extract relevant landmarks for normalization | |
left_shoulder = np.array(pose_landmarks[11][:2]) | |
right_shoulder = np.array(pose_landmarks[12][:2]) | |
left_eye = np.array(pose_landmarks[2][:2]) | |
nose = np.array(pose_landmarks[0][:2]) | |
# Calculate head unit in normalized space | |
head_unit = np.linalg.norm(right_shoulder - left_shoulder) / 2 | |
# Define signing space dimensions in normalized space | |
signing_space_width = 6 * head_unit | |
signing_space_height = 7 * head_unit | |
# Calculate signing space bounding box in normalized space | |
signing_space_top = left_eye[1] - 0.5 * head_unit | |
signing_space_bottom = signing_space_top + signing_space_height | |
signing_space_left = nose[0] - signing_space_width / 2 | |
signing_space_right = signing_space_left + signing_space_width | |
# Create transformation matrix | |
translation_matrix = np.array([[1, 0, -signing_space_left], | |
[0, 1, -signing_space_top], | |
[0, 0, 1]]) | |
scale_matrix = np.array([[1 / signing_space_width, 0, 0], | |
[0, 1 / signing_space_height, 0], | |
[0, 0, 1]]) | |
shift_matrix = np.array([[1, 0, -0.5], | |
[0, 1, -0.5], | |
[0, 0, 1]]) | |
transformation_matrix = shift_matrix @ scale_matrix @ translation_matrix | |
# Apply transformation to pose keypoints | |
normalized_keypoints = [] | |
for landmark in pose_landmarks: | |
keypoint = np.array([landmark[0], landmark[1], 1]) | |
normalized_keypoint = transformation_matrix @ keypoint | |
normalized_keypoints.append(normalized_keypoint[:2].tolist()) | |
return normalized_keypoints | |
def process_frame_landmarks(self, frame_landmarks: Optional[Dict[str, Any]]) -> np.ndarray: | |
""" | |
Process landmarks for a single frame. | |
Args: | |
frame_landmarks: Dictionary containing pose landmarks for one frame | |
Returns: | |
Numpy array of processed pose keypoints | |
""" | |
if frame_landmarks is None or frame_landmarks.get('pose_landmarks') is None: | |
# Return missing value array | |
return np.full(self.output_shape, self.fill_missing_value).flatten() | |
# Get pose landmarks | |
pose_landmarks = frame_landmarks['pose_landmarks'][0] | |
# Normalize keypoints if required | |
if self.normalize_keypoints: | |
# Take first 25 landmarks for normalization (MediaPipe pose has 33 total) | |
normalized_landmarks = self.normalize_pose_keypoints(pose_landmarks[:25]) | |
else: | |
normalized_landmarks = pose_landmarks | |
# Extract only the specified indices | |
selected_landmarks = [normalized_landmarks[i] for i in self.pose_indices] | |
# Convert to numpy array and flatten | |
frame_keypoints = np.array(selected_landmarks).flatten() | |
return frame_keypoints | |
def process_landmarks_sequence(self, landmarks_data: Dict[int, Any]) -> np.ndarray: | |
""" | |
Process landmarks for an entire sequence (video). | |
Args: | |
landmarks_data: Dictionary containing landmarks for each frame | |
Returns: | |
Numpy array of shape (num_frames, num_keypoints * 2) | |
""" | |
# Get number of frames | |
if not landmarks_data: | |
return np.array([]) | |
max_frame = max(landmarks_data.keys()) | |
num_frames = max_frame + 1 | |
video_pose_landmarks = [] | |
prev_pose = None | |
for i in range(num_frames): | |
frame_landmarks = landmarks_data.get(i, None) | |
if frame_landmarks is None: | |
# Use previous pose if available, otherwise use missing values | |
if prev_pose is not None: | |
frame_keypoints = prev_pose | |
else: | |
frame_keypoints = np.full(self.output_shape, self.fill_missing_value).flatten() | |
else: | |
# Process current frame | |
frame_keypoints = self.process_frame_landmarks(frame_landmarks) | |
if not np.all(frame_keypoints == self.fill_missing_value): | |
prev_pose = frame_keypoints | |
video_pose_landmarks.append(frame_keypoints) | |
# Convert to numpy array | |
video_pose_landmarks = np.array(video_pose_landmarks) | |
# Apply any post-processing (like the original code's wrist masking) | |
# video_pose_landmarks = self._apply_post_processing(video_pose_landmarks) | |
return video_pose_landmarks | |
def _apply_post_processing(self, pose_array: np.ndarray) -> np.ndarray: | |
""" | |
Apply post-processing to the pose array. | |
Args: | |
pose_array: Input pose array | |
Returns: | |
Post-processed pose array | |
""" | |
# The original code fills left and right wrist with -9999 | |
# This corresponds to indices 15 and 16 in the original pose landmarks | |
# In our selected indices [0,11,12,13,14,15,16], wrists are at positions 5 and 6 | |
# Each keypoint has 2 coordinates, so wrists are at positions 10-11 and 12-13 | |
# if len(self.pose_indices) >= 7 and 15 in self.pose_indices and 16 in self.pose_indices: | |
# # Find positions of wrists in our selected indices | |
# left_wrist_idx = self.pose_indices.index(15) * 2 # *2 because each keypoint has x,y | |
# right_wrist_idx = self.pose_indices.index(16) * 2 | |
# # Fill wrist coordinates with missing value | |
# pose_array[:, left_wrist_idx:left_wrist_idx+2] = self.fill_missing_value | |
# pose_array[:, right_wrist_idx:right_wrist_idx+2] = self.fill_missing_value | |
return pose_array | |
def process_landmarks_from_file(self, pose_file_path: str) -> np.ndarray: | |
""" | |
Process landmarks from a JSON file. | |
Args: | |
pose_file_path: Path to the pose landmarks JSON file | |
Returns: | |
Numpy array of processed pose keypoints | |
""" | |
try: | |
with open(pose_file_path, 'r') as f: | |
landmarks_data = json.load(f) | |
# Convert string keys to integers | |
landmarks_data = {int(k): v for k, v in landmarks_data.items()} | |
return self.process_landmarks_sequence(landmarks_data) | |
except Exception as e: | |
print(f"Error processing {pose_file_path}: {e}") | |
return np.array([]) | |
def process_and_save_landmarks(self, landmarks_data: Dict[int, Any], | |
output_path: str, filename: str) -> str: | |
""" | |
Process landmarks and save to file. | |
Args: | |
landmarks_data: Dictionary containing landmarks for each frame | |
output_path: Directory to save the processed landmarks | |
filename: Name for the output file (without extension) | |
Returns: | |
Path to the saved file | |
""" | |
# Process landmarks | |
processed_landmarks = self.process_landmarks_sequence(landmarks_data) | |
# Create output directory if it doesn't exist | |
output_dir = Path(output_path) | |
output_dir.mkdir(parents=True, exist_ok=True) | |
# Save to file | |
save_path = output_dir / f"{filename}.npy" | |
np.save(save_path, processed_landmarks) | |
return str(save_path) | |
# Convenience functions for backward compatibility | |
def process_pose_landmarks(landmarks_data: Dict[int, Any], | |
normalize: bool = True, | |
pose_indices: Optional[List[int]] = None) -> np.ndarray: | |
""" | |
Convenience function to process pose landmarks. | |
Args: | |
landmarks_data: Dictionary containing landmarks for each frame | |
normalize: Whether to normalize keypoints to signing space | |
pose_indices: List of pose landmark indices to extract | |
Returns: | |
Numpy array of processed pose keypoints | |
""" | |
processor = PoseProcessor(pose_indices=pose_indices, normalize_keypoints=normalize) | |
return processor.process_landmarks_sequence(landmarks_data) | |
def keypoints_to_numpy(pose_file: str, pose_emb_path: str): | |
""" | |
Original function for backward compatibility with command-line usage. | |
""" | |
try: | |
processor = PoseProcessor() | |
processed_landmarks = processor.process_landmarks_from_file(pose_file) | |
if processed_landmarks.size > 0: | |
# Save the processed landmarks | |
video_name = Path(pose_file).stem | |
save_path = Path(pose_emb_path) / f"{video_name}.npy" | |
save_path.parent.mkdir(parents=True, exist_ok=True) | |
np.save(save_path, processed_landmarks) | |
except Exception as e: | |
print(f"Error processing {pose_file}: {e}") | |
# Utility functions for batch processing | |
def get_mp4_files(directory: str) -> List[str]: | |
"""Get all MP4 files in a directory.""" | |
if not os.path.exists(directory): | |
raise FileNotFoundError(f'Directory not found: {directory}') | |
mp4_files = glob.glob(os.path.join(directory, '*.mp4')) | |
return [os.path.abspath(file) for file in mp4_files] | |
def load_file(filename: str): | |
"""Load a pickled and gzipped file.""" | |
with gzip.open(filename, "rb") as f: | |
return pickle.load(f) | |
def is_string_in_file(file_path: str, target_string: str) -> bool: | |
"""Check if a string exists in a file.""" | |
try: | |
with Path(file_path).open("r") as f: | |
for line in f: | |
if target_string in line: | |
return True | |
return False | |
except Exception as e: | |
print(f"Error: {e}") | |
return False | |
def main(): | |
"""Main function for command-line usage.""" | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--index', type=int, required=True, | |
help='index of the sub_list to work with') | |
parser.add_argument('--files_list', type=str, required=True, | |
help='path to the pose file') | |
parser.add_argument('--pose_features_path', type=str, required=True, | |
help='path to the pose features file') | |
parser.add_argument('--batch_size', type=int, required=True, | |
help='batch size') | |
parser.add_argument('--time_limit', type=int, required=True, | |
help='time limit') | |
args = parser.parse_args() | |
start_time = time.time() | |
# Load files list | |
fixed_list = load_file(args.files_list) | |
# Initialize processor | |
processor = PoseProcessor() | |
# Process files in batches | |
video_batches = [fixed_list[i:i + args.batch_size] for i in range(0, len(fixed_list), args.batch_size)] | |
for pose_file in video_batches[args.index]: | |
pose_file_path = Path(pose_file) | |
output_path = Path(args.pose_features_path) / f"{pose_file_path.stem}.npy" | |
if output_path.exists(): | |
print(f"Skipping {pose_file} - output already exists") | |
continue | |
current_time = time.time() | |
if current_time - start_time > args.time_limit: | |
print("Time limit reached. Stopping execution.") | |
break | |
try: | |
print(f"Processing {pose_file}") | |
keypoints_to_numpy(pose_file, args.pose_features_path) | |
print(f"Successfully processed {pose_file}") | |
except Exception as e: | |
print(f"Error processing {pose_file}: {e}") | |
if __name__ == "__main__": | |
main() |