Spaces:
Running
Running
| # Copyright (c) 2023 Amphion. | |
| # | |
| # This source code is licensed under the MIT license found in the | |
| # LICENSE file in the root directory of this source tree. | |
| import os | |
| import json | |
| import pickle | |
| import glob | |
| from collections import defaultdict | |
| from tqdm import tqdm | |
| # Train: male 20 hours, female 10 hours | |
| TRAIN_MALE_MAX_SECONDS = 20 * 3600 | |
| TRAIN_FEMALE_MAX_SECONDS = 10 * 3600 | |
| TEST_MAX_NUM_EVERY_PERSON = 5 | |
| def select_sample_idxs(): | |
| chosen_speakers = get_chosen_speakers() | |
| with open(os.path.join(vctk_dir, "train.json"), "r") as f: | |
| raw_train = json.load(f) | |
| with open(os.path.join(vctk_dir, "test.json"), "r") as f: | |
| raw_test = json.load(f) | |
| train_idxs, test_idxs = [], [] | |
| # =========== Test =========== | |
| test_nums = defaultdict(int) | |
| for utt in tqdm(raw_train): | |
| idx = utt["index"] | |
| singer = utt["Singer"] | |
| if singer in chosen_speakers and test_nums[singer] < TEST_MAX_NUM_EVERY_PERSON: | |
| test_nums[singer] += 1 | |
| test_idxs.append("train_{}".format(idx)) | |
| for utt in tqdm(raw_test): | |
| idx = utt["index"] | |
| singer = utt["Singer"] | |
| if singer in chosen_speakers and test_nums[singer] < TEST_MAX_NUM_EVERY_PERSON: | |
| test_nums[singer] += 1 | |
| test_idxs.append("test_{}".format(idx)) | |
| # =========== Train =========== | |
| for utt in tqdm(raw_train): | |
| idx = utt["index"] | |
| singer = utt["Singer"] | |
| if singer in chosen_speakers and "train_{}".format(idx) not in test_idxs: | |
| train_idxs.append("train_{}".format(idx)) | |
| for utt in tqdm(raw_test): | |
| idx = utt["index"] | |
| singer = utt["Singer"] | |
| if singer in chosen_speakers and "test_{}".format(idx) not in test_idxs: | |
| train_idxs.append("test_{}".format(idx)) | |
| train_idxs.sort() | |
| test_idxs.sort() | |
| return train_idxs, test_idxs, raw_train, raw_test | |
| def statistics_of_speakers(): | |
| speaker2time = defaultdict(float) | |
| sex2time = defaultdict(float) | |
| with open(os.path.join(vctk_dir, "train.json"), "r") as f: | |
| train = json.load(f) | |
| with open(os.path.join(vctk_dir, "test.json"), "r") as f: | |
| test = json.load(f) | |
| for utt in train + test: | |
| # minutes | |
| speaker2time[utt["Singer"]] += utt["Duration"] | |
| # hours | |
| sex2time[utt["Singer"].split("_")[0]] += utt["Duration"] | |
| print( | |
| "Female: {:.2f} hours, Male: {:.2f} hours.\n".format( | |
| sex2time["female"] / 3600, sex2time["male"] / 3600 | |
| ) | |
| ) | |
| speaker2time = sorted(speaker2time.items(), key=lambda x: x[-1], reverse=True) | |
| for singer, seconds in speaker2time: | |
| print("{}\t{:.2f} mins".format(singer, seconds / 60)) | |
| return speaker2time | |
| def get_chosen_speakers(): | |
| speaker2time = statistics_of_speakers() | |
| chosen_time = defaultdict(float) | |
| chosen_speaker = defaultdict(list) | |
| train_constrait = { | |
| "male": TRAIN_MALE_MAX_SECONDS, | |
| "female": TRAIN_FEMALE_MAX_SECONDS, | |
| } | |
| for speaker, seconds in speaker2time: | |
| sex = speaker.split("_")[0] | |
| if chosen_time[sex] < train_constrait[sex]: | |
| chosen_time[sex] += seconds | |
| chosen_speaker[sex].append(speaker) | |
| speaker2time = dict(speaker2time) | |
| chosen_speaker = chosen_speaker["male"] + chosen_speaker["female"] | |
| print("\n#Chosen speakers = {}".format(len(chosen_speaker))) | |
| for spk in chosen_speaker: | |
| print("{}\t{:.2f} mins".format(spk, speaker2time[spk] / 60)) | |
| return chosen_speaker | |
| if __name__ == "__main__": | |
| root_path = "" | |
| vctk_dir = os.path.join(root_path, "vctk") | |
| fewspeaker_dir = os.path.join(root_path, "vctkfewspeaker") | |
| os.makedirs(fewspeaker_dir, exist_ok=True) | |
| train_idxs, test_idxs, raw_train, raw_test = select_sample_idxs() | |
| print("#Train = {}, #Test = {}".format(len(train_idxs), len(test_idxs))) | |
| # There are no data leakage | |
| assert len(set(train_idxs).intersection(set(test_idxs))) == 0 | |
| for idx in train_idxs + test_idxs: | |
| # No test data of raw vctk | |
| assert "test_" not in idx | |
| for split, chosen_idxs in zip(["train", "test"], [train_idxs, test_idxs]): | |
| print("{}: #chosen idx = {}\n".format(split, len(chosen_idxs))) | |
| # Select features | |
| feat_files = glob.glob("**/train.pkl", root_dir=vctk_dir, recursive=True) | |
| for file in tqdm(feat_files): | |
| raw_file = os.path.join(vctk_dir, file) | |
| new_file = os.path.join( | |
| fewspeaker_dir, file.replace("train.pkl", "{}.pkl".format(split)) | |
| ) | |
| new_dir = "/".join(new_file.split("/")[:-1]) | |
| os.makedirs(new_dir, exist_ok=True) | |
| if "mel_min" in file or "mel_max" in file: | |
| os.system("cp {} {}".format(raw_file, new_file)) | |
| continue | |
| with open(raw_file, "rb") as f: | |
| raw_feats = pickle.load(f) | |
| print("file: {}, #raw_feats = {}".format(file, len(raw_feats))) | |
| new_feats = [] | |
| for idx in chosen_idxs: | |
| chosen_split_is_train, raw_idx = idx.split("_") | |
| assert chosen_split_is_train == "train" | |
| new_feats.append(raw_feats[int(raw_idx)]) | |
| with open(new_file, "wb") as f: | |
| pickle.dump(new_feats, f) | |
| print("New file: {}, #new_feats = {}".format(new_file, len(new_feats))) | |
| # Utterance re-index | |
| news_utts = [raw_train[int(idx.split("_")[-1])] for idx in chosen_idxs] | |
| for i, utt in enumerate(news_utts): | |
| utt["Dataset"] = "vctkfewsinger" | |
| utt["index"] = i | |
| with open(os.path.join(fewspeaker_dir, "{}.json".format(split)), "w") as f: | |
| json.dump(news_utts, f, indent=4) | |