from __future__ import annotations from pathlib import Path from typing import Any import httpx from . import settings _K8S_BASE_URL = "https://kubernetes.default.svc" _SA_PATH = Path("/var/run/secrets/kubernetes.io/serviceaccount") def _read_service_account() -> tuple[str, str]: token_path = _SA_PATH / "token" ca_path = _SA_PATH / "ca.crt" if not token_path.exists() or not ca_path.exists(): raise RuntimeError("kubernetes service account token missing") token = token_path.read_text().strip() if not token: raise RuntimeError("kubernetes service account token empty") return token, str(ca_path) def get_json(path: str) -> dict[str, Any]: token, ca_path = _read_service_account() url = f"{_K8S_BASE_URL}{path}" with httpx.Client( verify=ca_path, timeout=settings.K8S_API_TIMEOUT_SEC, headers={"Authorization": f"Bearer {token}"}, ) as client: resp = client.get(url) resp.raise_for_status() data = resp.json() if not isinstance(data, dict): raise RuntimeError("unexpected kubernetes response") return data def post_json(path: str, payload: dict[str, Any]) -> dict[str, Any]: token, ca_path = _read_service_account() url = f"{_K8S_BASE_URL}{path}" with httpx.Client( verify=ca_path, timeout=settings.K8S_API_TIMEOUT_SEC, headers={"Authorization": f"Bearer {token}"}, ) as client: resp = client.post(url, json=payload) resp.raise_for_status() data = resp.json() if not isinstance(data, dict): raise RuntimeError("unexpected kubernetes response") return data