|
import gradio as gr |
|
import os |
|
import subprocess |
|
import numpy as np |
|
import torch |
|
import torch.nn.functional as F |
|
import librosa |
|
import av |
|
from transformers import VivitImageProcessor, VivitForVideoClassification |
|
from transformers import AutoConfig, Wav2Vec2ForSequenceClassification, AutoFeatureExtractor |
|
from moviepy.editor import VideoFileClip |
|
|
|
def get_emotion_from_filename(filename): |
|
parts = filename.split('-') |
|
emotion_code = int(parts[2]) |
|
emotion_labels = { |
|
1: 'neutral', |
|
3: 'happy', |
|
4: 'sad', |
|
5: 'angry', |
|
6: 'fearful', |
|
7: 'disgust' |
|
} |
|
return emotion_labels.get(emotion_code, None) |
|
|
|
def separate_video_audio(file_path): |
|
output_dir = './temp/' |
|
if not os.path.exists(output_dir): |
|
os.makedirs(output_dir) |
|
video_path = os.path.join(output_dir, os.path.basename(file_path).replace('.mp4', '_video.mp4')) |
|
audio_path = os.path.join(output_dir, os.path.basename(file_path).replace('.mp4', '_audio.wav')) |
|
|
|
video_cmd = ['ffmpeg', '-loglevel', 'quiet', '-i', file_path, '-an', '-c:v', 'libx264', '-preset', 'ultrafast', video_path] |
|
subprocess.run(video_cmd, check=True) |
|
|
|
audio_cmd = ['ffmpeg', '-loglevel', 'quiet', '-i', file_path, '-vn', '-acodec', 'pcm_s16le', '-ar', '16000', audio_path] |
|
subprocess.run(audio_cmd, check=True) |
|
|
|
return video_path, audio_path |
|
|
|
def delete_files_in_directory(directory): |
|
for filename in os.listdir(directory): |
|
file_path = os.path.join(directory, filename) |
|
try: |
|
if os.path.isfile(file_path): |
|
os.remove(file_path) |
|
except Exception as e: |
|
print(f"Failed to delete {file_path}. Reason: {e}") |
|
|
|
def get_total_frames(container): |
|
stream = container.streams.video[0] |
|
total_frames = stream.frames |
|
return total_frames |
|
|
|
def process_video(file_path): |
|
container = av.open(file_path) |
|
total_frames = get_total_frames(container) |
|
|
|
if total_frames < 64: |
|
container.close() |
|
raise ValueError("Video must have at least 64 frames.") |
|
|
|
indices = sample_frame_indices(clip_len=32, frame_sample_rate=2, seg_len=total_frames) |
|
video = read_video_pyav(container=container, indices=indices) |
|
container.close() |
|
return video |
|
|
|
def read_video_pyav(container, indices): |
|
frames = [] |
|
container.seek(0) |
|
start_index = indices[0] |
|
end_index = indices[-1] |
|
for i, frame in enumerate(container.decode(video=0)): |
|
if i > end_index: |
|
break |
|
if i >= start_index and i in indices: |
|
frame = frame.reformat(width=224, height=224) |
|
frames.append(frame) |
|
return np.stack([x.to_ndarray(format="rgb24") for x in frames]) |
|
|
|
def sample_frame_indices(clip_len, frame_sample_rate, seg_len): |
|
converted_len = int(clip_len * frame_sample_rate) |
|
end_idx = np.random.randint(converted_len, seg_len) |
|
start_idx = end_idx - converted_len |
|
indices = np.linspace(start_idx, end_idx, num=clip_len) |
|
indices = np.clip(indices, start_idx, end_idx - 1).astype(np.int64) |
|
return indices |
|
|
|
def video_label_to_emotion(label): |
|
label_map = {0: 'neutral', 1: 'happy', 2: 'sad', 3: 'angry', 4: 'fearful', 5: 'disgust'} |
|
label_index = int(label.split('_')[1]) |
|
return label_map.get(label_index, "Unknown Label") |
|
|
|
def predict_video(file_path, video_model, image_processor): |
|
video = process_video(file_path) |
|
inputs = image_processor(list(video), return_tensors="pt") |
|
device = torch.device("cpu") |
|
inputs = inputs.to(device) |
|
|
|
with torch.no_grad(): |
|
outputs = video_model(**inputs) |
|
logits = outputs.logits |
|
probs = F.softmax(logits, dim=-1).squeeze() |
|
|
|
emotion_probabilities = {video_label_to_emotion(video_model.config.id2label[idx]): float(prob) for idx, prob in enumerate(probs)} |
|
return emotion_probabilities |
|
|
|
def audio_label_to_emotion(label): |
|
label_map = {0: 'angry', 1: 'disgust', 2: 'fearful', 3: 'happy', 4: 'neutral', 5: 'sad'} |
|
label_index = int(label.split('_')[1]) |
|
return label_map.get(label_index, "Unknown Label") |
|
|
|
def preprocess_and_predict_audio(file_path, model, processor): |
|
audio_array, _ = librosa.load(file_path, sr=16000) |
|
inputs = processor(audio_array, sampling_rate=16000, return_tensors="pt", padding=True, max_length=75275) |
|
device = torch.device("cpu") |
|
model = model.to(device) |
|
inputs = {k: v.to(device) for k, v in inputs.items()} |
|
|
|
with torch.no_grad(): |
|
output = model(**inputs) |
|
logits = output.logits |
|
probabilities = F.softmax(logits, dim=-1) |
|
emotion_probabilities = {audio_label_to_emotion(model.config.id2label[idx]): float(prob) for idx, prob in enumerate(probabilities[0])} |
|
return emotion_probabilities |
|
|
|
def averaging_method(video_prediction, audio_prediction): |
|
combined_probabilities = {} |
|
for label in set(video_prediction) | set(audio_prediction): |
|
combined_probabilities[label] = (video_prediction.get(label, 0) + audio_prediction.get(label, 0)) / 2 |
|
consensus_label = max(combined_probabilities, key=combined_probabilities.get) |
|
return consensus_label |
|
|
|
def weighted_average_method(video_prediction, audio_prediction, video_weight): |
|
audio_weight = 0.6 |
|
combined_probabilities = {} |
|
for label in set(video_prediction) | set(audio_prediction): |
|
video_prob = video_prediction.get(label, 0) |
|
audio_prob = audio_prediction.get(label, 0) |
|
combined_probabilities[label] = (video_weight * video_prob + audio_weight * audio_prob) / (video_weight + audio_weight) |
|
consensus_label = max(combined_probabilities, key=combined_probabilities.get) |
|
return consensus_label |
|
|
|
def confidence_level_method(video_prediction, audio_prediction, threshold=0.7): |
|
highest_video_label = max(video_prediction, key=video_prediction.get) |
|
highest_video_confidence = video_prediction[highest_video_label] |
|
if (highest_video_confidence >= threshold): |
|
return highest_video_label |
|
combined_probabilities = {} |
|
for label in set(video_prediction) | set(audio_prediction): |
|
video_prob = video_prediction.get(label, 0) |
|
audio_prob = audio_prediction.get(label, 0) |
|
combined_probabilities[label] = (video_prob + audio_prob) / 2 |
|
return max(combined_probabilities, key=combined_probabilities.get) |
|
|
|
def dynamic_weighting_method(video_prediction, audio_prediction): |
|
combined_probabilities = {} |
|
for label in set(video_prediction) | set(audio_prediction): |
|
video_prob = video_prediction.get(label, 0) |
|
audio_prob = audio_prediction.get(label, 0) |
|
video_confidence = video_prob / sum(video_prediction.values()) |
|
audio_confidence = audio_prob / sum(audio_prediction.values()) |
|
video_weight = video_confidence / (video_confidence + audio_confidence) |
|
audio_weight = audio_confidence / (video_confidence + audio_confidence) |
|
combined_probabilities[label] = (video_weight * video_prob + audio_weight * audio_prob) |
|
return max(combined_probabilities, key=combined_probabilities.get) |
|
|
|
def rule_based_method(video_prediction, audio_prediction, threshold=0.5): |
|
highest_video_label = max(video_prediction, key=video_prediction.get) |
|
highest_audio_label = max(audio_prediction, key=audio_prediction.get) |
|
video_confidence = video_prediction[highest_video_label] / sum(video_prediction.values()) |
|
audio_confidence = audio_prediction[highest_audio_label] / sum(audio_prediction.values()) |
|
combined_probabilities = {} |
|
for label in set(video_prediction) | set(audio_prediction): |
|
video_prob = video_prediction.get(label, 0) |
|
audio_prob = audio_prediction.get(label, 0) |
|
combined_probabilities[label] = (video_prob + audio_prob) / 2 |
|
if (highest_video_label == highest_audio_label and video_confidence > threshold and audio_confidence > threshold): |
|
return highest_video_label |
|
elif video_confidence > audio_confidence: |
|
return highest_video_label |
|
elif audio_confidence > video_confidence: |
|
return highest_audio_label |
|
return max(combined_probabilities, key=combined_probabilities.get) |
|
|
|
decision_frameworks = { |
|
"Averaging": averaging_method, |
|
"Weighted Average": weighted_average_method, |
|
"Confidence Level": confidence_level_method, |
|
"Dynamic Weighting": dynamic_weighting_method, |
|
"Rule-Based": rule_based_method |
|
} |
|
|
|
def predict(video_file, video_model_name, audio_model_name, framework_name): |
|
|
|
image_processor = VivitImageProcessor.from_pretrained("google/vivit-b-16x2-kinetics400") |
|
if video_model_name == "60% Accuracy": |
|
video_model = torch.load("video_model_60_acc.pth", map_location=torch.device('cpu')) |
|
elif video_model_name == "80% Accuracy": |
|
video_model = torch.load("video_model_80_acc.pth", map_location=torch.device('cpu')) |
|
|
|
model_id = "facebook/wav2vec2-large" |
|
config = AutoConfig.from_pretrained(model_id, num_labels=6) |
|
audio_processor = AutoFeatureExtractor.from_pretrained(model_id) |
|
audio_model = Wav2Vec2ForSequenceClassification.from_pretrained(model_id, config=config) |
|
if audio_model_name == "60% Accuracy": |
|
audio_model.load_state_dict(torch.load("audio_model_state_dict_6e.pth", map_location=torch.device('cpu'))) |
|
audio_model.eval() |
|
|
|
delete_directory_path = "./temp/" |
|
|
|
try: |
|
video_path, audio_path = separate_video_audio(video_file) |
|
|
|
video_prediction = predict_video(video_path, video_model, image_processor) |
|
|
|
highest_video_emotion = max(video_prediction, key=video_prediction.get) |
|
|
|
audio_prediction = preprocess_and_predict_audio(audio_path, audio_model, audio_processor) |
|
|
|
highest_audio_emotion = max(audio_prediction, key=audio_prediction.get) |
|
|
|
framework_function = decision_frameworks[framework_name] |
|
|
|
if framework_function == weighted_average_method and video_model_name == "60% Accuracy": |
|
consensus_label = framework_function(video_prediction, audio_prediction, 0.6) |
|
elif framework_function == weighted_average_method and video_model_name == "80% Accuracy": |
|
consensus_label = framework_function(video_prediction, audio_prediction, 0.88) |
|
else: |
|
consensus_label = framework_function(video_prediction, audio_prediction) |
|
|
|
delete_files_in_directory(delete_directory_path) |
|
|
|
result = f""" |
|
<h2>Predictions</h2> |
|
<p><strong>Video Label:</strong> {highest_video_emotion}</p> |
|
<p><strong>Audio Label:</strong> {highest_audio_emotion}</p> |
|
<p><strong>Consensus Label:</strong> {consensus_label}</p> |
|
""" |
|
|
|
except ValueError as e: |
|
result = f""" |
|
<h2>Error</h2> |
|
<p>{str(e)}</p> |
|
""" |
|
|
|
return result |
|
|
|
inputs = [ |
|
gr.Video(label="Upload Video"), |
|
gr.Dropdown(["60% Accuracy", "80% Accuracy"], label="Select Video Model"), |
|
gr.Dropdown(["60% Accuracy"], label="Select Audio Model"), |
|
gr.Dropdown(list(decision_frameworks.keys()), label="Select Decision Framework") |
|
] |
|
|
|
outputs = [ |
|
gr.HTML(label="Predictions") |
|
] |
|
|
|
iface = gr.Interface( |
|
fn=predict, |
|
inputs=inputs, |
|
outputs=outputs, |
|
examples=[ |
|
["./Angry.mp4", "60% Accuracy", "60% Accuracy", "Averaging"], |
|
["./Disgust.mp4", "80% Accuracy", "60% Accuracy", "Weighted Average"], |
|
["./Fearful.mp4", "60% Accuracy", "60% Accuracy", "Confidence Level"], |
|
["./Happy.mp4", "80% Accuracy", "60% Accuracy", "Dynamic Weighting"], |
|
["./Neutral.mp4", "80% Accuracy", "60% Accuracy", "Rule-Based"], |
|
["./Sad.mp4", "60% Accuracy", "60% Accuracy", "Weighted Average"] |
|
], |
|
title="Video and Audio Emotion Prediction", |
|
description="Upload a video to get emotion predictions from selected video and audio models. Example videos are from the RAVDESS dataset." |
|
) |
|
|
|
iface.launch(debug=True, share=True) |
|
|