titan-iac/services/monitoring/scripts/nvidia_process_exporter.py

249 lines
9.2 KiB
Python

#!/usr/bin/env python3
import json
import os
import re
import ssl
import subprocess
import time
import urllib.parse
import urllib.request
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pynvml import (
NVMLError,
NVMLError_NotFound,
NVMLError_NotSupported,
nvmlDeviceGetComputeRunningProcesses_v3,
nvmlDeviceGetCount,
nvmlDeviceGetGraphicsRunningProcesses_v3,
nvmlDeviceGetHandleByIndex,
nvmlDeviceGetName,
nvmlDeviceGetProcessUtilization,
nvmlDeviceGetUUID,
nvmlDeviceGetUtilizationRates,
nvmlInit,
)
NODE_NAME = os.environ.get("NODE_NAME", "")
PORT = int(os.environ.get("NVIDIA_PROCESS_EXPORTER_PORT", "9401"))
PROC_ROOT = os.environ.get("HOST_PROC", "/host/proc")
SAMPLE_WINDOW_MS = int(os.environ.get("NVML_PROCESS_SAMPLE_WINDOW_MS", "30000"))
POD_CACHE_TTL = int(os.environ.get("POD_CACHE_TTL_SECONDS", "30"))
METRIC_CACHE_TTL = int(os.environ.get("METRIC_CACHE_TTL_SECONDS", "5"))
TOKEN_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/token"
CA_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
POD_UID_RE = re.compile(r"pod([0-9a-fA-F_-]{32,36})")
SAFE_LABEL_RE = re.compile(r"[^a-zA-Z0-9_:]")
pod_cache = {"loaded_at": 0.0, "pods": {}}
metric_cache = {"loaded_at": 0.0, "body": ""}
def label_value(value):
return str(value).replace("\\", "\\\\").replace("\n", "\\n").replace('"', '\\"')
def metric_line(name, labels, value):
label_text = ",".join(f'{key}="{label_value(val)}"' for key, val in sorted(labels.items()))
return f"{name}{{{label_text}}} {value}"
def uid_key(value):
return re.sub(r"[^0-9a-f]", "", value.lower())
def process_name(pid):
for path in (f"{PROC_ROOT}/{pid}/comm", f"/proc/{pid}/comm"):
try:
with open(path, encoding="utf-8") as handle:
name = handle.read().strip()
if name:
return name
except OSError:
pass
return "unknown"
def process_cgroup(pid):
for path in (f"{PROC_ROOT}/{pid}/cgroup", f"/proc/{pid}/cgroup"):
try:
with open(path, encoding="utf-8") as handle:
return handle.read()
except OSError:
pass
return ""
def load_pods():
now = time.time()
if now - pod_cache["loaded_at"] < POD_CACHE_TTL:
return pod_cache["pods"]
host = os.environ.get("KUBERNETES_SERVICE_HOST")
port = os.environ.get("KUBERNETES_SERVICE_PORT", "443")
if not host or not NODE_NAME:
return {}
with open(TOKEN_PATH, encoding="utf-8") as handle:
token = handle.read().strip()
selector = urllib.parse.quote(f"spec.nodeName={NODE_NAME}", safe="")
url = f"https://{host}:{port}/api/v1/pods?fieldSelector={selector}"
request = urllib.request.Request(url, headers={"Authorization": f"Bearer {token}"})
context = ssl.create_default_context(cafile=CA_PATH)
with urllib.request.urlopen(request, context=context, timeout=10) as response:
payload = json.load(response)
pods = {}
for item in payload.get("items", []):
metadata = item.get("metadata", {})
uid = metadata.get("uid", "")
if not uid:
continue
pods[uid_key(uid)] = {
"namespace": metadata.get("namespace", "unknown"),
"pod": metadata.get("name", "unknown"),
}
pod_cache["loaded_at"] = now
pod_cache["pods"] = pods
return pods
def pod_for_pid(pid, pods):
cgroup = process_cgroup(pid)
match = POD_UID_RE.search(cgroup)
if not match:
return {"namespace": "host", "pod": "host"}
return pods.get(uid_key(match.group(1)), {"namespace": "unknown", "pod": "unknown"})
def running_process_memory(handle):
processes = {}
for proc_type, getter in (("compute", nvmlDeviceGetComputeRunningProcesses_v3), ("graphics", nvmlDeviceGetGraphicsRunningProcesses_v3)):
try:
for proc in getter(handle):
entry = processes.setdefault(int(proc.pid), {"memory": 0, "types": set()})
entry["memory"] += int(proc.usedGpuMemory or 0)
entry["types"].add(proc_type)
except (NVMLError_NotFound, NVMLError_NotSupported):
continue
return processes
def process_utilization_samples(handle):
try:
since = int(time.time() * 1000) - SAMPLE_WINDOW_MS
samples = nvmlDeviceGetProcessUtilization(handle, since)
except NVMLError_NotFound:
return {}, 1
except NVMLError_NotSupported:
return {}, 0
by_pid = {}
for sample in samples:
pid = int(sample.pid)
current = by_pid.get(pid)
if current is None or sample.timeStamp >= current["timestamp"]:
by_pid[pid] = {
"timestamp": int(sample.timeStamp),
"sm": int(sample.smUtil),
"memory": int(sample.memUtil),
"enc": int(sample.encUtil),
"dec": int(sample.decUtil),
}
return by_pid, 1
def collect_metrics():
nvmlInit()
pods = load_pods()
lines = [
"# HELP nvidia_gpu_device_utilization_percent Current NVML device GPU utilization.",
"# TYPE nvidia_gpu_device_utilization_percent gauge",
"# HELP nvidia_process_gpu_sm_util_percent Recent per-process SM utilization from NVML.",
"# TYPE nvidia_process_gpu_sm_util_percent gauge",
"# HELP nvidia_process_gpu_memory_used_bytes GPU memory held by a process.",
"# TYPE nvidia_process_gpu_memory_used_bytes gauge",
"# HELP nvidia_namespace_gpu_sm_util_percent GPU SM utilization attributed to namespace, with host/unattributed residual included.",
"# TYPE nvidia_namespace_gpu_sm_util_percent gauge",
"# HELP nvidia_gpu_process_utilization_supported Whether NVML process utilization samples are available for the device.",
"# TYPE nvidia_gpu_process_utilization_supported gauge",
]
for gpu_index in range(nvmlDeviceGetCount()):
handle = nvmlDeviceGetHandleByIndex(gpu_index)
uuid = nvmlDeviceGetUUID(handle)
name = nvmlDeviceGetName(handle)
device_util = float(nvmlDeviceGetUtilizationRates(handle).gpu)
base = {"node": NODE_NAME, "gpu": gpu_index, "uuid": uuid, "model": name}
lines.append(metric_line("nvidia_gpu_device_utilization_percent", base, device_util))
memory_by_pid = running_process_memory(handle)
util_by_pid, supported = process_utilization_samples(handle)
lines.append(metric_line("nvidia_gpu_process_utilization_supported", base, supported))
namespace_sm = {}
for pid in sorted(set(memory_by_pid) | set(util_by_pid)):
proc_info = memory_by_pid.get(pid, {"memory": 0, "types": set()})
util_info = util_by_pid.get(pid, {"sm": 0, "memory": 0, "enc": 0, "dec": 0})
pod = pod_for_pid(pid, pods)
proc_name = process_name(pid)
proc_type = "+".join(sorted(proc_info["types"])) or "unknown"
labels = {
**base,
"namespace": pod["namespace"],
"pod": pod["pod"],
"pid": pid,
"process": proc_name,
"type": proc_type,
}
sm_util = float(util_info["sm"])
namespace_sm[pod["namespace"]] = namespace_sm.get(pod["namespace"], 0.0) + sm_util
lines.append(metric_line("nvidia_process_gpu_sm_util_percent", labels, sm_util))
lines.append(metric_line("nvidia_process_gpu_memory_used_bytes", labels, int(proc_info["memory"])))
attributed = sum(namespace_sm.values())
residual = max(device_util - attributed, 0.0)
if residual > 0.1:
namespace_sm["host"] = namespace_sm.get("host", 0.0) + residual
for namespace, value in sorted(namespace_sm.items()):
labels = {**base, "namespace": namespace, "pod": "__namespace_total__"}
lines.append(metric_line("nvidia_namespace_gpu_sm_util_percent", labels, round(value, 3)))
return "\n".join(lines) + "\n"
class MetricsHandler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path not in ("/metrics", "/"):
self.send_response(404)
self.end_headers()
return
now = time.time()
if now - metric_cache["loaded_at"] >= METRIC_CACHE_TTL:
try:
metric_cache["body"] = collect_metrics()
except (NVMLError, OSError, subprocess.SubprocessError, urllib.error.URLError) as exc:
metric_cache["body"] = (
"# HELP nvidia_process_exporter_up Whether the NVIDIA process exporter scrape succeeded.\n"
"# TYPE nvidia_process_exporter_up gauge\n"
f'nvidia_process_exporter_up{{node="{label_value(NODE_NAME)}",error="{label_value(type(exc).__name__)}"}} 0\n'
)
metric_cache["loaded_at"] = now
body = metric_cache["body"].encode("utf-8")
self.send_response(200)
self.send_header("Content-Type", "text/plain; version=0.0.4; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def log_message(self, fmt, *args):
return
if __name__ == "__main__":
ThreadingHTTPServer(("0.0.0.0", PORT), MetricsHandler).serve_forever()