pdf_explainer / api /audio_concatenator.py
spagestic's picture
api updated
d1c4aa1
raw
history blame
7.9 kB
"""Audio processing utilities for the TTS API."""
import re
from typing import List
class AudioConcatenator:
"""Server-side audio concatenation with GPU acceleration."""
def __init__(self, silence_duration: float = 0.5, fade_duration: float = 0.1):
"""
Initialize the audio concatenator.
Args:
silence_duration: Duration of silence between chunks (seconds)
fade_duration: Duration of fade in/out effects (seconds)
"""
self.silence_duration = silence_duration
self.fade_duration = fade_duration
def concatenate_audio_chunks(self, audio_chunks: List, sample_rate: int):
"""
Concatenate multiple audio chunks into a single audio file.
Args:
audio_chunks: List of audio arrays
sample_rate: Sample rate for the audio
Returns:
Concatenated audio array
"""
if not audio_chunks:
raise ValueError("No audio chunks to concatenate")
if len(audio_chunks) == 1:
# Handle single chunk case
audio = audio_chunks[0]
if isinstance(audio, tuple):
return audio[0] # Extract audio data from tuple
return audio
import numpy as np
import torch
# Normalize and prepare audio data
normalized_chunks = []
for i, audio_data in enumerate(audio_chunks):
print(f"Processing chunk {i}: type={type(audio_data)}")
# Handle tuple format (common from TTS models)
if isinstance(audio_data, tuple):
audio_data = audio_data[0] # Extract audio array from tuple
print(f" Extracted from tuple: type={type(audio_data)}")
# Convert torch tensor to numpy if needed
if hasattr(audio_data, 'cpu'): # It's a torch tensor
audio_data = audio_data.cpu().numpy()
print(f" Converted from torch: shape={audio_data.shape}")
# Convert to numpy array if needed
if not isinstance(audio_data, np.ndarray):
audio_data = np.array(audio_data)
print(f" Final shape before processing: {audio_data.shape}")
# Handle different audio shapes
if audio_data.ndim == 1:
# Already 1D, perfect
normalized_audio = audio_data
elif audio_data.ndim == 2:
# Handle 2D audio - could be (channels, samples) or (samples, channels)
if audio_data.shape[0] < audio_data.shape[1]:
# Likely (channels, samples) - take first channel
normalized_audio = audio_data[0, :]
print(f" Used first channel from (C, L) format: {normalized_audio.shape}")
else:
# Likely (samples, channels) - take first channel
normalized_audio = audio_data[:, 0]
print(f" Used first channel from (L, C) format: {normalized_audio.shape}")
else:
# Flatten higher dimensional arrays
normalized_audio = audio_data.flatten()
print(f" Flattened {audio_data.ndim}D array: {normalized_audio.shape}")
# Ensure we have valid audio data
if len(normalized_audio) == 0:
print(f" Warning: Empty audio chunk {i}")
continue
print(f" Chunk {i} final length: {len(normalized_audio)} samples ({len(normalized_audio)/sample_rate:.2f}s)")
# Normalize audio levels
normalized_audio = self._normalize_audio(normalized_audio)
# Apply fade effects
normalized_audio = self._apply_fade_effects(normalized_audio, sample_rate)
normalized_chunks.append(normalized_audio)
if not normalized_chunks:
raise ValueError("No valid audio chunks after processing")
print(f"Successfully processed {len(normalized_chunks)} chunks")
# Create silence segments
silence_samples = int(self.silence_duration * sample_rate)
silence = np.zeros(silence_samples, dtype=np.float32)
print(f"Adding {silence_samples} silence samples ({self.silence_duration}s) between chunks")
# Concatenate all chunks with silence in between
concatenated_segments = []
total_audio_length = 0
for i, chunk in enumerate(normalized_chunks):
concatenated_segments.append(chunk)
total_audio_length += len(chunk)
print(f"Added chunk {i}: {len(chunk)} samples")
# Add silence between chunks (but not after the last chunk)
if i < len(normalized_chunks) - 1:
concatenated_segments.append(silence)
total_audio_length += len(silence)
print(f"Added silence: {len(silence)} samples")
# Combine all segments
final_audio = np.concatenate(concatenated_segments)
print(f"Final concatenated audio: {len(final_audio)} samples ({len(final_audio)/sample_rate:.2f}s)")
# Final normalization and cleanup
final_audio = self._normalize_audio(final_audio)
final_audio = self._remove_clicks_and_pops(final_audio)
return final_audio
def _normalize_audio(self, audio_data):
"""Normalize audio to prevent clipping."""
import numpy as np
# Convert to numpy array if it's not already
if not isinstance(audio_data, np.ndarray):
audio_data = np.array(audio_data)
# Ensure it's a 1D array
if audio_data.ndim > 1:
audio_data = audio_data.flatten()
# Find the maximum absolute value
max_val = np.max(np.abs(audio_data))
if max_val == 0:
return audio_data
# Normalize to 95% of maximum to leave some headroom
normalized = audio_data * (0.95 / max_val)
return normalized.astype(np.float32)
def _apply_fade_effects(self, audio_data, sample_rate: int):
"""Apply fade in and fade out effects to reduce pops and clicks."""
import numpy as np
fade_samples = int(self.fade_duration * sample_rate)
if len(audio_data) < 2 * fade_samples:
# If audio is too short for fade effects, return as-is
return audio_data
audio_with_fades = audio_data.copy()
# Apply fade in
fade_in = np.linspace(0, 1, fade_samples)
audio_with_fades[:fade_samples] *= fade_in
# Apply fade out
fade_out = np.linspace(1, 0, fade_samples)
audio_with_fades[-fade_samples:] *= fade_out
return audio_with_fades
def _remove_clicks_and_pops(self, audio_data):
"""Apply basic filtering to remove clicks and pops."""
try:
# Simple high-pass filter to remove DC offset and low-frequency artifacts
from scipy import signal
import numpy as np
# Design a high-pass filter (removes frequencies below 80 Hz)
# This helps remove some pops and clicks while preserving speech
nyquist = 22050 / 2 # Assuming common sample rate
low = 80 / nyquist
b, a = signal.butter(4, low, btype='high')
filtered_audio = signal.filtfilt(b, a, audio_data)
return filtered_audio.astype(np.float32)
except ImportError:
# If scipy is not available, return original audio
return audio_data