titan-iac/scripts/tests/test_keycloak_execute_actions_email.py

128 lines
4.6 KiB
Python

#!/usr/bin/env python3
import json
import os
import sys
import urllib.error
import urllib.parse
import urllib.request
def _require_env(name: str) -> str:
value = os.environ.get(name)
if not value:
raise SystemExit(f"missing required env var: {name}")
return value
def _post_form(url: str, data: dict[str, str], timeout_s: int = 30) -> dict:
body = urllib.parse.urlencode(data).encode()
req = urllib.request.Request(
url,
data=body,
headers={"Content-Type": "application/x-www-form-urlencoded"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=timeout_s) as resp:
payload = resp.read().decode()
return json.loads(payload) if payload else {}
except urllib.error.HTTPError as exc:
raw = exc.read().decode(errors="replace")
raise SystemExit(f"HTTP {exc.code} from {url}: {raw}")
def _request_json(method: str, url: str, token: str, payload: object | None = None, timeout_s: int = 30) -> tuple[int, object | None]:
data = None
headers = {"Authorization": f"Bearer {token}"}
if payload is not None:
data = json.dumps(payload).encode()
headers["Content-Type"] = "application/json"
req = urllib.request.Request(url, data=data, headers=headers, method=method)
try:
with urllib.request.urlopen(req, timeout=timeout_s) as resp:
body = resp.read()
if not body:
return resp.status, None
return resp.status, json.loads(body.decode())
except urllib.error.HTTPError as exc:
raw = exc.read().decode(errors="replace")
raise SystemExit(f"HTTP {exc.code} from {url}: {raw}")
def main() -> int:
keycloak_base = _require_env("KEYCLOAK_SERVER").rstrip("/")
realm = os.environ.get("KEYCLOAK_REALM", "atlas")
client_id = _require_env("PORTAL_E2E_CLIENT_ID")
client_secret = _require_env("PORTAL_E2E_CLIENT_SECRET")
probe_username = os.environ.get("E2E_PROBE_USERNAME", "e2e-smtp-probe")
probe_email = os.environ.get("E2E_PROBE_EMAIL", "e2e-smtp-probe@bstein.dev")
execute_client_id = os.environ.get("EXECUTE_ACTIONS_CLIENT_ID", "bstein-dev-home")
execute_redirect_uri = os.environ.get("EXECUTE_ACTIONS_REDIRECT_URI", "https://bstein.dev/")
token_url = f"{keycloak_base}/realms/{realm}/protocol/openid-connect/token"
admin_users_url = f"{keycloak_base}/admin/realms/{realm}/users"
token_payload = _post_form(
token_url,
{"grant_type": "client_credentials", "client_id": client_id, "client_secret": client_secret},
timeout_s=30,
)
access_token = token_payload.get("access_token")
if not isinstance(access_token, str) or not access_token:
raise SystemExit("client credentials token missing access_token")
status, users = _request_json(
"GET",
f"{admin_users_url}?{urllib.parse.urlencode({'username': probe_username, 'exact': 'true'})}",
access_token,
timeout_s=30,
)
if status != 200 or not isinstance(users, list):
raise SystemExit("unexpected admin API response when searching for probe user")
if not users:
create_payload = {
"username": probe_username,
"enabled": False,
"email": probe_email,
"emailVerified": True,
}
status, _ = _request_json("POST", admin_users_url, access_token, create_payload, timeout_s=30)
if status not in (201, 204):
raise SystemExit(f"unexpected status creating probe user: {status}")
status, users = _request_json(
"GET",
f"{admin_users_url}?{urllib.parse.urlencode({'username': probe_username, 'exact': 'true'})}",
access_token,
timeout_s=30,
)
if status != 200 or not isinstance(users, list) or not users:
raise SystemExit("failed to refetch probe user after creation")
user_id = users[0].get("id")
if not isinstance(user_id, str) or not user_id:
raise SystemExit("probe user missing id")
# Trigger an email to validate Keycloak SMTP integration.
query = urllib.parse.urlencode(
{
"client_id": execute_client_id,
"redirect_uri": execute_redirect_uri,
"lifespan": "600",
}
)
url = f"{admin_users_url}/{urllib.parse.quote(user_id)}/execute-actions-email?{query}"
status, _ = _request_json("PUT", url, access_token, ["UPDATE_PASSWORD"], timeout_s=30)
if status != 204:
raise SystemExit(f"unexpected status from execute-actions-email: {status}")
print("PASS: Keycloak execute-actions-email succeeded")
return 0
if __name__ == "__main__":
sys.exit(main())