titan-iac/scripts/tests/test_keycloak_execute_actions_email.py

166 lines
6.3 KiB
Python

#!/usr/bin/env python3
import json
import os
import sys
import time
import urllib.error
import urllib.parse
import urllib.request
def _require_env(name: str) -> str:
value = os.environ.get(name)
if not value:
raise SystemExit(f"missing required env var: {name}")
return value
def _post_form(url: str, data: dict[str, str], timeout_s: int = 30) -> dict:
body = urllib.parse.urlencode(data).encode()
req = urllib.request.Request(
url,
data=body,
headers={"Content-Type": "application/x-www-form-urlencoded"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=timeout_s) as resp:
payload = resp.read().decode()
return json.loads(payload) if payload else {}
except urllib.error.HTTPError as exc:
raw = exc.read().decode(errors="replace")
raise SystemExit(f"HTTP {exc.code} from {url}: {raw}")
def _request_json(method: str, url: str, token: str, payload: object | None = None, timeout_s: int = 30) -> tuple[int, object | None]:
data = None
headers = {"Authorization": f"Bearer {token}"}
if payload is not None:
data = json.dumps(payload).encode()
headers["Content-Type"] = "application/json"
req = urllib.request.Request(url, data=data, headers=headers, method=method)
try:
with urllib.request.urlopen(req, timeout=timeout_s) as resp:
body = resp.read()
if not body:
return resp.status, None
return resp.status, json.loads(body.decode())
except urllib.error.HTTPError as exc:
raw = exc.read().decode(errors="replace")
try:
return exc.code, json.loads(raw) if raw else None
except json.JSONDecodeError:
return exc.code, raw
def main() -> int:
keycloak_base = _require_env("KEYCLOAK_SERVER").rstrip("/")
realm = os.environ.get("KEYCLOAK_REALM", "atlas")
client_id = _require_env("PORTAL_E2E_CLIENT_ID")
client_secret = _require_env("PORTAL_E2E_CLIENT_SECRET")
probe_username = os.environ.get("E2E_PROBE_USERNAME", "e2e-smtp-probe")
probe_email = os.environ.get("E2E_PROBE_EMAIL", "e2e-smtp-probe@bstein.dev")
execute_client_id = os.environ.get("EXECUTE_ACTIONS_CLIENT_ID", "bstein-dev-home")
execute_redirect_uri = os.environ.get("EXECUTE_ACTIONS_REDIRECT_URI", "https://bstein.dev/")
token_url = f"{keycloak_base}/realms/{realm}/protocol/openid-connect/token"
admin_users_url = f"{keycloak_base}/admin/realms/{realm}/users"
def get_access_token() -> str:
token_payload = _post_form(
token_url,
{"grant_type": "client_credentials", "client_id": client_id, "client_secret": client_secret},
timeout_s=30,
)
access_token = token_payload.get("access_token")
if not isinstance(access_token, str) or not access_token:
raise SystemExit("client credentials token missing access_token")
return access_token
access_token = get_access_token()
users: list | None = None
search_url = f"{admin_users_url}?{urllib.parse.urlencode({'username': probe_username, 'exact': 'true'})}"
for attempt in range(1, 11):
status, body = _request_json("GET", search_url, access_token, timeout_s=30)
if status == 200 and isinstance(body, list):
users = body
break
if status == 403 and attempt < 10:
time.sleep(3)
access_token = get_access_token()
continue
raise SystemExit(f"unexpected admin API response when searching for probe user (status={status} body={body})")
if users is None:
raise SystemExit("probe user search did not return a list response")
if not users:
create_payload = {
"username": probe_username,
"enabled": True,
"email": probe_email,
"emailVerified": True,
}
for attempt in range(1, 6):
status, body = _request_json("POST", admin_users_url, access_token, create_payload, timeout_s=30)
if status in (201, 204):
break
if status == 403 and attempt < 5:
time.sleep(3)
access_token = get_access_token()
continue
raise SystemExit(f"unexpected status creating probe user: {status} body={body}")
# Refetch.
for attempt in range(1, 11):
status, body = _request_json("GET", search_url, access_token, timeout_s=30)
if status == 200 and isinstance(body, list) and body:
users = body
break
if status == 403 and attempt < 10:
time.sleep(3)
access_token = get_access_token()
continue
raise SystemExit(f"failed to refetch probe user after creation (status={status} body={body})")
user_id = users[0].get("id")
if not isinstance(user_id, str) or not user_id:
raise SystemExit("probe user missing id")
# execute-actions-email requires the user to be enabled.
status, user = _request_json("GET", f"{admin_users_url}/{urllib.parse.quote(user_id)}", access_token, timeout_s=30)
if status == 200 and isinstance(user, dict) and user.get("enabled") is False:
user["enabled"] = True
status, body = _request_json("PUT", f"{admin_users_url}/{urllib.parse.quote(user_id)}", access_token, user, timeout_s=30)
if status not in (200, 204):
raise SystemExit(f"unexpected status enabling probe user: {status} body={body}")
# Trigger an email to validate Keycloak SMTP integration.
query = urllib.parse.urlencode(
{
"client_id": execute_client_id,
"redirect_uri": execute_redirect_uri,
"lifespan": "600",
}
)
url = f"{admin_users_url}/{urllib.parse.quote(user_id)}/execute-actions-email?{query}"
for attempt in range(1, 6):
status, body = _request_json("PUT", url, access_token, ["UPDATE_PASSWORD"], timeout_s=30)
if status == 204:
break
if status == 403 and attempt < 5:
time.sleep(3)
access_token = get_access_token()
continue
raise SystemExit(f"unexpected status from execute-actions-email: {status} body={body}")
print("PASS: Keycloak execute-actions-email succeeded")
return 0
if __name__ == "__main__":
sys.exit(main())