#!/usr/bin/env python3 import email import http.client import imaplib import json import os import re import ssl import sys import time import urllib.error import urllib.parse import urllib.request def _env(name: str, default: str | None = None) -> str: value = os.environ.get(name, default) if value is None or value == "": raise SystemExit(f"missing required env var: {name}") return value def _post_json(url: str, payload: dict, timeout_s: int = 30) -> dict: body = json.dumps(payload).encode() req = urllib.request.Request( url, data=body, headers={"Content-Type": "application/json"}, method="POST", ) try: with urllib.request.urlopen(req, timeout=timeout_s) as resp: raw = resp.read().decode() return json.loads(raw) if raw else {} except urllib.error.HTTPError as exc: raw = exc.read().decode(errors="replace") raise SystemExit(f"HTTP {exc.code} from {url}: {raw}") def _post_form(url: str, data: dict[str, str], timeout_s: int = 30) -> dict: body = urllib.parse.urlencode(data).encode() req = urllib.request.Request( url, data=body, headers={"Content-Type": "application/x-www-form-urlencoded"}, method="POST", ) try: with urllib.request.urlopen(req, timeout=timeout_s) as resp: raw = resp.read().decode() return json.loads(raw) if raw else {} except urllib.error.HTTPError as exc: raw = exc.read().decode(errors="replace") raise SystemExit(f"HTTP {exc.code} from {url}: {raw}") def _get_json(url: str, headers: dict[str, str] | None = None, timeout_s: int = 30) -> object: req = urllib.request.Request(url, headers=headers or {}, method="GET") try: with urllib.request.urlopen(req, timeout=timeout_s) as resp: raw = resp.read().decode() return json.loads(raw) if raw else None except urllib.error.HTTPError as exc: raw = exc.read().decode(errors="replace") raise SystemExit(f"HTTP {exc.code} from {url}: {raw}") def _request_json( method: str, url: str, token: str, payload: dict | None = None, timeout_s: int = 30, ) -> dict: data = None headers = {"Authorization": f"Bearer {token}"} if payload is not None: data = json.dumps(payload).encode() headers["Content-Type"] = "application/json" req = urllib.request.Request(url, data=data, headers=headers, method=method) try: with urllib.request.urlopen(req, timeout=timeout_s) as resp: raw = resp.read().decode() return json.loads(raw) if raw else {} except urllib.error.HTTPError as exc: raw = exc.read().decode(errors="replace") raise SystemExit(f"HTTP {exc.code} from {url}: {raw}") def _keycloak_client_token(keycloak_base: str, realm: str, client_id: str, client_secret: str) -> str: token_url = f"{keycloak_base.rstrip('/')}/realms/{realm}/protocol/openid-connect/token" payload = _post_form( token_url, { "grant_type": "client_credentials", "client_id": client_id, "client_secret": client_secret, }, timeout_s=20, ) token = payload.get("access_token") if not isinstance(token, str) or not token: raise SystemExit("keycloak token response missing access_token") return token def _keycloak_token_exchange( *, keycloak_base: str, realm: str, client_id: str, client_secret: str, subject_token: str, requested_subject: str, audience: str, ) -> str: token_url = f"{keycloak_base.rstrip('/')}/realms/{realm}/protocol/openid-connect/token" payload = _post_form( token_url, { "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", "client_id": client_id, "client_secret": client_secret, "subject_token": subject_token, "requested_subject": requested_subject, "audience": audience, }, timeout_s=20, ) token = payload.get("access_token") if not isinstance(token, str) or not token: raise SystemExit("keycloak token exchange response missing access_token") return token def _keycloak_find_user(keycloak_base: str, realm: str, token: str, username: str) -> dict | None: url = f"{keycloak_base.rstrip('/')}/admin/realms/{realm}/users?{urllib.parse.urlencode({'username': username, 'exact': 'true', 'max': '1'})}" users = _get_json(url, headers={"Authorization": f"Bearer {token}"}, timeout_s=20) if not isinstance(users, list) or not users: return None user = users[0] return user if isinstance(user, dict) else None def _keycloak_get_user(keycloak_base: str, realm: str, token: str, user_id: str) -> dict: url = f"{keycloak_base.rstrip('/')}/admin/realms/{realm}/users/{urllib.parse.quote(user_id, safe='')}" data = _get_json(url, headers={"Authorization": f"Bearer {token}"}, timeout_s=20) if not isinstance(data, dict): raise SystemExit("unexpected keycloak user payload") return data def _extract_attr(attributes: object, key: str) -> str: if not isinstance(attributes, dict): return "" value = attributes.get(key) if isinstance(value, list) and value and isinstance(value[0], str): return value[0] if isinstance(value, str): return value return "" def _imap_wait_for_verify_token( *, host: str, port: int, username: str, password: str, request_code: str, deadline_sec: int, ) -> str: ssl_context = ssl._create_unverified_context() deadline_at = time.monotonic() + deadline_sec with imaplib.IMAP4_SSL(host, port, ssl_context=ssl_context) as client: client.login(username, password) client.select("INBOX") while time.monotonic() < deadline_at: status, data = client.search(None, "TEXT", request_code) if status == "OK" and data and data[0]: ids = data[0].split() msg_id = ids[-1] fetch_status, msg_data = client.fetch(msg_id, "(RFC822)") if fetch_status != "OK" or not msg_data: time.sleep(2) continue raw = msg_data[0][1] if isinstance(msg_data[0], tuple) and len(msg_data[0]) > 1 else None if not isinstance(raw, (bytes, bytearray)): time.sleep(2) continue message = email.message_from_bytes(raw) body = None if message.is_multipart(): for part in message.walk(): if part.get_content_type() == "text/plain": payload = part.get_payload(decode=True) if isinstance(payload, (bytes, bytearray)): body = payload.decode(errors="replace") break else: payload = message.get_payload(decode=True) if isinstance(payload, (bytes, bytearray)): body = payload.decode(errors="replace") if not body: time.sleep(2) continue url = None for line in body.splitlines(): candidate = line.strip() if "verify=" in candidate and candidate.startswith("http"): url = candidate break if not url: match = re.search(r"https?://\\S+verify=\\S+", body) url = match.group(0) if match else None if not url: time.sleep(2) continue parsed = urllib.parse.urlparse(url) query = urllib.parse.parse_qs(parsed.query) token = query.get("verify", [""])[0] if isinstance(token, str) and token: return token time.sleep(2) raise SystemExit("verification email not found before deadline") def main() -> int: portal_base = _env("PORTAL_BASE_URL").rstrip("/") keycloak_base = _env("KEYCLOAK_ADMIN_URL").rstrip("/") realm = _env("KEYCLOAK_REALM", "atlas") kc_admin_client_id = _env("KEYCLOAK_ADMIN_CLIENT_ID") kc_admin_client_secret = _env("KEYCLOAK_ADMIN_CLIENT_SECRET") portal_e2e_client_id = _env("PORTAL_E2E_CLIENT_ID") portal_e2e_client_secret = _env("PORTAL_E2E_CLIENT_SECRET") portal_target_client_id = os.environ.get("PORTAL_TARGET_CLIENT_ID", "bstein-dev-home").strip() or "bstein-dev-home" portal_admin_username = os.environ.get("E2E_PORTAL_ADMIN_USERNAME", "bstein").strip() or "bstein" contact_email = os.environ.get("E2E_CONTACT_EMAIL", "robotuser@bstein.dev").strip() if not contact_email: raise SystemExit("E2E_CONTACT_EMAIL must not be empty") imap_host = os.environ.get("E2E_IMAP_HOST", "mailu-front.mailu-mailserver.svc.cluster.local").strip() imap_port = int(os.environ.get("E2E_IMAP_PORT", "993")) imap_keycloak_username = os.environ.get("E2E_IMAP_KEYCLOAK_USERNAME", "robotuser").strip() imap_wait_sec = int(os.environ.get("E2E_IMAP_WAIT_SECONDS", "90")) try: token = _keycloak_client_token(keycloak_base, realm, kc_admin_client_id, kc_admin_client_secret) except SystemExit as exc: raise SystemExit(f"failed to fetch keycloak token for admin client {kc_admin_client_id!r}: {exc}") mailbox_user = _keycloak_find_user(keycloak_base, realm, token, imap_keycloak_username) if not mailbox_user: raise SystemExit(f"unable to locate Keycloak mailbox user {imap_keycloak_username!r}") mailbox_user_id = mailbox_user.get("id") if not isinstance(mailbox_user_id, str) or not mailbox_user_id: raise SystemExit("mailbox user missing id") mailbox_full = _keycloak_get_user(keycloak_base, realm, token, mailbox_user_id) mailbox_attrs = mailbox_full.get("attributes") mailu_email = _extract_attr(mailbox_attrs, "mailu_email") if not mailu_email: mailu_email = contact_email mailu_password = _extract_attr(mailbox_attrs, "mailu_app_password") if not mailu_password: raise SystemExit(f"Keycloak user {imap_keycloak_username!r} missing mailu_app_password attribute") username_prefix = os.environ.get("E2E_USERNAME_PREFIX", "e2e-user") now = int(time.time()) username = f"{username_prefix}-{now}" submit_url = f"{portal_base}/api/access/request" submit_payload = {"username": username, "email": contact_email, "note": "portal onboarding e2e"} submit = None for attempt in range(1, 6): try: submit = _post_json(submit_url, submit_payload, timeout_s=20) break except (http.client.RemoteDisconnected, TimeoutError, urllib.error.URLError) as exc: if attempt == 5: raise SystemExit(f"portal submit failed after {attempt} attempts: {exc}") time.sleep(2) if not isinstance(submit, dict): raise SystemExit("portal submit did not return json") request_code = submit.get("request_code") if not isinstance(request_code, str) or not request_code: raise SystemExit(f"request submit did not return request_code: {submit}") verify_token = _imap_wait_for_verify_token( host=imap_host, port=imap_port, username=mailu_email, password=mailu_password, request_code=request_code, deadline_sec=imap_wait_sec, ) verify_resp = _post_json( f"{portal_base}/api/access/request/verify", {"request_code": request_code, "token": verify_token}, timeout_s=30, ) if not isinstance(verify_resp, dict) or verify_resp.get("ok") is not True: raise SystemExit(f"unexpected verify response: {verify_resp}") portal_admin = _keycloak_find_user(keycloak_base, realm, token, portal_admin_username) if not portal_admin: raise SystemExit(f"unable to locate portal admin user {portal_admin_username!r} via Keycloak admin API") portal_admin_user_id = portal_admin.get("id") if not isinstance(portal_admin_user_id, str) or not portal_admin_user_id: raise SystemExit("portal admin user missing id") try: e2e_subject_token = _keycloak_client_token(keycloak_base, realm, portal_e2e_client_id, portal_e2e_client_secret) except SystemExit as exc: raise SystemExit(f"failed to fetch keycloak token for E2E client {portal_e2e_client_id!r}: {exc}") try: portal_bearer = _keycloak_token_exchange( keycloak_base=keycloak_base, realm=realm, client_id=portal_e2e_client_id, client_secret=portal_e2e_client_secret, subject_token=e2e_subject_token, requested_subject=portal_admin_user_id, audience=portal_target_client_id, ) except SystemExit as exc: raise SystemExit(f"failed to exchange token for portal approval as {portal_admin_username!r}: {exc}") approve_url = f"{portal_base}/api/admin/access/requests/{urllib.parse.quote(username, safe='')}/approve" approve_timeout_s = int(os.environ.get("E2E_APPROVE_TIMEOUT_SECONDS", "180")) approve_attempts = int(os.environ.get("E2E_APPROVE_ATTEMPTS", "3")) approve_resp = None approve_error = None for attempt in range(1, approve_attempts + 1): try: approve_resp = _request_json("POST", approve_url, portal_bearer, payload=None, timeout_s=approve_timeout_s) approve_error = None break except (http.client.RemoteDisconnected, TimeoutError, urllib.error.URLError) as exc: approve_error = str(exc) if attempt == approve_attempts: break time.sleep(3) if approve_resp is None: print( "WARNING: portal approval request did not return a response; " f"continuing to poll status (last_error={approve_error})" ) elif not isinstance(approve_resp, dict) or approve_resp.get("ok") is not True: raise SystemExit(f"unexpected approval response: {approve_resp}") status_url = f"{portal_base}/api/access/request/status" deadline_s = int(os.environ.get("E2E_DEADLINE_SECONDS", "600")) interval_s = int(os.environ.get("E2E_POLL_SECONDS", "10")) deadline_at = time.monotonic() + deadline_s last_status = None last_error = None while True: try: status_payload = _post_json(status_url, {"request_code": request_code}, timeout_s=60) last_error = None except (http.client.RemoteDisconnected, TimeoutError, urllib.error.URLError) as exc: last_error = str(exc) if time.monotonic() >= deadline_at: raise SystemExit(f"timed out waiting for provisioning to finish (last error={last_error})") time.sleep(interval_s) continue status = status_payload.get("status") if isinstance(status, str): last_status = status if status in ("awaiting_onboarding", "ready"): break if status in ("denied", "unknown"): raise SystemExit(f"request transitioned to unexpected terminal status: {status_payload}") if time.monotonic() >= deadline_at: suffix = f" (last error={last_error})" if last_error else "" raise SystemExit(f"timed out waiting for provisioning to finish (last status={last_status}){suffix}") time.sleep(interval_s) # Refresh admin token (it may expire during the provisioning wait). token = _keycloak_client_token(keycloak_base, realm, kc_admin_client_id, kc_admin_client_secret) user = _keycloak_find_user(keycloak_base, realm, token, username) if not user: raise SystemExit("expected Keycloak user was not created") user_id = user.get("id") if not isinstance(user_id, str) or not user_id: raise SystemExit("created user missing id") full = _keycloak_get_user(keycloak_base, realm, token, user_id) required_actions = full.get("requiredActions") or [] required: set[str] = set() if isinstance(required_actions, list): required = {a for a in required_actions if isinstance(a, str)} unexpected = sorted(required.intersection({"UPDATE_PASSWORD", "VERIFY_EMAIL", "CONFIGURE_TOTP"})) if unexpected: raise SystemExit( "Keycloak user should not require actions at first login " f"(Vaultwarden-first onboarding): unexpected requiredActions={unexpected} full={sorted(required)}" ) email_verified = full.get("emailVerified") if email_verified is not True: raise SystemExit(f"Keycloak user should have emailVerified=true: emailVerified={email_verified!r}") kc_email = full.get("email") if isinstance(kc_email, str) and contact_email and kc_email != contact_email: raise SystemExit(f"Keycloak user email mismatch: expected {contact_email!r} got {kc_email!r}") print(f"PASS: onboarding provisioning completed for {request_code} ({username})") return 0 if __name__ == "__main__": sys.exit(main())