titan-iac/services/bstein-dev-home/scripts/test_portal_onboarding_flow.py

455 lines
18 KiB
Python

#!/usr/bin/env python3
import email
import http.client
import imaplib
import json
import os
import re
import ssl
import sys
import time
import urllib.error
import urllib.parse
import urllib.request
def _env(name: str, default: str | None = None) -> str:
value = os.environ.get(name, default)
if value is None or value == "":
raise SystemExit(f"missing required env var: {name}")
return value
def _post_json(url: str, payload: dict, timeout_s: int = 30) -> dict:
body = json.dumps(payload).encode()
req = urllib.request.Request(
url,
data=body,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=timeout_s) as resp:
raw = resp.read().decode()
return json.loads(raw) if raw else {}
except urllib.error.HTTPError as exc:
raw = exc.read().decode(errors="replace")
raise SystemExit(f"HTTP {exc.code} from {url}: {raw}")
def _post_form(url: str, data: dict[str, str], timeout_s: int = 30) -> dict:
body = urllib.parse.urlencode(data).encode()
req = urllib.request.Request(
url,
data=body,
headers={"Content-Type": "application/x-www-form-urlencoded"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=timeout_s) as resp:
raw = resp.read().decode()
return json.loads(raw) if raw else {}
except urllib.error.HTTPError as exc:
raw = exc.read().decode(errors="replace")
raise SystemExit(f"HTTP {exc.code} from {url}: {raw}")
def _get_json(url: str, headers: dict[str, str] | None = None, timeout_s: int = 30) -> object:
req = urllib.request.Request(url, headers=headers or {}, method="GET")
try:
with urllib.request.urlopen(req, timeout=timeout_s) as resp:
raw = resp.read().decode()
return json.loads(raw) if raw else None
except urllib.error.HTTPError as exc:
raw = exc.read().decode(errors="replace")
raise SystemExit(f"HTTP {exc.code} from {url}: {raw}")
def _wait_for_portal_ready(base_url: str, timeout_s: int = 60) -> None:
health_url = f"{base_url.rstrip('/')}/api/healthz"
deadline_at = time.monotonic() + timeout_s
last_error = None
while time.monotonic() < deadline_at:
try:
req = urllib.request.Request(health_url, method="GET")
with urllib.request.urlopen(req, timeout=10) as resp:
if resp.status == 200:
return
except Exception as exc:
last_error = str(exc)
time.sleep(2)
suffix = f" (last_error={last_error})" if last_error else ""
raise SystemExit(f"portal health check timed out{suffix}")
def _request_json(
method: str,
url: str,
token: str,
payload: dict | None = None,
timeout_s: int = 30,
) -> dict:
data = None
headers = {"Authorization": f"Bearer {token}"}
if payload is not None:
data = json.dumps(payload).encode()
headers["Content-Type"] = "application/json"
req = urllib.request.Request(url, data=data, headers=headers, method=method)
try:
with urllib.request.urlopen(req, timeout=timeout_s) as resp:
raw = resp.read().decode()
return json.loads(raw) if raw else {}
except urllib.error.HTTPError as exc:
raw = exc.read().decode(errors="replace")
raise SystemExit(f"HTTP {exc.code} from {url}: {raw}")
def _keycloak_client_token(keycloak_base: str, realm: str, client_id: str, client_secret: str) -> str:
token_url = f"{keycloak_base.rstrip('/')}/realms/{realm}/protocol/openid-connect/token"
payload = _post_form(
token_url,
{
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret,
},
timeout_s=20,
)
token = payload.get("access_token")
if not isinstance(token, str) or not token:
raise SystemExit("keycloak token response missing access_token")
return token
def _keycloak_token_exchange(
*,
keycloak_base: str,
realm: str,
client_id: str,
client_secret: str,
subject_token: str,
requested_subject: str,
audience: str,
) -> str:
token_url = f"{keycloak_base.rstrip('/')}/realms/{realm}/protocol/openid-connect/token"
payload = _post_form(
token_url,
{
"grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
"client_id": client_id,
"client_secret": client_secret,
"subject_token": subject_token,
"requested_subject": requested_subject,
"audience": audience,
},
timeout_s=20,
)
token = payload.get("access_token")
if not isinstance(token, str) or not token:
raise SystemExit("keycloak token exchange response missing access_token")
return token
def _keycloak_find_user(keycloak_base: str, realm: str, token: str, username: str) -> dict | None:
url = f"{keycloak_base.rstrip('/')}/admin/realms/{realm}/users?{urllib.parse.urlencode({'username': username, 'exact': 'true', 'max': '1'})}"
users = _get_json(url, headers={"Authorization": f"Bearer {token}"}, timeout_s=20)
if not isinstance(users, list) or not users:
return None
user = users[0]
return user if isinstance(user, dict) else None
def _keycloak_get_user(keycloak_base: str, realm: str, token: str, user_id: str) -> dict:
url = f"{keycloak_base.rstrip('/')}/admin/realms/{realm}/users/{urllib.parse.quote(user_id, safe='')}"
data = _get_json(url, headers={"Authorization": f"Bearer {token}"}, timeout_s=20)
if not isinstance(data, dict):
raise SystemExit("unexpected keycloak user payload")
return data
def _extract_attr(attributes: object, key: str) -> str:
if not isinstance(attributes, dict):
return ""
value = attributes.get(key)
if isinstance(value, list) and value and isinstance(value[0], str):
return value[0]
if isinstance(value, str):
return value
return ""
def _imap_wait_for_verify_token(
*,
host: str,
port: int,
username: str,
password: str,
request_code: str,
deadline_sec: int,
) -> str:
ssl_context = ssl._create_unverified_context()
deadline_at = time.monotonic() + deadline_sec
with imaplib.IMAP4_SSL(host, port, ssl_context=ssl_context) as client:
client.login(username, password)
client.select("INBOX")
while time.monotonic() < deadline_at:
status, data = client.search(None, "TEXT", request_code)
if status == "OK" and data and data[0]:
ids = data[0].split()
msg_id = ids[-1]
fetch_status, msg_data = client.fetch(msg_id, "(RFC822)")
if fetch_status != "OK" or not msg_data:
time.sleep(2)
continue
raw = msg_data[0][1] if isinstance(msg_data[0], tuple) and len(msg_data[0]) > 1 else None
if not isinstance(raw, (bytes, bytearray)):
time.sleep(2)
continue
message = email.message_from_bytes(raw)
body = None
if message.is_multipart():
for part in message.walk():
if part.get_content_type() == "text/plain":
payload = part.get_payload(decode=True)
if isinstance(payload, (bytes, bytearray)):
body = payload.decode(errors="replace")
break
else:
payload = message.get_payload(decode=True)
if isinstance(payload, (bytes, bytearray)):
body = payload.decode(errors="replace")
if not body:
time.sleep(2)
continue
url = None
for line in body.splitlines():
candidate = line.strip()
if "verify=" in candidate and candidate.startswith("http"):
url = candidate
break
if not url:
match = re.search(r"https?://\\S+verify=\\S+", body)
url = match.group(0) if match else None
if not url:
time.sleep(2)
continue
parsed = urllib.parse.urlparse(url)
query = urllib.parse.parse_qs(parsed.query)
token = query.get("verify", [""])[0]
if isinstance(token, str) and token:
return token
time.sleep(2)
raise SystemExit("verification email not found before deadline")
def main() -> int:
portal_base = _env("PORTAL_BASE_URL").rstrip("/")
portal_ready_timeout = int(os.environ.get("E2E_PORTAL_READY_TIMEOUT_SECONDS", "60"))
keycloak_base = _env("KEYCLOAK_ADMIN_URL").rstrip("/")
realm = _env("KEYCLOAK_REALM", "atlas")
kc_admin_client_id = _env("KEYCLOAK_ADMIN_CLIENT_ID")
kc_admin_client_secret = _env("KEYCLOAK_ADMIN_CLIENT_SECRET")
portal_e2e_client_id = _env("PORTAL_E2E_CLIENT_ID")
portal_e2e_client_secret = _env("PORTAL_E2E_CLIENT_SECRET")
portal_target_client_id = os.environ.get("PORTAL_TARGET_CLIENT_ID", "bstein-dev-home").strip() or "bstein-dev-home"
portal_admin_username = os.environ.get("E2E_PORTAL_ADMIN_USERNAME", "bstein").strip() or "bstein"
contact_email = os.environ.get("E2E_CONTACT_EMAIL", "robotuser@bstein.dev").strip()
if not contact_email:
raise SystemExit("E2E_CONTACT_EMAIL must not be empty")
imap_host = os.environ.get("E2E_IMAP_HOST", "mail.bstein.dev").strip()
imap_port = int(os.environ.get("E2E_IMAP_PORT", "993"))
imap_keycloak_username = os.environ.get("E2E_IMAP_KEYCLOAK_USERNAME", "robotuser").strip()
imap_wait_sec = int(os.environ.get("E2E_IMAP_WAIT_SECONDS", "90"))
try:
token = _keycloak_client_token(keycloak_base, realm, kc_admin_client_id, kc_admin_client_secret)
except SystemExit as exc:
raise SystemExit(f"failed to fetch keycloak token for admin client {kc_admin_client_id!r}: {exc}")
mailbox_user = _keycloak_find_user(keycloak_base, realm, token, imap_keycloak_username)
if not mailbox_user:
raise SystemExit(f"unable to locate Keycloak mailbox user {imap_keycloak_username!r}")
mailbox_user_id = mailbox_user.get("id")
if not isinstance(mailbox_user_id, str) or not mailbox_user_id:
raise SystemExit("mailbox user missing id")
mailbox_full = _keycloak_get_user(keycloak_base, realm, token, mailbox_user_id)
mailbox_attrs = mailbox_full.get("attributes")
mailu_email = _extract_attr(mailbox_attrs, "mailu_email")
if not mailu_email:
mailu_email = contact_email
mailu_password = _extract_attr(mailbox_attrs, "mailu_app_password")
if not mailu_password:
raise SystemExit(f"Keycloak user {imap_keycloak_username!r} missing mailu_app_password attribute")
_wait_for_portal_ready(portal_base, timeout_s=portal_ready_timeout)
username_prefix = os.environ.get("E2E_USERNAME_PREFIX", "e2e-user")
now = int(time.time())
username = f"{username_prefix}-{now}"
submit_url = f"{portal_base}/api/access/request"
submit_payload = {"username": username, "email": contact_email, "note": "portal onboarding e2e"}
submit = None
for attempt in range(1, 6):
try:
submit = _post_json(submit_url, submit_payload, timeout_s=20)
break
except (http.client.RemoteDisconnected, TimeoutError, urllib.error.URLError) as exc:
if attempt == 5:
raise SystemExit(f"portal submit failed after {attempt} attempts: {exc}")
time.sleep(2)
if not isinstance(submit, dict):
raise SystemExit("portal submit did not return json")
request_code = submit.get("request_code")
if not isinstance(request_code, str) or not request_code:
raise SystemExit(f"request submit did not return request_code: {submit}")
verify_token = _imap_wait_for_verify_token(
host=imap_host,
port=imap_port,
username=mailu_email,
password=mailu_password,
request_code=request_code,
deadline_sec=imap_wait_sec,
)
verify_resp = _post_json(
f"{portal_base}/api/access/request/verify",
{"request_code": request_code, "token": verify_token},
timeout_s=30,
)
if not isinstance(verify_resp, dict) or verify_resp.get("ok") is not True:
raise SystemExit(f"unexpected verify response: {verify_resp}")
portal_admin = _keycloak_find_user(keycloak_base, realm, token, portal_admin_username)
if not portal_admin:
raise SystemExit(f"unable to locate portal admin user {portal_admin_username!r} via Keycloak admin API")
portal_admin_user_id = portal_admin.get("id")
if not isinstance(portal_admin_user_id, str) or not portal_admin_user_id:
raise SystemExit("portal admin user missing id")
try:
e2e_subject_token = _keycloak_client_token(keycloak_base, realm, portal_e2e_client_id, portal_e2e_client_secret)
except SystemExit as exc:
raise SystemExit(f"failed to fetch keycloak token for E2E client {portal_e2e_client_id!r}: {exc}")
try:
portal_bearer = _keycloak_token_exchange(
keycloak_base=keycloak_base,
realm=realm,
client_id=portal_e2e_client_id,
client_secret=portal_e2e_client_secret,
subject_token=e2e_subject_token,
requested_subject=portal_admin_user_id,
audience=portal_target_client_id,
)
except SystemExit as exc:
raise SystemExit(f"failed to exchange token for portal approval as {portal_admin_username!r}: {exc}")
_wait_for_portal_ready(portal_base, timeout_s=portal_ready_timeout)
approve_url = f"{portal_base}/api/admin/access/requests/{urllib.parse.quote(username, safe='')}/approve"
approve_timeout_s = int(os.environ.get("E2E_APPROVE_TIMEOUT_SECONDS", "180"))
approve_attempts = int(os.environ.get("E2E_APPROVE_ATTEMPTS", "3"))
approve_resp = None
approve_error = None
for attempt in range(1, approve_attempts + 1):
try:
approve_resp = _request_json("POST", approve_url, portal_bearer, payload=None, timeout_s=approve_timeout_s)
approve_error = None
break
except (http.client.RemoteDisconnected, TimeoutError, urllib.error.URLError) as exc:
approve_error = str(exc)
try:
_wait_for_portal_ready(portal_base, timeout_s=min(30, portal_ready_timeout))
except SystemExit:
pass
if attempt == approve_attempts:
break
time.sleep(3)
if approve_resp is None:
print(
"WARNING: portal approval request did not return a response; "
f"continuing to poll status (last_error={approve_error})"
)
elif not isinstance(approve_resp, dict) or approve_resp.get("ok") is not True:
raise SystemExit(f"unexpected approval response: {approve_resp}")
status_url = f"{portal_base}/api/access/request/status"
deadline_s = int(os.environ.get("E2E_DEADLINE_SECONDS", "600"))
interval_s = int(os.environ.get("E2E_POLL_SECONDS", "10"))
deadline_at = time.monotonic() + deadline_s
last_status = None
last_error = None
while True:
try:
status_payload = _post_json(status_url, {"request_code": request_code}, timeout_s=60)
last_error = None
except (http.client.RemoteDisconnected, TimeoutError, urllib.error.URLError) as exc:
last_error = str(exc)
if time.monotonic() >= deadline_at:
raise SystemExit(f"timed out waiting for provisioning to finish (last error={last_error})")
time.sleep(interval_s)
continue
status = status_payload.get("status")
if isinstance(status, str):
last_status = status
if status in ("awaiting_onboarding", "ready"):
break
if status in ("denied", "unknown"):
raise SystemExit(f"request transitioned to unexpected terminal status: {status_payload}")
if time.monotonic() >= deadline_at:
suffix = f" (last error={last_error})" if last_error else ""
raise SystemExit(f"timed out waiting for provisioning to finish (last status={last_status}){suffix}")
time.sleep(interval_s)
# Refresh admin token (it may expire during the provisioning wait).
token = _keycloak_client_token(keycloak_base, realm, kc_admin_client_id, kc_admin_client_secret)
user = _keycloak_find_user(keycloak_base, realm, token, username)
if not user:
raise SystemExit("expected Keycloak user was not created")
user_id = user.get("id")
if not isinstance(user_id, str) or not user_id:
raise SystemExit("created user missing id")
full = _keycloak_get_user(keycloak_base, realm, token, user_id)
required_actions = full.get("requiredActions") or []
required: set[str] = set()
if isinstance(required_actions, list):
required = {a for a in required_actions if isinstance(a, str)}
unexpected = sorted(required.intersection({"UPDATE_PASSWORD", "VERIFY_EMAIL", "CONFIGURE_TOTP"}))
if unexpected:
raise SystemExit(
"Keycloak user should not require actions at first login "
f"(Vaultwarden-first onboarding): unexpected requiredActions={unexpected} full={sorted(required)}"
)
email_verified = full.get("emailVerified")
if email_verified is not True:
raise SystemExit(f"Keycloak user should have emailVerified=true: emailVerified={email_verified!r}")
kc_email = full.get("email")
if isinstance(kc_email, str) and contact_email and kc_email != contact_email:
raise SystemExit(f"Keycloak user email mismatch: expected {contact_email!r} got {kc_email!r}")
print(f"PASS: onboarding provisioning completed for {request_code} ({username})")
return 0
if __name__ == "__main__":
sys.exit(main())