#!/usr/bin/env python3 import json import os import re import ssl import subprocess import time import urllib.parse import urllib.request from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from pynvml import ( NVMLError, NVMLError_NotFound, NVMLError_NotSupported, nvmlDeviceGetComputeRunningProcesses_v3, nvmlDeviceGetCount, nvmlDeviceGetGraphicsRunningProcesses_v3, nvmlDeviceGetHandleByIndex, nvmlDeviceGetName, nvmlDeviceGetProcessUtilization, nvmlDeviceGetUUID, nvmlDeviceGetUtilizationRates, nvmlInit, ) NODE_NAME = os.environ.get("NODE_NAME", "") PORT = int(os.environ.get("NVIDIA_PROCESS_EXPORTER_PORT", "9401")) PROC_ROOT = os.environ.get("HOST_PROC", "/host/proc") SAMPLE_WINDOW_MS = int(os.environ.get("NVML_PROCESS_SAMPLE_WINDOW_MS", "30000")) POD_CACHE_TTL = int(os.environ.get("POD_CACHE_TTL_SECONDS", "30")) METRIC_CACHE_TTL = int(os.environ.get("METRIC_CACHE_TTL_SECONDS", "5")) TOKEN_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/token" CA_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" POD_UID_RE = re.compile(r"pod([0-9a-fA-F_-]{32,36})") SAFE_LABEL_RE = re.compile(r"[^a-zA-Z0-9_:]") pod_cache = {"loaded_at": 0.0, "pods": {}} metric_cache = {"loaded_at": 0.0, "body": ""} def label_value(value): return str(value).replace("\\", "\\\\").replace("\n", "\\n").replace('"', '\\"') def metric_line(name, labels, value): label_text = ",".join(f'{key}="{label_value(val)}"' for key, val in sorted(labels.items())) return f"{name}{{{label_text}}} {value}" def uid_key(value): return re.sub(r"[^0-9a-f]", "", value.lower()) def process_name(pid): for path in (f"{PROC_ROOT}/{pid}/comm", f"/proc/{pid}/comm"): try: with open(path, encoding="utf-8") as handle: name = handle.read().strip() if name: return name except OSError: pass return "unknown" def process_cgroup(pid): for path in (f"{PROC_ROOT}/{pid}/cgroup", f"/proc/{pid}/cgroup"): try: with open(path, encoding="utf-8") as handle: return handle.read() except OSError: pass return "" def load_pods(): now = time.time() if now - pod_cache["loaded_at"] < POD_CACHE_TTL: return pod_cache["pods"] host = os.environ.get("KUBERNETES_SERVICE_HOST") port = os.environ.get("KUBERNETES_SERVICE_PORT", "443") if not host or not NODE_NAME: return {} with open(TOKEN_PATH, encoding="utf-8") as handle: token = handle.read().strip() selector = urllib.parse.quote(f"spec.nodeName={NODE_NAME}", safe="") url = f"https://{host}:{port}/api/v1/pods?fieldSelector={selector}" request = urllib.request.Request(url, headers={"Authorization": f"Bearer {token}"}) context = ssl.create_default_context(cafile=CA_PATH) with urllib.request.urlopen(request, context=context, timeout=10) as response: payload = json.load(response) pods = {} for item in payload.get("items", []): metadata = item.get("metadata", {}) uid = metadata.get("uid", "") if not uid: continue pods[uid_key(uid)] = { "namespace": metadata.get("namespace", "unknown"), "pod": metadata.get("name", "unknown"), } pod_cache["loaded_at"] = now pod_cache["pods"] = pods return pods def pod_for_pid(pid, pods): cgroup = process_cgroup(pid) match = POD_UID_RE.search(cgroup) if not match: return {"namespace": "host", "pod": "host"} return pods.get(uid_key(match.group(1)), {"namespace": "unknown", "pod": "unknown"}) def running_process_memory(handle): processes = {} for proc_type, getter in (("compute", nvmlDeviceGetComputeRunningProcesses_v3), ("graphics", nvmlDeviceGetGraphicsRunningProcesses_v3)): try: for proc in getter(handle): entry = processes.setdefault(int(proc.pid), {"memory": 0, "types": set()}) entry["memory"] += int(proc.usedGpuMemory or 0) entry["types"].add(proc_type) except (NVMLError_NotFound, NVMLError_NotSupported): continue return processes def process_utilization_samples(handle): try: since = int(time.time() * 1000) - SAMPLE_WINDOW_MS samples = nvmlDeviceGetProcessUtilization(handle, since) except NVMLError_NotFound: return {}, 1 except NVMLError_NotSupported: return {}, 0 by_pid = {} for sample in samples: pid = int(sample.pid) current = by_pid.get(pid) if current is None or sample.timeStamp >= current["timestamp"]: by_pid[pid] = { "timestamp": int(sample.timeStamp), "sm": int(sample.smUtil), "memory": int(sample.memUtil), "enc": int(sample.encUtil), "dec": int(sample.decUtil), } return by_pid, 1 def collect_metrics(): nvmlInit() pods = load_pods() lines = [ "# HELP nvidia_gpu_device_utilization_percent Current NVML device GPU utilization.", "# TYPE nvidia_gpu_device_utilization_percent gauge", "# HELP nvidia_process_gpu_sm_util_percent Recent per-process SM utilization from NVML.", "# TYPE nvidia_process_gpu_sm_util_percent gauge", "# HELP nvidia_process_gpu_memory_used_bytes GPU memory held by a process.", "# TYPE nvidia_process_gpu_memory_used_bytes gauge", "# HELP nvidia_namespace_gpu_sm_util_percent GPU SM utilization attributed to namespace, with host/unattributed residual included.", "# TYPE nvidia_namespace_gpu_sm_util_percent gauge", "# HELP nvidia_gpu_process_utilization_supported Whether NVML process utilization samples are available for the device.", "# TYPE nvidia_gpu_process_utilization_supported gauge", ] for gpu_index in range(nvmlDeviceGetCount()): handle = nvmlDeviceGetHandleByIndex(gpu_index) uuid = nvmlDeviceGetUUID(handle) name = nvmlDeviceGetName(handle) device_util = float(nvmlDeviceGetUtilizationRates(handle).gpu) base = {"node": NODE_NAME, "gpu": gpu_index, "uuid": uuid, "model": name} lines.append(metric_line("nvidia_gpu_device_utilization_percent", base, device_util)) memory_by_pid = running_process_memory(handle) util_by_pid, supported = process_utilization_samples(handle) lines.append(metric_line("nvidia_gpu_process_utilization_supported", base, supported)) namespace_sm = {} for pid in sorted(set(memory_by_pid) | set(util_by_pid)): proc_info = memory_by_pid.get(pid, {"memory": 0, "types": set()}) util_info = util_by_pid.get(pid, {"sm": 0, "memory": 0, "enc": 0, "dec": 0}) pod = pod_for_pid(pid, pods) proc_name = process_name(pid) proc_type = "+".join(sorted(proc_info["types"])) or "unknown" labels = { **base, "namespace": pod["namespace"], "pod": pod["pod"], "pid": pid, "process": proc_name, "type": proc_type, } sm_util = float(util_info["sm"]) namespace_sm[pod["namespace"]] = namespace_sm.get(pod["namespace"], 0.0) + sm_util lines.append(metric_line("nvidia_process_gpu_sm_util_percent", labels, sm_util)) lines.append(metric_line("nvidia_process_gpu_memory_used_bytes", labels, int(proc_info["memory"]))) attributed = sum(namespace_sm.values()) residual = max(device_util - attributed, 0.0) if residual > 0.1: namespace_sm["host"] = namespace_sm.get("host", 0.0) + residual for namespace, value in sorted(namespace_sm.items()): labels = {**base, "namespace": namespace, "pod": "__namespace_total__"} lines.append(metric_line("nvidia_namespace_gpu_sm_util_percent", labels, round(value, 3))) return "\n".join(lines) + "\n" class MetricsHandler(BaseHTTPRequestHandler): def do_GET(self): if self.path not in ("/metrics", "/"): self.send_response(404) self.end_headers() return now = time.time() if now - metric_cache["loaded_at"] >= METRIC_CACHE_TTL: try: metric_cache["body"] = collect_metrics() except (NVMLError, OSError, subprocess.SubprocessError, urllib.error.URLError) as exc: metric_cache["body"] = ( "# HELP nvidia_process_exporter_up Whether the NVIDIA process exporter scrape succeeded.\n" "# TYPE nvidia_process_exporter_up gauge\n" f'nvidia_process_exporter_up{{node="{label_value(NODE_NAME)}",error="{label_value(type(exc).__name__)}"}} 0\n' ) metric_cache["loaded_at"] = now body = metric_cache["body"].encode("utf-8") self.send_response(200) self.send_header("Content-Type", "text/plain; version=0.0.4; charset=utf-8") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def log_message(self, fmt, *args): return if __name__ == "__main__": ThreadingHTTPServer(("0.0.0.0", PORT), MetricsHandler).serve_forever()