viralplay / handlers /frame_handler_yolo.py
phitran's picture
Update handlers/frame_handler_yolo.py
ac64bee verified
import os
import cv2
import shutil
from collections import deque
from ultralytics import YOLO # Assuming YOLOv8 library
import numpy as np
import functools
import time
def timer_decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
execution_time = end_time - start_time
print(f"{func.__name__} took {execution_time:.2f} seconds to execute")
return result
return wrapper
@timer_decorator
#no longer used in the new approach
def extract_key_frames(input_folder, key_frames_folder, original_fps, model_path='yolov8n.pt'):
"""
Detects frames containing a football and separates them into key frames.
Reduces file I/O by loading frames into memory before processing.
Optimizations:
- Reads all frames into memory once to avoid multiple disk reads.
- Uses OpenCV to write frames instead of shutil.copy (faster).
Args:
input_folder (str): Path to the folder containing input frames.
key_frames_folder (str): Path to save frames containing a football.
original_fps: original frames per second
model_path (str): Path to the YOLOv8 model file (default is yolov8n.pt).
"""
counter = 0
print("Extracting key frames with reduced file I/O...")
# Ensure the output directory exists
os.makedirs(key_frames_folder, exist_ok=True)
# Load YOLO model once
model = YOLO(model_path)
# Maintain last non-key frames for reclassification, max = original_fps
previous_nonkey_frames = deque(maxlen=original_fps)
processed_key_frames = set()
last_frame_was_key = False
# Load frames into memory first (Reduces file I/O), sort frames by file names
frame_names = sorted(os.listdir(input_folder))
frames = {}
for frame_name in frame_names:
if frame_name.lower().endswith(('.jpg', '.png')):
frame_path = os.path.join(input_folder, frame_name)
frames[frame_name] = cv2.imread(frame_path) # Load into RAM
for frame_name, frame in frames.items():
if frame is None:
continue # Skip invalid frames
counter += 1
if counter % 1000 == 0:
print(f"Processed {counter} frames.")
# Run YOLO inference
results = model.predict(frame, conf=0.1, verbose=False)
# Check if a football (sports ball) is detected
ball_detected = any(model.names[int(box.cls)] == "sports ball" for box in results[0].boxes)
if ball_detected:
# TTP: to-do crop the frame
# Reclassify up to {original_fps} previous non-key frames
if not last_frame_was_key:
for _ in range(min(len(previous_nonkey_frames), original_fps)):
nonkey_frame_name, nonkey_frame = previous_nonkey_frames.popleft()
if nonkey_frame_name not in processed_key_frames:
cv2.imwrite(os.path.join(key_frames_folder, nonkey_frame_name), nonkey_frame)
processed_key_frames.add(nonkey_frame_name)
previous_nonkey_frames.clear() # Reset after reclassification
# Save the current frame as a key frame if not already processed
if frame_name not in processed_key_frames:
cv2.imwrite(os.path.join(key_frames_folder, frame_name), frame)
processed_key_frames.add(frame_name)
last_frame_was_key = True
else:
previous_nonkey_frames.append((frame_name, frame))
last_frame_was_key = False
print("Key frame extraction complete (Optimized for File I/O).")
@timer_decorator
def crop_preserve_key_objects(input_folder, output_folder, model_path='yolov8n.pt', target_resolution=(360, 640)):
"""
Optimized version: Uses more RAM and reduces file I/O by storing frames in memory.
Args:
input_folder (str): Path to the folder containing key frames.
output_folder (str): Path to save the processed frames.
model_path (str): Path to the YOLOv8 model file.
target_resolution (tuple): Desired resolution (width, height), e.g., (360, 640).
"""
print("Preprocessing frames to fit the target aspect ratio (Optimized for RAM)...")
model = YOLO(model_path)
target_aspect_ratio = target_resolution[0] / target_resolution[1]
os.makedirs(output_folder, exist_ok=True)
# Sort frames by file name
frame_files = sorted([f for f in os.listdir(input_folder) if f.lower().endswith(('.jpg', '.png'))])
# Load all frames into memory
frames = {}
for frame_name in frame_files:
frame_path = os.path.join(input_folder, frame_name)
frames[frame_name] = cv2.imread(frame_path) # Read into RAM
last_cropping_area = None # Store the last cropping area
last_objects_detected = None # Track last detected object type
ball_counter = 0
counter = 0
for frame_name, frame in frames.items():
if frame is None:
print(f"Error reading frame: {frame_name}")
continue
counter += 1
if counter % 1000 == 0:
print(f"Processed {counter} frames...")
original_height, original_width = frame.shape[:2]
new_width = int(original_height * target_aspect_ratio)
new_height = int(original_width / target_aspect_ratio)
# YOLO inference
results = model.predict(frame, conf=0.1, verbose=False)
# Initialize variables
ball_detected = False
people_boxes = []
ball_box = None
# Process detections
for result in results[0].boxes:
label = result.cls
x_min, y_min, x_max, y_max = result.xyxy[0].cpu().numpy()
if model.names[int(label)] == "sports ball":
ball_detected = True
ball_box = (x_min, y_min, x_max, y_max)
ball_counter += 1
elif model.names[int(label)] == "person":
people_boxes.append((x_min, y_min, x_max, y_max))
# Determine whether to reuse the last cropping area
reuse_last_area = False
if last_cropping_area:
if ball_detected and last_objects_detected == "ball":
# Check if the ball is within the last cropping area
x_min, y_min, x_max, y_max = last_cropping_area
if ball_box and (ball_box[0] >= x_min and ball_box[1] >= y_min and
ball_box[2] <= x_max and ball_box[3] <= y_max):
reuse_last_area = True
elif people_boxes and last_objects_detected == "people":
reuse_last_area = True
if reuse_last_area:
x_min, y_min, x_max, y_max = last_cropping_area
else:
# Calculate a new cropping area
if ball_detected:
x_min, y_min, x_max, y_max = ball_box
last_objects_detected = "ball"
elif people_boxes:
x_min, y_min, x_max, y_max = calculate_largest_group_box(people_boxes, original_width, original_height)
last_objects_detected = "people"
else:
# Default to center cropping
x_center, y_center = original_width // 2, original_height // 2
new_width = int(original_height * target_aspect_ratio)
new_height = int(original_width / target_aspect_ratio)
x_min = max(0, x_center - new_width // 2)
y_min = max(0, y_center - new_height // 2)
x_max = min(original_width, x_min + new_width)
y_max = min(original_height, y_min + new_height)
# Ensure crop size matches target aspect ratio
if (x_max - x_min) < new_width:
x_min = max(0, x_max - new_width)
if (y_max - y_min) < new_height:
y_min = max(0, y_max - new_height)
last_cropping_area = (x_min, y_min, x_max, y_max)
# Crop and resize the frame
frame_cropped = frame[int(y_min):int(y_max), int(x_min):int(x_max)]
frame_resized = cv2.resize(frame_cropped, target_resolution, interpolation=cv2.INTER_CUBIC)
# Save processed frame
output_path = os.path.join(output_folder, frame_name)
cv2.imwrite(output_path, frame_resized)
print("Completed preprocessing (Optimized for RAM).")
print(f"Total frames processed: {len(frame_files)}")
print(f"Total frames detected with a sports ball: {ball_counter}")
def calculate_largest_group_box(people_boxes, original_width, original_height):
"""
Calculate the bounding box for the densest group of people.
Args:
people_boxes (list of tuples): List of bounding boxes for detected people.
Each box is (x_min, y_min, x_max, y_max).
original_width (int): Width of the original frame.
original_height (int): Height of the original frame.
Returns:
tuple: Bounding box (x_min, y_min, x_max, y_max) for the densest group of people.
"""
if not people_boxes:
return None # Return None if no people boxes are provided
# Get the center points of all bounding boxes
centers = np.array([(int((x1 + x2) / 2), int((y1 + y2) / 2)) for x1, y1, x2, y2 in people_boxes])
# Calculate pairwise distances between all centers
distances = np.linalg.norm(centers[:, None, :] - centers[None, :, :], axis=2)
# Define a distance threshold for clustering. Adjust this value if needed
threshold = max(original_width, original_height) * 0.2 # TTP adjusted to 0.2 to allow bigger distance
# Perform clustering using a flood-fill approach
clusters = []
visited = set()
for i, center in enumerate(centers):
if i in visited:
continue
cluster = [i]
queue = [i]
visited.add(i)
for j in range(len(centers)):
if j not in visited and distances[i, j] < threshold:
cluster.append(j)
visited.add(j)
clusters.append(cluster)
# Find the largest cluster based on the number of people
largest_cluster = max(clusters, key=len)
# Calculate the bounding box for the largest cluster
x_min = min(people_boxes[i][0] for i in largest_cluster)
y_min = min(people_boxes[i][1] for i in largest_cluster)
x_max = max(people_boxes[i][2] for i in largest_cluster)
y_max = max(people_boxes[i][3] for i in largest_cluster)
# Expand the bounding box slightly to include some context
#padding_x = int(original_width * 0.05) # 5% padding horizontally
#padding_y = int(original_height * 0.05) # 5% padding vertically
#x_min = max(0, x_min - padding_x)
#y_min = max(0, y_min - padding_y)
#x_max = min(original_width, x_max + padding_x)
#y_max = min(original_height, y_max + padding_y)
return x_min, y_min, x_max, y_max