import subprocess from pathlib import Path import librosa from scipy.io import wavfile import numpy as np from demucs.pretrained import get_model, DEFAULT_MODEL from demucs.apply import apply_model import torch import csv import whisper def download_youtube_clip(video_identifier, start_time, end_time, output_filename, num_attempts=5, url_base="https://www.youtube.com/watch?v="): status = False output_path = Path(output_filename) if output_path.exists(): return True, "Already Downloaded" command = f""" yt-dlp --quiet --no-warnings -x --audio-format wav -f bestaudio -o "{output_filename}" --download-sections "*{start_time}-{end_time}" "{url_base}{video_identifier}" """.strip() attempts = 0 while True: try: output = subprocess.check_output(command, shell=True, stderr=subprocess.STDOUT) except subprocess.CalledProcessError as err: attempts += 1 if attempts == num_attempts: return status, err.output else: break status = output_path.exists() return status, "Downloaded" def split_long_audio(model, filepaths, character_name, save_dir="data_dir", out_sr=44100): if isinstance(filepaths, str): filepaths = [filepaths] for file_idx, filepath in enumerate(filepaths): save_path = Path(save_dir) / character_name save_path.mkdir(exist_ok=True, parents=True) print(f"Transcribing file {file_idx}: '{filepath}' to segments...") result = model.transcribe(filepath, word_timestamps=True, task="transcribe", beam_size=5, best_of=5) segments = result['segments'] wav, sr = librosa.load(filepath, sr=None, offset=0, duration=None, mono=True) wav, _ = librosa.effects.trim(wav, top_db=20) peak = np.abs(wav).max() if peak > 1.0: wav = 0.98 * wav / peak wav2 = librosa.resample(wav, orig_sr=sr, target_sr=out_sr) wav2 /= max(wav2.max(), -wav2.min()) for i, seg in enumerate(segments): start_time = seg['start'] end_time = seg['end'] wav_seg = wav2[int(start_time * out_sr):int(end_time * out_sr)] wav_seg_name = f"{character_name}_{file_idx}_{i}.wav" out_fpath = save_path / wav_seg_name wavfile.write(out_fpath, rate=out_sr, data=(wav_seg * np.iinfo(np.int16).max).astype(np.int16)) def extract_vocal_demucs(model, filename, out_filename, sr=44100, device=None, shifts=1, split=True, overlap=0.25, jobs=0): wav, sr = librosa.load(filename, mono=False, sr=sr) wav = torch.tensor(wav) ref = wav.mean(0) wav = (wav - ref.mean()) / ref.std() sources = apply_model( model, wav[None], device=device, shifts=shifts, split=split, overlap=overlap, progress=True, num_workers=jobs )[0] sources = sources * ref.std() + ref.mean() wav = sources[-1] wav = wav / max(1.01 * wav.abs().max(), 1) wavfile.write(out_filename, rate=sr, data=wav.numpy().T) return out_filename def main( clips_csv_filepath = "theovon.csv", character = "theovon", do_extract_vocals = False, whisper_size = "medium", # Where raw yt clips will be downloaded to dl_dir = "raw_data", # Where actual data will be organized data_dir = "prepared_data", ): dl_path = Path(dl_dir) / character dl_path.mkdir(exist_ok=True, parents=True) if do_extract_vocals: demucs_model = get_model(DEFAULT_MODEL) with Path(clips_csv_filepath).open() as f: reader = csv.DictReader(f) for i, row in enumerate(reader): outfile_path = dl_path / f"{character}_{i:04d}.wav" download_youtube_clip(row['ytid'], row['start'], row['end'], outfile_path) if do_extract_vocals: extract_vocal_demucs(demucs_model, outfile_path, outfile_path) filenames = sorted([str(x) for x in dl_path.glob("*.wav")]) whisper_model = whisper.load_model(whisper_size) split_long_audio(whisper_model, filenames, character, data_dir) if __name__ == '__main__': import json cfg = json.loads(Path('dataset_config.json').read_text()) main(**cfg)