import cv2 import numpy as np import os import pickle import gzip from datetime import datetime from pathlib import Path import decord import argparse import json import time from typing import Dict, Optional, Tuple, List, Union, Any class FaceExtractor: """ A class for extracting face regions from videos based on pose and face landmarks. Creates face frames with only eyes and mouth visible on grey background. """ def __init__(self, output_size: Tuple[int, int] = (224, 224), scale_factor: float = 1.2, grey_background_color: int = 128): """ Initialize the FaceExtractor. Args: output_size: Size of the output face frames (width, height) scale_factor: Scale factor for bounding box expansion grey_background_color: Color value for grey background (0-255) """ self.output_size = output_size self.scale_factor = scale_factor self.grey_background_color = grey_background_color # Face landmark indices for eyes and mouth self.left_eye_indices = [69, 168, 156, 118, 54] self.right_eye_indices = [168, 299, 347, 336, 301] self.mouth_indices = [164, 212, 432, 18] def resize_frame(self, frame: np.ndarray, frame_size: Tuple[int, int]) -> Optional[np.ndarray]: """Resize frame to specified size.""" if frame is not None and frame.size > 0: return cv2.resize(frame, frame_size, interpolation=cv2.INTER_AREA) else: return None def calculate_bounding_box(self, landmarks: List[List[float]], indices: List[int], image_shape: Tuple[int, int, int]) -> Tuple[int, int, int, int]: """Calculate bounding box for specific landmark indices.""" x_coordinates = [landmarks[i][0] for i in indices] y_coordinates = [landmarks[i][1] for i in indices] left = min(x_coordinates) right = max(x_coordinates) top = min(y_coordinates) bottom = max(y_coordinates) return (int(left * image_shape[1]), int(top * image_shape[0]), int(right * image_shape[1]), int(bottom * image_shape[0])) def crop_and_paste(self, src: np.ndarray, dst: np.ndarray, src_box: Tuple[int, int, int, int], dst_origin: Tuple[int, int]): """Crop region from source and paste to destination.""" x1, y1, x2, y2 = src_box dx, dy = dst_origin crop = src[y1:y2, x1:x2] crop_height, crop_width = crop.shape[:2] dst[dy:dy+crop_height, dx:dx+crop_width] = crop def cues_on_grey_background(self, image: np.ndarray, facial_landmarks: List[List[float]]) -> np.ndarray: """ Create face frame with only eyes and mouth visible on grey background. Args: image: Input image as numpy array facial_landmarks: Face landmarks from MediaPipe Returns: Face frame with eyes and mouth on grey background """ image_shape = image.shape # Calculate bounding boxes for facial features left_eye_box = self.calculate_bounding_box(facial_landmarks, self.left_eye_indices, image_shape) right_eye_box = self.calculate_bounding_box(facial_landmarks, self.right_eye_indices, image_shape) mouth_box = self.calculate_bounding_box(facial_landmarks, self.mouth_indices, image_shape) # Calculate the overall bounding box min_x = min(left_eye_box[0], right_eye_box[0], mouth_box[0]) min_y = min(left_eye_box[1], right_eye_box[1], mouth_box[1]) max_x = max(left_eye_box[2], right_eye_box[2], mouth_box[2]) max_y = max(left_eye_box[3], right_eye_box[3], mouth_box[3]) # Add padding padding = 10 min_x = max(0, min_x - padding) min_y = max(0, min_y - padding) max_x = min(image.shape[1], max_x + padding) max_y = min(image.shape[0], max_y + padding) # Make the crop a square by adjusting either width or height width = max_x - min_x height = max_y - min_y side_length = max(width, height) # Adjust to ensure square if width < side_length: extra = side_length - width min_x = max(0, min_x - extra // 2) max_x = min(image.shape[1], max_x + extra // 2) if height < side_length: extra = side_length - height min_y = max(0, min_y - extra // 2) max_y = min(image.shape[0], max_y + extra // 2) # Create grey background image grey_background = np.ones((side_length, side_length, 3), dtype=np.uint8) * self.grey_background_color # Crop and paste facial features onto grey background self.crop_and_paste(image, grey_background, left_eye_box, (left_eye_box[0]-min_x, left_eye_box[1]-min_y)) self.crop_and_paste(image, grey_background, right_eye_box, (right_eye_box[0]-min_x, right_eye_box[1]-min_y)) self.crop_and_paste(image, grey_background, mouth_box, (mouth_box[0]-min_x, mouth_box[1]-min_y)) return grey_background def select_face(self, pose_landmarks: List[List[float]], face_landmarks: List[List[List[float]]]) -> List[List[float]]: """ Select the face that is closest to the pose nose landmark. Args: pose_landmarks: Pose landmarks from MediaPipe face_landmarks: List of face landmarks from MediaPipe Returns: Selected face landmarks """ nose_landmark_from_pose = pose_landmarks[0] # Nose from pose nose_landmarks_from_face = [face_landmarks[i][0] for i in range(len(face_landmarks))] # Find closest face based on nose landmark distances = [np.linalg.norm(np.array(nose_landmark_from_pose) - np.array(nose_landmark)) for nose_landmark in nose_landmarks_from_face] closest_nose_index = np.argmin(distances) return face_landmarks[closest_nose_index] def extract_face_frames(self, video_input, landmarks_data: Dict[int, Any]) -> List[np.ndarray]: """ Extract face frames from video based on landmarks. Args: video_input: Either a path to video file (str) or a decord.VideoReader object landmarks_data: Dictionary containing pose and face landmarks for each frame Returns: List of face frames as numpy arrays """ # Handle different input types if isinstance(video_input, str): video_path = Path(video_input) if not video_path.exists(): raise FileNotFoundError(f"Video file not found: {video_input}") video = decord.VideoReader(str(video_path)) # elif hasattr(video_input, '__len__') and hasattr(video_input, '__getitem__'): else: video = video_input # else: # raise TypeError("video_input must be either a file path (str) or a VideoReader object") face_frames = [] prev_face_frame = None prev_landmarks = None for i in range(len(video)): # frame = video[i].asnumpy() frame = video[i] if hasattr(video, 'seek'): video.seek(0) frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # Get landmarks for this frame frame_landmarks = landmarks_data.get(i, None) # Handle missing landmarks if frame_landmarks is None: if prev_landmarks is not None: frame_landmarks = prev_landmarks else: # Use blank frame if no landmarks available face_frames.append(np.zeros((*self.output_size, 3), dtype=np.uint8)) continue else: prev_landmarks = frame_landmarks # Check if pose landmarks exist if frame_landmarks.get('pose_landmarks') is None: if prev_face_frame is not None: face_frames.append(prev_face_frame) else: face_frames.append(np.zeros((*self.output_size, 3), dtype=np.uint8)) continue # Process face if face landmarks exist if frame_landmarks.get('face_landmarks') is not None: # Select the face closest to the pose selected_face = self.select_face( frame_landmarks['pose_landmarks'][0], frame_landmarks['face_landmarks'] ) # Create face frame with cues on grey background face_frame = self.cues_on_grey_background(frame_rgb, selected_face) face_frame = self.resize_frame(face_frame, self.output_size) face_frames.append(face_frame) prev_face_frame = face_frame elif prev_face_frame is not None: face_frames.append(prev_face_frame) else: # Use blank frame if no face landmarks face_frames.append(np.zeros((*self.output_size, 3), dtype=np.uint8)) return face_frames def extract_and_save_face_video(self, video_input, landmarks_data: Dict[int, Any], output_dir: str, video_name: Optional[str] = None) -> str: """ Extract face frames and save as video file. Args: video_input: Either a path to video file (str) or a decord.VideoReader object landmarks_data: Dictionary containing pose and face landmarks for each frame output_dir: Directory to save the face video video_name: Name for output video (auto-generated if not provided) Returns: Path to the saved face video """ # Handle video input and get FPS if isinstance(video_input, str): video_path = Path(video_input) if not video_path.exists(): raise FileNotFoundError(f"Video file not found: {video_input}") video = decord.VideoReader(str(video_path)) if video_name is None: video_name = video_path.stem # elif hasattr(video_input, '__len__') and hasattr(video_input, '__getitem__'): else: video = video_input if video_name is None: video_name = "video" # else: # raise TypeError("video_input must be either a file path (str) or a VideoReader object") fps = video.get_avg_fps() if hasattr(video, 'get_avg_fps') else 30.0 # Create output directory output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) # Define output path face_video_path = output_path / f"{video_name}_face.mp4" # Remove existing file if face_video_path.exists(): face_video_path.unlink() # Create video writer fourcc = cv2.VideoWriter_fourcc(*'mp4v') writer = cv2.VideoWriter(str(face_video_path), fourcc, fps, self.output_size) try: # Extract face frames face_frames = self.extract_face_frames(video, landmarks_data) # Write frames to video file for frame in face_frames: writer.write(frame) finally: # Clean up writer.release() del writer return str(face_video_path) # Convenience function for backward compatibility def extract_face_frames(video_input, landmarks_data: Dict[int, Any], output_size: Tuple[int, int] = (224, 224)) -> List[np.ndarray]: """ Convenience function to extract face frames from video. Args: video_input: Either a path to video file (str) or a decord.VideoReader object landmarks_data: Dictionary containing pose and face landmarks for each frame output_size: Size of the output face frames (width, height) Returns: List of face frames as numpy arrays """ extractor = FaceExtractor(output_size=output_size) return extractor.extract_face_frames(video_input, landmarks_data) def video_holistic(video_file: str, face_path: str, problem_file_path: str, pose_path: str): """ Original function for backward compatibility with command-line usage. """ try: video = decord.VideoReader(video_file) fps = video.get_avg_fps() video_name = Path(video_file).stem clip_face_path = Path(face_path) / f"{video_name}_face.mp4" landmark_json_path = Path(pose_path) / f"{video_name}_pose.json" # Load landmarks with open(landmark_json_path, 'r') as rd: landmarks_data = json.load(rd) # Convert string keys to integers landmarks_data = {int(k): v for k, v in landmarks_data.items()} # Extract face video extractor = FaceExtractor() extractor.extract_and_save_face_video(video, landmarks_data, face_path, video_name) except Exception as e: print(f"Error processing {video_file}: {e}") with open(problem_file_path, "a") as p: p.write(video_file + "\n") # Utility functions for batch processing def load_file(filename: str): """Load a pickled and gzipped file.""" with gzip.open(filename, "rb") as f: return pickle.load(f) def is_string_in_file(file_path: str, target_string: str) -> bool: """Check if a string exists in a file.""" try: with Path(file_path).open("r") as f: for line in f: if target_string in line: return True return False except Exception as e: print(f"Error: {e}") return False def main(): """Main function for command-line usage.""" parser = argparse.ArgumentParser() parser.add_argument('--index', type=int, required=True, help='index of the sub_list to work with') parser.add_argument('--batch_size', type=int, required=True, help='batch size') parser.add_argument('--time_limit', type=int, required=True, help='time limit') parser.add_argument('--files_list', type=str, required=True, help='files list') parser.add_argument('--problem_file_path', type=str, required=True, help='problem file path') parser.add_argument('--pose_path', type=str, required=True, help='pose path') parser.add_argument('--face_path', type=str, required=True, help='face path') args = parser.parse_args() start_time = time.time() # Load files list fixed_list = load_file(args.files_list) # Create problem file if it doesn't exist if not os.path.exists(args.problem_file_path): with open(args.problem_file_path, "w") as f: f.write("") # Process videos in batches video_batches = [fixed_list[i:i + args.batch_size] for i in range(0, len(fixed_list), args.batch_size)] for video_file in video_batches[args.index]: current_time = time.time() if current_time - start_time > args.time_limit: print("Time limit reached. Stopping execution.") break video_name = Path(video_file).stem clip_face_path = Path(args.face_path) / f"{video_name}_face.mp4" if clip_face_path.exists(): print(f"Skipping {video_file} - output already exists") continue elif is_string_in_file(args.problem_file_path, video_file): print(f"Skipping {video_file} - found in problem file") continue else: try: print(f"Processing {video_file}") video_holistic(video_file, args.face_path, args.problem_file_path, args.pose_path) print(f"Successfully processed {video_file}") except Exception as e: print(f"Error processing {video_file}: {e}") with open(args.problem_file_path, "a") as p: p.write(video_file + "\n") if __name__ == "__main__": main()