236 lines
11 KiB
YAML
236 lines
11 KiB
YAML
|
|
# services/keycloak/portal-e2e-token-exchange-permissions-job.yaml
|
||
|
|
apiVersion: batch/v1
|
||
|
|
kind: Job
|
||
|
|
metadata:
|
||
|
|
name: keycloak-portal-e2e-token-exchange-permissions-1
|
||
|
|
namespace: sso
|
||
|
|
spec:
|
||
|
|
backoffLimit: 0
|
||
|
|
template:
|
||
|
|
spec:
|
||
|
|
restartPolicy: Never
|
||
|
|
containers:
|
||
|
|
- name: configure
|
||
|
|
image: python:3.11-alpine
|
||
|
|
env:
|
||
|
|
- name: KEYCLOAK_SERVER
|
||
|
|
value: http://keycloak.sso.svc.cluster.local
|
||
|
|
- name: KEYCLOAK_REALM
|
||
|
|
value: atlas
|
||
|
|
- name: KEYCLOAK_ADMIN_USER
|
||
|
|
valueFrom:
|
||
|
|
secretKeyRef:
|
||
|
|
name: keycloak-admin
|
||
|
|
key: username
|
||
|
|
- name: KEYCLOAK_ADMIN_PASSWORD
|
||
|
|
valueFrom:
|
||
|
|
secretKeyRef:
|
||
|
|
name: keycloak-admin
|
||
|
|
key: password
|
||
|
|
- name: PORTAL_E2E_CLIENT_ID
|
||
|
|
value: test-portal-e2e
|
||
|
|
- name: TARGET_CLIENT_ID
|
||
|
|
value: bstein-dev-home
|
||
|
|
command: ["/bin/sh", "-c"]
|
||
|
|
args:
|
||
|
|
- |
|
||
|
|
set -euo pipefail
|
||
|
|
python - <<'PY'
|
||
|
|
import json
|
||
|
|
import os
|
||
|
|
import re
|
||
|
|
import urllib.parse
|
||
|
|
import urllib.error
|
||
|
|
import urllib.request
|
||
|
|
from typing import Any
|
||
|
|
|
||
|
|
base_url = os.environ["KEYCLOAK_SERVER"].rstrip("/")
|
||
|
|
realm = os.environ["KEYCLOAK_REALM"]
|
||
|
|
admin_user = os.environ["KEYCLOAK_ADMIN_USER"]
|
||
|
|
admin_password = os.environ["KEYCLOAK_ADMIN_PASSWORD"]
|
||
|
|
e2e_client_id = os.environ["PORTAL_E2E_CLIENT_ID"]
|
||
|
|
target_client_id = os.environ["TARGET_CLIENT_ID"]
|
||
|
|
|
||
|
|
uuid_re = re.compile(r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$", re.IGNORECASE)
|
||
|
|
|
||
|
|
def is_uuid(value: str) -> bool:
|
||
|
|
return bool(uuid_re.match(value))
|
||
|
|
|
||
|
|
def http_json(method: str, url: str, token: str, payload: Any | None = None):
|
||
|
|
data = None
|
||
|
|
headers = {"Authorization": f"Bearer {token}"}
|
||
|
|
if payload is not None:
|
||
|
|
data = json.dumps(payload).encode()
|
||
|
|
headers["Content-Type"] = "application/json"
|
||
|
|
req = urllib.request.Request(url, data=data, headers=headers, method=method)
|
||
|
|
try:
|
||
|
|
with urllib.request.urlopen(req, timeout=30) as resp:
|
||
|
|
body = resp.read()
|
||
|
|
if not body:
|
||
|
|
return resp.status, None
|
||
|
|
return resp.status, json.loads(body.decode())
|
||
|
|
except urllib.error.HTTPError as exc:
|
||
|
|
raw = exc.read()
|
||
|
|
if not raw:
|
||
|
|
return exc.code, None
|
||
|
|
try:
|
||
|
|
return exc.code, json.loads(raw.decode())
|
||
|
|
except Exception:
|
||
|
|
return exc.code, {"raw": raw.decode(errors="replace")}
|
||
|
|
|
||
|
|
def get_admin_token() -> str:
|
||
|
|
token_data = urllib.parse.urlencode(
|
||
|
|
{
|
||
|
|
"grant_type": "password",
|
||
|
|
"client_id": "admin-cli",
|
||
|
|
"username": admin_user,
|
||
|
|
"password": admin_password,
|
||
|
|
}
|
||
|
|
).encode()
|
||
|
|
req = urllib.request.Request(
|
||
|
|
f"{base_url}/realms/master/protocol/openid-connect/token",
|
||
|
|
data=token_data,
|
||
|
|
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
||
|
|
method="POST",
|
||
|
|
)
|
||
|
|
try:
|
||
|
|
with urllib.request.urlopen(req, timeout=15) as resp:
|
||
|
|
body = json.loads(resp.read().decode())
|
||
|
|
except urllib.error.HTTPError as exc:
|
||
|
|
raw = exc.read().decode(errors="replace")
|
||
|
|
raise SystemExit(f"Token request failed: status={exc.code} body={raw}")
|
||
|
|
return body["access_token"]
|
||
|
|
|
||
|
|
def find_client_uuid(token: str, client_id: str) -> str:
|
||
|
|
status, clients = http_json(
|
||
|
|
"GET",
|
||
|
|
f"{base_url}/admin/realms/{realm}/clients?clientId={urllib.parse.quote(client_id)}",
|
||
|
|
token,
|
||
|
|
)
|
||
|
|
if status != 200 or not isinstance(clients, list) or not clients:
|
||
|
|
raise SystemExit(f"Unable to find client {client_id!r} (status={status})")
|
||
|
|
for item in clients:
|
||
|
|
if isinstance(item, dict) and item.get("clientId") == client_id and item.get("id"):
|
||
|
|
return item["id"]
|
||
|
|
raise SystemExit(f"Client {client_id!r} has no id")
|
||
|
|
|
||
|
|
token = get_admin_token()
|
||
|
|
|
||
|
|
rm_uuid = find_client_uuid(token, "realm-management")
|
||
|
|
e2e_uuid = find_client_uuid(token, e2e_client_id)
|
||
|
|
target_uuid = find_client_uuid(token, target_client_id)
|
||
|
|
|
||
|
|
def enable_and_get_permissions(url: str) -> dict[str, Any]:
|
||
|
|
status, resp = http_json("PUT", url, token, {"enabled": True})
|
||
|
|
if status not in (200, 204):
|
||
|
|
raise SystemExit(f"Failed enabling permissions at {url} (status={status}) resp={resp}")
|
||
|
|
status, perms = http_json("GET", url, token)
|
||
|
|
if status != 200 or not isinstance(perms, dict):
|
||
|
|
raise SystemExit(f"Failed reading permissions at {url} (status={status}) resp={perms}")
|
||
|
|
return perms
|
||
|
|
|
||
|
|
users_perms = enable_and_get_permissions(f"{base_url}/admin/realms/{realm}/users-management-permissions")
|
||
|
|
users_scope_perms = users_perms.get("scopePermissions") or {}
|
||
|
|
if not isinstance(users_scope_perms, dict):
|
||
|
|
raise SystemExit("Users management permissions missing scopePermissions")
|
||
|
|
impersonate_perm_id = users_scope_perms.get("impersonate") or users_scope_perms.get("impersonation")
|
||
|
|
if not impersonate_perm_id:
|
||
|
|
keys = sorted(k for k in users_scope_perms.keys())
|
||
|
|
raise SystemExit(f"Users permissions missing impersonate scope (have: {keys})")
|
||
|
|
|
||
|
|
target_perms = enable_and_get_permissions(
|
||
|
|
f"{base_url}/admin/realms/{realm}/clients/{target_uuid}/management/permissions"
|
||
|
|
)
|
||
|
|
target_scope_perms = target_perms.get("scopePermissions") or {}
|
||
|
|
if not isinstance(target_scope_perms, dict):
|
||
|
|
raise SystemExit("Target client permissions missing scopePermissions")
|
||
|
|
token_exchange_perm_id = target_scope_perms.get("token-exchange")
|
||
|
|
if not token_exchange_perm_id:
|
||
|
|
keys = sorted(k for k in target_scope_perms.keys())
|
||
|
|
raise SystemExit(f"Target client permissions missing token-exchange scope (have: {keys})")
|
||
|
|
|
||
|
|
policy_name = "test-portal-e2e-token-exchange"
|
||
|
|
status, policies = http_json(
|
||
|
|
"GET",
|
||
|
|
f"{base_url}/admin/realms/{realm}/clients/{rm_uuid}/authz/resource-server/policy/search?name={urllib.parse.quote(policy_name)}&fields=id,name,type,config",
|
||
|
|
token,
|
||
|
|
)
|
||
|
|
policy = None
|
||
|
|
if status == 200 and isinstance(policies, list):
|
||
|
|
for item in policies:
|
||
|
|
if isinstance(item, dict) and item.get("name") == policy_name:
|
||
|
|
policy = item
|
||
|
|
break
|
||
|
|
|
||
|
|
if policy is None:
|
||
|
|
create_rep: dict[str, Any] = {
|
||
|
|
"name": policy_name,
|
||
|
|
"type": "client",
|
||
|
|
"logic": "POSITIVE",
|
||
|
|
"decisionStrategy": "UNANIMOUS",
|
||
|
|
"config": {"clients": json.dumps([e2e_uuid])},
|
||
|
|
}
|
||
|
|
status, created = http_json(
|
||
|
|
"POST",
|
||
|
|
f"{base_url}/admin/realms/{realm}/clients/{rm_uuid}/authz/resource-server/policy",
|
||
|
|
token,
|
||
|
|
create_rep,
|
||
|
|
)
|
||
|
|
if status != 201 or not isinstance(created, dict) or not created.get("id"):
|
||
|
|
raise SystemExit(f"Failed creating policy {policy_name!r} (status={status}) resp={created}")
|
||
|
|
policy = created
|
||
|
|
|
||
|
|
policy_id = policy.get("id")
|
||
|
|
if not isinstance(policy_id, str) or not policy_id:
|
||
|
|
raise SystemExit(f"Policy {policy_name!r} missing id")
|
||
|
|
|
||
|
|
def patch_permission(permission_id: str):
|
||
|
|
candidates = [
|
||
|
|
f"{base_url}/admin/realms/{realm}/clients/{rm_uuid}/authz/resource-server/permission/scope/{permission_id}",
|
||
|
|
f"{base_url}/admin/realms/{realm}/clients/{rm_uuid}/authz/resource-server/permission/resource/{permission_id}",
|
||
|
|
f"{base_url}/admin/realms/{realm}/clients/{rm_uuid}/authz/resource-server/permission/{permission_id}",
|
||
|
|
]
|
||
|
|
perm = None
|
||
|
|
url_used = None
|
||
|
|
for url in candidates:
|
||
|
|
st, body = http_json("GET", url, token)
|
||
|
|
if st == 200 and isinstance(body, dict):
|
||
|
|
perm = body
|
||
|
|
url_used = url
|
||
|
|
break
|
||
|
|
if perm is None or url_used is None:
|
||
|
|
raise SystemExit(f"Unable to fetch permission {permission_id} via expected endpoints")
|
||
|
|
|
||
|
|
policies_field = perm.get("policies")
|
||
|
|
if isinstance(policies_field, list):
|
||
|
|
policies_list = [p for p in policies_field if isinstance(p, str)]
|
||
|
|
else:
|
||
|
|
policies_list = []
|
||
|
|
|
||
|
|
use_ids = any(is_uuid(p) for p in policies_list)
|
||
|
|
entry = policy_id if use_ids else policy_name
|
||
|
|
if entry in policies_list:
|
||
|
|
return
|
||
|
|
|
||
|
|
policies_list.append(entry)
|
||
|
|
perm["policies"] = policies_list
|
||
|
|
st, body = http_json("PUT", url_used, token, perm)
|
||
|
|
if st in (200, 204):
|
||
|
|
return
|
||
|
|
|
||
|
|
# Retry once with the other identifier form.
|
||
|
|
alt_entry = policy_name if entry == policy_id else policy_id
|
||
|
|
if alt_entry not in policies_list:
|
||
|
|
perm["policies"] = [p for p in policies_list if p != entry] + [alt_entry]
|
||
|
|
st2, body2 = http_json("PUT", url_used, token, perm)
|
||
|
|
if st2 in (200, 204):
|
||
|
|
return
|
||
|
|
raise SystemExit(f"Failed updating permission {permission_id} (status={st2}) resp={body2}")
|
||
|
|
raise SystemExit(f"Failed updating permission {permission_id} (status={st}) resp={body}")
|
||
|
|
|
||
|
|
patch_permission(str(impersonate_perm_id))
|
||
|
|
patch_permission(str(token_exchange_perm_id))
|
||
|
|
|
||
|
|
print("OK: configured token exchange permissions for portal E2E client")
|
||
|
|
PY
|