# services/keycloak/portal-admin-client-secret-ensure-job.yaml apiVersion: batch/v1 kind: Job metadata: name: keycloak-portal-admin-secret-ensure-1 namespace: sso spec: backoffLimit: 0 template: metadata: annotations: vault.hashicorp.com/agent-inject: "true" vault.hashicorp.com/agent-pre-populate-only: "true" vault.hashicorp.com/role: "sso" vault.hashicorp.com/agent-inject-secret-keycloak-env.sh: "kv/data/atlas/shared/keycloak-admin" vault.hashicorp.com/agent-inject-template-keycloak-env.sh: | {{ with secret "kv/data/atlas/shared/keycloak-admin" }} export KEYCLOAK_ADMIN_USER="{{ .Data.data.username }}" export KEYCLOAK_ADMIN_PASSWORD="{{ .Data.data.password }}" {{ end }} {{ with secret "kv/data/atlas/portal/bstein-dev-home-keycloak-admin" }} export PORTAL_ADMIN_CLIENT_SECRET="{{ .Data.data.client_secret }}" {{ end }} spec: restartPolicy: Never serviceAccountName: sso-vault containers: - name: configure image: python:3.11-alpine env: - name: KEYCLOAK_SERVER value: http://keycloak.sso.svc.cluster.local - name: KEYCLOAK_REALM value: atlas - name: PORTAL_ADMIN_CLIENT_ID value: bstein-dev-home-admin command: ["/bin/sh", "-c"] args: - | set -eu . /vault/secrets/keycloak-env.sh python - <<'PY' import json import os import urllib.parse import urllib.error import urllib.request base_url = os.environ["KEYCLOAK_SERVER"].rstrip("/") realm = os.environ["KEYCLOAK_REALM"] admin_user = os.environ["KEYCLOAK_ADMIN_USER"] admin_password = os.environ["KEYCLOAK_ADMIN_PASSWORD"] client_id = os.environ["PORTAL_ADMIN_CLIENT_ID"] client_secret = os.environ["PORTAL_ADMIN_CLIENT_SECRET"] def http_json(method: str, url: str, token: str, payload=None): data = None headers = {"Authorization": f"Bearer {token}"} if payload is not None: data = json.dumps(payload).encode() headers["Content-Type"] = "application/json" req = urllib.request.Request(url, data=data, headers=headers, method=method) try: with urllib.request.urlopen(req, timeout=30) as resp: body = resp.read() if not body: return resp.status, None return resp.status, json.loads(body.decode()) except urllib.error.HTTPError as exc: raw = exc.read() if not raw: return exc.code, None try: return exc.code, json.loads(raw.decode()) except Exception: return exc.code, {"raw": raw.decode(errors="replace")} def get_admin_token() -> str: token_data = urllib.parse.urlencode( { "grant_type": "password", "client_id": "admin-cli", "username": admin_user, "password": admin_password, } ).encode() req = urllib.request.Request( f"{base_url}/realms/master/protocol/openid-connect/token", data=token_data, headers={"Content-Type": "application/x-www-form-urlencoded"}, method="POST", ) try: with urllib.request.urlopen(req, timeout=15) as resp: body = json.loads(resp.read().decode()) except urllib.error.HTTPError as exc: raw = exc.read().decode(errors="replace") raise SystemExit(f"Token request failed: status={exc.code} body={raw}") return body["access_token"] token = get_admin_token() status, clients = http_json( "GET", f"{base_url}/admin/realms/{realm}/clients?clientId={urllib.parse.quote(client_id)}", token, ) if status != 200 or not isinstance(clients, list) or not clients: raise SystemExit(f"Unable to find client {client_id!r} (status={status})") client_uuid = None for item in clients: if isinstance(item, dict) and item.get("clientId") == client_id: client_uuid = item.get("id") break if not client_uuid: raise SystemExit(f"Client {client_id!r} has no id") status, client_rep = http_json( "GET", f"{base_url}/admin/realms/{realm}/clients/{client_uuid}", token, ) if status != 200 or not isinstance(client_rep, dict): raise SystemExit(f"Unable to fetch client representation (status={status})") if client_rep.get("secret") != client_secret: client_rep["secret"] = client_secret status, resp = http_json( "PUT", f"{base_url}/admin/realms/{realm}/clients/{client_uuid}", token, client_rep, ) if status not in (200, 204): raise SystemExit(f"Client update failed (status={status}) resp={resp}") print(f"OK: ensured secret for {client_id}") PY