|
""" |
|
Audio Preprocessing Module for Multilingual Audio Intelligence System |
|
|
|
This module handles the standardization of diverse audio inputs into a consistent |
|
format suitable for downstream ML models. It supports various audio formats |
|
(wav, mp3, ogg, flac), sample rates (8k-48k), bit depths (4-32 bits), and |
|
handles SNR variations as specified in PS-6 requirements. |
|
|
|
Key Features: |
|
- Format conversion and standardization |
|
- Intelligent resampling to 16kHz |
|
- Stereo to mono conversion |
|
- Volume normalization for SNR robustness |
|
- Memory-efficient processing |
|
- Robust error handling |
|
|
|
Dependencies: pydub, librosa, numpy |
|
System Dependencies: ffmpeg (for format conversion) |
|
""" |
|
|
|
import os |
|
import logging |
|
import numpy as np |
|
import librosa |
|
from pydub import AudioSegment |
|
from pydub.utils import which |
|
from typing import Tuple, Optional, Union, Dict, Any |
|
import tempfile |
|
import warnings |
|
import time |
|
from pathlib import Path |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
warnings.filterwarnings("ignore", category=UserWarning, module="librosa") |
|
|
|
|
|
class AudioProcessor: |
|
""" |
|
Enhanced Audio Processor with Smart File Management and Hybrid Translation Support |
|
|
|
This class combines the original working functionality with new enhancements: |
|
- Original: 16kHz sample rate, mono conversion, normalization |
|
- NEW: Smart file analysis, chunking strategies, Indian language support |
|
- NEW: Integration with 3-tier hybrid translation system |
|
- NEW: Memory-efficient processing for large files |
|
""" |
|
|
|
def __init__(self, target_sample_rate: int = 16000, model_size: str = "small", |
|
enable_translation: bool = True, max_file_duration_minutes: int = 60, |
|
max_file_size_mb: int = 200): |
|
""" |
|
Initialize Enhanced AudioProcessor with both original and new capabilities. |
|
|
|
Args: |
|
target_sample_rate (int): Target sample rate in Hz (default: 16kHz) |
|
model_size (str): Whisper model size for transcription |
|
enable_translation (bool): Enable translation capabilities |
|
max_file_duration_minutes (int): Maximum file duration for processing |
|
max_file_size_mb (int): Maximum file size for processing |
|
""" |
|
|
|
self.target_sample_rate = target_sample_rate |
|
self.supported_formats = ['.wav', '.mp3', '.ogg', '.flac', '.m4a', '.aac'] |
|
|
|
|
|
self.model_size = model_size |
|
self.enable_translation = enable_translation |
|
self.max_file_duration = max_file_duration_minutes |
|
self.max_file_size = max_file_size_mb |
|
|
|
|
|
self.whisper_model = None |
|
self.processing_stats = { |
|
'files_processed': 0, |
|
'total_processing_time': 0.0, |
|
'chunks_processed': 0, |
|
'languages_detected': set() |
|
} |
|
|
|
|
|
if not which("ffmpeg"): |
|
logger.warning("ffmpeg not found. Some format conversions may fail.") |
|
|
|
logger.info(f"✅ Enhanced AudioProcessor initialized") |
|
logger.info(f" Model: {model_size}, Translation: {enable_translation}") |
|
logger.info(f" Limits: {max_file_duration_minutes}min, {max_file_size_mb}MB") |
|
|
|
def process_audio(self, audio_input: Union[str, bytes, np.ndarray], |
|
input_sample_rate: Optional[int] = None) -> Tuple[np.ndarray, int]: |
|
""" |
|
Main processing function that standardizes any audio input. |
|
|
|
Args: |
|
audio_input: Can be file path (str), audio bytes, or numpy array |
|
input_sample_rate: Required if audio_input is numpy array |
|
|
|
Returns: |
|
Tuple[np.ndarray, int]: (processed_audio_array, sample_rate) |
|
|
|
Raises: |
|
ValueError: If input format is unsupported or invalid |
|
FileNotFoundError: If audio file doesn't exist |
|
Exception: For processing errors |
|
""" |
|
try: |
|
|
|
if isinstance(audio_input, str): |
|
|
|
audio_array, original_sr = self._load_from_file(audio_input) |
|
elif isinstance(audio_input, bytes): |
|
|
|
audio_array, original_sr = self._load_from_bytes(audio_input) |
|
elif isinstance(audio_input, np.ndarray): |
|
|
|
if input_sample_rate is None: |
|
raise ValueError("input_sample_rate must be provided for numpy array input") |
|
audio_array = audio_input.astype(np.float32) |
|
original_sr = input_sample_rate |
|
else: |
|
raise ValueError(f"Unsupported input type: {type(audio_input)}") |
|
|
|
logger.info(f"Loaded audio: {audio_array.shape}, {original_sr}Hz") |
|
|
|
|
|
processed_audio = self._preprocess_pipeline(audio_array, original_sr) |
|
|
|
logger.info(f"Processed audio: {processed_audio.shape}, {self.target_sample_rate}Hz") |
|
|
|
return processed_audio, self.target_sample_rate |
|
|
|
except Exception as e: |
|
logger.error(f"Audio processing failed: {str(e)}") |
|
raise |
|
|
|
def _load_from_file(self, file_path: str) -> Tuple[np.ndarray, int]: |
|
"""Load audio from file path.""" |
|
if not os.path.exists(file_path): |
|
raise FileNotFoundError(f"Audio file not found: {file_path}") |
|
|
|
file_ext = os.path.splitext(file_path)[1].lower() |
|
if file_ext not in self.supported_formats: |
|
raise ValueError(f"Unsupported format {file_ext}. Supported: {self.supported_formats}") |
|
|
|
try: |
|
|
|
audio_array, sample_rate = librosa.load(file_path, sr=None, mono=False) |
|
return audio_array, sample_rate |
|
except Exception as e: |
|
|
|
logger.warning(f"librosa failed, trying pydub: {e}") |
|
return self._load_with_pydub(file_path) |
|
|
|
def _load_from_bytes(self, audio_bytes: bytes) -> Tuple[np.ndarray, int]: |
|
"""Load audio from bytes (e.g., uploaded file).""" |
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.audio') as tmp_file: |
|
tmp_file.write(audio_bytes) |
|
tmp_path = tmp_file.name |
|
|
|
try: |
|
|
|
audio_array, sample_rate = self._load_with_pydub(tmp_path) |
|
return audio_array, sample_rate |
|
finally: |
|
|
|
try: |
|
os.unlink(tmp_path) |
|
except OSError: |
|
pass |
|
|
|
def _load_with_pydub(self, file_path: str) -> Tuple[np.ndarray, int]: |
|
"""Load audio using pydub with format detection.""" |
|
try: |
|
|
|
audio_segment = AudioSegment.from_file(file_path) |
|
|
|
|
|
samples = np.array(audio_segment.get_array_of_samples(), dtype=np.float32) |
|
|
|
|
|
if audio_segment.channels == 2: |
|
samples = samples.reshape((-1, 2)) |
|
|
|
|
|
samples = samples / (2**15) |
|
|
|
return samples, audio_segment.frame_rate |
|
|
|
except Exception as e: |
|
raise Exception(f"Failed to load audio with pydub: {str(e)}") |
|
|
|
def _preprocess_pipeline(self, audio_array: np.ndarray, original_sr: int) -> np.ndarray: |
|
""" |
|
Apply the complete preprocessing pipeline. |
|
|
|
Pipeline steps: |
|
1. Convert stereo to mono |
|
2. Resample to target sample rate |
|
3. Normalize amplitude |
|
4. Apply basic noise reduction (optional) |
|
""" |
|
|
|
if len(audio_array.shape) > 1 and audio_array.shape[0] == 2: |
|
|
|
audio_array = np.mean(audio_array, axis=0) |
|
elif len(audio_array.shape) > 1 and audio_array.shape[1] == 2: |
|
|
|
audio_array = np.mean(audio_array, axis=1) |
|
|
|
|
|
audio_array = audio_array.flatten() |
|
|
|
logger.debug(f"After mono conversion: {audio_array.shape}") |
|
|
|
|
|
if original_sr != self.target_sample_rate: |
|
audio_array = librosa.resample( |
|
audio_array, |
|
orig_sr=original_sr, |
|
target_sr=self.target_sample_rate, |
|
res_type='kaiser_best' |
|
) |
|
logger.debug(f"Resampled from {original_sr}Hz to {self.target_sample_rate}Hz") |
|
|
|
|
|
audio_array = self._normalize_audio(audio_array) |
|
|
|
|
|
audio_array = self._apply_preprocessing_filters(audio_array) |
|
|
|
return audio_array.astype(np.float32) |
|
|
|
def _normalize_audio(self, audio_array: np.ndarray) -> np.ndarray: |
|
""" |
|
Normalize audio amplitude to handle varying SNR conditions. |
|
|
|
Uses RMS-based normalization for better handling of varying |
|
signal-to-noise ratios (-5dB to 20dB as per PS-6 requirements). |
|
""" |
|
|
|
rms = np.sqrt(np.mean(audio_array**2)) |
|
|
|
if rms > 0: |
|
|
|
target_rms = 0.1 |
|
normalization_factor = target_rms / rms |
|
|
|
|
|
normalized = audio_array * normalization_factor |
|
normalized = np.clip(normalized, -1.0, 1.0) |
|
|
|
logger.debug(f"RMS normalization: {rms:.4f} -> {target_rms:.4f}") |
|
return normalized |
|
|
|
return audio_array |
|
|
|
def _apply_preprocessing_filters(self, audio_array: np.ndarray) -> np.ndarray: |
|
""" |
|
Apply basic preprocessing filters for improved robustness. |
|
|
|
Includes: |
|
- DC offset removal |
|
- Light high-pass filtering (removes very low frequencies) |
|
""" |
|
|
|
audio_array = audio_array - np.mean(audio_array) |
|
|
|
|
|
|
|
try: |
|
from scipy.signal import butter, filtfilt |
|
|
|
|
|
nyquist = self.target_sample_rate / 2 |
|
cutoff = 80 / nyquist |
|
|
|
if cutoff < 1.0: |
|
b, a = butter(N=1, Wn=cutoff, btype='high') |
|
audio_array = filtfilt(b, a, audio_array) |
|
logger.debug("Applied high-pass filter (80Hz cutoff)") |
|
|
|
except ImportError: |
|
logger.debug("scipy not available, skipping high-pass filter") |
|
except Exception as e: |
|
logger.debug(f"High-pass filter failed: {e}") |
|
|
|
return audio_array |
|
|
|
def get_audio_info(self, audio_input: Union[str, bytes]) -> dict: |
|
""" |
|
Get detailed information about audio file without full processing. |
|
|
|
Returns: |
|
dict: Audio metadata including duration, sample rate, channels, etc. |
|
""" |
|
try: |
|
if isinstance(audio_input, str): |
|
|
|
if not os.path.exists(audio_input): |
|
raise FileNotFoundError(f"Audio file not found: {audio_input}") |
|
audio_segment = AudioSegment.from_file(audio_input) |
|
else: |
|
|
|
with tempfile.NamedTemporaryFile(delete=False) as tmp_file: |
|
tmp_file.write(audio_input) |
|
tmp_path = tmp_file.name |
|
|
|
try: |
|
audio_segment = AudioSegment.from_file(tmp_path) |
|
finally: |
|
try: |
|
os.unlink(tmp_path) |
|
except OSError: |
|
pass |
|
|
|
return { |
|
'duration_seconds': len(audio_segment) / 1000.0, |
|
'sample_rate': audio_segment.frame_rate, |
|
'channels': audio_segment.channels, |
|
'sample_width': audio_segment.sample_width, |
|
'frame_count': audio_segment.frame_count(), |
|
'max_possible_amplitude': audio_segment.max_possible_amplitude |
|
} |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to get audio info: {e}") |
|
return {} |
|
|
|
|
|
|
|
def analyze_audio_file(self, file_path: str) -> 'AudioInfo': |
|
""" |
|
NEW: Analyze audio file and return comprehensive information. |
|
This supports our smart file management for large files. |
|
""" |
|
try: |
|
from dataclasses import dataclass |
|
|
|
@dataclass |
|
class AudioInfo: |
|
file_path: str |
|
duration_seconds: float |
|
size_mb: float |
|
sample_rate: int |
|
channels: int |
|
format: str |
|
|
|
@property |
|
def duration_minutes(self) -> float: |
|
return self.duration_seconds / 60.0 |
|
|
|
@property |
|
def is_large_file(self) -> bool: |
|
return self.duration_minutes > 30 or self.size_mb > 100 |
|
|
|
info = self.get_audio_info(file_path) |
|
file_size = os.path.getsize(file_path) / (1024 * 1024) |
|
|
|
return AudioInfo( |
|
file_path=file_path, |
|
duration_seconds=info.get('duration_seconds', 0), |
|
size_mb=file_size, |
|
sample_rate=info.get('sample_rate', 0), |
|
channels=info.get('channels', 0), |
|
format=Path(file_path).suffix.lower() |
|
) |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to analyze audio file: {e}") |
|
raise |
|
|
|
def get_processing_recommendation(self, audio_info) -> Dict[str, Any]: |
|
""" |
|
NEW: Get smart processing recommendation based on file characteristics. |
|
Helps handle large files efficiently for competition requirements. |
|
""" |
|
if audio_info.duration_minutes > 60 or audio_info.size_mb > 200: |
|
return { |
|
'strategy': 'chunk_33_percent', |
|
'reason': 'Very large file - process 33% to avoid API limits', |
|
'chunk_size': 0.33, |
|
'warning': 'File is very large. Processing only 33% to prevent timeouts.' |
|
} |
|
elif audio_info.duration_minutes > 30 or audio_info.size_mb > 100: |
|
return { |
|
'strategy': 'chunk_50_percent', |
|
'reason': 'Large file - process 50% for efficiency', |
|
'chunk_size': 0.50, |
|
'warning': 'File is large. Processing 50% for optimal performance.' |
|
} |
|
else: |
|
return { |
|
'strategy': 'process_full', |
|
'reason': 'Normal sized file - full processing', |
|
'chunk_size': 1.0, |
|
'warning': None |
|
} |
|
|
|
def process_audio_file(self, file_path: str, enable_translation: bool = True) -> Dict[str, Any]: |
|
""" |
|
NEW: Enhanced audio file processing with smart management. |
|
This integrates all our new features while maintaining compatibility. |
|
""" |
|
start_time = time.time() |
|
|
|
try: |
|
logger.info(f"🎵 Processing audio file: {Path(file_path).name}") |
|
|
|
|
|
audio_info = self.analyze_audio_file(file_path) |
|
recommendation = self.get_processing_recommendation(audio_info) |
|
|
|
logger.info(f"📊 File Analysis:") |
|
logger.info(f" Duration: {audio_info.duration_minutes:.1f} minutes") |
|
logger.info(f" Size: {audio_info.size_mb:.1f} MB") |
|
logger.info(f" Strategy: {recommendation['strategy']}") |
|
|
|
|
|
processed_audio, sample_rate = self.process_audio(file_path) |
|
|
|
|
|
if recommendation['chunk_size'] < 1.0: |
|
chunk_size = int(len(processed_audio) * recommendation['chunk_size']) |
|
processed_audio = processed_audio[:chunk_size] |
|
logger.info(f"📏 Applied {recommendation['strategy']}: using {recommendation['chunk_size']*100}% of audio") |
|
|
|
|
|
self.processing_stats['files_processed'] += 1 |
|
self.processing_stats['total_processing_time'] += time.time() - start_time |
|
|
|
|
|
return { |
|
'processed_audio': processed_audio, |
|
'sample_rate': sample_rate, |
|
'audio_info': audio_info, |
|
'recommendation': recommendation, |
|
'processing_time': time.time() - start_time, |
|
'status': 'success' |
|
} |
|
|
|
except Exception as e: |
|
logger.error(f"❌ Audio processing failed: {e}") |
|
return { |
|
'error': str(e), |
|
'processing_time': time.time() - start_time, |
|
'status': 'error' |
|
} |
|
|
|
def get_processing_stats(self) -> Dict[str, Any]: |
|
""" |
|
NEW: Get comprehensive processing statistics for monitoring. |
|
""" |
|
return { |
|
'files_processed': self.processing_stats['files_processed'], |
|
'total_processing_time': self.processing_stats['total_processing_time'], |
|
'average_processing_time': ( |
|
self.processing_stats['total_processing_time'] / max(1, self.processing_stats['files_processed']) |
|
), |
|
'chunks_processed': self.processing_stats['chunks_processed'], |
|
'languages_detected': list(self.processing_stats['languages_detected']), |
|
'supported_formats': self.supported_formats, |
|
'model_size': self.model_size, |
|
'translation_enabled': self.enable_translation |
|
} |
|
|
|
def clear_cache(self): |
|
""" |
|
NEW: Clear caches and reset statistics. |
|
""" |
|
self.processing_stats = { |
|
'files_processed': 0, |
|
'total_processing_time': 0.0, |
|
'chunks_processed': 0, |
|
'languages_detected': set() |
|
} |
|
logger.info("🧹 AudioProcessor cache cleared") |
|
|
|
|
|
|
|
def validate_audio_file(file_path: str) -> bool: |
|
""" |
|
Quick validation of audio file without full loading. |
|
|
|
Args: |
|
file_path (str): Path to audio file |
|
|
|
Returns: |
|
bool: True if file appears to be valid audio |
|
""" |
|
try: |
|
processor = AudioProcessor() |
|
info = processor.get_audio_info(file_path) |
|
return info.get('duration_seconds', 0) > 0 |
|
except Exception: |
|
return False |
|
|
|
|
|
def estimate_processing_time(file_path: str) -> float: |
|
""" |
|
Estimate processing time based on audio duration. |
|
|
|
Args: |
|
file_path (str): Path to audio file |
|
|
|
Returns: |
|
float: Estimated processing time in seconds |
|
""" |
|
try: |
|
processor = AudioProcessor() |
|
info = processor.get_audio_info(file_path) |
|
duration = info.get('duration_seconds', 0) |
|
|
|
|
|
|
|
estimated_time = duration * 0.2 |
|
return max(estimated_time, 1.0) |
|
except Exception: |
|
return 10.0 |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
processor = AudioProcessor() |
|
|
|
|
|
test_files = ["sample.wav", "sample.mp3", "test_audio.flac"] |
|
|
|
for test_file in test_files: |
|
if os.path.exists(test_file): |
|
try: |
|
print(f"\nTesting {test_file}:") |
|
|
|
|
|
info = processor.get_audio_info(test_file) |
|
print(f"Info: {info}") |
|
|
|
|
|
audio, sr = processor.process_audio(test_file) |
|
print(f"Processed: shape={audio.shape}, sr={sr}") |
|
|
|
|
|
is_valid = validate_audio_file(test_file) |
|
print(f"Valid: {is_valid}") |
|
|
|
except Exception as e: |
|
print(f"Error processing {test_file}: {e}") |