import os import cv2 import shutil from collections import deque from ultralytics import YOLO # Assuming YOLOv8 library import numpy as np import functools import time def timer_decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): start_time = time.time() result = func(*args, **kwargs) end_time = time.time() execution_time = end_time - start_time print(f"{func.__name__} took {execution_time:.2f} seconds to execute") return result return wrapper @timer_decorator #no longer used in the new approach def extract_key_frames(input_folder, key_frames_folder, original_fps, model_path='yolov8n.pt'): """ Detects frames containing a football and separates them into key frames. Reduces file I/O by loading frames into memory before processing. Optimizations: - Reads all frames into memory once to avoid multiple disk reads. - Uses OpenCV to write frames instead of shutil.copy (faster). Args: input_folder (str): Path to the folder containing input frames. key_frames_folder (str): Path to save frames containing a football. original_fps: original frames per second model_path (str): Path to the YOLOv8 model file (default is yolov8n.pt). """ counter = 0 print("Extracting key frames with reduced file I/O...") # Ensure the output directory exists os.makedirs(key_frames_folder, exist_ok=True) # Load YOLO model once model = YOLO(model_path) # Maintain last non-key frames for reclassification, max = original_fps previous_nonkey_frames = deque(maxlen=original_fps) processed_key_frames = set() last_frame_was_key = False # Load frames into memory first (Reduces file I/O), sort frames by file names frame_names = sorted(os.listdir(input_folder)) frames = {} for frame_name in frame_names: if frame_name.lower().endswith(('.jpg', '.png')): frame_path = os.path.join(input_folder, frame_name) frames[frame_name] = cv2.imread(frame_path) # Load into RAM for frame_name, frame in frames.items(): if frame is None: continue # Skip invalid frames counter += 1 if counter % 1000 == 0: print(f"Processed {counter} frames.") # Run YOLO inference results = model.predict(frame, conf=0.7, verbose=False) # Check if a football (sports ball) is detected ball_detected = any(model.names[int(box.cls)] == "sports ball" for box in results[0].boxes) if ball_detected: # TTP: to-do crop the frame # Reclassify up to {original_fps} previous non-key frames if not last_frame_was_key: for _ in range(min(len(previous_nonkey_frames), original_fps)): nonkey_frame_name, nonkey_frame = previous_nonkey_frames.popleft() if nonkey_frame_name not in processed_key_frames: cv2.imwrite(os.path.join(key_frames_folder, nonkey_frame_name), nonkey_frame) processed_key_frames.add(nonkey_frame_name) previous_nonkey_frames.clear() # Reset after reclassification # Save the current frame as a key frame if not already processed if frame_name not in processed_key_frames: cv2.imwrite(os.path.join(key_frames_folder, frame_name), frame) processed_key_frames.add(frame_name) last_frame_was_key = True else: previous_nonkey_frames.append((frame_name, frame)) last_frame_was_key = False print("Key frame extraction complete (Optimized for File I/O).") @timer_decorator def crop_preserve_key_objects(input_folder, output_folder, model_path='yolov8n.pt', target_resolution=(360, 640)): """ Optimized version: Uses more RAM and reduces file I/O by storing frames in memory. Args: input_folder (str): Path to the folder containing key frames. output_folder (str): Path to save the processed frames. model_path (str): Path to the YOLOv8 model file. target_resolution (tuple): Desired resolution (width, height), e.g., (360, 640). """ print("Preprocessing frames to fit the target aspect ratio (Optimized for RAM)...") model = YOLO(model_path) target_aspect_ratio = target_resolution[0] / target_resolution[1] os.makedirs(output_folder, exist_ok=True) # Sort frames by file name frame_files = sorted([f for f in os.listdir(input_folder) if f.lower().endswith(('.jpg', '.png'))]) # Load all frames into memory frames = {} for frame_name in frame_files: frame_path = os.path.join(input_folder, frame_name) frames[frame_name] = cv2.imread(frame_path) # Read into RAM last_cropping_area = None # Store the last cropping area last_objects_detected = None # Track last detected object type ball_counter = 0 counter = 0 for frame_name, frame in frames.items(): if frame is None: print(f"Error reading frame: {frame_name}") continue counter += 1 if counter % 1000 == 0: print(f"Processed {counter} frames...") original_height, original_width = frame.shape[:2] new_width = int(original_height * target_aspect_ratio) new_height = int(original_width / target_aspect_ratio) # YOLO inference results = model.predict(frame, conf=0.7, verbose=False) # Initialize variables ball_detected = False people_boxes = [] ball_box = None # Process detections for result in results[0].boxes: label = result.cls x_min, y_min, x_max, y_max = result.xyxy[0].cpu().numpy() if model.names[int(label)] == "sports ball": ball_detected = True ball_box = (x_min, y_min, x_max, y_max) ball_counter += 1 elif model.names[int(label)] == "person": people_boxes.append((x_min, y_min, x_max, y_max)) # Determine whether to reuse the last cropping area reuse_last_area = False if last_cropping_area: if ball_detected and last_objects_detected == "ball": # Check if the ball is within the last cropping area x_min, y_min, x_max, y_max = last_cropping_area if ball_box and (ball_box[0] >= x_min and ball_box[1] >= y_min and ball_box[2] <= x_max and ball_box[3] <= y_max): reuse_last_area = True elif people_boxes and last_objects_detected == "people": reuse_last_area = True if reuse_last_area: x_min, y_min, x_max, y_max = last_cropping_area else: # Calculate a new cropping area if ball_detected: x_min, y_min, x_max, y_max = ball_box last_objects_detected = "ball" elif people_boxes: x_min, y_min, x_max, y_max = calculate_largest_group_box(people_boxes, original_width, original_height) last_objects_detected = "people" else: # Default to center cropping x_center, y_center = original_width // 2, original_height // 2 new_width = int(original_height * target_aspect_ratio) new_height = int(original_width / target_aspect_ratio) x_min = max(0, x_center - new_width // 2) y_min = max(0, y_center - new_height // 2) x_max = min(original_width, x_min + new_width) y_max = min(original_height, y_min + new_height) # Ensure crop size matches target aspect ratio if (x_max - x_min) < new_width: x_min = max(0, x_max - new_width) if (y_max - y_min) < new_height: y_min = max(0, y_max - new_height) last_cropping_area = (x_min, y_min, x_max, y_max) # Crop and resize the frame frame_cropped = frame[int(y_min):int(y_max), int(x_min):int(x_max)] frame_resized = cv2.resize(frame_cropped, target_resolution, interpolation=cv2.INTER_CUBIC) # Save processed frame output_path = os.path.join(output_folder, frame_name) cv2.imwrite(output_path, frame_resized) print("Completed preprocessing (Optimized for RAM).") print(f"Total frames processed: {len(frame_files)}") print(f"Total frames detected with a sports ball: {ball_counter}") def calculate_largest_group_box(people_boxes, original_width, original_height): """ Calculate the bounding box for the densest group of people. Args: people_boxes (list of tuples): List of bounding boxes for detected people. Each box is (x_min, y_min, x_max, y_max). original_width (int): Width of the original frame. original_height (int): Height of the original frame. Returns: tuple: Bounding box (x_min, y_min, x_max, y_max) for the densest group of people. """ if not people_boxes: return None # Return None if no people boxes are provided # Get the center points of all bounding boxes centers = np.array([(int((x1 + x2) / 2), int((y1 + y2) / 2)) for x1, y1, x2, y2 in people_boxes]) # Calculate pairwise distances between all centers distances = np.linalg.norm(centers[:, None, :] - centers[None, :, :], axis=2) # Define a distance threshold for clustering. Adjust this value if needed threshold = max(original_width, original_height) * 0.2 # TTP adjusted to 0.2 to allow bigger distance # Perform clustering using a flood-fill approach clusters = [] visited = set() for i, center in enumerate(centers): if i in visited: continue cluster = [i] queue = [i] visited.add(i) for j in range(len(centers)): if j not in visited and distances[i, j] < threshold: cluster.append(j) visited.add(j) clusters.append(cluster) # Find the largest cluster based on the number of people largest_cluster = max(clusters, key=len) # Calculate the bounding box for the largest cluster x_min = min(people_boxes[i][0] for i in largest_cluster) y_min = min(people_boxes[i][1] for i in largest_cluster) x_max = max(people_boxes[i][2] for i in largest_cluster) y_max = max(people_boxes[i][3] for i in largest_cluster) # Expand the bounding box slightly to include some context #padding_x = int(original_width * 0.05) # 5% padding horizontally #padding_y = int(original_height * 0.05) # 5% padding vertically #x_min = max(0, x_min - padding_x) #y_min = max(0, y_min - padding_y) #x_max = min(original_width, x_max + padding_x) #y_max = min(original_height, y_max + padding_y) return x_min, y_min, x_max, y_max