Spaces:
Running
on
Zero
Running
on
Zero
| import typing | |
| import julius | |
| import numpy as np | |
| import torch | |
| from . import util | |
| class DSPMixin: | |
| _original_batch_size = None | |
| _original_num_channels = None | |
| _padded_signal_length = None | |
| def _preprocess_signal_for_windowing(self, window_duration, hop_duration): | |
| self._original_batch_size = self.batch_size | |
| self._original_num_channels = self.num_channels | |
| window_length = int(window_duration * self.sample_rate) | |
| hop_length = int(hop_duration * self.sample_rate) | |
| if window_length % hop_length != 0: | |
| factor = window_length // hop_length | |
| window_length = factor * hop_length | |
| self.zero_pad(hop_length, hop_length) | |
| self._padded_signal_length = self.signal_length | |
| return window_length, hop_length | |
| def windows( | |
| self, window_duration: float, hop_duration: float, preprocess: bool = True | |
| ): | |
| """Generator which yields windows of specified duration from signal with a specified | |
| hop length. | |
| Parameters | |
| ---------- | |
| window_duration : float | |
| Duration of every window in seconds. | |
| hop_duration : float | |
| Hop between windows in seconds. | |
| preprocess : bool, optional | |
| Whether to preprocess the signal, so that the first sample is in | |
| the middle of the first window, by default True | |
| Yields | |
| ------ | |
| AudioSignal | |
| Each window is returned as an AudioSignal. | |
| """ | |
| if preprocess: | |
| window_length, hop_length = self._preprocess_signal_for_windowing( | |
| window_duration, hop_duration | |
| ) | |
| self.audio_data = self.audio_data.reshape(-1, 1, self.signal_length) | |
| for b in range(self.batch_size): | |
| i = 0 | |
| start_idx = i * hop_length | |
| while True: | |
| start_idx = i * hop_length | |
| i += 1 | |
| end_idx = start_idx + window_length | |
| if end_idx > self.signal_length: | |
| break | |
| yield self[b, ..., start_idx:end_idx] | |
| def collect_windows( | |
| self, window_duration: float, hop_duration: float, preprocess: bool = True | |
| ): | |
| """Reshapes signal into windows of specified duration from signal with a specified | |
| hop length. Window are placed along the batch dimension. Use with | |
| :py:func:`audiotools.core.dsp.DSPMixin.overlap_and_add` to reconstruct the | |
| original signal. | |
| Parameters | |
| ---------- | |
| window_duration : float | |
| Duration of every window in seconds. | |
| hop_duration : float | |
| Hop between windows in seconds. | |
| preprocess : bool, optional | |
| Whether to preprocess the signal, so that the first sample is in | |
| the middle of the first window, by default True | |
| Returns | |
| ------- | |
| AudioSignal | |
| AudioSignal unfolded with shape ``(nb * nch * num_windows, 1, window_length)`` | |
| """ | |
| if preprocess: | |
| window_length, hop_length = self._preprocess_signal_for_windowing( | |
| window_duration, hop_duration | |
| ) | |
| # self.audio_data: (nb, nch, nt). | |
| unfolded = torch.nn.functional.unfold( | |
| self.audio_data.reshape(-1, 1, 1, self.signal_length), | |
| kernel_size=(1, window_length), | |
| stride=(1, hop_length), | |
| ) | |
| # unfolded: (nb * nch, window_length, num_windows). | |
| # -> (nb * nch * num_windows, 1, window_length) | |
| unfolded = unfolded.permute(0, 2, 1).reshape(-1, 1, window_length) | |
| self.audio_data = unfolded | |
| return self | |
| def overlap_and_add(self, hop_duration: float): | |
| """Function which takes a list of windows and overlap adds them into a | |
| signal the same length as ``audio_signal``. | |
| Parameters | |
| ---------- | |
| hop_duration : float | |
| How much to shift for each window | |
| (overlap is window_duration - hop_duration) in seconds. | |
| Returns | |
| ------- | |
| AudioSignal | |
| overlap-and-added signal. | |
| """ | |
| hop_length = int(hop_duration * self.sample_rate) | |
| window_length = self.signal_length | |
| nb, nch = self._original_batch_size, self._original_num_channels | |
| unfolded = self.audio_data.reshape(nb * nch, -1, window_length).permute(0, 2, 1) | |
| folded = torch.nn.functional.fold( | |
| unfolded, | |
| output_size=(1, self._padded_signal_length), | |
| kernel_size=(1, window_length), | |
| stride=(1, hop_length), | |
| ) | |
| norm = torch.ones_like(unfolded, device=unfolded.device) | |
| norm = torch.nn.functional.fold( | |
| norm, | |
| output_size=(1, self._padded_signal_length), | |
| kernel_size=(1, window_length), | |
| stride=(1, hop_length), | |
| ) | |
| folded = folded / norm | |
| folded = folded.reshape(nb, nch, -1) | |
| self.audio_data = folded | |
| self.trim(hop_length, hop_length) | |
| return self | |
| def low_pass( | |
| self, cutoffs: typing.Union[torch.Tensor, np.ndarray, float], zeros: int = 51 | |
| ): | |
| """Low-passes the signal in-place. Each item in the batch | |
| can have a different low-pass cutoff, if the input | |
| to this signal is an array or tensor. If a float, all | |
| items are given the same low-pass filter. | |
| Parameters | |
| ---------- | |
| cutoffs : typing.Union[torch.Tensor, np.ndarray, float] | |
| Cutoff in Hz of low-pass filter. | |
| zeros : int, optional | |
| Number of taps to use in low-pass filter, by default 51 | |
| Returns | |
| ------- | |
| AudioSignal | |
| Low-passed AudioSignal. | |
| """ | |
| cutoffs = util.ensure_tensor(cutoffs, 2, self.batch_size) | |
| cutoffs = cutoffs / self.sample_rate | |
| filtered = torch.empty_like(self.audio_data) | |
| for i, cutoff in enumerate(cutoffs): | |
| lp_filter = julius.LowPassFilter(cutoff.cpu(), zeros=zeros).to(self.device) | |
| filtered[i] = lp_filter(self.audio_data[i]) | |
| self.audio_data = filtered | |
| self.stft_data = None | |
| return self | |
| def high_pass( | |
| self, cutoffs: typing.Union[torch.Tensor, np.ndarray, float], zeros: int = 51 | |
| ): | |
| """High-passes the signal in-place. Each item in the batch | |
| can have a different high-pass cutoff, if the input | |
| to this signal is an array or tensor. If a float, all | |
| items are given the same high-pass filter. | |
| Parameters | |
| ---------- | |
| cutoffs : typing.Union[torch.Tensor, np.ndarray, float] | |
| Cutoff in Hz of high-pass filter. | |
| zeros : int, optional | |
| Number of taps to use in high-pass filter, by default 51 | |
| Returns | |
| ------- | |
| AudioSignal | |
| High-passed AudioSignal. | |
| """ | |
| cutoffs = util.ensure_tensor(cutoffs, 2, self.batch_size) | |
| cutoffs = cutoffs / self.sample_rate | |
| filtered = torch.empty_like(self.audio_data) | |
| for i, cutoff in enumerate(cutoffs): | |
| hp_filter = julius.HighPassFilter(cutoff.cpu(), zeros=zeros).to(self.device) | |
| filtered[i] = hp_filter(self.audio_data[i]) | |
| self.audio_data = filtered | |
| self.stft_data = None | |
| return self | |
| def mask_frequencies( | |
| self, | |
| fmin_hz: typing.Union[torch.Tensor, np.ndarray, float], | |
| fmax_hz: typing.Union[torch.Tensor, np.ndarray, float], | |
| val: float = 0.0, | |
| ): | |
| """Masks frequencies between ``fmin_hz`` and ``fmax_hz``, and fills them | |
| with the value specified by ``val``. Useful for implementing SpecAug. | |
| The min and max can be different for every item in the batch. | |
| Parameters | |
| ---------- | |
| fmin_hz : typing.Union[torch.Tensor, np.ndarray, float] | |
| Lower end of band to mask out. | |
| fmax_hz : typing.Union[torch.Tensor, np.ndarray, float] | |
| Upper end of band to mask out. | |
| val : float, optional | |
| Value to fill in, by default 0.0 | |
| Returns | |
| ------- | |
| AudioSignal | |
| Signal with ``stft_data`` manipulated. Apply ``.istft()`` to get the | |
| masked audio data. | |
| """ | |
| # SpecAug | |
| mag, phase = self.magnitude, self.phase | |
| fmin_hz = util.ensure_tensor(fmin_hz, ndim=mag.ndim) | |
| fmax_hz = util.ensure_tensor(fmax_hz, ndim=mag.ndim) | |
| assert torch.all(fmin_hz < fmax_hz) | |
| # build mask | |
| nbins = mag.shape[-2] | |
| bins_hz = torch.linspace(0, self.sample_rate / 2, nbins, device=self.device) | |
| bins_hz = bins_hz[None, None, :, None].repeat( | |
| self.batch_size, 1, 1, mag.shape[-1] | |
| ) | |
| mask = (fmin_hz <= bins_hz) & (bins_hz < fmax_hz) | |
| mask = mask.to(self.device) | |
| mag = mag.masked_fill(mask, val) | |
| phase = phase.masked_fill(mask, val) | |
| self.stft_data = mag * torch.exp(1j * phase) | |
| return self | |
| def mask_timesteps( | |
| self, | |
| tmin_s: typing.Union[torch.Tensor, np.ndarray, float], | |
| tmax_s: typing.Union[torch.Tensor, np.ndarray, float], | |
| val: float = 0.0, | |
| ): | |
| """Masks timesteps between ``tmin_s`` and ``tmax_s``, and fills them | |
| with the value specified by ``val``. Useful for implementing SpecAug. | |
| The min and max can be different for every item in the batch. | |
| Parameters | |
| ---------- | |
| tmin_s : typing.Union[torch.Tensor, np.ndarray, float] | |
| Lower end of timesteps to mask out. | |
| tmax_s : typing.Union[torch.Tensor, np.ndarray, float] | |
| Upper end of timesteps to mask out. | |
| val : float, optional | |
| Value to fill in, by default 0.0 | |
| Returns | |
| ------- | |
| AudioSignal | |
| Signal with ``stft_data`` manipulated. Apply ``.istft()`` to get the | |
| masked audio data. | |
| """ | |
| # SpecAug | |
| mag, phase = self.magnitude, self.phase | |
| tmin_s = util.ensure_tensor(tmin_s, ndim=mag.ndim) | |
| tmax_s = util.ensure_tensor(tmax_s, ndim=mag.ndim) | |
| assert torch.all(tmin_s < tmax_s) | |
| # build mask | |
| nt = mag.shape[-1] | |
| bins_t = torch.linspace(0, self.signal_duration, nt, device=self.device) | |
| bins_t = bins_t[None, None, None, :].repeat( | |
| self.batch_size, 1, mag.shape[-2], 1 | |
| ) | |
| mask = (tmin_s <= bins_t) & (bins_t < tmax_s) | |
| mag = mag.masked_fill(mask, val) | |
| phase = phase.masked_fill(mask, val) | |
| self.stft_data = mag * torch.exp(1j * phase) | |
| return self | |
| def mask_low_magnitudes( | |
| self, db_cutoff: typing.Union[torch.Tensor, np.ndarray, float], val: float = 0.0 | |
| ): | |
| """Mask away magnitudes below a specified threshold, which | |
| can be different for every item in the batch. | |
| Parameters | |
| ---------- | |
| db_cutoff : typing.Union[torch.Tensor, np.ndarray, float] | |
| Decibel value for which things below it will be masked away. | |
| val : float, optional | |
| Value to fill in for masked portions, by default 0.0 | |
| Returns | |
| ------- | |
| AudioSignal | |
| Signal with ``stft_data`` manipulated. Apply ``.istft()`` to get the | |
| masked audio data. | |
| """ | |
| mag = self.magnitude | |
| log_mag = self.log_magnitude() | |
| db_cutoff = util.ensure_tensor(db_cutoff, ndim=mag.ndim) | |
| mask = log_mag < db_cutoff | |
| mag = mag.masked_fill(mask, val) | |
| self.magnitude = mag | |
| return self | |
| def shift_phase(self, shift: typing.Union[torch.Tensor, np.ndarray, float]): | |
| """Shifts the phase by a constant value. | |
| Parameters | |
| ---------- | |
| shift : typing.Union[torch.Tensor, np.ndarray, float] | |
| What to shift the phase by. | |
| Returns | |
| ------- | |
| AudioSignal | |
| Signal with ``stft_data`` manipulated. Apply ``.istft()`` to get the | |
| masked audio data. | |
| """ | |
| shift = util.ensure_tensor(shift, ndim=self.phase.ndim) | |
| self.phase = self.phase + shift | |
| return self | |
| def corrupt_phase(self, scale: typing.Union[torch.Tensor, np.ndarray, float]): | |
| """Corrupts the phase randomly by some scaled value. | |
| Parameters | |
| ---------- | |
| scale : typing.Union[torch.Tensor, np.ndarray, float] | |
| Standard deviation of noise to add to the phase. | |
| Returns | |
| ------- | |
| AudioSignal | |
| Signal with ``stft_data`` manipulated. Apply ``.istft()`` to get the | |
| masked audio data. | |
| """ | |
| scale = util.ensure_tensor(scale, ndim=self.phase.ndim) | |
| self.phase = self.phase + scale * torch.randn_like(self.phase) | |
| return self | |
| def preemphasis(self, coef: float = 0.85): | |
| """Applies pre-emphasis to audio signal. | |
| Parameters | |
| ---------- | |
| coef : float, optional | |
| How much pre-emphasis to apply, lower values do less. 0 does nothing. | |
| by default 0.85 | |
| Returns | |
| ------- | |
| AudioSignal | |
| Pre-emphasized signal. | |
| """ | |
| kernel = torch.tensor([1, -coef, 0]).view(1, 1, -1).to(self.device) | |
| x = self.audio_data.reshape(-1, 1, self.signal_length) | |
| x = torch.nn.functional.conv1d(x, kernel, padding=1) | |
| self.audio_data = x.reshape(*self.audio_data.shape) | |
| return self | |