ariadne/ariadne/k8s/client.py

79 lines
2.6 KiB
Python

from __future__ import annotations
import base64
from pathlib import Path
from typing import Any
import httpx
from ..settings import settings
_K8S_BASE_URL = "https://kubernetes.default.svc"
_SA_PATH = Path("/var/run/secrets/kubernetes.io/serviceaccount")
def _read_service_account() -> tuple[str, str]:
token_path = _SA_PATH / "token"
ca_path = _SA_PATH / "ca.crt"
if not token_path.exists() or not ca_path.exists():
raise RuntimeError("kubernetes service account token missing")
token = token_path.read_text().strip()
if not token:
raise RuntimeError("kubernetes service account token empty")
return token, str(ca_path)
def _k8s_request(method: str, path: str, payload: dict[str, Any] | None = None) -> Any:
token, ca_path = _read_service_account()
url = f"{_K8S_BASE_URL}{path}"
headers = {"Authorization": f"Bearer {token}"}
with httpx.Client(verify=ca_path, timeout=settings.k8s_api_timeout_sec, headers=headers) as client:
resp = client.request(method, url, json=payload)
resp.raise_for_status()
return resp.json()
def get_json(path: str) -> dict[str, Any]:
"""Fetch a Kubernetes API path and return its JSON object payload."""
payload = _k8s_request("GET", path)
if not isinstance(payload, dict):
raise RuntimeError("unexpected kubernetes response")
return payload
def post_json(path: str, payload: dict[str, Any]) -> dict[str, Any]:
"""Post a JSON payload to the Kubernetes API and return the response."""
data = _k8s_request("POST", path, payload)
if not isinstance(data, dict):
raise RuntimeError("unexpected kubernetes response")
return data
def delete_json(path: str) -> dict[str, Any]:
"""Delete a Kubernetes API resource and return the response payload."""
data = _k8s_request("DELETE", path)
if not isinstance(data, dict):
raise RuntimeError("unexpected kubernetes response")
return data
def get_secret_value(namespace: str, name: str, key: str) -> str:
"""Read and decode one string value from a Kubernetes Secret."""
data = get_json(f"/api/v1/namespaces/{namespace}/secrets/{name}")
blob = data.get("data") if isinstance(data.get("data"), dict) else {}
raw = blob.get(key)
if not isinstance(raw, str) or not raw:
raise RuntimeError("secret key missing")
try:
decoded = base64.b64decode(raw).decode("utf-8").strip()
except Exception as exc:
raise RuntimeError("failed to decode secret") from exc
if not decoded:
raise RuntimeError("secret value empty")
return decoded