titan-iac/services/keycloak/portal-e2e-client-job.yaml

261 lines
12 KiB
YAML

# services/keycloak/portal-e2e-client-job.yaml
apiVersion: batch/v1
kind: Job
metadata:
name: keycloak-portal-e2e-client-6
namespace: sso
spec:
backoffLimit: 0
template:
metadata:
annotations:
vault.hashicorp.com/agent-inject: "true"
vault.hashicorp.com/agent-pre-populate-only: "true"
vault.hashicorp.com/role: "sso"
vault.hashicorp.com/agent-inject-secret-keycloak-env.sh: "kv/data/atlas/shared/keycloak-admin"
vault.hashicorp.com/agent-inject-template-keycloak-env.sh: |
{{ with secret "kv/data/atlas/shared/keycloak-admin" }}
export KEYCLOAK_ADMIN="{{ .Data.data.username }}"
export KEYCLOAK_ADMIN_USER="{{ .Data.data.username }}"
export KEYCLOAK_ADMIN_PASSWORD="{{ .Data.data.password }}"
{{ end }}
{{ with secret "kv/data/atlas/sso/keycloak-db" }}
export KC_DB_URL_DATABASE="{{ .Data.data.POSTGRES_DATABASE }}"
export KC_DB_USERNAME="{{ .Data.data.POSTGRES_USER }}"
export KC_DB_PASSWORD="{{ .Data.data.POSTGRES_PASSWORD }}"
{{ end }}
{{ with secret "kv/data/atlas/shared/portal-e2e-client" }}
export PORTAL_E2E_CLIENT_ID="{{ .Data.data.client_id }}"
export PORTAL_E2E_CLIENT_SECRET="{{ .Data.data.client_secret }}"
{{ end }}
{{ with secret "kv/data/atlas/sso/openldap-admin" }}
export LDAP_ADMIN_PASSWORD="{{ .Data.data.LDAP_ADMIN_PASSWORD }}"
export LDAP_CONFIG_PASSWORD="{{ .Data.data.LDAP_CONFIG_PASSWORD }}"
export LDAP_BIND_PASSWORD="${LDAP_ADMIN_PASSWORD}"
{{ end }}
{{ with secret "kv/data/atlas/shared/postmark-relay" }}
export KEYCLOAK_SMTP_USER="{{ index .Data.data "relay-username" }}"
export KEYCLOAK_SMTP_PASSWORD="{{ index .Data.data "relay-password" }}"
{{ end }}
spec:
restartPolicy: Never
serviceAccountName: sso-vault
containers:
- name: configure
image: python:3.11-alpine
env:
- name: KEYCLOAK_SERVER
value: http://keycloak.sso.svc.cluster.local
- name: KEYCLOAK_REALM
value: atlas
command: ["/bin/sh", "-c"]
args:
- |
set -euo pipefail
. /vault/secrets/keycloak-env.sh
python - <<'PY'
import json
import os
import urllib.parse
import urllib.error
import urllib.request
base_url = os.environ["KEYCLOAK_SERVER"].rstrip("/")
realm = os.environ["KEYCLOAK_REALM"]
admin_user = os.environ["KEYCLOAK_ADMIN_USER"]
admin_password = os.environ["KEYCLOAK_ADMIN_PASSWORD"]
e2e_client_id = os.environ["PORTAL_E2E_CLIENT_ID"]
e2e_client_secret = os.environ["PORTAL_E2E_CLIENT_SECRET"]
def http_json(method: str, url: str, token: str, payload=None):
data = None
headers = {"Authorization": f"Bearer {token}"}
if payload is not None:
data = json.dumps(payload).encode()
headers["Content-Type"] = "application/json"
req = urllib.request.Request(url, data=data, headers=headers, method=method)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
body = resp.read()
if not body:
return resp.status, None
return resp.status, json.loads(body.decode())
except urllib.error.HTTPError as exc:
raw = exc.read()
if not raw:
return exc.code, None
try:
return exc.code, json.loads(raw.decode())
except Exception:
return exc.code, {"raw": raw.decode(errors="replace")}
def get_admin_token() -> str:
token_data = urllib.parse.urlencode(
{
"grant_type": "password",
"client_id": "admin-cli",
"username": admin_user,
"password": admin_password,
}
).encode()
req = urllib.request.Request(
f"{base_url}/realms/master/protocol/openid-connect/token",
data=token_data,
headers={"Content-Type": "application/x-www-form-urlencoded"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=15) as resp:
body = json.loads(resp.read().decode())
except urllib.error.HTTPError as exc:
raw = exc.read().decode(errors="replace")
raise SystemExit(f"Token request failed: status={exc.code} body={raw}")
return body["access_token"]
token = get_admin_token()
# Ensure the confidential client for E2E token exchange exists with service accounts enabled.
status, clients = http_json(
"GET",
f"{base_url}/admin/realms/{realm}/clients?clientId={urllib.parse.quote(e2e_client_id)}",
token,
)
if status != 200 or not isinstance(clients, list):
raise SystemExit(f"Unexpected clients lookup response: {status}")
client_uuid = None
if clients:
for item in clients:
if isinstance(item, dict) and item.get("clientId") == e2e_client_id:
client_uuid = item.get("id")
break
desired_rep = {
"clientId": e2e_client_id,
"enabled": True,
"protocol": "openid-connect",
"publicClient": False,
"serviceAccountsEnabled": True,
"standardFlowEnabled": False,
"directAccessGrantsEnabled": False,
"implicitFlowEnabled": False,
"secret": e2e_client_secret,
"attributes": {
"oauth2.device.authorization.grant.enabled": "false",
"oauth2.token.exchange.grant.enabled": "true",
},
}
if not client_uuid:
status, resp = http_json(
"POST",
f"{base_url}/admin/realms/{realm}/clients",
token,
desired_rep,
)
if status not in (201, 204):
raise SystemExit(f"Client create failed (status={status}) resp={resp}")
status, clients = http_json(
"GET",
f"{base_url}/admin/realms/{realm}/clients?clientId={urllib.parse.quote(e2e_client_id)}",
token,
)
if status != 200 or not isinstance(clients, list) or not clients:
raise SystemExit("Unable to refetch client after creation")
client_uuid = clients[0].get("id")
# Update existing client with desired settings (idempotent).
status, client_rep = http_json(
"GET",
f"{base_url}/admin/realms/{realm}/clients/{client_uuid}",
token,
)
if status != 200 or not isinstance(client_rep, dict):
raise SystemExit(f"Unable to fetch client representation (status={status})")
updated = False
for key in ("enabled", "serviceAccountsEnabled", "standardFlowEnabled", "directAccessGrantsEnabled", "implicitFlowEnabled"):
if client_rep.get(key) != desired_rep.get(key):
client_rep[key] = desired_rep.get(key)
updated = True
if client_rep.get("publicClient") is not False:
client_rep["publicClient"] = False
updated = True
if client_rep.get("secret") != desired_rep.get("secret"):
client_rep["secret"] = desired_rep.get("secret")
updated = True
attrs = client_rep.get("attributes") or {}
for k, v in desired_rep["attributes"].items():
if attrs.get(k) != v:
attrs[k] = v
updated = True
client_rep["attributes"] = attrs
if updated:
status, resp = http_json(
"PUT",
f"{base_url}/admin/realms/{realm}/clients/{client_uuid}",
token,
client_rep,
)
if status not in (200, 204):
raise SystemExit(f"Client update failed (status={status}) resp={resp}")
# Give the service account user minimal realm-management roles for impersonation + user lookup.
status, svc_user = http_json(
"GET",
f"{base_url}/admin/realms/{realm}/clients/{client_uuid}/service-account-user",
token,
)
if status != 200 or not isinstance(svc_user, dict) or not svc_user.get("id"):
raise SystemExit(f"Unable to fetch service account user (status={status})")
svc_user_id = svc_user["id"]
status, rm_clients = http_json(
"GET",
f"{base_url}/admin/realms/{realm}/clients?clientId=realm-management",
token,
)
if status != 200 or not isinstance(rm_clients, list) or not rm_clients:
raise SystemExit("Unable to find realm-management client")
rm_uuid = rm_clients[0].get("id")
if not rm_uuid:
raise SystemExit("realm-management client has no id")
wanted_roles = ("query-users", "view-users", "manage-users", "impersonation")
role_reps = []
for role_name in wanted_roles:
status, role = http_json(
"GET",
f"{base_url}/admin/realms/{realm}/clients/{rm_uuid}/roles/{urllib.parse.quote(role_name)}",
token,
)
if status != 200 or not isinstance(role, dict):
raise SystemExit(f"Unable to fetch role {role_name} (status={status})")
role_reps.append({"id": role.get("id"), "name": role.get("name")})
status, assigned = http_json(
"GET",
f"{base_url}/admin/realms/{realm}/users/{svc_user_id}/role-mappings/clients/{rm_uuid}",
token,
)
assigned_names = set()
if status == 200 and isinstance(assigned, list):
for r in assigned:
if isinstance(r, dict) and r.get("name"):
assigned_names.add(r["name"])
missing = [r for r in role_reps if r.get("name") and r["name"] not in assigned_names]
if missing:
status, resp = http_json(
"POST",
f"{base_url}/admin/realms/{realm}/users/{svc_user_id}/role-mappings/clients/{rm_uuid}",
token,
missing,
)
if status not in (200, 204):
raise SystemExit(f"Role mapping update failed (status={status}) resp={resp}")
PY
volumeMounts:
volumes: