Spaces:
Running
Running
import json | |
import logging | |
import os | |
from pathlib import Path | |
from typing import Any, Mapping, Optional, Union | |
import librosa | |
import numpy as np | |
import numpy.typing as npt | |
import torch | |
from coqpit import Coqpit | |
from torch import nn | |
from torch.nn import functional as F | |
from trainer.io import load_fsspec | |
from TTS.tts.layers.vits.networks import PosteriorEncoder | |
from TTS.tts.utils.speakers import SpeakerManager | |
from TTS.utils.audio.torch_transforms import wav_to_spec | |
from TTS.vc.configs.openvoice_config import OpenVoiceConfig | |
from TTS.vc.models.base_vc import BaseVC | |
from TTS.vc.models.freevc import Generator, ResidualCouplingBlock | |
logger = logging.getLogger(__name__) | |
class ReferenceEncoder(nn.Module): | |
"""NN module creating a fixed size prosody embedding from a spectrogram. | |
inputs: mel spectrograms [batch_size, num_spec_frames, num_mel] | |
outputs: [batch_size, embedding_dim] | |
""" | |
def __init__(self, spec_channels: int, embedding_dim: int = 0, layernorm: bool = True) -> None: | |
super().__init__() | |
self.spec_channels = spec_channels | |
ref_enc_filters = [32, 32, 64, 64, 128, 128] | |
K = len(ref_enc_filters) | |
filters = [1] + ref_enc_filters | |
convs = [ | |
torch.nn.utils.parametrizations.weight_norm( | |
nn.Conv2d( | |
in_channels=filters[i], | |
out_channels=filters[i + 1], | |
kernel_size=(3, 3), | |
stride=(2, 2), | |
padding=(1, 1), | |
) | |
) | |
for i in range(K) | |
] | |
self.convs = nn.ModuleList(convs) | |
out_channels = self.calculate_channels(spec_channels, 3, 2, 1, K) | |
self.gru = nn.GRU( | |
input_size=ref_enc_filters[-1] * out_channels, | |
hidden_size=256 // 2, | |
batch_first=True, | |
) | |
self.proj = nn.Linear(128, embedding_dim) | |
self.layernorm = nn.LayerNorm(self.spec_channels) if layernorm else None | |
def forward(self, inputs: torch.Tensor) -> torch.Tensor: | |
N = inputs.size(0) | |
out = inputs.view(N, 1, -1, self.spec_channels) # [N, 1, Ty, n_freqs] | |
if self.layernorm is not None: | |
out = self.layernorm(out) | |
for conv in self.convs: | |
out = conv(out) | |
out = F.relu(out) # [N, 128, Ty//2^K, n_mels//2^K] | |
out = out.transpose(1, 2) # [N, Ty//2^K, 128, n_mels//2^K] | |
T = out.size(1) | |
N = out.size(0) | |
out = out.contiguous().view(N, T, -1) # [N, Ty//2^K, 128*n_mels//2^K] | |
self.gru.flatten_parameters() | |
_memory, out = self.gru(out) # out --- [1, N, 128] | |
return self.proj(out.squeeze(0)) | |
def calculate_channels(self, L: int, kernel_size: int, stride: int, pad: int, n_convs: int) -> int: | |
for _ in range(n_convs): | |
L = (L - kernel_size + 2 * pad) // stride + 1 | |
return L | |
class OpenVoice(BaseVC): | |
""" | |
OpenVoice voice conversion model (inference only). | |
Source: https://github.com/myshell-ai/OpenVoice | |
Paper: https://arxiv.org/abs/2312.01479 | |
Paper abstract: | |
We introduce OpenVoice, a versatile voice cloning approach that requires | |
only a short audio clip from the reference speaker to replicate their voice and | |
generate speech in multiple languages. OpenVoice represents a significant | |
advancement in addressing the following open challenges in the field: 1) | |
Flexible Voice Style Control. OpenVoice enables granular control over voice | |
styles, including emotion, accent, rhythm, pauses, and intonation, in addition | |
to replicating the tone color of the reference speaker. The voice styles are not | |
directly copied from and constrained by the style of the reference speaker. | |
Previous approaches lacked the ability to flexibly manipulate voice styles after | |
cloning. 2) Zero-Shot Cross-Lingual Voice Cloning. OpenVoice achieves zero-shot | |
cross-lingual voice cloning for languages not included in the massive-speaker | |
training set. Unlike previous approaches, which typically require extensive | |
massive-speaker multi-lingual (MSML) dataset for all languages, OpenVoice can | |
clone voices into a new language without any massive-speaker training data for | |
that language. OpenVoice is also computationally efficient, costing tens of | |
times less than commercially available APIs that offer even inferior | |
performance. To foster further research in the field, we have made the source | |
code and trained model publicly accessible. We also provide qualitative results | |
in our demo website. Prior to its public release, our internal version of | |
OpenVoice was used tens of millions of times by users worldwide between May and | |
October 2023, serving as the backend of MyShell. | |
""" | |
def __init__(self, config: Coqpit, speaker_manager: Optional[SpeakerManager] = None) -> None: | |
super().__init__(config, None, speaker_manager, None) | |
self.init_multispeaker(config) | |
self.zero_g = self.args.zero_g | |
self.inter_channels = self.args.inter_channels | |
self.hidden_channels = self.args.hidden_channels | |
self.filter_channels = self.args.filter_channels | |
self.n_heads = self.args.n_heads | |
self.n_layers = self.args.n_layers | |
self.kernel_size = self.args.kernel_size | |
self.p_dropout = self.args.p_dropout | |
self.resblock = self.args.resblock | |
self.resblock_kernel_sizes = self.args.resblock_kernel_sizes | |
self.resblock_dilation_sizes = self.args.resblock_dilation_sizes | |
self.upsample_rates = self.args.upsample_rates | |
self.upsample_initial_channel = self.args.upsample_initial_channel | |
self.upsample_kernel_sizes = self.args.upsample_kernel_sizes | |
self.n_layers_q = self.args.n_layers_q | |
self.use_spectral_norm = self.args.use_spectral_norm | |
self.gin_channels = self.args.gin_channels | |
self.tau = self.args.tau | |
self.spec_channels = config.audio.fft_size // 2 + 1 | |
self.dec = Generator( | |
self.inter_channels, | |
self.resblock, | |
self.resblock_kernel_sizes, | |
self.resblock_dilation_sizes, | |
self.upsample_rates, | |
self.upsample_initial_channel, | |
self.upsample_kernel_sizes, | |
gin_channels=self.gin_channels, | |
) | |
self.enc_q = PosteriorEncoder( | |
self.spec_channels, | |
self.inter_channels, | |
self.hidden_channels, | |
kernel_size=5, | |
dilation_rate=1, | |
num_layers=16, | |
cond_channels=self.gin_channels, | |
) | |
self.flow = ResidualCouplingBlock( | |
self.inter_channels, | |
self.hidden_channels, | |
kernel_size=5, | |
dilation_rate=1, | |
n_layers=4, | |
gin_channels=self.gin_channels, | |
) | |
self.ref_enc = ReferenceEncoder(self.spec_channels, self.gin_channels) | |
def device(self) -> torch.device: | |
return next(self.parameters()).device | |
def init_from_config(config: OpenVoiceConfig) -> "OpenVoice": | |
return OpenVoice(config) | |
def init_multispeaker(self, config: Coqpit, data: Optional[list[Any]] = None) -> None: | |
"""Initialize multi-speaker modules of a model. A model can be trained either with a speaker embedding layer | |
or with external `d_vectors` computed from a speaker encoder model. | |
You must provide a `speaker_manager` at initialization to set up the multi-speaker modules. | |
Args: | |
config (Coqpit): Model configuration. | |
data (list, optional): Dataset items to infer number of speakers. Defaults to None. | |
""" | |
self.num_spks = config.num_speakers | |
if self.speaker_manager: | |
self.num_spks = self.speaker_manager.num_speakers | |
def load_checkpoint( | |
self, | |
config: OpenVoiceConfig, | |
checkpoint_path: Union[str, os.PathLike[Any]], | |
eval: bool = False, | |
strict: bool = True, | |
cache: bool = False, | |
) -> None: | |
"""Map from OpenVoice's config structure.""" | |
config_path = Path(checkpoint_path).parent / "config.json" | |
with open(config_path, encoding="utf-8") as f: | |
config_org = json.load(f) | |
self.config.audio.input_sample_rate = config_org["data"]["sampling_rate"] | |
self.config.audio.output_sample_rate = config_org["data"]["sampling_rate"] | |
self.config.audio.fft_size = config_org["data"]["filter_length"] | |
self.config.audio.hop_length = config_org["data"]["hop_length"] | |
self.config.audio.win_length = config_org["data"]["win_length"] | |
state = load_fsspec(str(checkpoint_path), map_location=torch.device("cpu"), cache=cache) | |
self.load_state_dict(state["model"], strict=strict) | |
if eval: | |
self.eval() | |
def forward(self) -> None: ... | |
def train_step(self) -> None: ... | |
def eval_step(self) -> None: ... | |
def _set_x_lengths(x: torch.Tensor, aux_input: Mapping[str, Optional[torch.Tensor]]) -> torch.Tensor: | |
if "x_lengths" in aux_input and aux_input["x_lengths"] is not None: | |
return aux_input["x_lengths"] | |
return torch.tensor(x.shape[1:2]).to(x.device) | |
def inference( | |
self, | |
x: torch.Tensor, | |
aux_input: Mapping[str, Optional[torch.Tensor]] = {"x_lengths": None, "g_src": None, "g_tgt": None}, | |
) -> dict[str, torch.Tensor]: | |
""" | |
Inference pass of the model | |
Args: | |
x (torch.Tensor): Input tensor. Shape: (batch_size, c_seq_len). | |
x_lengths (torch.Tensor): Lengths of the input tensor. Shape: (batch_size,). | |
g_src (torch.Tensor): Source speaker embedding tensor. Shape: (batch_size, spk_emb_dim). | |
g_tgt (torch.Tensor): Target speaker embedding tensor. Shape: (batch_size, spk_emb_dim). | |
Returns: | |
o_hat: Output spectrogram tensor. Shape: (batch_size, spec_seq_len, spec_dim). | |
x_mask: Spectrogram mask. Shape: (batch_size, spec_seq_len). | |
(z, z_p, z_hat): A tuple of latent variables. | |
""" | |
x_lengths = self._set_x_lengths(x, aux_input) | |
if "g_src" in aux_input and aux_input["g_src"] is not None: | |
g_src = aux_input["g_src"] | |
else: | |
raise ValueError("aux_input must define g_src") | |
if "g_tgt" in aux_input and aux_input["g_tgt"] is not None: | |
g_tgt = aux_input["g_tgt"] | |
else: | |
raise ValueError("aux_input must define g_tgt") | |
z, _m_q, _logs_q, y_mask = self.enc_q( | |
x, x_lengths, g=g_src if not self.zero_g else torch.zeros_like(g_src), tau=self.tau | |
) | |
z_p = self.flow(z, y_mask, g=g_src) | |
z_hat = self.flow(z_p, y_mask, g=g_tgt, reverse=True) | |
o_hat = self.dec(z_hat * y_mask, g=g_tgt if not self.zero_g else torch.zeros_like(g_tgt)) | |
return { | |
"model_outputs": o_hat, | |
"y_mask": y_mask, | |
"z": z, | |
"z_p": z_p, | |
"z_hat": z_hat, | |
} | |
def load_audio(self, wav: Union[str, npt.NDArray[np.float32], torch.Tensor, list[float]]) -> torch.Tensor: | |
"""Read and format the input audio.""" | |
if isinstance(wav, str): | |
out = torch.from_numpy(librosa.load(wav, sr=self.config.audio.input_sample_rate)[0]) | |
elif isinstance(wav, np.ndarray): | |
out = torch.from_numpy(wav) | |
elif isinstance(wav, list): | |
out = torch.from_numpy(np.array(wav)) | |
else: | |
out = wav | |
return out.to(self.device).float() | |
def extract_se(self, audio: Union[str, torch.Tensor]) -> tuple[torch.Tensor, torch.Tensor]: | |
y = self.load_audio(audio) | |
y = y.to(self.device) | |
y = y.unsqueeze(0) | |
spec = wav_to_spec( | |
y, | |
n_fft=self.config.audio.fft_size, | |
hop_length=self.config.audio.hop_length, | |
win_length=self.config.audio.win_length, | |
center=False, | |
).to(self.device) | |
with torch.no_grad(): | |
g = self.ref_enc(spec.transpose(1, 2)).unsqueeze(-1) | |
return g, spec | |
def voice_conversion(self, src: Union[str, torch.Tensor], tgt: Union[str, torch.Tensor]) -> npt.NDArray[np.float32]: | |
""" | |
Voice conversion pass of the model. | |
Args: | |
src (str or torch.Tensor): Source utterance. | |
tgt (str or torch.Tensor): Target utterance. | |
Returns: | |
Output numpy array. | |
""" | |
src_se, src_spec = self.extract_se(src) | |
tgt_se, _ = self.extract_se(tgt) | |
aux_input = {"g_src": src_se, "g_tgt": tgt_se} | |
audio = self.inference(src_spec, aux_input) | |
return audio["model_outputs"][0, 0].data.cpu().float().numpy() | |