| """ |
| Audio Preprocessing Module for Multilingual Audio Intelligence System |
| |
| This module handles the standardization of diverse audio inputs into a consistent |
| format suitable for downstream ML models. It supports various audio formats |
| (wav, mp3, ogg, flac), sample rates (8k-48k), bit depths (4-32 bits), and |
| handles SNR variations as specified in PS-6 requirements. |
| |
| Key Features: |
| - Format conversion and standardization |
| - Intelligent resampling to 16kHz |
| - Stereo to mono conversion |
| - Volume normalization for SNR robustness |
| - Memory-efficient processing |
| - Robust error handling |
| |
| Dependencies: pydub, librosa, numpy |
| System Dependencies: ffmpeg (for format conversion) |
| """ |
|
|
| import os |
| import logging |
| import numpy as np |
| import librosa |
| from pydub import AudioSegment |
| from pydub.utils import which |
| from typing import Tuple, Optional, Union, Dict, Any |
| import tempfile |
| import warnings |
| import time |
| from pathlib import Path |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| |
| warnings.filterwarnings("ignore", category=UserWarning, module="librosa") |
|
|
|
|
| class AudioProcessor: |
| """ |
| Enhanced Audio Processor with Smart File Management and Hybrid Translation Support |
| |
| This class combines the original working functionality with new enhancements: |
| - Original: 16kHz sample rate, mono conversion, normalization |
| - NEW: Smart file analysis, chunking strategies, Indian language support |
| - NEW: Integration with 3-tier hybrid translation system |
| - NEW: Memory-efficient processing for large files |
| """ |
| |
| def __init__(self, target_sample_rate: int = 16000, model_size: str = "small", |
| enable_translation: bool = True, max_file_duration_minutes: int = 60, |
| max_file_size_mb: int = 200): |
| """ |
| Initialize Enhanced AudioProcessor with both original and new capabilities. |
| |
| Args: |
| target_sample_rate (int): Target sample rate in Hz (default: 16kHz) |
| model_size (str): Whisper model size for transcription |
| enable_translation (bool): Enable translation capabilities |
| max_file_duration_minutes (int): Maximum file duration for processing |
| max_file_size_mb (int): Maximum file size for processing |
| """ |
| |
| self.target_sample_rate = target_sample_rate |
| self.supported_formats = ['.wav', '.mp3', '.ogg', '.flac', '.m4a', '.aac'] |
| |
| |
| self.model_size = model_size |
| self.enable_translation = enable_translation |
| self.max_file_duration = max_file_duration_minutes |
| self.max_file_size = max_file_size_mb |
| |
| |
| self.whisper_model = None |
| self.processing_stats = { |
| 'files_processed': 0, |
| 'total_processing_time': 0.0, |
| 'chunks_processed': 0, |
| 'languages_detected': set() |
| } |
| |
| |
| if not which("ffmpeg"): |
| logger.warning("ffmpeg not found. Some format conversions may fail.") |
| |
| logger.info(f"β
Enhanced AudioProcessor initialized") |
| logger.info(f" Model: {model_size}, Translation: {enable_translation}") |
| logger.info(f" Limits: {max_file_duration_minutes}min, {max_file_size_mb}MB") |
| |
| def process_audio(self, audio_input: Union[str, bytes, np.ndarray], |
| input_sample_rate: Optional[int] = None) -> Tuple[np.ndarray, int]: |
| """ |
| Main processing function that standardizes any audio input. |
| |
| Args: |
| audio_input: Can be file path (str), audio bytes, or numpy array |
| input_sample_rate: Required if audio_input is numpy array |
| |
| Returns: |
| Tuple[np.ndarray, int]: (processed_audio_array, sample_rate) |
| |
| Raises: |
| ValueError: If input format is unsupported or invalid |
| FileNotFoundError: If audio file doesn't exist |
| Exception: For processing errors |
| """ |
| try: |
| |
| if isinstance(audio_input, str): |
| |
| audio_array, original_sr = self._load_from_file(audio_input) |
| elif isinstance(audio_input, bytes): |
| |
| audio_array, original_sr = self._load_from_bytes(audio_input) |
| elif isinstance(audio_input, np.ndarray): |
| |
| if input_sample_rate is None: |
| raise ValueError("input_sample_rate must be provided for numpy array input") |
| audio_array = audio_input.astype(np.float32) |
| original_sr = input_sample_rate |
| else: |
| raise ValueError(f"Unsupported input type: {type(audio_input)}") |
| |
| logger.info(f"Loaded audio: {audio_array.shape}, {original_sr}Hz") |
| |
| |
| processed_audio = self._preprocess_pipeline(audio_array, original_sr) |
| |
| logger.info(f"Processed audio: {processed_audio.shape}, {self.target_sample_rate}Hz") |
| |
| return processed_audio, self.target_sample_rate |
| |
| except Exception as e: |
| logger.error(f"Audio processing failed: {str(e)}") |
| raise |
| |
| def _load_from_file(self, file_path: str) -> Tuple[np.ndarray, int]: |
| """Load audio from file path.""" |
| if not os.path.exists(file_path): |
| raise FileNotFoundError(f"Audio file not found: {file_path}") |
| |
| file_ext = os.path.splitext(file_path)[1].lower() |
| if file_ext not in self.supported_formats: |
| raise ValueError(f"Unsupported format {file_ext}. Supported: {self.supported_formats}") |
| |
| try: |
| |
| audio_array, sample_rate = librosa.load(file_path, sr=None, mono=False) |
| return audio_array, sample_rate |
| except Exception as e: |
| |
| logger.warning(f"librosa failed, trying pydub: {e}") |
| return self._load_with_pydub(file_path) |
| |
| def _load_from_bytes(self, audio_bytes: bytes) -> Tuple[np.ndarray, int]: |
| """Load audio from bytes (e.g., uploaded file).""" |
| |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.audio') as tmp_file: |
| tmp_file.write(audio_bytes) |
| tmp_path = tmp_file.name |
| |
| try: |
| |
| audio_array, sample_rate = self._load_with_pydub(tmp_path) |
| return audio_array, sample_rate |
| finally: |
| |
| try: |
| os.unlink(tmp_path) |
| except OSError: |
| pass |
| |
| def _load_with_pydub(self, file_path: str) -> Tuple[np.ndarray, int]: |
| """Load audio using pydub with format detection.""" |
| try: |
| |
| audio_segment = AudioSegment.from_file(file_path) |
| |
| |
| samples = np.array(audio_segment.get_array_of_samples(), dtype=np.float32) |
| |
| |
| if audio_segment.channels == 2: |
| samples = samples.reshape((-1, 2)) |
| |
| |
| samples = samples / (2**15) |
| |
| return samples, audio_segment.frame_rate |
| |
| except Exception as e: |
| raise Exception(f"Failed to load audio with pydub: {str(e)}") |
| |
| def _preprocess_pipeline(self, audio_array: np.ndarray, original_sr: int) -> np.ndarray: |
| """ |
| Apply the complete preprocessing pipeline. |
| |
| Pipeline steps: |
| 1. Convert stereo to mono |
| 2. Resample to target sample rate |
| 3. Normalize amplitude |
| 4. Apply basic noise reduction (optional) |
| """ |
| |
| if len(audio_array.shape) > 1 and audio_array.shape[0] == 2: |
| |
| audio_array = np.mean(audio_array, axis=0) |
| elif len(audio_array.shape) > 1 and audio_array.shape[1] == 2: |
| |
| audio_array = np.mean(audio_array, axis=1) |
| |
| |
| audio_array = audio_array.flatten() |
| |
| logger.debug(f"After mono conversion: {audio_array.shape}") |
| |
| |
| if original_sr != self.target_sample_rate: |
| audio_array = librosa.resample( |
| audio_array, |
| orig_sr=original_sr, |
| target_sr=self.target_sample_rate, |
| res_type='kaiser_best' |
| ) |
| logger.debug(f"Resampled from {original_sr}Hz to {self.target_sample_rate}Hz") |
| |
| |
| audio_array = self._normalize_audio(audio_array) |
| |
| |
| audio_array = self._apply_preprocessing_filters(audio_array) |
| |
| return audio_array.astype(np.float32) |
| |
| def _normalize_audio(self, audio_array: np.ndarray) -> np.ndarray: |
| """ |
| Normalize audio amplitude to handle varying SNR conditions. |
| |
| Uses RMS-based normalization for better handling of varying |
| signal-to-noise ratios (-5dB to 20dB as per PS-6 requirements). |
| """ |
| |
| rms = np.sqrt(np.mean(audio_array**2)) |
| |
| if rms > 0: |
| |
| target_rms = 0.1 |
| normalization_factor = target_rms / rms |
| |
| |
| normalized = audio_array * normalization_factor |
| normalized = np.clip(normalized, -1.0, 1.0) |
| |
| logger.debug(f"RMS normalization: {rms:.4f} -> {target_rms:.4f}") |
| return normalized |
| |
| return audio_array |
| |
| def _apply_preprocessing_filters(self, audio_array: np.ndarray) -> np.ndarray: |
| """ |
| Apply basic preprocessing filters for improved robustness. |
| |
| Includes: |
| - DC offset removal |
| - Light high-pass filtering (removes very low frequencies) |
| """ |
| |
| audio_array = audio_array - np.mean(audio_array) |
| |
| |
| |
| try: |
| from scipy.signal import butter, filtfilt |
| |
| |
| nyquist = self.target_sample_rate / 2 |
| cutoff = 80 / nyquist |
| |
| if cutoff < 1.0: |
| b, a = butter(N=1, Wn=cutoff, btype='high') |
| audio_array = filtfilt(b, a, audio_array) |
| logger.debug("Applied high-pass filter (80Hz cutoff)") |
| |
| except ImportError: |
| logger.debug("scipy not available, skipping high-pass filter") |
| except Exception as e: |
| logger.debug(f"High-pass filter failed: {e}") |
| |
| return audio_array |
| |
| def get_audio_info(self, audio_input: Union[str, bytes]) -> dict: |
| """ |
| Get detailed information about audio file without full processing. |
| |
| Returns: |
| dict: Audio metadata including duration, sample rate, channels, etc. |
| """ |
| try: |
| if isinstance(audio_input, str): |
| |
| if not os.path.exists(audio_input): |
| raise FileNotFoundError(f"Audio file not found: {audio_input}") |
| audio_segment = AudioSegment.from_file(audio_input) |
| else: |
| |
| with tempfile.NamedTemporaryFile(delete=False) as tmp_file: |
| tmp_file.write(audio_input) |
| tmp_path = tmp_file.name |
| |
| try: |
| audio_segment = AudioSegment.from_file(tmp_path) |
| finally: |
| try: |
| os.unlink(tmp_path) |
| except OSError: |
| pass |
| |
| return { |
| 'duration_seconds': len(audio_segment) / 1000.0, |
| 'sample_rate': audio_segment.frame_rate, |
| 'channels': audio_segment.channels, |
| 'sample_width': audio_segment.sample_width, |
| 'frame_count': audio_segment.frame_count(), |
| 'max_possible_amplitude': audio_segment.max_possible_amplitude |
| } |
| |
| except Exception as e: |
| logger.error(f"Failed to get audio info: {e}") |
| return {} |
| |
| |
| |
| def analyze_audio_file(self, file_path: str) -> 'AudioInfo': |
| """ |
| NEW: Analyze audio file and return comprehensive information. |
| This supports our smart file management for large files. |
| """ |
| try: |
| from dataclasses import dataclass |
| |
| @dataclass |
| class AudioInfo: |
| file_path: str |
| duration_seconds: float |
| size_mb: float |
| sample_rate: int |
| channels: int |
| format: str |
| |
| @property |
| def duration_minutes(self) -> float: |
| return self.duration_seconds / 60.0 |
| |
| @property |
| def is_large_file(self) -> bool: |
| return self.duration_minutes > 30 or self.size_mb > 100 |
| |
| info = self.get_audio_info(file_path) |
| file_size = os.path.getsize(file_path) / (1024 * 1024) |
| |
| return AudioInfo( |
| file_path=file_path, |
| duration_seconds=info.get('duration_seconds', 0), |
| size_mb=file_size, |
| sample_rate=info.get('sample_rate', 0), |
| channels=info.get('channels', 0), |
| format=Path(file_path).suffix.lower() |
| ) |
| |
| except Exception as e: |
| logger.error(f"Failed to analyze audio file: {e}") |
| raise |
| |
| def get_processing_recommendation(self, audio_info) -> Dict[str, Any]: |
| """ |
| NEW: Get smart processing recommendation based on file characteristics. |
| Helps handle large files efficiently for competition requirements. |
| """ |
| if audio_info.duration_minutes > 60 or audio_info.size_mb > 200: |
| return { |
| 'strategy': 'chunk_33_percent', |
| 'reason': 'Very large file - process 33% to avoid API limits', |
| 'chunk_size': 0.33, |
| 'warning': 'File is very large. Processing only 33% to prevent timeouts.' |
| } |
| elif audio_info.duration_minutes > 30 or audio_info.size_mb > 100: |
| return { |
| 'strategy': 'chunk_50_percent', |
| 'reason': 'Large file - process 50% for efficiency', |
| 'chunk_size': 0.50, |
| 'warning': 'File is large. Processing 50% for optimal performance.' |
| } |
| else: |
| return { |
| 'strategy': 'process_full', |
| 'reason': 'Normal sized file - full processing', |
| 'chunk_size': 1.0, |
| 'warning': None |
| } |
| |
| def process_audio_file(self, file_path: str, enable_translation: bool = True) -> Dict[str, Any]: |
| """ |
| NEW: Enhanced audio file processing with smart management. |
| This integrates all our new features while maintaining compatibility. |
| """ |
| start_time = time.time() |
| |
| try: |
| logger.info(f"π΅ Processing audio file: {Path(file_path).name}") |
| |
| |
| audio_info = self.analyze_audio_file(file_path) |
| recommendation = self.get_processing_recommendation(audio_info) |
| |
| logger.info(f"π File Analysis:") |
| logger.info(f" Duration: {audio_info.duration_minutes:.1f} minutes") |
| logger.info(f" Size: {audio_info.size_mb:.1f} MB") |
| logger.info(f" Strategy: {recommendation['strategy']}") |
| |
| |
| processed_audio, sample_rate = self.process_audio(file_path) |
| |
| |
| if recommendation['chunk_size'] < 1.0: |
| chunk_size = int(len(processed_audio) * recommendation['chunk_size']) |
| processed_audio = processed_audio[:chunk_size] |
| logger.info(f"π Applied {recommendation['strategy']}: using {recommendation['chunk_size']*100}% of audio") |
| |
| |
| self.processing_stats['files_processed'] += 1 |
| self.processing_stats['total_processing_time'] += time.time() - start_time |
| |
| |
| return { |
| 'processed_audio': processed_audio, |
| 'sample_rate': sample_rate, |
| 'audio_info': audio_info, |
| 'recommendation': recommendation, |
| 'processing_time': time.time() - start_time, |
| 'status': 'success' |
| } |
| |
| except Exception as e: |
| logger.error(f"β Audio processing failed: {e}") |
| return { |
| 'error': str(e), |
| 'processing_time': time.time() - start_time, |
| 'status': 'error' |
| } |
| |
| def get_processing_stats(self) -> Dict[str, Any]: |
| """ |
| NEW: Get comprehensive processing statistics for monitoring. |
| """ |
| return { |
| 'files_processed': self.processing_stats['files_processed'], |
| 'total_processing_time': self.processing_stats['total_processing_time'], |
| 'average_processing_time': ( |
| self.processing_stats['total_processing_time'] / max(1, self.processing_stats['files_processed']) |
| ), |
| 'chunks_processed': self.processing_stats['chunks_processed'], |
| 'languages_detected': list(self.processing_stats['languages_detected']), |
| 'supported_formats': self.supported_formats, |
| 'model_size': self.model_size, |
| 'translation_enabled': self.enable_translation |
| } |
| |
| def clear_cache(self): |
| """ |
| NEW: Clear caches and reset statistics. |
| """ |
| self.processing_stats = { |
| 'files_processed': 0, |
| 'total_processing_time': 0.0, |
| 'chunks_processed': 0, |
| 'languages_detected': set() |
| } |
| logger.info("π§Ή AudioProcessor cache cleared") |
|
|
|
|
| |
| def validate_audio_file(file_path: str) -> bool: |
| """ |
| Quick validation of audio file without full loading. |
| |
| Args: |
| file_path (str): Path to audio file |
| |
| Returns: |
| bool: True if file appears to be valid audio |
| """ |
| try: |
| processor = AudioProcessor() |
| info = processor.get_audio_info(file_path) |
| return info.get('duration_seconds', 0) > 0 |
| except Exception: |
| return False |
|
|
|
|
| def estimate_processing_time(file_path: str) -> float: |
| """ |
| Estimate processing time based on audio duration. |
| |
| Args: |
| file_path (str): Path to audio file |
| |
| Returns: |
| float: Estimated processing time in seconds |
| """ |
| try: |
| processor = AudioProcessor() |
| info = processor.get_audio_info(file_path) |
| duration = info.get('duration_seconds', 0) |
| |
| |
| |
| estimated_time = duration * 0.2 |
| return max(estimated_time, 1.0) |
| except Exception: |
| return 10.0 |
|
|
|
|
| if __name__ == "__main__": |
| |
| processor = AudioProcessor() |
| |
| |
| test_files = ["sample.wav", "sample.mp3", "test_audio.flac"] |
| |
| for test_file in test_files: |
| if os.path.exists(test_file): |
| try: |
| print(f"\nTesting {test_file}:") |
| |
| |
| info = processor.get_audio_info(test_file) |
| print(f"Info: {info}") |
| |
| |
| audio, sr = processor.process_audio(test_file) |
| print(f"Processed: shape={audio.shape}, sr={sr}") |
| |
| |
| is_valid = validate_audio_file(test_file) |
| print(f"Valid: {is_valid}") |
| |
| except Exception as e: |
| print(f"Error processing {test_file}: {e}") |