|
import subprocess |
|
from pathlib import Path |
|
import librosa |
|
from scipy.io import wavfile |
|
import numpy as np |
|
from demucs.pretrained import get_model, DEFAULT_MODEL |
|
from demucs.apply import apply_model |
|
import torch |
|
import csv |
|
import whisper |
|
|
|
|
|
def download_youtube_clip(video_identifier, start_time, end_time, output_filename, num_attempts=5, url_base="https://www.youtube.com/watch?v="): |
|
status = False |
|
|
|
output_path = Path(output_filename) |
|
if output_path.exists(): |
|
return True, "Already Downloaded" |
|
|
|
command = f""" |
|
yt-dlp --quiet --no-warnings -x --audio-format wav -f bestaudio -o "{output_filename}" --download-sections "*{start_time}-{end_time}" "{url_base}{video_identifier}" |
|
""".strip() |
|
|
|
attempts = 0 |
|
while True: |
|
try: |
|
output = subprocess.check_output(command, shell=True, stderr=subprocess.STDOUT) |
|
except subprocess.CalledProcessError as err: |
|
attempts += 1 |
|
if attempts == num_attempts: |
|
return status, err.output |
|
else: |
|
break |
|
|
|
status = output_path.exists() |
|
return status, "Downloaded" |
|
|
|
|
|
def split_long_audio(model, filepaths, character_name, save_dir="data_dir", out_sr=44100): |
|
if isinstance(filepaths, str): |
|
filepaths = [filepaths] |
|
|
|
for file_idx, filepath in enumerate(filepaths): |
|
|
|
save_path = Path(save_dir) / character_name |
|
save_path.mkdir(exist_ok=True, parents=True) |
|
|
|
print(f"Transcribing file {file_idx}: '{filepath}' to segments...") |
|
result = model.transcribe(filepath, word_timestamps=True, task="transcribe", beam_size=5, best_of=5) |
|
segments = result['segments'] |
|
|
|
wav, sr = librosa.load(filepath, sr=None, offset=0, duration=None, mono=True) |
|
wav, _ = librosa.effects.trim(wav, top_db=20) |
|
peak = np.abs(wav).max() |
|
if peak > 1.0: |
|
wav = 0.98 * wav / peak |
|
wav2 = librosa.resample(wav, orig_sr=sr, target_sr=out_sr) |
|
wav2 /= max(wav2.max(), -wav2.min()) |
|
|
|
for i, seg in enumerate(segments): |
|
start_time = seg['start'] |
|
end_time = seg['end'] |
|
wav_seg = wav2[int(start_time * out_sr):int(end_time * out_sr)] |
|
wav_seg_name = f"{character_name}_{file_idx}_{i}.wav" |
|
out_fpath = save_path / wav_seg_name |
|
wavfile.write(out_fpath, rate=out_sr, data=(wav_seg * np.iinfo(np.int16).max).astype(np.int16)) |
|
|
|
|
|
def extract_vocal_demucs(model, filename, out_filename, sr=44100, device=None, shifts=1, split=True, overlap=0.25, jobs=0): |
|
wav, sr = librosa.load(filename, mono=False, sr=sr) |
|
wav = torch.tensor(wav) |
|
ref = wav.mean(0) |
|
wav = (wav - ref.mean()) / ref.std() |
|
sources = apply_model( |
|
model, |
|
wav[None], |
|
device=device, |
|
shifts=shifts, |
|
split=split, |
|
overlap=overlap, |
|
progress=True, |
|
num_workers=jobs |
|
)[0] |
|
sources = sources * ref.std() + ref.mean() |
|
|
|
wav = sources[-1] |
|
wav = wav / max(1.01 * wav.abs().max(), 1) |
|
wavfile.write(out_filename, rate=sr, data=wav.numpy().T) |
|
return out_filename |
|
|
|
|
|
def main( |
|
clips_csv_filepath = "data.csv", |
|
character = "somebody", |
|
do_extract_vocals = False, |
|
whisper_size = "medium", |
|
|
|
dl_dir = "downloads", |
|
|
|
data_dir = "dataset_raw", |
|
**kwargs |
|
): |
|
dl_path = Path(dl_dir) / character |
|
dl_path.mkdir(exist_ok=True, parents=True) |
|
if do_extract_vocals: |
|
demucs_model = get_model(DEFAULT_MODEL) |
|
|
|
with Path(clips_csv_filepath).open() as f: |
|
reader = csv.DictReader(f) |
|
for i, row in enumerate(reader): |
|
outfile_path = dl_path / f"{character}_{i:04d}.wav" |
|
download_youtube_clip(row['ytid'], row['start'], row['end'], outfile_path) |
|
if do_extract_vocals: |
|
extract_vocal_demucs(demucs_model, outfile_path, outfile_path) |
|
|
|
filenames = sorted([str(x) for x in dl_path.glob("*.wav")]) |
|
whisper_model = whisper.load_model(whisper_size) |
|
split_long_audio(whisper_model, filenames, character, data_dir) |
|
|
|
|
|
if __name__ == '__main__': |
|
import json |
|
cfg = json.loads(Path('dataset_config.json').read_text()) |
|
main(**cfg) |
|
|