| | """ |
| | GPU Manager for SPARKNET |
| | Handles GPU allocation, monitoring, and resource management |
| | """ |
| |
|
| | import os |
| | import torch |
| | from typing import Optional, List, Dict |
| | from contextlib import contextmanager |
| | import pynvml |
| | from loguru import logger |
| |
|
| |
|
| | class GPUManager: |
| | """Manages GPU resources for model deployment and monitoring.""" |
| |
|
| | def __init__(self, primary_gpu: int = 0, fallback_gpus: Optional[List[int]] = None): |
| | """ |
| | Initialize GPU Manager. |
| | |
| | Args: |
| | primary_gpu: Primary GPU device ID (default: 0) |
| | fallback_gpus: List of fallback GPU IDs (default: [1, 2, 3]) |
| | """ |
| | self.primary_gpu = primary_gpu |
| | self.fallback_gpus = fallback_gpus or [1, 2, 3] |
| | self.initialized = False |
| |
|
| | |
| | try: |
| | pynvml.nvmlInit() |
| | self.initialized = True |
| | logger.info("GPU Manager initialized with NVML") |
| | except Exception as e: |
| | logger.warning(f"Failed to initialize NVML: {e}") |
| |
|
| | |
| | self.available_gpus = self._detect_gpus() |
| | logger.info(f"Detected {len(self.available_gpus)} GPUs: {self.available_gpus}") |
| |
|
| | def _detect_gpus(self) -> List[int]: |
| | """Detect available CUDA GPUs.""" |
| | if not torch.cuda.is_available(): |
| | logger.warning("CUDA not available!") |
| | return [] |
| |
|
| | gpu_count = torch.cuda.device_count() |
| | return list(range(gpu_count)) |
| |
|
| | def get_gpu_info(self, gpu_id: int) -> Dict[str, any]: |
| | """ |
| | Get detailed information about a GPU. |
| | |
| | Args: |
| | gpu_id: GPU device ID |
| | |
| | Returns: |
| | Dictionary with GPU information |
| | """ |
| | if not self.initialized: |
| | return {"error": "NVML not initialized"} |
| |
|
| | try: |
| | handle = pynvml.nvmlDeviceGetHandleByIndex(gpu_id) |
| | mem_info = pynvml.nvmlDeviceGetMemoryInfo(handle) |
| | utilization = pynvml.nvmlDeviceGetUtilizationRates(handle) |
| | temperature = pynvml.nvmlDeviceGetTemperature(handle, pynvml.NVML_TEMPERATURE_GPU) |
| | name = pynvml.nvmlDeviceGetName(handle) |
| |
|
| | return { |
| | "gpu_id": gpu_id, |
| | "name": name, |
| | "memory_total": mem_info.total, |
| | "memory_used": mem_info.used, |
| | "memory_free": mem_info.free, |
| | "memory_percent": (mem_info.used / mem_info.total) * 100, |
| | "gpu_utilization": utilization.gpu, |
| | "memory_utilization": utilization.memory, |
| | "temperature": temperature, |
| | } |
| | except Exception as e: |
| | logger.error(f"Error getting GPU {gpu_id} info: {e}") |
| | return {"error": str(e)} |
| |
|
| | def get_all_gpu_info(self) -> List[Dict[str, any]]: |
| | """Get information for all available GPUs.""" |
| | return [self.get_gpu_info(gpu_id) for gpu_id in self.available_gpus] |
| |
|
| | def get_free_memory(self, gpu_id: int) -> int: |
| | """ |
| | Get free memory on a GPU in bytes. |
| | |
| | Args: |
| | gpu_id: GPU device ID |
| | |
| | Returns: |
| | Free memory in bytes |
| | """ |
| | info = self.get_gpu_info(gpu_id) |
| | return info.get("memory_free", 0) |
| |
|
| | def select_best_gpu(self, min_memory_gb: float = 8.0) -> Optional[int]: |
| | """ |
| | Select the best available GPU based on free memory. |
| | |
| | Args: |
| | min_memory_gb: Minimum required free memory in GB |
| | |
| | Returns: |
| | GPU ID or None if no suitable GPU found |
| | """ |
| | min_memory_bytes = min_memory_gb * 1024 ** 3 |
| |
|
| | |
| | if self.primary_gpu in self.available_gpus: |
| | free_mem = self.get_free_memory(self.primary_gpu) |
| | if free_mem >= min_memory_bytes: |
| | logger.info(f"Selected primary GPU {self.primary_gpu} ({free_mem / 1024**3:.2f} GB free)") |
| | return self.primary_gpu |
| |
|
| | |
| | for gpu_id in self.fallback_gpus: |
| | if gpu_id in self.available_gpus: |
| | free_mem = self.get_free_memory(gpu_id) |
| | if free_mem >= min_memory_bytes: |
| | logger.info(f"Selected fallback GPU {gpu_id} ({free_mem / 1024**3:.2f} GB free)") |
| | return gpu_id |
| |
|
| | logger.warning(f"No GPU found with {min_memory_gb} GB free memory") |
| | return None |
| |
|
| | def set_device(self, gpu_id: int): |
| | """ |
| | Set the CUDA device. |
| | |
| | Args: |
| | gpu_id: GPU device ID |
| | """ |
| | if gpu_id not in self.available_gpus: |
| | raise ValueError(f"GPU {gpu_id} not available") |
| |
|
| | os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_id) |
| | torch.cuda.set_device(gpu_id) |
| | logger.info(f"Set CUDA device to GPU {gpu_id}") |
| |
|
| | @contextmanager |
| | def gpu_context(self, gpu_id: Optional[int] = None, min_memory_gb: float = 8.0): |
| | """ |
| | Context manager for GPU allocation. |
| | |
| | Args: |
| | gpu_id: Specific GPU ID or None for auto-selection |
| | min_memory_gb: Minimum required memory in GB |
| | |
| | Yields: |
| | GPU device ID |
| | """ |
| | |
| | if gpu_id is None: |
| | gpu_id = self.select_best_gpu(min_memory_gb) |
| | if gpu_id is None: |
| | raise RuntimeError("No suitable GPU available") |
| |
|
| | |
| | original_device = os.environ.get("CUDA_VISIBLE_DEVICES", "") |
| |
|
| | try: |
| | self.set_device(gpu_id) |
| | yield gpu_id |
| | finally: |
| | |
| | if original_device: |
| | os.environ["CUDA_VISIBLE_DEVICES"] = original_device |
| | |
| | if torch.cuda.is_available(): |
| | torch.cuda.empty_cache() |
| | logger.debug("Cleared CUDA cache") |
| |
|
| | def clear_cache(self, gpu_id: Optional[int] = None): |
| | """ |
| | Clear CUDA cache for a specific GPU or all GPUs. |
| | |
| | Args: |
| | gpu_id: GPU device ID or None for all GPUs |
| | """ |
| | if gpu_id is not None: |
| | with torch.cuda.device(gpu_id): |
| | torch.cuda.empty_cache() |
| | logger.info(f"Cleared cache for GPU {gpu_id}") |
| | else: |
| | torch.cuda.empty_cache() |
| | logger.info("Cleared cache for all GPUs") |
| |
|
| | def monitor(self) -> str: |
| | """ |
| | Get a formatted monitoring string for all GPUs. |
| | |
| | Returns: |
| | Formatted string with GPU status |
| | """ |
| | info_list = self.get_all_gpu_info() |
| |
|
| | lines = ["GPU Status:"] |
| | for info in info_list: |
| | if "error" in info: |
| | lines.append(f" GPU {info.get('gpu_id', '?')}: Error - {info['error']}") |
| | else: |
| | lines.append( |
| | f" GPU {info['gpu_id']}: {info['name']} | " |
| | f"Memory: {info['memory_used'] / 1024**3:.2f}/{info['memory_total'] / 1024**3:.2f} GB " |
| | f"({info['memory_percent']:.1f}%) | " |
| | f"Utilization: {info['gpu_utilization']}% | " |
| | f"Temp: {info['temperature']}°C" |
| | ) |
| |
|
| | return "\n".join(lines) |
| |
|
| | def __del__(self): |
| | """Cleanup NVML on deletion.""" |
| | if self.initialized: |
| | try: |
| | pynvml.nvmlShutdown() |
| | except Exception: |
| | pass |
| |
|
| |
|
| | |
| | _gpu_manager = None |
| |
|
| |
|
| | def get_gpu_manager() -> GPUManager: |
| | """Get or create the global GPU manager instance.""" |
| | global _gpu_manager |
| | if _gpu_manager is None: |
| | _gpu_manager = GPUManager() |
| | return _gpu_manager |
| |
|