# services/keycloak/portal-e2e-token-exchange-permissions-job.yaml apiVersion: batch/v1 kind: Job metadata: name: keycloak-portal-e2e-token-exchange-permissions-11 namespace: sso spec: backoffLimit: 6 template: metadata: annotations: vault.hashicorp.com/agent-inject: "true" vault.hashicorp.com/agent-pre-populate-only: "true" vault.hashicorp.com/role: "sso" vault.hashicorp.com/agent-inject-secret-keycloak-env.sh: "kv/data/atlas/shared/keycloak-admin" vault.hashicorp.com/agent-inject-template-keycloak-env.sh: | {{ with secret "kv/data/atlas/shared/keycloak-admin" }} export KEYCLOAK_ADMIN="{{ .Data.data.username }}" export KEYCLOAK_ADMIN_USER="{{ .Data.data.username }}" export KEYCLOAK_ADMIN_PASSWORD="{{ .Data.data.password }}" {{ end }} {{ with secret "kv/data/atlas/sso/keycloak-db" }} export KC_DB_URL_DATABASE="{{ .Data.data.POSTGRES_DATABASE }}" export KC_DB_USERNAME="{{ .Data.data.POSTGRES_USER }}" export KC_DB_PASSWORD="{{ .Data.data.POSTGRES_PASSWORD }}" {{ end }} {{ with secret "kv/data/atlas/shared/portal-e2e-client" }} export PORTAL_E2E_CLIENT_ID="{{ .Data.data.client_id }}" export PORTAL_E2E_CLIENT_SECRET="{{ .Data.data.client_secret }}" {{ end }} {{ with secret "kv/data/atlas/sso/openldap-admin" }} export LDAP_ADMIN_PASSWORD="{{ .Data.data.LDAP_ADMIN_PASSWORD }}" export LDAP_CONFIG_PASSWORD="{{ .Data.data.LDAP_CONFIG_PASSWORD }}" export LDAP_BIND_PASSWORD="${LDAP_ADMIN_PASSWORD}" {{ end }} {{ with secret "kv/data/atlas/shared/postmark-relay" }} export KEYCLOAK_SMTP_USER="{{ index .Data.data "apikey" }}" export KEYCLOAK_SMTP_PASSWORD="{{ index .Data.data "apikey" }}" {{ end }} spec: restartPolicy: Never affinity: nodeAffinity: requiredDuringSchedulingIgnoredDuringExecution: nodeSelectorTerms: - matchExpressions: - key: node-role.kubernetes.io/worker operator: Exists preferredDuringSchedulingIgnoredDuringExecution: - weight: 100 preference: matchExpressions: - key: kubernetes.io/arch operator: In values: ["arm64"] serviceAccountName: sso-vault containers: - name: configure image: python:3.11-alpine env: - name: KEYCLOAK_SERVER value: http://keycloak.sso.svc.cluster.local - name: KEYCLOAK_REALM value: atlas - name: PORTAL_E2E_CLIENT_ID value: test-portal-e2e - name: TARGET_CLIENT_ID value: bstein-dev-home command: ["/bin/sh", "-c"] args: - | set -euo pipefail . /vault/secrets/keycloak-env.sh python - <<'PY' import json import os import re import time import urllib.parse import urllib.error import urllib.request from typing import Any base_url = os.environ["KEYCLOAK_SERVER"].rstrip("/") realm = os.environ["KEYCLOAK_REALM"] admin_user = os.environ["KEYCLOAK_ADMIN_USER"] admin_password = os.environ["KEYCLOAK_ADMIN_PASSWORD"] e2e_client_id = os.environ["PORTAL_E2E_CLIENT_ID"] target_client_id = os.environ["TARGET_CLIENT_ID"] uuid_re = re.compile(r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$", re.IGNORECASE) def is_uuid(value: str) -> bool: return bool(uuid_re.match(value)) def http_json(method: str, url: str, token: str, payload: Any | None = None): data = None headers = {"Authorization": f"Bearer {token}"} if payload is not None: data = json.dumps(payload).encode() headers["Content-Type"] = "application/json" req = urllib.request.Request(url, data=data, headers=headers, method=method) try: with urllib.request.urlopen(req, timeout=30) as resp: body = resp.read() if not body: return resp.status, None return resp.status, json.loads(body.decode()) except urllib.error.HTTPError as exc: raw = exc.read() if not raw: return exc.code, None try: return exc.code, json.loads(raw.decode()) except Exception: return exc.code, {"raw": raw.decode(errors="replace")} def get_admin_token() -> str: last_error: str | None = None token_data = urllib.parse.urlencode( { "grant_type": "password", "client_id": "admin-cli", "username": admin_user, "password": admin_password, } ).encode() req = urllib.request.Request( f"{base_url}/realms/master/protocol/openid-connect/token", data=token_data, headers={"Content-Type": "application/x-www-form-urlencoded"}, method="POST", ) for attempt in range(1, 61): try: with urllib.request.urlopen(req, timeout=15) as resp: body = json.loads(resp.read().decode()) token = body.get("access_token") if isinstance(token, str) and token: return token last_error = "missing access_token" except urllib.error.HTTPError as exc: # Treat transient startup errors as retryable. if exc.code in (404, 429, 500, 502, 503, 504): last_error = f"http {exc.code}" else: raise SystemExit(f"Token request failed: status={exc.code}") except urllib.error.URLError as exc: last_error = str(exc.reason) time.sleep(2) raise SystemExit(f"Token request failed after retries: {last_error}") def find_client_uuid(token: str, client_id: str) -> str: status, clients = http_json( "GET", f"{base_url}/admin/realms/{realm}/clients?clientId={urllib.parse.quote(client_id)}", token, ) if status != 200 or not isinstance(clients, list) or not clients: raise SystemExit(f"Unable to find client {client_id!r} (status={status})") for item in clients: if isinstance(item, dict) and item.get("clientId") == client_id and item.get("id"): return item["id"] raise SystemExit(f"Client {client_id!r} has no id") token = get_admin_token() rm_uuid = find_client_uuid(token, "realm-management") e2e_uuid = find_client_uuid(token, e2e_client_id) target_uuid = find_client_uuid(token, target_client_id) def enable_and_get_permissions(url: str) -> dict[str, Any]: status, resp = http_json("PUT", url, token, {"enabled": True}) if status not in (200, 204): raise SystemExit(f"Failed enabling permissions at {url} (status={status}) resp={resp}") status, perms = http_json("GET", url, token) if status != 200 or not isinstance(perms, dict): raise SystemExit(f"Failed reading permissions at {url} (status={status}) resp={perms}") return perms users_perms = enable_and_get_permissions(f"{base_url}/admin/realms/{realm}/users-management-permissions") users_scope_perms = users_perms.get("scopePermissions") or {} if not isinstance(users_scope_perms, dict): raise SystemExit("Users management permissions missing scopePermissions") impersonate_perm_id = users_scope_perms.get("impersonate") or users_scope_perms.get("impersonation") if not impersonate_perm_id: keys = sorted(k for k in users_scope_perms.keys()) raise SystemExit(f"Users permissions missing impersonate scope (have: {keys})") target_perms = enable_and_get_permissions( f"{base_url}/admin/realms/{realm}/clients/{target_uuid}/management/permissions" ) target_scope_perms = target_perms.get("scopePermissions") or {} if not isinstance(target_scope_perms, dict): raise SystemExit("Target client permissions missing scopePermissions") token_exchange_perm_id = target_scope_perms.get("token-exchange") if not token_exchange_perm_id: keys = sorted(k for k in target_scope_perms.keys()) raise SystemExit(f"Target client permissions missing token-exchange scope (have: {keys})") policy_name = "test-portal-e2e-token-exchange" policy_base_url = f"{base_url}/admin/realms/{realm}/clients/{rm_uuid}/authz/resource-server/policy" def find_policy_by_name(name: str): urls = [ f"{policy_base_url}/search?name={urllib.parse.quote(name)}&fields=id,name,type,config", f"{policy_base_url}/search?name={urllib.parse.quote(name)}", policy_base_url, ] for url in urls: st, body = http_json("GET", url, token) if st != 200: continue items = None if isinstance(body, list): items = body elif isinstance(body, dict): for key in ("policies", "items", "data"): value = body.get(key) if isinstance(value, list): items = value break if not isinstance(items, list): continue for item in items: if isinstance(item, dict) and item.get("name") == name and item.get("id"): return item return None policy = find_policy_by_name(policy_name) if policy is None: create_rep: dict[str, Any] = { "name": policy_name, "type": "client", "logic": "POSITIVE", "decisionStrategy": "UNANIMOUS", "config": {"clients": json.dumps([e2e_uuid])}, } status, created = http_json( "POST", policy_base_url, token, create_rep, ) if status == 201 and isinstance(created, dict) and created.get("id"): policy = created elif status == 409: policy = find_policy_by_name(policy_name) if policy is None: raise SystemExit(f"Policy {policy_name!r} exists but could not be retrieved") else: raise SystemExit(f"Failed creating policy {policy_name!r} (status={status}) resp={created}") policy_id = policy.get("id") if not isinstance(policy_id, str) or not policy_id: raise SystemExit(f"Policy {policy_name!r} missing id") def patch_permission(permission_id: str): candidates = [ f"{base_url}/admin/realms/{realm}/clients/{rm_uuid}/authz/resource-server/permission/scope/{permission_id}", f"{base_url}/admin/realms/{realm}/clients/{rm_uuid}/authz/resource-server/permission/resource/{permission_id}", f"{base_url}/admin/realms/{realm}/clients/{rm_uuid}/authz/resource-server/permission/{permission_id}", ] perm = None url_used = None for url in candidates: st, body = http_json("GET", url, token) if st == 200 and isinstance(body, dict): perm = body url_used = url break if perm is None or url_used is None: raise SystemExit(f"Unable to fetch permission {permission_id} via expected endpoints") policies_field = perm.get("policies") if isinstance(policies_field, list): policies_list = [p for p in policies_field if isinstance(p, str)] else: policies_list = [] use_ids = any(is_uuid(p) for p in policies_list) entry = policy_id if use_ids else policy_name if entry in policies_list: return policies_list.append(entry) perm["policies"] = policies_list st, body = http_json("PUT", url_used, token, perm) if st in (200, 201, 204): return # Retry once with the other identifier form. alt_entry = policy_name if entry == policy_id else policy_id if alt_entry not in policies_list: perm["policies"] = [p for p in policies_list if p != entry] + [alt_entry] st2, body2 = http_json("PUT", url_used, token, perm) if st2 in (200, 201, 204): return raise SystemExit(f"Failed updating permission {permission_id} (status={st2}) resp={body2}") raise SystemExit(f"Failed updating permission {permission_id} (status={st}) resp={body}") patch_permission(str(impersonate_perm_id)) patch_permission(str(token_exchange_perm_id)) print("OK: configured token exchange permissions for portal E2E client") PY volumeMounts: