Add GPU temperature monitor Python script for WLED integration
This commit is contained in:
parent
286c2345f5
commit
5f3bd9ae8e
310
usermods/GPU_Fan_Controller/gpu_temp_monitor.py
Normal file
310
usermods/GPU_Fan_Controller/gpu_temp_monitor.py
Normal file
@ -0,0 +1,310 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
GPU Temperature Monitor for WLED GPU Fan Controller Usermod
|
||||
|
||||
This script monitors GPU temperature and sends it to the WLED
|
||||
GPU Fan Controller usermod via the JSON API.
|
||||
|
||||
Supports:
|
||||
- NVIDIA GPUs (via nvidia-ml-py / pynvml)
|
||||
- AMD/Intel/NVIDIA GPUs (via gpustat)
|
||||
|
||||
Usage:
|
||||
python gpu_temp_monitor.py --wled-ip 192.168.1.100
|
||||
python gpu_temp_monitor.py --wled-ip 192.168.1.100 --gpu-type nvidia --interval 2
|
||||
python gpu_temp_monitor.py --test
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import sys
|
||||
import time
|
||||
import signal
|
||||
import json
|
||||
from typing import Optional, Callable
|
||||
|
||||
# Try to import requests
|
||||
try:
|
||||
import requests
|
||||
except ImportError:
|
||||
print("Error: 'requests' module not found. Install with: pip install requests")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
class GPUMonitor:
|
||||
"""Base class for GPU monitoring"""
|
||||
|
||||
def get_temperature(self, gpu_index: int = 0) -> Optional[float]:
|
||||
raise NotImplementedError
|
||||
|
||||
def get_name(self) -> str:
|
||||
raise NotImplementedError
|
||||
|
||||
def cleanup(self):
|
||||
pass
|
||||
|
||||
|
||||
class NVMLMonitor(GPUMonitor):
|
||||
"""NVIDIA GPU monitor using NVML (nvidia-ml-py)"""
|
||||
|
||||
def __init__(self):
|
||||
try:
|
||||
import pynvml
|
||||
pynvml.nvmlInit()
|
||||
self.pynvml = pynvml
|
||||
self._initialized = True
|
||||
except ImportError:
|
||||
raise ImportError("nvidia-ml-py not installed. Install with: pip install nvidia-ml-py")
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Failed to initialize NVML: {e}")
|
||||
|
||||
def get_temperature(self, gpu_index: int = 0) -> Optional[float]:
|
||||
try:
|
||||
handle = self.pynvml.nvmlDeviceGetHandleByIndex(gpu_index)
|
||||
temp = self.pynvml.nvmlDeviceGetTemperature(handle, self.pynvml.NVML_TEMPERATURE_GPU)
|
||||
return float(temp)
|
||||
except Exception as e:
|
||||
print(f"Error reading NVIDIA GPU temperature: {e}")
|
||||
return None
|
||||
|
||||
def get_name(self) -> str:
|
||||
return "NVIDIA (NVML)"
|
||||
|
||||
def cleanup(self):
|
||||
if hasattr(self, '_initialized') and self._initialized:
|
||||
try:
|
||||
self.pynvml.nvmlShutdown()
|
||||
except:
|
||||
pass
|
||||
|
||||
|
||||
class GPUStatMonitor(GPUMonitor):
|
||||
"""Multi-vendor GPU monitor using gpustat"""
|
||||
|
||||
def __init__(self):
|
||||
try:
|
||||
import gpustat
|
||||
self.gpustat = gpustat
|
||||
except ImportError:
|
||||
raise ImportError("gpustat not installed. Install with: pip install gpustat")
|
||||
|
||||
def get_temperature(self, gpu_index: int = 0) -> Optional[float]:
|
||||
try:
|
||||
stats = self.gpustat.GPUStatCollection.new_query()
|
||||
if gpu_index < len(stats.gpus):
|
||||
return float(stats.gpus[gpu_index].temperature)
|
||||
else:
|
||||
print(f"GPU index {gpu_index} not found. Available GPUs: {len(stats.gpus)}")
|
||||
return None
|
||||
except Exception as e:
|
||||
print(f"Error reading GPU temperature via gpustat: {e}")
|
||||
return None
|
||||
|
||||
def get_name(self) -> str:
|
||||
return "gpustat (multi-vendor)"
|
||||
|
||||
|
||||
def create_monitor(gpu_type: str = "auto") -> GPUMonitor:
|
||||
"""Create appropriate GPU monitor based on type or auto-detection"""
|
||||
|
||||
if gpu_type == "nvidia":
|
||||
return NVMLMonitor()
|
||||
elif gpu_type == "gpustat":
|
||||
return GPUStatMonitor()
|
||||
elif gpu_type == "auto":
|
||||
# Try NVML first, then gpustat
|
||||
try:
|
||||
monitor = NVMLMonitor()
|
||||
# Test if it works
|
||||
if monitor.get_temperature(0) is not None:
|
||||
return monitor
|
||||
except:
|
||||
pass
|
||||
|
||||
try:
|
||||
monitor = GPUStatMonitor()
|
||||
if monitor.get_temperature(0) is not None:
|
||||
return monitor
|
||||
except:
|
||||
pass
|
||||
|
||||
raise RuntimeError("No GPU monitoring method available. Install nvidia-ml-py or gpustat.")
|
||||
else:
|
||||
raise ValueError(f"Unknown GPU type: {gpu_type}")
|
||||
|
||||
|
||||
def send_temperature_to_wled(wled_ip: str, temperature: float, timeout: float = 5.0) -> bool:
|
||||
"""Send temperature to WLED via JSON API"""
|
||||
|
||||
url = f"http://{wled_ip}/json/state"
|
||||
payload = {
|
||||
"GPU-Fan": {
|
||||
"temperature": temperature
|
||||
}
|
||||
}
|
||||
|
||||
try:
|
||||
response = requests.post(url, json=payload, timeout=timeout)
|
||||
return response.status_code == 200
|
||||
except requests.exceptions.RequestException as e:
|
||||
print(f"Error sending temperature to WLED: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def get_wled_fan_status(wled_ip: str, timeout: float = 5.0) -> Optional[dict]:
|
||||
"""Get current fan status from WLED"""
|
||||
|
||||
url = f"http://{wled_ip}/json/info"
|
||||
|
||||
try:
|
||||
response = requests.get(url, timeout=timeout)
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
return data.get("u", {})
|
||||
return None
|
||||
except requests.exceptions.RequestException as e:
|
||||
print(f"Error getting WLED status: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def test_gpu_monitoring():
|
||||
"""Test GPU temperature monitoring"""
|
||||
|
||||
print("Testing GPU temperature monitoring...\n")
|
||||
|
||||
# Test NVML
|
||||
print("Testing NVIDIA (NVML)...")
|
||||
try:
|
||||
monitor = NVMLMonitor()
|
||||
temp = monitor.get_temperature(0)
|
||||
if temp is not None:
|
||||
print(f" ✓ NVML working - Temperature: {temp}°C")
|
||||
else:
|
||||
print(" ✗ NVML initialized but couldn't read temperature")
|
||||
monitor.cleanup()
|
||||
except ImportError as e:
|
||||
print(f" ✗ Not available: {e}")
|
||||
except Exception as e:
|
||||
print(f" ✗ Error: {e}")
|
||||
|
||||
print()
|
||||
|
||||
# Test gpustat
|
||||
print("Testing gpustat...")
|
||||
try:
|
||||
monitor = GPUStatMonitor()
|
||||
temp = monitor.get_temperature(0)
|
||||
if temp is not None:
|
||||
print(f" ✓ gpustat working - Temperature: {temp}°C")
|
||||
else:
|
||||
print(" ✗ gpustat initialized but couldn't read temperature")
|
||||
except ImportError as e:
|
||||
print(f" ✗ Not available: {e}")
|
||||
except Exception as e:
|
||||
print(f" ✗ Error: {e}")
|
||||
|
||||
print()
|
||||
|
||||
# Test auto-detection
|
||||
print("Testing auto-detection...")
|
||||
try:
|
||||
monitor = create_monitor("auto")
|
||||
temp = monitor.get_temperature(0)
|
||||
print(f" ✓ Auto-detection successful using {monitor.get_name()}")
|
||||
print(f" Current temperature: {temp}°C")
|
||||
monitor.cleanup()
|
||||
except Exception as e:
|
||||
print(f" ✗ Auto-detection failed: {e}")
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="GPU Temperature Monitor for WLED GPU Fan Controller",
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
epilog="""
|
||||
Examples:
|
||||
%(prog)s --wled-ip 192.168.1.100
|
||||
%(prog)s --wled-ip 192.168.1.100 --gpu-type nvidia --interval 2
|
||||
%(prog)s --test
|
||||
"""
|
||||
)
|
||||
|
||||
parser.add_argument("--wled-ip", help="IP address of WLED device")
|
||||
parser.add_argument("--gpu-type", choices=["auto", "nvidia", "gpustat"],
|
||||
default="auto", help="GPU monitoring method (default: auto)")
|
||||
parser.add_argument("--gpu-index", type=int, default=0,
|
||||
help="GPU index for multi-GPU systems (default: 0)")
|
||||
parser.add_argument("--interval", type=float, default=2.0,
|
||||
help="Update interval in seconds (default: 2.0)")
|
||||
parser.add_argument("--test", action="store_true",
|
||||
help="Test GPU monitoring capabilities")
|
||||
parser.add_argument("--quiet", action="store_true",
|
||||
help="Suppress normal output (only show errors)")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.test:
|
||||
test_gpu_monitoring()
|
||||
return
|
||||
|
||||
if not args.wled_ip:
|
||||
parser.error("--wled-ip is required (unless using --test)")
|
||||
|
||||
# Setup signal handlers for graceful shutdown
|
||||
running = True
|
||||
|
||||
def signal_handler(sig, frame):
|
||||
nonlocal running
|
||||
print("\nShutting down...")
|
||||
running = False
|
||||
|
||||
signal.signal(signal.SIGINT, signal_handler)
|
||||
signal.signal(signal.SIGTERM, signal_handler)
|
||||
|
||||
# Create monitor
|
||||
try:
|
||||
monitor = create_monitor(args.gpu_type)
|
||||
if not args.quiet:
|
||||
print(f"GPU Monitor started using {monitor.get_name()}")
|
||||
print(f"Sending to WLED at {args.wled_ip}")
|
||||
print(f"Update interval: {args.interval}s")
|
||||
print("-" * 40)
|
||||
except Exception as e:
|
||||
print(f"Error: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
# Main loop
|
||||
error_count = 0
|
||||
max_errors = 5
|
||||
|
||||
try:
|
||||
while running:
|
||||
temp = monitor.get_temperature(args.gpu_index)
|
||||
|
||||
if temp is not None:
|
||||
success = send_temperature_to_wled(args.wled_ip, temp)
|
||||
|
||||
if success:
|
||||
error_count = 0
|
||||
if not args.quiet:
|
||||
print(f"Temperature: {temp:.1f}°C - Sent OK")
|
||||
else:
|
||||
error_count += 1
|
||||
print(f"Failed to send temperature (error {error_count}/{max_errors})")
|
||||
else:
|
||||
error_count += 1
|
||||
print(f"Failed to read GPU temperature (error {error_count}/{max_errors})")
|
||||
|
||||
if error_count >= max_errors:
|
||||
print(f"Too many consecutive errors ({max_errors}). Exiting.")
|
||||
break
|
||||
|
||||
time.sleep(args.interval)
|
||||
|
||||
finally:
|
||||
monitor.cleanup()
|
||||
if not args.quiet:
|
||||
print("GPU Monitor stopped")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
Reference in New Issue
Block a user