| """ |
| Utility Functions for Multilingual Audio Intelligence System |
| |
| This module provides common helper functions, data validation utilities, |
| performance monitoring, and error handling functionality used across |
| all components of the audio intelligence system. |
| |
| Key Features: |
| - File I/O utilities for audio and text files |
| - Data validation and type checking |
| - Performance monitoring and timing utilities |
| - Error handling and logging helpers |
| - Audio format detection and validation |
| - Memory management utilities |
| - Progress tracking for long-running operations |
| |
| Dependencies: pathlib, typing, functools, time |
| """ |
|
|
| import os |
| import sys |
| import time |
| import logging |
| import functools |
| from pathlib import Path |
| from typing import Union, Optional, Dict, List, Any, Callable, Tuple |
| import json |
| import hashlib |
| import tempfile |
| import psutil |
| from dataclasses import dataclass |
| from contextlib import contextmanager |
|
|
| |
| logger = logging.getLogger(__name__) |
|
|
|
|
| @dataclass |
| class PerformanceMetrics: |
| """Data class for tracking performance metrics.""" |
| operation_name: str |
| start_time: float |
| end_time: Optional[float] = None |
| duration: Optional[float] = None |
| memory_before: Optional[float] = None |
| memory_after: Optional[float] = None |
| memory_peak: Optional[float] = None |
| success: bool = True |
| error_message: Optional[str] = None |
| |
| def finalize(self, end_time: float, memory_after: float, |
| success: bool = True, error_message: Optional[str] = None): |
| """Finalize the metrics with end time and status.""" |
| self.end_time = end_time |
| self.duration = end_time - self.start_time |
| self.memory_after = memory_after |
| self.success = success |
| self.error_message = error_message |
| |
| def to_dict(self) -> Dict[str, Any]: |
| """Convert metrics to dictionary.""" |
| return { |
| 'operation_name': self.operation_name, |
| 'duration': self.duration, |
| 'memory_before_mb': self.memory_before, |
| 'memory_after_mb': self.memory_after, |
| 'memory_peak_mb': self.memory_peak, |
| 'success': self.success, |
| 'error_message': self.error_message |
| } |
|
|
|
|
| class ProgressTracker: |
| """Simple progress tracker for long-running operations.""" |
| |
| def __init__(self, total: int, description: str = "Processing"): |
| self.total = total |
| self.current = 0 |
| self.description = description |
| self.start_time = time.time() |
| |
| def update(self, increment: int = 1): |
| """Update progress by increment.""" |
| self.current += increment |
| if self.current <= self.total: |
| self._display_progress() |
| |
| def _display_progress(self): |
| """Display progress bar in console.""" |
| if self.total == 0: |
| return |
| |
| percent = (self.current / self.total) * 100 |
| elapsed = time.time() - self.start_time |
| |
| if self.current > 0: |
| eta = (elapsed / self.current) * (self.total - self.current) |
| eta_str = f" ETA: {format_duration(eta)}" |
| else: |
| eta_str = "" |
| |
| bar_length = 30 |
| filled_length = int(bar_length * self.current // self.total) |
| bar = 'β' * filled_length + '-' * (bar_length - filled_length) |
| |
| print(f'\r{self.description}: |{bar}| {percent:.1f}% ' |
| f'({self.current}/{self.total}){eta_str}', end='', flush=True) |
| |
| if self.current >= self.total: |
| print() |
| |
| def finish(self): |
| """Mark progress as complete.""" |
| self.current = self.total |
| self._display_progress() |
|
|
|
|
| |
| def ensure_directory(path: Union[str, Path]) -> Path: |
| """ |
| Ensure a directory exists, creating it if necessary. |
| |
| Args: |
| path: Directory path to create |
| |
| Returns: |
| Path: Path object of the created directory |
| """ |
| path = Path(path) |
| path.mkdir(parents=True, exist_ok=True) |
| return path |
|
|
|
|
| def get_file_info(file_path: Union[str, Path]) -> Dict[str, Any]: |
| """ |
| Get comprehensive file information. |
| |
| Args: |
| file_path: Path to file |
| |
| Returns: |
| Dict: File information including size, modification time, etc. |
| """ |
| file_path = Path(file_path) |
| |
| if not file_path.exists(): |
| return {'exists': False} |
| |
| stat = file_path.stat() |
| |
| return { |
| 'exists': True, |
| 'size_bytes': stat.st_size, |
| 'size_mb': stat.st_size / (1024 * 1024), |
| 'modified_time': stat.st_mtime, |
| 'is_file': file_path.is_file(), |
| 'is_directory': file_path.is_dir(), |
| 'extension': file_path.suffix.lower(), |
| 'name': file_path.name, |
| 'parent': str(file_path.parent) |
| } |
|
|
|
|
| def get_file_hash(file_path: Union[str, Path], algorithm: str = 'md5') -> str: |
| """ |
| Calculate hash of a file. |
| |
| Args: |
| file_path: Path to file |
| algorithm: Hash algorithm ('md5', 'sha1', 'sha256') |
| |
| Returns: |
| str: Hex digest of file hash |
| """ |
| file_path = Path(file_path) |
| hash_obj = hashlib.new(algorithm) |
| |
| with open(file_path, 'rb') as f: |
| for chunk in iter(lambda: f.read(4096), b""): |
| hash_obj.update(chunk) |
| |
| return hash_obj.hexdigest() |
|
|
|
|
| def safe_filename(filename: str) -> str: |
| """ |
| Create a safe filename by removing/replacing problematic characters. |
| |
| Args: |
| filename: Original filename |
| |
| Returns: |
| str: Safe filename |
| """ |
| |
| unsafe_chars = '<>:"/\\|?*' |
| safe_name = filename |
| |
| for char in unsafe_chars: |
| safe_name = safe_name.replace(char, '_') |
| |
| |
| while '__' in safe_name: |
| safe_name = safe_name.replace('__', '_') |
| |
| |
| safe_name = safe_name.strip('_.') |
| |
| |
| if not safe_name: |
| safe_name = 'unnamed_file' |
| |
| return safe_name |
|
|
|
|
| |
| def detect_audio_format(file_path: Union[str, Path]) -> Optional[str]: |
| """ |
| Detect audio format from file extension and header. |
| |
| Args: |
| file_path: Path to audio file |
| |
| Returns: |
| Optional[str]: Detected format ('wav', 'mp3', 'ogg', 'flac', etc.) |
| """ |
| file_path = Path(file_path) |
| |
| if not file_path.exists(): |
| return None |
| |
| |
| extension = file_path.suffix.lower() |
| extension_map = { |
| '.wav': 'wav', |
| '.mp3': 'mp3', |
| '.ogg': 'ogg', |
| '.flac': 'flac', |
| '.m4a': 'm4a', |
| '.aac': 'aac', |
| '.wma': 'wma' |
| } |
| |
| if extension in extension_map: |
| return extension_map[extension] |
| |
| |
| try: |
| with open(file_path, 'rb') as f: |
| header = f.read(12) |
| |
| |
| if header[:4] == b'RIFF' and header[8:12] == b'WAVE': |
| return 'wav' |
| |
| |
| if header[:3] == b'ID3' or header[:2] == b'\xFF\xFB': |
| return 'mp3' |
| |
| |
| if header[:4] == b'fLaC': |
| return 'flac' |
| |
| |
| if header[:4] == b'OggS': |
| return 'ogg' |
| |
| except Exception as e: |
| logger.warning(f"Failed to read file header: {e}") |
| |
| return None |
|
|
|
|
| def validate_audio_file(file_path: Union[str, Path]) -> Dict[str, Any]: |
| """ |
| Validate if file is a supported audio format. |
| |
| Args: |
| file_path: Path to audio file |
| |
| Returns: |
| Dict: Validation results with 'valid' boolean and details |
| """ |
| file_path = Path(file_path) |
| |
| result = { |
| 'valid': False, |
| 'format': None, |
| 'error': None, |
| 'file_info': get_file_info(file_path) |
| } |
| |
| if not file_path.exists(): |
| result['error'] = 'File does not exist' |
| return result |
| |
| if not file_path.is_file(): |
| result['error'] = 'Path is not a file' |
| return result |
| |
| if result['file_info']['size_bytes'] == 0: |
| result['error'] = 'File is empty' |
| return result |
| |
| |
| detected_format = detect_audio_format(file_path) |
| if not detected_format: |
| result['error'] = 'Unsupported or unrecognized audio format' |
| return result |
| |
| result['format'] = detected_format |
| result['valid'] = True |
| |
| return result |
|
|
|
|
| |
| def validate_time_range(start_time: float, end_time: float, |
| max_duration: Optional[float] = None) -> bool: |
| """ |
| Validate time range parameters. |
| |
| Args: |
| start_time: Start time in seconds |
| end_time: End time in seconds |
| max_duration: Optional maximum allowed duration |
| |
| Returns: |
| bool: True if valid time range |
| """ |
| if start_time < 0 or end_time < 0: |
| return False |
| |
| if start_time >= end_time: |
| return False |
| |
| if max_duration and (end_time - start_time) > max_duration: |
| return False |
| |
| return True |
|
|
|
|
| def validate_language_code(lang_code: str) -> bool: |
| """ |
| Validate ISO language code format. |
| |
| Args: |
| lang_code: Language code to validate |
| |
| Returns: |
| bool: True if valid format |
| """ |
| if not isinstance(lang_code, str): |
| return False |
| |
| lang_code = lang_code.lower().strip() |
| |
| |
| if len(lang_code) in [2, 3] and lang_code.isalpha(): |
| return True |
| |
| return False |
|
|
|
|
| def validate_confidence_score(score: float) -> bool: |
| """ |
| Validate confidence score is in valid range [0, 1]. |
| |
| Args: |
| score: Confidence score to validate |
| |
| Returns: |
| bool: True if valid confidence score |
| """ |
| return isinstance(score, (int, float)) and 0.0 <= score <= 1.0 |
|
|
|
|
| |
| @contextmanager |
| def performance_monitor(operation_name: str, |
| log_results: bool = True) -> PerformanceMetrics: |
| """ |
| Context manager for monitoring operation performance. |
| |
| Args: |
| operation_name: Name of the operation being monitored |
| log_results: Whether to log results automatically |
| |
| Yields: |
| PerformanceMetrics: Metrics object for the operation |
| |
| Example: |
| >>> with performance_monitor("audio_processing") as metrics: |
| >>> # Your code here |
| >>> pass |
| >>> print(f"Operation took {metrics.duration:.2f} seconds") |
| """ |
| |
| process = psutil.Process() |
| memory_before = process.memory_info().rss / (1024 * 1024) |
| |
| metrics = PerformanceMetrics( |
| operation_name=operation_name, |
| start_time=time.time(), |
| memory_before=memory_before |
| ) |
| |
| try: |
| yield metrics |
| |
| |
| memory_after = process.memory_info().rss / (1024 * 1024) |
| metrics.finalize( |
| end_time=time.time(), |
| memory_after=memory_after, |
| success=True |
| ) |
| |
| except Exception as e: |
| |
| memory_after = process.memory_info().rss / (1024 * 1024) |
| metrics.finalize( |
| end_time=time.time(), |
| memory_after=memory_after, |
| success=False, |
| error_message=str(e) |
| ) |
| |
| if log_results: |
| logger.error(f"Operation '{operation_name}' failed: {e}") |
| |
| raise |
| |
| finally: |
| if log_results and metrics.duration is not None: |
| if metrics.success: |
| logger.info(f"Operation '{operation_name}' completed in " |
| f"{metrics.duration:.2f}s") |
| |
| if metrics.memory_before and metrics.memory_after: |
| memory_change = metrics.memory_after - metrics.memory_before |
| if abs(memory_change) > 10: |
| logger.debug(f"Memory change: {memory_change:+.1f} MB") |
|
|
|
|
| def timing_decorator(func: Callable) -> Callable: |
| """ |
| Decorator to measure function execution time. |
| |
| Args: |
| func: Function to measure |
| |
| Returns: |
| Callable: Wrapped function that logs execution time |
| """ |
| @functools.wraps(func) |
| def wrapper(*args, **kwargs): |
| start_time = time.time() |
| try: |
| result = func(*args, **kwargs) |
| duration = time.time() - start_time |
| logger.debug(f"{func.__name__} executed in {duration:.3f}s") |
| return result |
| except Exception as e: |
| duration = time.time() - start_time |
| logger.error(f"{func.__name__} failed after {duration:.3f}s: {e}") |
| raise |
| |
| return wrapper |
|
|
|
|
| |
| def get_memory_usage() -> Dict[str, float]: |
| """ |
| Get current memory usage statistics. |
| |
| Returns: |
| Dict: Memory usage in MB |
| """ |
| process = psutil.Process() |
| memory_info = process.memory_info() |
| |
| return { |
| 'rss_mb': memory_info.rss / (1024 * 1024), |
| 'vms_mb': memory_info.vms / (1024 * 1024), |
| 'percent': process.memory_percent(), |
| 'system_total_mb': psutil.virtual_memory().total / (1024 * 1024), |
| 'system_available_mb': psutil.virtual_memory().available / (1024 * 1024) |
| } |
|
|
|
|
| def check_memory_available(required_mb: float, |
| safety_margin: float = 1.2) -> bool: |
| """ |
| Check if sufficient memory is available. |
| |
| Args: |
| required_mb: Required memory in MB |
| safety_margin: Safety margin multiplier |
| |
| Returns: |
| bool: True if sufficient memory available |
| """ |
| memory = get_memory_usage() |
| required_with_margin = required_mb * safety_margin |
| |
| return memory['system_available_mb'] >= required_with_margin |
|
|
|
|
| |
| def format_duration(seconds: float) -> str: |
| """ |
| Format duration in human-readable format. |
| |
| Args: |
| seconds: Duration in seconds |
| |
| Returns: |
| str: Formatted duration string |
| """ |
| if seconds < 1: |
| return f"{seconds*1000:.0f}ms" |
| elif seconds < 60: |
| return f"{seconds:.1f}s" |
| elif seconds < 3600: |
| minutes = int(seconds // 60) |
| secs = seconds % 60 |
| return f"{minutes}m {secs:.1f}s" |
| else: |
| hours = int(seconds // 3600) |
| minutes = int((seconds % 3600) // 60) |
| secs = seconds % 60 |
| return f"{hours}h {minutes}m {secs:.1f}s" |
|
|
|
|
| def format_file_size(size_bytes: int) -> str: |
| """ |
| Format file size in human-readable format. |
| |
| Args: |
| size_bytes: Size in bytes |
| |
| Returns: |
| str: Formatted size string |
| """ |
| for unit in ['B', 'KB', 'MB', 'GB', 'TB']: |
| if size_bytes < 1024.0: |
| return f"{size_bytes:.1f} {unit}" |
| size_bytes /= 1024.0 |
| return f"{size_bytes:.1f} PB" |
|
|
|
|
| def truncate_text(text: str, max_length: int = 100, |
| suffix: str = "...") -> str: |
| """ |
| Truncate text to specified length with suffix. |
| |
| Args: |
| text: Text to truncate |
| max_length: Maximum length |
| suffix: Suffix to add when truncated |
| |
| Returns: |
| str: Truncated text |
| """ |
| if len(text) <= max_length: |
| return text |
| |
| return text[:max_length - len(suffix)] + suffix |
|
|
|
|
| |
| def safe_execute(func: Callable, *args, default=None, |
| log_errors: bool = True, **kwargs): |
| """ |
| Safely execute a function with error handling. |
| |
| Args: |
| func: Function to execute |
| *args: Function arguments |
| default: Default value to return on error |
| log_errors: Whether to log errors |
| **kwargs: Function keyword arguments |
| |
| Returns: |
| Function result or default value on error |
| """ |
| try: |
| return func(*args, **kwargs) |
| except Exception as e: |
| if log_errors: |
| logger.error(f"Error in {func.__name__}: {e}") |
| return default |
|
|
|
|
| def retry_on_failure(max_attempts: int = 3, delay: float = 1.0, |
| exceptions: Tuple = (Exception,)): |
| """ |
| Decorator to retry function on failure. |
| |
| Args: |
| max_attempts: Maximum number of retry attempts |
| delay: Delay between attempts in seconds |
| exceptions: Tuple of exceptions to catch |
| |
| Returns: |
| Decorator function |
| """ |
| def decorator(func: Callable) -> Callable: |
| @functools.wraps(func) |
| def wrapper(*args, **kwargs): |
| last_exception = None |
| |
| for attempt in range(max_attempts): |
| try: |
| return func(*args, **kwargs) |
| except exceptions as e: |
| last_exception = e |
| if attempt < max_attempts - 1: |
| logger.warning(f"Attempt {attempt + 1} failed for " |
| f"{func.__name__}: {e}. Retrying in {delay}s...") |
| time.sleep(delay) |
| else: |
| logger.error(f"All {max_attempts} attempts failed for " |
| f"{func.__name__}") |
| |
| raise last_exception |
| |
| return wrapper |
| return decorator |
|
|
|
|
| |
| def load_config(config_path: Union[str, Path], |
| default_config: Optional[Dict] = None) -> Dict[str, Any]: |
| """ |
| Load configuration from JSON file with defaults. |
| |
| Args: |
| config_path: Path to configuration file |
| default_config: Default configuration values |
| |
| Returns: |
| Dict: Configuration dictionary |
| """ |
| config_path = Path(config_path) |
| config = default_config.copy() if default_config else {} |
| |
| if config_path.exists(): |
| try: |
| with open(config_path, 'r', encoding='utf-8') as f: |
| file_config = json.load(f) |
| config.update(file_config) |
| logger.info(f"Loaded configuration from {config_path}") |
| except Exception as e: |
| logger.error(f"Failed to load configuration from {config_path}: {e}") |
| else: |
| logger.warning(f"Configuration file {config_path} not found, using defaults") |
| |
| return config |
|
|
|
|
| def save_config(config: Dict[str, Any], |
| config_path: Union[str, Path]) -> bool: |
| """ |
| Save configuration to JSON file. |
| |
| Args: |
| config: Configuration dictionary |
| config_path: Path to save configuration |
| |
| Returns: |
| bool: True if saved successfully |
| """ |
| config_path = Path(config_path) |
| |
| try: |
| ensure_directory(config_path.parent) |
| |
| with open(config_path, 'w', encoding='utf-8') as f: |
| json.dump(config, f, indent=2, ensure_ascii=False) |
| |
| logger.info(f"Configuration saved to {config_path}") |
| return True |
| |
| except Exception as e: |
| logger.error(f"Failed to save configuration to {config_path}: {e}") |
| return False |
|
|
|
|
| |
| def get_system_info() -> Dict[str, Any]: |
| """ |
| Get comprehensive system information. |
| |
| Returns: |
| Dict: System information |
| """ |
| try: |
| import platform |
| import torch |
| |
| gpu_info = "Not available" |
| if torch.cuda.is_available(): |
| gpu_count = torch.cuda.device_count() |
| gpu_name = torch.cuda.get_device_name(0) if gpu_count > 0 else "Unknown" |
| gpu_memory = torch.cuda.get_device_properties(0).total_memory // (1024**3) if gpu_count > 0 else 0 |
| gpu_info = f"{gpu_count}x {gpu_name} ({gpu_memory}GB)" |
| |
| return { |
| 'platform': platform.platform(), |
| 'python_version': platform.python_version(), |
| 'cpu_count': psutil.cpu_count(logical=False), |
| 'cpu_count_logical': psutil.cpu_count(logical=True), |
| 'memory_total_gb': psutil.virtual_memory().total // (1024**3), |
| 'gpu_info': gpu_info, |
| 'torch_version': torch.__version__ if 'torch' in sys.modules else 'Not installed' |
| } |
| |
| except Exception as e: |
| logger.error(f"Failed to get system info: {e}") |
| return {'error': str(e)} |
|
|
|
|
| |
| class TempFileManager: |
| """Context manager for temporary file handling.""" |
| |
| def __init__(self, prefix: str = "audio_intel_", suffix: str = ".tmp"): |
| self.prefix = prefix |
| self.suffix = suffix |
| self.temp_files = [] |
| |
| def create_temp_file(self, suffix: Optional[str] = None) -> str: |
| """Create a temporary file and track it.""" |
| actual_suffix = suffix or self.suffix |
| temp_file = tempfile.NamedTemporaryFile( |
| prefix=self.prefix, |
| suffix=actual_suffix, |
| delete=False |
| ) |
| temp_path = temp_file.name |
| temp_file.close() |
| |
| self.temp_files.append(temp_path) |
| return temp_path |
| |
| def cleanup(self): |
| """Clean up all tracked temporary files.""" |
| for temp_path in self.temp_files: |
| try: |
| if os.path.exists(temp_path): |
| os.unlink(temp_path) |
| except Exception as e: |
| logger.warning(f"Failed to delete temp file {temp_path}: {e}") |
| |
| self.temp_files.clear() |
| |
| def __enter__(self): |
| return self |
| |
| def __exit__(self, exc_type, exc_val, exc_tb): |
| self.cleanup() |
|
|
|
|
| |
| if __name__ == "__main__": |
| import argparse |
| |
| def main(): |
| """Command line interface for testing utilities.""" |
| parser = argparse.ArgumentParser(description="Audio Intelligence Utilities") |
| parser.add_argument("--test", choices=["performance", "memory", "file", "all"], |
| default="all", help="Which utilities to test") |
| parser.add_argument("--verbose", "-v", action="store_true", |
| help="Enable verbose output") |
| |
| args = parser.parse_args() |
| |
| if args.verbose: |
| logging.getLogger().setLevel(logging.DEBUG) |
| |
| if args.test in ["performance", "all"]: |
| print("=== Testing Performance Monitoring ===") |
| |
| with performance_monitor("test_operation") as metrics: |
| |
| time.sleep(0.1) |
| data = [i**2 for i in range(1000)] |
| |
| print(f"Operation metrics: {metrics.to_dict()}") |
| print() |
| |
| if args.test in ["memory", "all"]: |
| print("=== Testing Memory Utilities ===") |
| |
| memory_info = get_memory_usage() |
| print(f"Current memory usage: {memory_info}") |
| |
| available = check_memory_available(100) |
| print(f"100MB memory available: {available}") |
| print() |
| |
| if args.test in ["file", "all"]: |
| print("=== Testing File Utilities ===") |
| |
| |
| with TempFileManager() as temp_manager: |
| temp_file = temp_manager.create_temp_file(suffix=".txt") |
| |
| |
| with open(temp_file, 'w') as f: |
| f.write("Test data for utilities") |
| |
| file_info = get_file_info(temp_file) |
| print(f"File info: {file_info}") |
| |
| file_hash = get_file_hash(temp_file) |
| print(f"File hash (MD5): {file_hash}") |
| |
| safe_name = safe_filename("Test <File> Name?.txt") |
| print(f"Safe filename: {safe_name}") |
| |
| print() |
| |
| if args.test == "all": |
| print("=== System Information ===") |
| system_info = get_system_info() |
| for key, value in system_info.items(): |
| print(f"{key}: {value}") |
| print() |
| |
| print("=== Format Utilities ===") |
| print(f"Duration format: {format_duration(3661.5)}") |
| print(f"File size format: {format_file_size(1536000)}") |
| print(f"Text truncation: {truncate_text('This is a very long text that should be truncated', 20)}") |
| |
| main() |