File size: 5,190 Bytes
997b6a1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
from dataclasses import dataclass
from typing import Optional, Union, Tuple
import torch
import torch.nn as nn
from transformers import PreTrainedModel
from transformers.utils import ModelOutput
from .configuration_ecapa_tdnn import EcapaTdnnConfig
from .audio_processing import AudioToMelSpectrogramPreprocessor
from .audio_processing import SpectrogramAugmentation
from .conv_asr import EcapaTdnnEncoder, SpeakerDecoder
from .angular_loss import AdditiveMarginSoftmaxLoss, AdditiveAngularMarginSoftmaxLoss
@dataclass
class EcapaTdnnBaseModelOutput(ModelOutput):
encoder_outputs: torch.FloatTensor = None
extract_features: torch.FloatTensor = None
output_lengths: torch.FloatTensor = None
@dataclass
class EcapaTdnnSequenceClassifierOutput(ModelOutput):
loss: torch.FloatTensor = None
logits: torch.FloatTensor = None
embeddings: torch.FloatTensor = None
class EcapaTdnnPreTrainedModel(PreTrainedModel):
config_class = EcapaTdnnConfig
base_model_prefix = "ecapa_tdnn"
main_input_name = "input_values"
def _init_weights(self, module):
"""Initialize the weights"""
if isinstance(module, (nn.Linear, nn.Conv1d)):
module.weight.data.normal_(mean=0.0, std=self.config.initializer_range)
if module.bias is not None:
module.bias.data.zero_()
elif isinstance(module, nn.Conv2d):
module.weight.data.normal_(mean=0.0, std=self.config.initializer_range)
if module.bias is not None:
module.bias.data.zero_()
elif isinstance(module, nn.LayerNorm):
module.bias.data.zero_()
module.weight.data.fill_(1.0)
elif isinstance(module, (nn.BatchNorm2d, nn.GroupNorm)):
nn.init.constant_(module.weight, 1)
nn.init.constant_(module.bias, 0)
@property
def num_weights(self):
"""
Utility property that returns the total number of parameters of NeuralModule.
"""
return self._num_weights()
@torch.jit.ignore
def _num_weights(self):
num: int = 0
for p in self.parameters():
if p.requires_grad:
num += p.numel()
return num
class EcapaTdnnModel(EcapaTdnnPreTrainedModel):
def __init__(self, config: EcapaTdnnConfig):
super().__init__(config)
self.config = config
self.preprocessor = AudioToMelSpectrogramPreprocessor(**config.mel_spectrogram_config)
self.spec_augment = SpectrogramAugmentation(**config.spectrogram_augmentation_config)
self.encoder = EcapaTdnnEncoder(**config.encoder_config)
# Initialize weights and apply final processing
self.post_init()
def forward(
self,
input_values: Optional[torch.Tensor],
attention_mask: Optional[torch.Tensor] = None,
) -> Union[Tuple, EcapaTdnnBaseModelOutput]:
if attention_mask is None:
attention_mask = torch.ones_like(input_values).to(input_values)
lengths = attention_mask.sum(dim=1).long()
extract_features, output_lengths = self.preprocessor(input_values, lengths)
if self.training:
extract_features = self.spec_augment(extract_features, output_lengths)
encoder_outputs, output_lengths = self.encoder(extract_features, output_lengths)
return EcapaTdnnBaseModelOutput(
encoder_outputs=encoder_outputs,
extract_features=extract_features,
output_lengths=output_lengths,
)
class EcapaTdnnForSequenceClassification(EcapaTdnnPreTrainedModel):
def __init__(self, config: EcapaTdnnConfig):
super().__init__(config)
self.ecapa_tdnn = EcapaTdnnModel(config)
self.classifier = SpeakerDecoder(**config.decoder_config)
if config.objective == 'additive_angular_margin':
self.loss_fct = AdditiveAngularMarginSoftmaxLoss(**config.objective_config)
elif config.objective == 'additive_margin':
self.loss_fct = AdditiveMarginSoftmaxLoss(**config.objective_config)
elif config.objective == 'cross_entropy':
self.loss_fct = nn.CrossEntropyLoss(**config.objective_config)
self.init_weights()
def freeze_base_model(self):
for param in self.ecapa_tdnn.parameters():
param.requires_grad = False
def forward(
self,
input_values: Optional[torch.Tensor],
attention_mask: Optional[torch.Tensor] = None,
labels: Optional[torch.Tensor] = None,
) -> Union[Tuple, EcapaTdnnSequenceClassifierOutput]:
ecapa_tdnn_outputs = self.ecapa_tdnn(
input_values,
attention_mask,
)
logits, output_embeddings = self.classifier(
ecapa_tdnn_outputs.encoder_outputs,
ecapa_tdnn_outputs.output_lengths
)
logits = logits.view(-1, self.config.num_labels)
loss = None
if labels is not None:
loss = self.loss_fct(logits, labels.view(-1))
return EcapaTdnnSequenceClassifierOutput(
loss=loss,
logits=logits,
embeddings=output_embeddings,
) |