resnet101-voxceleb1 / conv_asr.py
yangwang825's picture
Upload ResNetForSequenceClassification
3957fd5 verified
from typing import Optional, Union
import torch
import torch.nn as nn
import torch.nn.functional as F
from .module import NeuralModule
from .tdnn_attention import (
StatsPoolLayer,
AttentivePoolLayer,
ChannelDependentAttentiveStatisticsPoolLayer,
TdnnModule,
TdnnSeModule,
TdnnSeRes2NetModule,
init_weights
)
def conv3x3(in_planes, out_planes, stride=1, padding=1):
"""2D convolution with kernel_size = 3"""
return nn.Conv2d(
in_planes,
out_planes,
kernel_size=3,
stride=stride,
padding=padding,
bias=False,
)
def conv1x1(in_planes, out_planes, stride=1):
"""2D convolution with kernel_size = 1"""
return nn.Conv2d(
in_planes, out_planes, kernel_size=1, stride=stride, bias=False
)
class BasicBlock(nn.Module):
def __init__(
self,
in_channels,
out_channels,
stride=1,
downsample=None,
activation=nn.ReLU,
):
super(BasicBlock, self).__init__()
self.activation = activation()
self.bn1 = nn.BatchNorm2d(in_channels)
self.conv1 = conv3x3(in_channels, out_channels, stride)
self.bn2 = nn.BatchNorm2d(out_channels)
self.conv2 = conv3x3(out_channels, out_channels)
self.bn3 = nn.BatchNorm2d(out_channels)
self.conv3 = conv1x1(out_channels, out_channels)
self.downsample = downsample
self.stride = stride
def forward(self, x):
residual = x
out = self.bn1(x)
out = self.activation(out)
out = self.conv1(out)
out = self.bn2(out)
out = self.activation(out)
out = self.conv2(out)
out = self.bn3(out)
out = self.activation(out)
out = self.conv3(out)
if self.downsample is not None:
residual = self.downsample(x)
out += residual
return out
class SEBlock(nn.Module):
def __init__(self, channels, reduction=1, activation=nn.ReLU):
super(SEBlock, self).__init__()
self.avg_pool = nn.AdaptiveAvgPool2d(1)
self.fc = nn.Sequential(
nn.Linear(channels, channels // reduction),
activation(),
nn.Linear(channels // reduction, channels),
nn.Sigmoid(),
)
def forward(self, x):
"""Intermediate step. Processes the input tensor x
and returns an output tensor.
"""
b, c, _, _ = x.size()
y = self.avg_pool(x).view(b, c)
y = self.fc(y).view(b, c, 1, 1)
return x * y
class SEBasicBlock(nn.Module):
def __init__(
self,
in_channels,
out_channels,
stride=1,
downsample=None,
activation=nn.ReLU,
reduction=1,
):
super(SEBasicBlock, self).__init__()
self.activation = activation()
self.bn1 = nn.BatchNorm2d(in_channels)
self.conv1 = conv3x3(in_channels, out_channels, stride)
self.bn2 = nn.BatchNorm2d(out_channels)
self.conv2 = conv3x3(out_channels, out_channels)
self.bn3 = nn.BatchNorm2d(out_channels)
self.conv3 = conv1x1(out_channels, out_channels)
self.downsample = downsample
self.stride = stride
self.se = SEBlock(out_channels, reduction)
def forward(self, x):
residual = x
out = self.bn1(x)
out = self.activation(out)
out = self.conv1(out)
out = self.bn2(out)
out = self.activation(out)
out = self.conv2(out)
out = self.bn3(out)
out = self.activation(out)
out = self.conv3(out)
out = self.se(out)
if self.downsample is not None:
residual = self.downsample(x)
out += residual
return out
class SEBottleneck(nn.Module):
def __init__(
self,
in_channels,
out_channels,
stride=1,
downsample=None,
activation=nn.ReLU,
reduction=16, # Reduction ratio for SE block
):
super(SEBottleneck, self).__init__()
self.activation = activation()
# 1x1 convolution to reduce channels
self.conv1 = conv1x1(in_channels, out_channels // 4, stride)
self.bn1 = nn.BatchNorm2d(out_channels // 4)
# 3x3 convolution
self.conv2 = conv3x3(out_channels // 4, out_channels // 4)
self.bn2 = nn.BatchNorm2d(out_channels // 4)
# 1x1 convolution to restore channels
self.conv3 = conv1x1(out_channels // 4, out_channels)
self.bn3 = nn.BatchNorm2d(out_channels)
# Squeeze-and-Excitation block
self.se = SEBlock(out_channels, reduction)
self.downsample = downsample
self.stride = stride
def forward(self, x):
residual = x
# First 1x1 convolution
out = self.conv1(x)
out = self.bn1(out)
out = self.activation(out)
# 3x3 convolution
out = self.conv2(out)
out = self.bn2(out)
out = self.activation(out)
# Second 1x1 convolution
out = self.conv3(out)
out = self.bn3(out)
# Apply SE block
out = self.se(out)
# Downsample residual if needed
if self.downsample is not None:
residual = self.downsample(x)
# Add residual
out += residual
out = self.activation(out)
return out
class Bottleneck(nn.Module):
def __init__(
self,
in_channels,
out_channels,
stride=1,
downsample=None,
activation=nn.ReLU,
):
super(Bottleneck, self).__init__()
self.activation = activation()
# 1x1 convolution to reduce channels
self.conv1 = conv1x1(in_channels, out_channels // 4, stride)
self.bn1 = nn.BatchNorm2d(out_channels // 4)
# 3x3 convolution
self.conv2 = conv3x3(out_channels // 4, out_channels // 4)
self.bn2 = nn.BatchNorm2d(out_channels // 4)
# 1x1 convolution to restore channels
self.conv3 = conv1x1(out_channels // 4, out_channels)
self.bn3 = nn.BatchNorm2d(out_channels)
self.downsample = downsample
self.stride = stride
def forward(self, x):
residual = x
# First 1x1 convolution
out = self.conv1(x)
out = self.bn1(out)
out = self.activation(out)
# 3x3 convolution
out = self.conv2(out)
out = self.bn2(out)
out = self.activation(out)
# Second 1x1 convolution
out = self.conv3(out)
out = self.bn3(out)
# Downsample residual if needed
if self.downsample is not None:
residual = self.downsample(x)
# Add residual
out += residual
out = self.activation(out)
return out
class ResNetEncoder(NeuralModule):
def __init__(
self,
feat_in: int,
filters: list = [16, 32, 64, 128],
block_sizes: list = [3, 4, 6, 3],
strides: list = [1, 2, 2, 1],
block_type: str = 'basic', # basic, bottleneck
reduction: int = 8, # reduction for SE layer
init_mode: str = 'xavier_uniform',
):
super().__init__()
if block_type == 'basic':
self.block_class = BasicBlock
self.se_block_class = SEBasicBlock
elif block_type == 'bottleneck':
self.block_class = Bottleneck
self.se_block_class = SEBottleneck
self.pre_conv = nn.Sequential(
nn.Conv2d(
in_channels=1,
out_channels=filters[0],
kernel_size=3,
stride=1,
padding=1,
bias=False
),
nn.BatchNorm2d(filters[0]),
nn.ReLU(inplace=True)
)
self.layer1 = self._make_layer_se(
filters[0], filters[0], block_sizes[0], stride=strides[0], reduction=reduction
)
self.layer2 = self._make_layer_se(
filters[0], filters[1], block_sizes[1], stride=strides[1], reduction=reduction
)
self.layer3 = self._make_layer(
filters[1], filters[2], block_sizes[2], stride=strides[2]
)
self.layer4 = self._make_layer(
filters[2], filters[3], block_sizes[3], stride=strides[3]
)
self.apply(lambda x: init_weights(x, mode=init_mode))
def _make_layer_se(self, in_channels, out_channels, block_num, stride=1, reduction=1):
"""Construct the squeeze-and-excitation block layer.
Arguments
---------
in_channels : int
Number of input channels.
out_channels : int
The number of output channels.
block_num: int
Number of ResNet blocks for the network.
stride : int
Factor that reduce the spatial dimensionality. Default is 1
Returns
-------
se_block : nn.Sequential
Squeeze-and-excitation block
"""
downsample = None
if stride != 1 or in_channels != out_channels:
downsample = nn.Sequential(
nn.Conv2d(
in_channels,
out_channels,
kernel_size=1,
stride=stride,
bias=False,
),
nn.BatchNorm2d(out_channels),
)
layers = []
layers.append(
self.se_block_class(in_channels, out_channels, stride, downsample, reduction=reduction)
)
for i in range(1, block_num):
layers.append(self.se_block_class(out_channels, out_channels, reduction=reduction))
return nn.Sequential(*layers)
def _make_layer(self, in_channels, out_channels, block_num, stride=1):
"""
Construct the ResNet block layer.
Arguments
---------
in_channels : int
Number of input channels.
out_channels : int
The number of output channels.
block_num: int
Number of ResNet blocks for the network.
stride : int
Factor that reduce the spatial dimensionality. Default is 1
Returns
-------
block : nn.Sequential
ResNet block
"""
downsample = None
if stride != 1 or in_channels != out_channels:
downsample = nn.Sequential(
nn.Conv2d(
in_channels,
out_channels,
kernel_size=1,
stride=stride,
bias=False,
),
nn.BatchNorm2d(out_channels),
)
layers = []
layers.append(self.block_class(in_channels, out_channels, stride, downsample))
for i in range(1, block_num):
layers.append(self.block_class(out_channels, out_channels))
return nn.Sequential(*layers)
def forward(self, audio_signal: torch.Tensor, length: torch.Tensor = None):
x = audio_signal
x = x.unsqueeze(dim=1) # (B, 1, C, T)
x = self.pre_conv(x)
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.layer4(x)
x = x.flatten(1, 2)
return x, length
class SpeakerDecoder(NeuralModule):
"""
Speaker Decoder creates the final neural layers that maps from the outputs
of Jasper Encoder to the embedding layer followed by speaker based softmax loss.
Args:
feat_in (int): Number of channels being input to this module
num_classes (int): Number of unique speakers in dataset
emb_sizes (list) : shapes of intermediate embedding layers (we consider speaker embbeddings
from 1st of this layers). Defaults to [1024,1024]
pool_mode (str) : Pooling strategy type. options are 'xvector','tap', 'attention'
Defaults to 'xvector (mean and variance)'
tap (temporal average pooling: just mean)
attention (attention based pooling)
init_mode (str): Describes how neural network parameters are
initialized. Options are ['xavier_uniform', 'xavier_normal',
'kaiming_uniform','kaiming_normal'].
Defaults to "xavier_uniform".
"""
def __init__(
self,
feat_in: int,
num_classes: int,
emb_sizes: Optional[Union[int, list]] = 256,
pool_mode: str = 'xvector',
angular: bool = False,
attention_channels: int = 128,
init_mode: str = "xavier_uniform",
):
super().__init__()
self.angular = angular
self.emb_id = 2
bias = False if self.angular else True
emb_sizes = [emb_sizes] if type(emb_sizes) is int else emb_sizes
self._num_classes = num_classes
self.pool_mode = pool_mode.lower()
if self.pool_mode == 'xvector' or self.pool_mode == 'tap':
self._pooling = StatsPoolLayer(feat_in=feat_in, pool_mode=self.pool_mode)
affine_type = 'linear'
elif self.pool_mode == 'attention':
self._pooling = AttentivePoolLayer(inp_filters=feat_in, attention_channels=attention_channels)
affine_type = 'conv'
elif self.pool_mode == 'ecapa2':
self._pooling = ChannelDependentAttentiveStatisticsPoolLayer(
inp_filters=feat_in, attention_channels=attention_channels
)
affine_type = 'conv'
shapes = [self._pooling.feat_in]
for size in emb_sizes:
shapes.append(int(size))
emb_layers = []
for shape_in, shape_out in zip(shapes[:-1], shapes[1:]):
layer = self.affine_layer(shape_in, shape_out, learn_mean=False, affine_type=affine_type)
emb_layers.append(layer)
self.emb_layers = nn.ModuleList(emb_layers)
self.final = nn.Linear(shapes[-1], self._num_classes, bias=bias)
self.apply(lambda x: init_weights(x, mode=init_mode))
def affine_layer(
self,
inp_shape,
out_shape,
learn_mean=True,
affine_type='conv',
):
if affine_type == 'conv':
layer = nn.Sequential(
nn.BatchNorm1d(inp_shape, affine=True, track_running_stats=True),
nn.Conv1d(inp_shape, out_shape, kernel_size=1),
)
else:
layer = nn.Sequential(
nn.Linear(inp_shape, out_shape),
nn.BatchNorm1d(out_shape, affine=learn_mean, track_running_stats=True),
nn.ReLU(),
)
return layer
def forward(self, encoder_output, length=None):
pool = self._pooling(encoder_output, length)
embs = []
for layer in self.emb_layers:
pool, emb = layer(pool), layer[: self.emb_id](pool)
embs.append(emb)
pool = pool.squeeze(-1)
if self.angular:
for W in self.final.parameters():
W = F.normalize(W, p=2, dim=1)
pool = F.normalize(pool, p=2, dim=1)
out = self.final(pool)
return out, embs[-1].squeeze(-1)