titan-iac/services/keycloak/portal-e2e-token-exchange-permissions-job.yaml

309 lines
15 KiB
YAML

# services/keycloak/portal-e2e-token-exchange-permissions-job.yaml
apiVersion: batch/v1
kind: Job
metadata:
name: keycloak-portal-e2e-token-exchange-permissions-9
namespace: sso
spec:
backoffLimit: 6
template:
metadata:
annotations:
vault.hashicorp.com/agent-inject: "true"
vault.hashicorp.com/agent-pre-populate-only: "true"
vault.hashicorp.com/role: "sso"
vault.hashicorp.com/agent-inject-secret-keycloak-env.sh: "kv/data/atlas/shared/keycloak-admin"
vault.hashicorp.com/agent-inject-template-keycloak-env.sh: |
{{ with secret "kv/data/atlas/shared/keycloak-admin" }}
export KEYCLOAK_ADMIN="{{ .Data.data.username }}"
export KEYCLOAK_ADMIN_USER="{{ .Data.data.username }}"
export KEYCLOAK_ADMIN_PASSWORD="{{ .Data.data.password }}"
{{ end }}
{{ with secret "kv/data/atlas/sso/keycloak-db" }}
export KC_DB_URL_DATABASE="{{ .Data.data.POSTGRES_DATABASE }}"
export KC_DB_USERNAME="{{ .Data.data.POSTGRES_USER }}"
export KC_DB_PASSWORD="{{ .Data.data.POSTGRES_PASSWORD }}"
{{ end }}
{{ with secret "kv/data/atlas/shared/portal-e2e-client" }}
export PORTAL_E2E_CLIENT_ID="{{ .Data.data.client_id }}"
export PORTAL_E2E_CLIENT_SECRET="{{ .Data.data.client_secret }}"
{{ end }}
{{ with secret "kv/data/atlas/sso/openldap-admin" }}
export LDAP_ADMIN_PASSWORD="{{ .Data.data.LDAP_ADMIN_PASSWORD }}"
export LDAP_CONFIG_PASSWORD="{{ .Data.data.LDAP_CONFIG_PASSWORD }}"
export LDAP_BIND_PASSWORD="${LDAP_ADMIN_PASSWORD}"
{{ end }}
{{ with secret "kv/data/atlas/shared/postmark-relay" }}
export KEYCLOAK_SMTP_USER="{{ index .Data.data "relay-username" }}"
export KEYCLOAK_SMTP_PASSWORD="{{ index .Data.data "relay-password" }}"
{{ end }}
spec:
restartPolicy: Never
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: node-role.kubernetes.io/worker
operator: Exists
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
preference:
matchExpressions:
- key: kubernetes.io/arch
operator: In
values: ["arm64"]
serviceAccountName: sso-vault
containers:
- name: configure
image: python:3.11-alpine
env:
- name: KEYCLOAK_SERVER
value: http://keycloak.sso.svc.cluster.local
- name: KEYCLOAK_REALM
value: atlas
- name: PORTAL_E2E_CLIENT_ID
value: test-portal-e2e
- name: TARGET_CLIENT_ID
value: bstein-dev-home
command: ["/bin/sh", "-c"]
args:
- |
set -euo pipefail
. /vault/secrets/keycloak-env.sh
python - <<'PY'
import json
import os
import re
import time
import urllib.parse
import urllib.error
import urllib.request
from typing import Any
base_url = os.environ["KEYCLOAK_SERVER"].rstrip("/")
realm = os.environ["KEYCLOAK_REALM"]
admin_user = os.environ["KEYCLOAK_ADMIN_USER"]
admin_password = os.environ["KEYCLOAK_ADMIN_PASSWORD"]
e2e_client_id = os.environ["PORTAL_E2E_CLIENT_ID"]
target_client_id = os.environ["TARGET_CLIENT_ID"]
uuid_re = re.compile(r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$", re.IGNORECASE)
def is_uuid(value: str) -> bool:
return bool(uuid_re.match(value))
def http_json(method: str, url: str, token: str, payload: Any | None = None):
data = None
headers = {"Authorization": f"Bearer {token}"}
if payload is not None:
data = json.dumps(payload).encode()
headers["Content-Type"] = "application/json"
req = urllib.request.Request(url, data=data, headers=headers, method=method)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
body = resp.read()
if not body:
return resp.status, None
return resp.status, json.loads(body.decode())
except urllib.error.HTTPError as exc:
raw = exc.read()
if not raw:
return exc.code, None
try:
return exc.code, json.loads(raw.decode())
except Exception:
return exc.code, {"raw": raw.decode(errors="replace")}
def get_admin_token() -> str:
last_error: str | None = None
token_data = urllib.parse.urlencode(
{
"grant_type": "password",
"client_id": "admin-cli",
"username": admin_user,
"password": admin_password,
}
).encode()
req = urllib.request.Request(
f"{base_url}/realms/master/protocol/openid-connect/token",
data=token_data,
headers={"Content-Type": "application/x-www-form-urlencoded"},
method="POST",
)
for attempt in range(1, 61):
try:
with urllib.request.urlopen(req, timeout=15) as resp:
body = json.loads(resp.read().decode())
token = body.get("access_token")
if isinstance(token, str) and token:
return token
last_error = "missing access_token"
except urllib.error.HTTPError as exc:
# Treat transient startup errors as retryable.
if exc.code in (404, 429, 500, 502, 503, 504):
last_error = f"http {exc.code}"
else:
raise SystemExit(f"Token request failed: status={exc.code}")
except urllib.error.URLError as exc:
last_error = str(exc.reason)
time.sleep(2)
raise SystemExit(f"Token request failed after retries: {last_error}")
def find_client_uuid(token: str, client_id: str) -> str:
status, clients = http_json(
"GET",
f"{base_url}/admin/realms/{realm}/clients?clientId={urllib.parse.quote(client_id)}",
token,
)
if status != 200 or not isinstance(clients, list) or not clients:
raise SystemExit(f"Unable to find client {client_id!r} (status={status})")
for item in clients:
if isinstance(item, dict) and item.get("clientId") == client_id and item.get("id"):
return item["id"]
raise SystemExit(f"Client {client_id!r} has no id")
token = get_admin_token()
rm_uuid = find_client_uuid(token, "realm-management")
e2e_uuid = find_client_uuid(token, e2e_client_id)
target_uuid = find_client_uuid(token, target_client_id)
def enable_and_get_permissions(url: str) -> dict[str, Any]:
status, resp = http_json("PUT", url, token, {"enabled": True})
if status not in (200, 204):
raise SystemExit(f"Failed enabling permissions at {url} (status={status}) resp={resp}")
status, perms = http_json("GET", url, token)
if status != 200 or not isinstance(perms, dict):
raise SystemExit(f"Failed reading permissions at {url} (status={status}) resp={perms}")
return perms
users_perms = enable_and_get_permissions(f"{base_url}/admin/realms/{realm}/users-management-permissions")
users_scope_perms = users_perms.get("scopePermissions") or {}
if not isinstance(users_scope_perms, dict):
raise SystemExit("Users management permissions missing scopePermissions")
impersonate_perm_id = users_scope_perms.get("impersonate") or users_scope_perms.get("impersonation")
if not impersonate_perm_id:
keys = sorted(k for k in users_scope_perms.keys())
raise SystemExit(f"Users permissions missing impersonate scope (have: {keys})")
target_perms = enable_and_get_permissions(
f"{base_url}/admin/realms/{realm}/clients/{target_uuid}/management/permissions"
)
target_scope_perms = target_perms.get("scopePermissions") or {}
if not isinstance(target_scope_perms, dict):
raise SystemExit("Target client permissions missing scopePermissions")
token_exchange_perm_id = target_scope_perms.get("token-exchange")
if not token_exchange_perm_id:
keys = sorted(k for k in target_scope_perms.keys())
raise SystemExit(f"Target client permissions missing token-exchange scope (have: {keys})")
policy_name = "test-portal-e2e-token-exchange"
policy_base_url = f"{base_url}/admin/realms/{realm}/clients/{rm_uuid}/authz/resource-server/policy"
def find_policy_by_name(name: str):
urls = [
f"{policy_base_url}/search?name={urllib.parse.quote(name)}&fields=id,name,type,config",
f"{policy_base_url}/search?name={urllib.parse.quote(name)}",
policy_base_url,
]
for url in urls:
st, body = http_json("GET", url, token)
if st != 200:
continue
items = None
if isinstance(body, list):
items = body
elif isinstance(body, dict):
for key in ("policies", "items", "data"):
value = body.get(key)
if isinstance(value, list):
items = value
break
if not isinstance(items, list):
continue
for item in items:
if isinstance(item, dict) and item.get("name") == name and item.get("id"):
return item
return None
policy = find_policy_by_name(policy_name)
if policy is None:
create_rep: dict[str, Any] = {
"name": policy_name,
"type": "client",
"logic": "POSITIVE",
"decisionStrategy": "UNANIMOUS",
"config": {"clients": json.dumps([e2e_uuid])},
}
status, created = http_json(
"POST",
policy_base_url,
token,
create_rep,
)
if status == 201 and isinstance(created, dict) and created.get("id"):
policy = created
elif status == 409:
policy = find_policy_by_name(policy_name)
if policy is None:
raise SystemExit(f"Policy {policy_name!r} exists but could not be retrieved")
else:
raise SystemExit(f"Failed creating policy {policy_name!r} (status={status}) resp={created}")
policy_id = policy.get("id")
if not isinstance(policy_id, str) or not policy_id:
raise SystemExit(f"Policy {policy_name!r} missing id")
def patch_permission(permission_id: str):
candidates = [
f"{base_url}/admin/realms/{realm}/clients/{rm_uuid}/authz/resource-server/permission/scope/{permission_id}",
f"{base_url}/admin/realms/{realm}/clients/{rm_uuid}/authz/resource-server/permission/resource/{permission_id}",
f"{base_url}/admin/realms/{realm}/clients/{rm_uuid}/authz/resource-server/permission/{permission_id}",
]
perm = None
url_used = None
for url in candidates:
st, body = http_json("GET", url, token)
if st == 200 and isinstance(body, dict):
perm = body
url_used = url
break
if perm is None or url_used is None:
raise SystemExit(f"Unable to fetch permission {permission_id} via expected endpoints")
policies_field = perm.get("policies")
if isinstance(policies_field, list):
policies_list = [p for p in policies_field if isinstance(p, str)]
else:
policies_list = []
use_ids = any(is_uuid(p) for p in policies_list)
entry = policy_id if use_ids else policy_name
if entry in policies_list:
return
policies_list.append(entry)
perm["policies"] = policies_list
st, body = http_json("PUT", url_used, token, perm)
if st in (200, 201, 204):
return
# Retry once with the other identifier form.
alt_entry = policy_name if entry == policy_id else policy_id
if alt_entry not in policies_list:
perm["policies"] = [p for p in policies_list if p != entry] + [alt_entry]
st2, body2 = http_json("PUT", url_used, token, perm)
if st2 in (200, 201, 204):
return
raise SystemExit(f"Failed updating permission {permission_id} (status={st2}) resp={body2}")
raise SystemExit(f"Failed updating permission {permission_id} (status={st}) resp={body}")
patch_permission(str(impersonate_perm_id))
patch_permission(str(token_exchange_perm_id))
print("OK: configured token exchange permissions for portal E2E client")
PY
volumeMounts: