#!/usr/bin/env python3 import json import os import sys import time import urllib.error import urllib.parse import urllib.request def _require_env(name: str) -> str: value = os.environ.get(name) if not value: raise SystemExit(f"missing required env var: {name}") return value def _post_form(url: str, data: dict[str, str], timeout_s: int = 30) -> dict: body = urllib.parse.urlencode(data).encode() req = urllib.request.Request( url, data=body, headers={"Content-Type": "application/x-www-form-urlencoded"}, method="POST", ) try: with urllib.request.urlopen(req, timeout=timeout_s) as resp: payload = resp.read().decode() return json.loads(payload) if payload else {} except urllib.error.HTTPError as exc: raw = exc.read().decode(errors="replace") raise SystemExit(f"HTTP {exc.code} from {url}: {raw}") def _request_json(method: str, url: str, token: str, payload: object | None = None, timeout_s: int = 30) -> tuple[int, object | None]: data = None headers = {"Authorization": f"Bearer {token}"} if payload is not None: data = json.dumps(payload).encode() headers["Content-Type"] = "application/json" req = urllib.request.Request(url, data=data, headers=headers, method=method) try: with urllib.request.urlopen(req, timeout=timeout_s) as resp: body = resp.read() if not body: return resp.status, None return resp.status, json.loads(body.decode()) except urllib.error.HTTPError as exc: raw = exc.read().decode(errors="replace") try: return exc.code, json.loads(raw) if raw else None except json.JSONDecodeError: return exc.code, raw def main() -> int: keycloak_base = _require_env("KEYCLOAK_SERVER").rstrip("/") realm = os.environ.get("KEYCLOAK_REALM", "atlas") client_id = _require_env("PORTAL_E2E_CLIENT_ID") client_secret = _require_env("PORTAL_E2E_CLIENT_SECRET") probe_username = os.environ.get("E2E_PROBE_USERNAME", "e2e-smtp-probe") probe_email = os.environ.get("E2E_PROBE_EMAIL", "e2e-smtp-probe@bstein.dev") execute_client_id = os.environ.get("EXECUTE_ACTIONS_CLIENT_ID", "bstein-dev-home") execute_redirect_uri = os.environ.get("EXECUTE_ACTIONS_REDIRECT_URI", "https://bstein.dev/") token_url = f"{keycloak_base}/realms/{realm}/protocol/openid-connect/token" admin_users_url = f"{keycloak_base}/admin/realms/{realm}/users" def get_access_token() -> str: token_payload = _post_form( token_url, {"grant_type": "client_credentials", "client_id": client_id, "client_secret": client_secret}, timeout_s=30, ) access_token = token_payload.get("access_token") if not isinstance(access_token, str) or not access_token: raise SystemExit("client credentials token missing access_token") return access_token access_token = get_access_token() users: list | None = None search_url = f"{admin_users_url}?{urllib.parse.urlencode({'username': probe_username, 'exact': 'true'})}" for attempt in range(1, 11): status, body = _request_json("GET", search_url, access_token, timeout_s=30) if status == 200 and isinstance(body, list): users = body break if status == 403 and attempt < 10: time.sleep(3) access_token = get_access_token() continue raise SystemExit(f"unexpected admin API response when searching for probe user (status={status} body={body})") if users is None: raise SystemExit("probe user search did not return a list response") if not users: create_payload = { "username": probe_username, "enabled": True, "email": probe_email, "emailVerified": True, } for attempt in range(1, 6): status, body = _request_json("POST", admin_users_url, access_token, create_payload, timeout_s=30) if status in (201, 204): break if status == 403 and attempt < 5: time.sleep(3) access_token = get_access_token() continue raise SystemExit(f"unexpected status creating probe user: {status} body={body}") # Refetch. for attempt in range(1, 11): status, body = _request_json("GET", search_url, access_token, timeout_s=30) if status == 200 and isinstance(body, list) and body: users = body break if status == 403 and attempt < 10: time.sleep(3) access_token = get_access_token() continue raise SystemExit(f"failed to refetch probe user after creation (status={status} body={body})") user_id = users[0].get("id") if not isinstance(user_id, str) or not user_id: raise SystemExit("probe user missing id") # execute-actions-email requires the user to be enabled and have an email configured. user_url = f"{admin_users_url}/{urllib.parse.quote(user_id)}" user: dict | None = None for attempt in range(1, 6): status, body = _request_json("GET", user_url, access_token, timeout_s=30) if status == 200 and isinstance(body, dict): user = body break if status == 403 and attempt < 5: time.sleep(3) access_token = get_access_token() continue raise SystemExit(f"unexpected status fetching probe user: {status} body={body}") if user is None: raise SystemExit("probe user fetch did not return a user object") needs_update = False if user.get("enabled") is False: user["enabled"] = True needs_update = True if user.get("email") != probe_email: user["email"] = probe_email needs_update = True if user.get("emailVerified") is not True: user["emailVerified"] = True needs_update = True if needs_update: for attempt in range(1, 6): status, body = _request_json("PUT", user_url, access_token, user, timeout_s=30) if status in (200, 204): break if status == 403 and attempt < 5: time.sleep(3) access_token = get_access_token() continue raise SystemExit(f"unexpected status updating probe user: {status} body={body}") # Trigger an email to validate Keycloak SMTP integration. query = urllib.parse.urlencode( { "client_id": execute_client_id, "redirect_uri": execute_redirect_uri, "lifespan": "600", } ) url = f"{admin_users_url}/{urllib.parse.quote(user_id)}/execute-actions-email?{query}" for attempt in range(1, 6): status, body = _request_json("PUT", url, access_token, ["UPDATE_PASSWORD"], timeout_s=30) if status == 204: break if status == 403 and attempt < 5: time.sleep(3) access_token = get_access_token() continue raise SystemExit(f"unexpected status from execute-actions-email: {status} body={body}") print("PASS: Keycloak execute-actions-email succeeded") return 0 if __name__ == "__main__": sys.exit(main())