Spaces:
Runtime error
Runtime error
| import moviepy.editor as mp | |
| from pyannote.audio import Pipeline | |
| import torch | |
| import torchaudio | |
| from pyannote.audio import Pipeline | |
| from pyannote.core import Segment | |
| from pyannote.audio import Model | |
| import os | |
| import numpy as np | |
| def extract_audio_from_video(video_path): | |
| video = mp.VideoFileClip(video_path) | |
| audio_path = video_path.rsplit('.', 1)[0] + '.wav' | |
| video.audio.write_audiofile(audio_path) | |
| return audio_path | |
| def diarize_speakers(audio_path): | |
| hf_token = os.environ.get("py_annote_hf_token") | |
| if not hf_token: | |
| raise ValueError("py_annote_hf_token environment variable is not set. Please check your Hugging Face Space's Variables and secrets section.") | |
| pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1", use_auth_token=hf_token) | |
| diarization = pipeline(audio_path) | |
| # Identify the most frequent speaker | |
| speaker_segments = {} | |
| for turn, _, speaker in diarization.itertracks(yield_label=True): | |
| if speaker not in speaker_segments: | |
| speaker_segments[speaker] = 0 | |
| speaker_segments[speaker] += turn.end - turn.start | |
| most_frequent_speaker = max(speaker_segments, key=speaker_segments.get) | |
| return diarization, most_frequent_speaker | |
| def get_speaker_embeddings(audio_path, diarization, most_frequent_speaker, model_name="pyannote/embedding"): | |
| model = Model.from_pretrained(model_name, use_auth_token=os.environ.get("py_annote_hf_token")) | |
| waveform, sample_rate = torchaudio.load(audio_path) | |
| duration = waveform.shape[1] / sample_rate | |
| # Convert stereo to mono if necessary | |
| if waveform.shape[0] == 2: | |
| waveform = torch.mean(waveform, dim=0, keepdim=True) | |
| # Minimum segment duration (in seconds) | |
| min_segment_duration = 0.5 | |
| min_segment_length = int(min_segment_duration * sample_rate) | |
| embeddings = [] | |
| for turn, _, speaker in diarization.itertracks(yield_label=True): | |
| if speaker != most_frequent_speaker: | |
| continue | |
| start_frame = int(turn.start * sample_rate) | |
| end_frame = int(turn.end * sample_rate) | |
| segment = waveform[:, start_frame:end_frame] | |
| if segment.shape[1] > 0: | |
| # Pad short segments | |
| if segment.shape[1] < min_segment_length: | |
| padding = torch.zeros(1, min_segment_length - segment.shape[1]) | |
| segment = torch.cat([segment, padding], dim=1) | |
| # Split long segments | |
| for i in range(0, segment.shape[1], min_segment_length): | |
| sub_segment = segment[:, i:i+min_segment_length] | |
| if sub_segment.shape[1] < min_segment_length: | |
| padding = torch.zeros(1, min_segment_length - sub_segment.shape[1]) | |
| sub_segment = torch.cat([sub_segment, padding], dim=1) | |
| # Ensure the segment is on the correct device | |
| sub_segment = sub_segment.to(model.device) | |
| with torch.no_grad(): | |
| embedding = model(sub_segment) | |
| embeddings.append({ | |
| "time": turn.start + i / sample_rate, | |
| "duration": min_segment_duration, | |
| "embedding": embedding.cpu().numpy(), | |
| "speaker": speaker | |
| }) | |
| # Ensure embeddings cover the entire duration | |
| if embeddings and embeddings[-1]['time'] + embeddings[-1]['duration'] < duration: | |
| embeddings.append({ | |
| "time": duration, | |
| "duration": 0, | |
| "embedding": np.zeros_like(embeddings[0]['embedding']), | |
| "speaker": "silence" | |
| }) | |
| return embeddings, duration | |
| # Ensure embeddings cover the entire duration | |
| if embeddings and embeddings[-1]['time'] + embeddings[-1]['duration'] < duration: | |
| embeddings.append({ | |
| "time": duration, | |
| "duration": 0, | |
| "embedding": np.zeros_like(embeddings[0]['embedding']), | |
| "speaker": "silence" | |
| }) | |
| return embeddings, duration | |
| def align_voice_embeddings(voice_embeddings, frame_count, fps, audio_duration): | |
| aligned_embeddings = [] | |
| current_embedding_index = 0 | |
| for frame in range(frame_count): | |
| frame_time = frame / fps | |
| while (current_embedding_index < len(voice_embeddings) - 1 and | |
| voice_embeddings[current_embedding_index + 1]["time"] <= frame_time): | |
| current_embedding_index += 1 | |
| aligned_embeddings.append(voice_embeddings[current_embedding_index]["embedding"].flatten()) | |
| return aligned_embeddings |