diff --git a/usermods/GPU_Fan_Controller/gpu_temp_monitor.py b/usermods/GPU_Fan_Controller/gpu_temp_monitor.py new file mode 100644 index 00000000..6cb7f686 --- /dev/null +++ b/usermods/GPU_Fan_Controller/gpu_temp_monitor.py @@ -0,0 +1,310 @@ +#!/usr/bin/env python3 +""" +GPU Temperature Monitor for WLED GPU Fan Controller Usermod + +This script monitors GPU temperature and sends it to the WLED +GPU Fan Controller usermod via the JSON API. + +Supports: +- NVIDIA GPUs (via nvidia-ml-py / pynvml) +- AMD/Intel/NVIDIA GPUs (via gpustat) + +Usage: + python gpu_temp_monitor.py --wled-ip 192.168.1.100 + python gpu_temp_monitor.py --wled-ip 192.168.1.100 --gpu-type nvidia --interval 2 + python gpu_temp_monitor.py --test +""" + +import argparse +import sys +import time +import signal +import json +from typing import Optional, Callable + +# Try to import requests +try: + import requests +except ImportError: + print("Error: 'requests' module not found. Install with: pip install requests") + sys.exit(1) + + +class GPUMonitor: + """Base class for GPU monitoring""" + + def get_temperature(self, gpu_index: int = 0) -> Optional[float]: + raise NotImplementedError + + def get_name(self) -> str: + raise NotImplementedError + + def cleanup(self): + pass + + +class NVMLMonitor(GPUMonitor): + """NVIDIA GPU monitor using NVML (nvidia-ml-py)""" + + def __init__(self): + try: + import pynvml + pynvml.nvmlInit() + self.pynvml = pynvml + self._initialized = True + except ImportError: + raise ImportError("nvidia-ml-py not installed. Install with: pip install nvidia-ml-py") + except Exception as e: + raise RuntimeError(f"Failed to initialize NVML: {e}") + + def get_temperature(self, gpu_index: int = 0) -> Optional[float]: + try: + handle = self.pynvml.nvmlDeviceGetHandleByIndex(gpu_index) + temp = self.pynvml.nvmlDeviceGetTemperature(handle, self.pynvml.NVML_TEMPERATURE_GPU) + return float(temp) + except Exception as e: + print(f"Error reading NVIDIA GPU temperature: {e}") + return None + + def get_name(self) -> str: + return "NVIDIA (NVML)" + + def cleanup(self): + if hasattr(self, '_initialized') and self._initialized: + try: + self.pynvml.nvmlShutdown() + except: + pass + + +class GPUStatMonitor(GPUMonitor): + """Multi-vendor GPU monitor using gpustat""" + + def __init__(self): + try: + import gpustat + self.gpustat = gpustat + except ImportError: + raise ImportError("gpustat not installed. Install with: pip install gpustat") + + def get_temperature(self, gpu_index: int = 0) -> Optional[float]: + try: + stats = self.gpustat.GPUStatCollection.new_query() + if gpu_index < len(stats.gpus): + return float(stats.gpus[gpu_index].temperature) + else: + print(f"GPU index {gpu_index} not found. Available GPUs: {len(stats.gpus)}") + return None + except Exception as e: + print(f"Error reading GPU temperature via gpustat: {e}") + return None + + def get_name(self) -> str: + return "gpustat (multi-vendor)" + + +def create_monitor(gpu_type: str = "auto") -> GPUMonitor: + """Create appropriate GPU monitor based on type or auto-detection""" + + if gpu_type == "nvidia": + return NVMLMonitor() + elif gpu_type == "gpustat": + return GPUStatMonitor() + elif gpu_type == "auto": + # Try NVML first, then gpustat + try: + monitor = NVMLMonitor() + # Test if it works + if monitor.get_temperature(0) is not None: + return monitor + except: + pass + + try: + monitor = GPUStatMonitor() + if monitor.get_temperature(0) is not None: + return monitor + except: + pass + + raise RuntimeError("No GPU monitoring method available. Install nvidia-ml-py or gpustat.") + else: + raise ValueError(f"Unknown GPU type: {gpu_type}") + + +def send_temperature_to_wled(wled_ip: str, temperature: float, timeout: float = 5.0) -> bool: + """Send temperature to WLED via JSON API""" + + url = f"http://{wled_ip}/json/state" + payload = { + "GPU-Fan": { + "temperature": temperature + } + } + + try: + response = requests.post(url, json=payload, timeout=timeout) + return response.status_code == 200 + except requests.exceptions.RequestException as e: + print(f"Error sending temperature to WLED: {e}") + return False + + +def get_wled_fan_status(wled_ip: str, timeout: float = 5.0) -> Optional[dict]: + """Get current fan status from WLED""" + + url = f"http://{wled_ip}/json/info" + + try: + response = requests.get(url, timeout=timeout) + if response.status_code == 200: + data = response.json() + return data.get("u", {}) + return None + except requests.exceptions.RequestException as e: + print(f"Error getting WLED status: {e}") + return None + + +def test_gpu_monitoring(): + """Test GPU temperature monitoring""" + + print("Testing GPU temperature monitoring...\n") + + # Test NVML + print("Testing NVIDIA (NVML)...") + try: + monitor = NVMLMonitor() + temp = monitor.get_temperature(0) + if temp is not None: + print(f" ✓ NVML working - Temperature: {temp}°C") + else: + print(" ✗ NVML initialized but couldn't read temperature") + monitor.cleanup() + except ImportError as e: + print(f" ✗ Not available: {e}") + except Exception as e: + print(f" ✗ Error: {e}") + + print() + + # Test gpustat + print("Testing gpustat...") + try: + monitor = GPUStatMonitor() + temp = monitor.get_temperature(0) + if temp is not None: + print(f" ✓ gpustat working - Temperature: {temp}°C") + else: + print(" ✗ gpustat initialized but couldn't read temperature") + except ImportError as e: + print(f" ✗ Not available: {e}") + except Exception as e: + print(f" ✗ Error: {e}") + + print() + + # Test auto-detection + print("Testing auto-detection...") + try: + monitor = create_monitor("auto") + temp = monitor.get_temperature(0) + print(f" ✓ Auto-detection successful using {monitor.get_name()}") + print(f" Current temperature: {temp}°C") + monitor.cleanup() + except Exception as e: + print(f" ✗ Auto-detection failed: {e}") + + +def main(): + parser = argparse.ArgumentParser( + description="GPU Temperature Monitor for WLED GPU Fan Controller", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + %(prog)s --wled-ip 192.168.1.100 + %(prog)s --wled-ip 192.168.1.100 --gpu-type nvidia --interval 2 + %(prog)s --test + """ + ) + + parser.add_argument("--wled-ip", help="IP address of WLED device") + parser.add_argument("--gpu-type", choices=["auto", "nvidia", "gpustat"], + default="auto", help="GPU monitoring method (default: auto)") + parser.add_argument("--gpu-index", type=int, default=0, + help="GPU index for multi-GPU systems (default: 0)") + parser.add_argument("--interval", type=float, default=2.0, + help="Update interval in seconds (default: 2.0)") + parser.add_argument("--test", action="store_true", + help="Test GPU monitoring capabilities") + parser.add_argument("--quiet", action="store_true", + help="Suppress normal output (only show errors)") + + args = parser.parse_args() + + if args.test: + test_gpu_monitoring() + return + + if not args.wled_ip: + parser.error("--wled-ip is required (unless using --test)") + + # Setup signal handlers for graceful shutdown + running = True + + def signal_handler(sig, frame): + nonlocal running + print("\nShutting down...") + running = False + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + # Create monitor + try: + monitor = create_monitor(args.gpu_type) + if not args.quiet: + print(f"GPU Monitor started using {monitor.get_name()}") + print(f"Sending to WLED at {args.wled_ip}") + print(f"Update interval: {args.interval}s") + print("-" * 40) + except Exception as e: + print(f"Error: {e}") + sys.exit(1) + + # Main loop + error_count = 0 + max_errors = 5 + + try: + while running: + temp = monitor.get_temperature(args.gpu_index) + + if temp is not None: + success = send_temperature_to_wled(args.wled_ip, temp) + + if success: + error_count = 0 + if not args.quiet: + print(f"Temperature: {temp:.1f}°C - Sent OK") + else: + error_count += 1 + print(f"Failed to send temperature (error {error_count}/{max_errors})") + else: + error_count += 1 + print(f"Failed to read GPU temperature (error {error_count}/{max_errors})") + + if error_count >= max_errors: + print(f"Too many consecutive errors ({max_errors}). Exiting.") + break + + time.sleep(args.interval) + + finally: + monitor.cleanup() + if not args.quiet: + print("GPU Monitor stopped") + + +if __name__ == "__main__": + main()