Spaces:
Sleeping
Sleeping
| # Copyright (c) Facebook, Inc. and its affiliates. | |
| # All rights reserved. | |
| # | |
| # This source code is licensed under the license found in the | |
| # LICENSE file in the root directory of this source tree. | |
| # | |
| # Created on 2018/12 | |
| # Author: Kaituo XU | |
| # Modified on 2019/11 by Alexandre Defossez, added support for multiple output channels | |
| # Here is the original license: | |
| # The MIT License (MIT) | |
| # | |
| # Copyright (c) 2018 Kaituo XU | |
| # | |
| # Permission is hereby granted, free of charge, to any person obtaining a copy | |
| # of this software and associated documentation files (the "Software"), to deal | |
| # in the Software without restriction, including without limitation the rights | |
| # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
| # copies of the Software, and to permit persons to whom the Software is | |
| # furnished to do so, subject to the following conditions: | |
| # | |
| # The above copyright notice and this permission notice shall be included in all | |
| # copies or substantial portions of the Software. | |
| # | |
| # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
| # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
| # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
| # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
| # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
| # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
| # SOFTWARE. | |
| import math | |
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| from .utils import capture_init | |
| EPS = 1e-8 | |
| def overlap_and_add(signal, frame_step): | |
| outer_dimensions = signal.size()[:-2] | |
| frames, frame_length = signal.size()[-2:] | |
| subframe_length = math.gcd(frame_length, frame_step) # gcd=Greatest Common Divisor | |
| subframe_step = frame_step // subframe_length | |
| subframes_per_frame = frame_length // subframe_length | |
| output_size = frame_step * (frames - 1) + frame_length | |
| output_subframes = output_size // subframe_length | |
| subframe_signal = signal.view(*outer_dimensions, -1, subframe_length) | |
| frame = torch.arange(0, output_subframes, | |
| device=signal.device).unfold(0, subframes_per_frame, subframe_step) | |
| frame = frame.long() # signal may in GPU or CPU | |
| frame = frame.contiguous().view(-1) | |
| result = signal.new_zeros(*outer_dimensions, output_subframes, subframe_length) | |
| result.index_add_(-2, frame, subframe_signal) | |
| result = result.view(*outer_dimensions, -1) | |
| return result | |
| class ConvTasNet(nn.Module): | |
| def __init__(self, | |
| sources, | |
| N=256, | |
| L=20, | |
| B=256, | |
| H=512, | |
| P=3, | |
| X=8, | |
| R=4, | |
| audio_channels=2, | |
| norm_type="gLN", | |
| causal=False, | |
| mask_nonlinear='relu', | |
| samplerate=44100, | |
| segment_length=44100 * 2 * 4): | |
| """ | |
| Args: | |
| sources: list of sources | |
| N: Number of filters in autoencoder | |
| L: Length of the filters (in samples) | |
| B: Number of channels in bottleneck 1 × 1-conv block | |
| H: Number of channels in convolutional blocks | |
| P: Kernel size in convolutional blocks | |
| X: Number of convolutional blocks in each repeat | |
| R: Number of repeats | |
| norm_type: BN, gLN, cLN | |
| causal: causal or non-causal | |
| mask_nonlinear: use which non-linear function to generate mask | |
| """ | |
| super(ConvTasNet, self).__init__() | |
| # Hyper-parameter | |
| self.sources = sources | |
| self.C = len(sources) | |
| self.N, self.L, self.B, self.H, self.P, self.X, self.R = N, L, B, H, P, X, R | |
| self.norm_type = norm_type | |
| self.causal = causal | |
| self.mask_nonlinear = mask_nonlinear | |
| self.audio_channels = audio_channels | |
| self.samplerate = samplerate | |
| self.segment_length = segment_length | |
| # Components | |
| self.encoder = Encoder(L, N, audio_channels) | |
| self.separator = TemporalConvNet( | |
| N, B, H, P, X, R, self.C, norm_type, causal, mask_nonlinear) | |
| self.decoder = Decoder(N, L, audio_channels) | |
| # init | |
| for p in self.parameters(): | |
| if p.dim() > 1: | |
| nn.init.xavier_normal_(p) | |
| def valid_length(self, length): | |
| return length | |
| def forward(self, mixture): | |
| """ | |
| Args: | |
| mixture: [M, T], M is batch size, T is #samples | |
| Returns: | |
| est_source: [M, C, T] | |
| """ | |
| mixture_w = self.encoder(mixture) | |
| est_mask = self.separator(mixture_w) | |
| est_source = self.decoder(mixture_w, est_mask) | |
| # T changed after conv1d in encoder, fix it here | |
| T_origin = mixture.size(-1) | |
| T_conv = est_source.size(-1) | |
| est_source = F.pad(est_source, (0, T_origin - T_conv)) | |
| return est_source | |
| class Encoder(nn.Module): | |
| """Estimation of the nonnegative mixture weight by a 1-D conv layer. | |
| """ | |
| def __init__(self, L, N, audio_channels): | |
| super(Encoder, self).__init__() | |
| # Hyper-parameter | |
| self.L, self.N = L, N | |
| # Components | |
| # 50% overlap | |
| self.conv1d_U = nn.Conv1d(audio_channels, N, kernel_size=L, stride=L // 2, bias=False) | |
| def forward(self, mixture): | |
| """ | |
| Args: | |
| mixture: [M, T], M is batch size, T is #samples | |
| Returns: | |
| mixture_w: [M, N, K], where K = (T-L)/(L/2)+1 = 2T/L-1 | |
| """ | |
| mixture_w = F.relu(self.conv1d_U(mixture)) # [M, N, K] | |
| return mixture_w | |
| class Decoder(nn.Module): | |
| def __init__(self, N, L, audio_channels): | |
| super(Decoder, self).__init__() | |
| # Hyper-parameter | |
| self.N, self.L = N, L | |
| self.audio_channels = audio_channels | |
| # Components | |
| self.basis_signals = nn.Linear(N, audio_channels * L, bias=False) | |
| def forward(self, mixture_w, est_mask): | |
| """ | |
| Args: | |
| mixture_w: [M, N, K] | |
| est_mask: [M, C, N, K] | |
| Returns: | |
| est_source: [M, C, T] | |
| """ | |
| # D = W * M | |
| source_w = torch.unsqueeze(mixture_w, 1) * est_mask # [M, C, N, K] | |
| source_w = torch.transpose(source_w, 2, 3) # [M, C, K, N] | |
| # S = DV | |
| est_source = self.basis_signals(source_w) # [M, C, K, ac * L] | |
| m, c, k, _ = est_source.size() | |
| est_source = est_source.view(m, c, k, self.audio_channels, -1).transpose(2, 3).contiguous() | |
| est_source = overlap_and_add(est_source, self.L // 2) # M x C x ac x T | |
| return est_source | |
| class TemporalConvNet(nn.Module): | |
| def __init__(self, N, B, H, P, X, R, C, norm_type="gLN", causal=False, mask_nonlinear='relu'): | |
| """ | |
| Args: | |
| N: Number of filters in autoencoder | |
| B: Number of channels in bottleneck 1 × 1-conv block | |
| H: Number of channels in convolutional blocks | |
| P: Kernel size in convolutional blocks | |
| X: Number of convolutional blocks in each repeat | |
| R: Number of repeats | |
| C: Number of speakers | |
| norm_type: BN, gLN, cLN | |
| causal: causal or non-causal | |
| mask_nonlinear: use which non-linear function to generate mask | |
| """ | |
| super(TemporalConvNet, self).__init__() | |
| # Hyper-parameter | |
| self.C = C | |
| self.mask_nonlinear = mask_nonlinear | |
| # Components | |
| # [M, N, K] -> [M, N, K] | |
| layer_norm = ChannelwiseLayerNorm(N) | |
| # [M, N, K] -> [M, B, K] | |
| bottleneck_conv1x1 = nn.Conv1d(N, B, 1, bias=False) | |
| # [M, B, K] -> [M, B, K] | |
| repeats = [] | |
| for r in range(R): | |
| blocks = [] | |
| for x in range(X): | |
| dilation = 2**x | |
| padding = (P - 1) * dilation if causal else (P - 1) * dilation // 2 | |
| blocks += [ | |
| TemporalBlock(B, | |
| H, | |
| P, | |
| stride=1, | |
| padding=padding, | |
| dilation=dilation, | |
| norm_type=norm_type, | |
| causal=causal) | |
| ] | |
| repeats += [nn.Sequential(*blocks)] | |
| temporal_conv_net = nn.Sequential(*repeats) | |
| # [M, B, K] -> [M, C*N, K] | |
| mask_conv1x1 = nn.Conv1d(B, C * N, 1, bias=False) | |
| # Put together | |
| self.network = nn.Sequential(layer_norm, bottleneck_conv1x1, temporal_conv_net, | |
| mask_conv1x1) | |
| def forward(self, mixture_w): | |
| """ | |
| Keep this API same with TasNet | |
| Args: | |
| mixture_w: [M, N, K], M is batch size | |
| returns: | |
| est_mask: [M, C, N, K] | |
| """ | |
| M, N, K = mixture_w.size() | |
| score = self.network(mixture_w) # [M, N, K] -> [M, C*N, K] | |
| score = score.view(M, self.C, N, K) # [M, C*N, K] -> [M, C, N, K] | |
| if self.mask_nonlinear == 'softmax': | |
| est_mask = F.softmax(score, dim=1) | |
| elif self.mask_nonlinear == 'relu': | |
| est_mask = F.relu(score) | |
| else: | |
| raise ValueError("Unsupported mask non-linear function") | |
| return est_mask | |
| class TemporalBlock(nn.Module): | |
| def __init__(self, | |
| in_channels, | |
| out_channels, | |
| kernel_size, | |
| stride, | |
| padding, | |
| dilation, | |
| norm_type="gLN", | |
| causal=False): | |
| super(TemporalBlock, self).__init__() | |
| # [M, B, K] -> [M, H, K] | |
| conv1x1 = nn.Conv1d(in_channels, out_channels, 1, bias=False) | |
| prelu = nn.PReLU() | |
| norm = chose_norm(norm_type, out_channels) | |
| # [M, H, K] -> [M, B, K] | |
| dsconv = DepthwiseSeparableConv(out_channels, in_channels, kernel_size, stride, padding, | |
| dilation, norm_type, causal) | |
| # Put together | |
| self.net = nn.Sequential(conv1x1, prelu, norm, dsconv) | |
| def forward(self, x): | |
| """ | |
| Args: | |
| x: [M, B, K] | |
| Returns: | |
| [M, B, K] | |
| """ | |
| residual = x | |
| out = self.net(x) | |
| # TODO: when P = 3 here works fine, but when P = 2 maybe need to pad? | |
| return out + residual # look like w/o F.relu is better than w/ F.relu | |
| # return F.relu(out + residual) | |
| class DepthwiseSeparableConv(nn.Module): | |
| def __init__(self, | |
| in_channels, | |
| out_channels, | |
| kernel_size, | |
| stride, | |
| padding, | |
| dilation, | |
| norm_type="gLN", | |
| causal=False): | |
| super(DepthwiseSeparableConv, self).__init__() | |
| # Use `groups` option to implement depthwise convolution | |
| # [M, H, K] -> [M, H, K] | |
| depthwise_conv = nn.Conv1d(in_channels, | |
| in_channels, | |
| kernel_size, | |
| stride=stride, | |
| padding=padding, | |
| dilation=dilation, | |
| groups=in_channels, | |
| bias=False) | |
| if causal: | |
| chomp = Chomp1d(padding) | |
| prelu = nn.PReLU() | |
| norm = chose_norm(norm_type, in_channels) | |
| # [M, H, K] -> [M, B, K] | |
| pointwise_conv = nn.Conv1d(in_channels, out_channels, 1, bias=False) | |
| # Put together | |
| if causal: | |
| self.net = nn.Sequential(depthwise_conv, chomp, prelu, norm, pointwise_conv) | |
| else: | |
| self.net = nn.Sequential(depthwise_conv, prelu, norm, pointwise_conv) | |
| def forward(self, x): | |
| """ | |
| Args: | |
| x: [M, H, K] | |
| Returns: | |
| result: [M, B, K] | |
| """ | |
| return self.net(x) | |
| class Chomp1d(nn.Module): | |
| """To ensure the output length is the same as the input. | |
| """ | |
| def __init__(self, chomp_size): | |
| super(Chomp1d, self).__init__() | |
| self.chomp_size = chomp_size | |
| def forward(self, x): | |
| """ | |
| Args: | |
| x: [M, H, Kpad] | |
| Returns: | |
| [M, H, K] | |
| """ | |
| return x[:, :, :-self.chomp_size].contiguous() | |
| def chose_norm(norm_type, channel_size): | |
| """The input of normlization will be (M, C, K), where M is batch size, | |
| C is channel size and K is sequence length. | |
| """ | |
| if norm_type == "gLN": | |
| return GlobalLayerNorm(channel_size) | |
| elif norm_type == "cLN": | |
| return ChannelwiseLayerNorm(channel_size) | |
| elif norm_type == "id": | |
| return nn.Identity() | |
| else: # norm_type == "BN": | |
| # Given input (M, C, K), nn.BatchNorm1d(C) will accumulate statics | |
| # along M and K, so this BN usage is right. | |
| return nn.BatchNorm1d(channel_size) | |
| # TODO: Use nn.LayerNorm to impl cLN to speed up | |
| class ChannelwiseLayerNorm(nn.Module): | |
| """Channel-wise Layer Normalization (cLN)""" | |
| def __init__(self, channel_size): | |
| super(ChannelwiseLayerNorm, self).__init__() | |
| self.gamma = nn.Parameter(torch.Tensor(1, channel_size, 1)) # [1, N, 1] | |
| self.beta = nn.Parameter(torch.Tensor(1, channel_size, 1)) # [1, N, 1] | |
| self.reset_parameters() | |
| def reset_parameters(self): | |
| self.gamma.data.fill_(1) | |
| self.beta.data.zero_() | |
| def forward(self, y): | |
| """ | |
| Args: | |
| y: [M, N, K], M is batch size, N is channel size, K is length | |
| Returns: | |
| cLN_y: [M, N, K] | |
| """ | |
| mean = torch.mean(y, dim=1, keepdim=True) # [M, 1, K] | |
| var = torch.var(y, dim=1, keepdim=True, unbiased=False) # [M, 1, K] | |
| cLN_y = self.gamma * (y - mean) / torch.pow(var + EPS, 0.5) + self.beta | |
| return cLN_y | |
| class GlobalLayerNorm(nn.Module): | |
| """Global Layer Normalization (gLN)""" | |
| def __init__(self, channel_size): | |
| super(GlobalLayerNorm, self).__init__() | |
| self.gamma = nn.Parameter(torch.Tensor(1, channel_size, 1)) # [1, N, 1] | |
| self.beta = nn.Parameter(torch.Tensor(1, channel_size, 1)) # [1, N, 1] | |
| self.reset_parameters() | |
| def reset_parameters(self): | |
| self.gamma.data.fill_(1) | |
| self.beta.data.zero_() | |
| def forward(self, y): | |
| """ | |
| Args: | |
| y: [M, N, K], M is batch size, N is channel size, K is length | |
| Returns: | |
| gLN_y: [M, N, K] | |
| """ | |
| # TODO: in torch 1.0, torch.mean() support dim list | |
| mean = y.mean(dim=1, keepdim=True).mean(dim=2, keepdim=True) # [M, 1, 1] | |
| var = (torch.pow(y - mean, 2)).mean(dim=1, keepdim=True).mean(dim=2, keepdim=True) | |
| gLN_y = self.gamma * (y - mean) / torch.pow(var + EPS, 0.5) + self.beta | |
| return gLN_y | |
| if __name__ == "__main__": | |
| torch.manual_seed(123) | |
| M, N, L, T = 2, 3, 4, 12 | |
| K = 2 * T // L - 1 | |
| B, H, P, X, R, C, norm_type, causal = 2, 3, 3, 3, 2, 2, "gLN", False | |
| mixture = torch.randint(3, (M, T)) | |
| # test Encoder | |
| encoder = Encoder(L, N) | |
| encoder.conv1d_U.weight.data = torch.randint(2, encoder.conv1d_U.weight.size()) | |
| mixture_w = encoder(mixture) | |
| print('mixture', mixture) | |
| print('U', encoder.conv1d_U.weight) | |
| print('mixture_w', mixture_w) | |
| print('mixture_w size', mixture_w.size()) | |
| # test TemporalConvNet | |
| separator = TemporalConvNet(N, B, H, P, X, R, C, norm_type=norm_type, causal=causal) | |
| est_mask = separator(mixture_w) | |
| print('est_mask', est_mask) | |
| # test Decoder | |
| decoder = Decoder(N, L) | |
| est_mask = torch.randint(2, (B, K, C, N)) | |
| est_source = decoder(mixture_w, est_mask) | |
| print('est_source', est_source) | |
| # test Conv-TasNet | |
| conv_tasnet = ConvTasNet(N, L, B, H, P, X, R, C, norm_type=norm_type) | |
| est_source = conv_tasnet(mixture) | |
| print('est_source', est_source) | |
| print('est_source size', est_source.size()) | |