#!/usr/bin/env python3 import json import os import sys import time import urllib.error import urllib.parse import urllib.request import psycopg def _env(name: str, default: str | None = None) -> str: value = os.environ.get(name, default) if value is None or value == "": raise SystemExit(f"missing required env var: {name}") return value def _post_json(url: str, payload: dict, timeout_s: int = 30) -> dict: body = json.dumps(payload).encode() req = urllib.request.Request( url, data=body, headers={"Content-Type": "application/json"}, method="POST", ) try: with urllib.request.urlopen(req, timeout=timeout_s) as resp: raw = resp.read().decode() return json.loads(raw) if raw else {} except urllib.error.HTTPError as exc: raw = exc.read().decode(errors="replace") raise SystemExit(f"HTTP {exc.code} from {url}: {raw}") def _post_form(url: str, data: dict[str, str], timeout_s: int = 30) -> dict: body = urllib.parse.urlencode(data).encode() req = urllib.request.Request( url, data=body, headers={"Content-Type": "application/x-www-form-urlencoded"}, method="POST", ) try: with urllib.request.urlopen(req, timeout=timeout_s) as resp: raw = resp.read().decode() return json.loads(raw) if raw else {} except urllib.error.HTTPError as exc: raw = exc.read().decode(errors="replace") raise SystemExit(f"HTTP {exc.code} from {url}: {raw}") def _get_json(url: str, headers: dict[str, str] | None = None, timeout_s: int = 30) -> object: req = urllib.request.Request(url, headers=headers or {}, method="GET") try: with urllib.request.urlopen(req, timeout=timeout_s) as resp: raw = resp.read().decode() return json.loads(raw) if raw else None except urllib.error.HTTPError as exc: raw = exc.read().decode(errors="replace") raise SystemExit(f"HTTP {exc.code} from {url}: {raw}") def _keycloak_admin_token(keycloak_base: str, realm: str, client_id: str, client_secret: str) -> str: token_url = f"{keycloak_base.rstrip('/')}/realms/{realm}/protocol/openid-connect/token" payload = _post_form( token_url, { "grant_type": "client_credentials", "client_id": client_id, "client_secret": client_secret, }, timeout_s=20, ) token = payload.get("access_token") if not isinstance(token, str) or not token: raise SystemExit("keycloak admin token response missing access_token") return token def _keycloak_find_user(keycloak_base: str, realm: str, token: str, username: str) -> dict | None: url = f"{keycloak_base.rstrip('/')}/admin/realms/{realm}/users?{urllib.parse.urlencode({'username': username, 'exact': 'true', 'max': '1'})}" users = _get_json(url, headers={"Authorization": f"Bearer {token}"}, timeout_s=20) if not isinstance(users, list) or not users: return None user = users[0] return user if isinstance(user, dict) else None def _keycloak_get_user(keycloak_base: str, realm: str, token: str, user_id: str) -> dict: url = f"{keycloak_base.rstrip('/')}/admin/realms/{realm}/users/{urllib.parse.quote(user_id, safe='')}" data = _get_json(url, headers={"Authorization": f"Bearer {token}"}, timeout_s=20) if not isinstance(data, dict): raise SystemExit("unexpected keycloak user payload") return data def main() -> int: portal_base = _env("PORTAL_BASE_URL").rstrip("/") db_url = _env("PORTAL_DATABASE_URL") keycloak_base = _env("KEYCLOAK_ADMIN_URL").rstrip("/") realm = _env("KEYCLOAK_REALM", "atlas") kc_admin_client_id = _env("KEYCLOAK_ADMIN_CLIENT_ID") kc_admin_client_secret = _env("KEYCLOAK_ADMIN_CLIENT_SECRET") username_prefix = os.environ.get("E2E_USERNAME_PREFIX", "e2e-user") now = int(time.time()) username = f"{username_prefix}-{now}" email = f"{username}@example.invalid" submit = _post_json( f"{portal_base}/api/access/request", {"username": username, "email": email, "note": "portal onboarding e2e"}, timeout_s=20, ) request_code = submit.get("request_code") if not isinstance(request_code, str) or not request_code: raise SystemExit(f"request submit did not return request_code: {submit}") with psycopg.connect(db_url, autocommit=True) as conn: # Bypass the emailed token by marking the request as verified and pending (same as /api/access/request/verify). conn.execute( """ UPDATE access_requests SET status = 'pending', email_verified_at = NOW(), email_verification_token_hash = NULL WHERE request_code = %s AND status = 'pending_email_verification' """, (request_code,), ) # Simulate admin approval. conn.execute( """ UPDATE access_requests SET status = 'accounts_building', decided_at = NOW(), decided_by = 'portal-e2e' WHERE request_code = %s AND status = 'pending' """, (request_code,), ) status_url = f"{portal_base}/api/access/request/status" deadline_s = int(os.environ.get("E2E_DEADLINE_SECONDS", "600")) interval_s = int(os.environ.get("E2E_POLL_SECONDS", "10")) deadline_at = time.monotonic() + deadline_s last_status = None while True: status_payload = _post_json(status_url, {"request_code": request_code}, timeout_s=60) status = status_payload.get("status") if isinstance(status, str): last_status = status if status in ("awaiting_onboarding", "ready"): break if status in ("denied", "unknown"): raise SystemExit(f"request transitioned to unexpected terminal status: {status_payload}") if time.monotonic() >= deadline_at: raise SystemExit(f"timed out waiting for provisioning to finish (last status={last_status})") time.sleep(interval_s) token = _keycloak_admin_token(keycloak_base, realm, kc_admin_client_id, kc_admin_client_secret) user = _keycloak_find_user(keycloak_base, realm, token, username) if not user: raise SystemExit("expected Keycloak user was not created") user_id = user.get("id") if not isinstance(user_id, str) or not user_id: raise SystemExit("created user missing id") full = _keycloak_get_user(keycloak_base, realm, token, user_id) required_actions = full.get("requiredActions") or [] required: set[str] = set() if isinstance(required_actions, list): required = {a for a in required_actions if isinstance(a, str)} missing = [name for name in ("UPDATE_PASSWORD", "CONFIGURE_TOTP") if name not in required] if missing: raise SystemExit(f"Keycloak user missing required actions {missing}: requiredActions={sorted(required)}") print(f"PASS: onboarding provisioning completed for {request_code} ({username})") return 0 if __name__ == "__main__": sys.exit(main())