test(portal): approve requests via admin API

This commit is contained in:
Brad Stein 2026-01-04 02:58:44 -03:00
parent 0b96894e7a
commit c298946ce0
2 changed files with 95 additions and 25 deletions

View File

@ -12,8 +12,6 @@ import urllib.error
import urllib.parse
import urllib.request
import psycopg
def _env(name: str, default: str | None = None) -> str:
value = os.environ.get(name, default)
@ -67,7 +65,29 @@ def _get_json(url: str, headers: dict[str, str] | None = None, timeout_s: int =
raise SystemExit(f"HTTP {exc.code} from {url}: {raw}")
def _keycloak_admin_token(keycloak_base: str, realm: str, client_id: str, client_secret: str) -> str:
def _request_json(
method: str,
url: str,
token: str,
payload: dict | None = None,
timeout_s: int = 30,
) -> dict:
data = None
headers = {"Authorization": f"Bearer {token}"}
if payload is not None:
data = json.dumps(payload).encode()
headers["Content-Type"] = "application/json"
req = urllib.request.Request(url, data=data, headers=headers, method=method)
try:
with urllib.request.urlopen(req, timeout=timeout_s) as resp:
raw = resp.read().decode()
return json.loads(raw) if raw else {}
except urllib.error.HTTPError as exc:
raw = exc.read().decode(errors="replace")
raise SystemExit(f"HTTP {exc.code} from {url}: {raw}")
def _keycloak_client_token(keycloak_base: str, realm: str, client_id: str, client_secret: str) -> str:
token_url = f"{keycloak_base.rstrip('/')}/realms/{realm}/protocol/openid-connect/token"
payload = _post_form(
token_url,
@ -84,6 +104,35 @@ def _keycloak_admin_token(keycloak_base: str, realm: str, client_id: str, client
return token
def _keycloak_token_exchange(
*,
keycloak_base: str,
realm: str,
client_id: str,
client_secret: str,
subject_token: str,
requested_subject: str,
audience: str,
) -> str:
token_url = f"{keycloak_base.rstrip('/')}/realms/{realm}/protocol/openid-connect/token"
payload = _post_form(
token_url,
{
"grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
"client_id": client_id,
"client_secret": client_secret,
"subject_token": subject_token,
"requested_subject": requested_subject,
"audience": audience,
},
timeout_s=20,
)
token = payload.get("access_token")
if not isinstance(token, str) or not token:
raise SystemExit("keycloak token exchange response missing access_token")
return token
def _keycloak_find_user(keycloak_base: str, realm: str, token: str, username: str) -> dict | None:
url = f"{keycloak_base.rstrip('/')}/admin/realms/{realm}/users?{urllib.parse.urlencode({'username': username, 'exact': 'true', 'max': '1'})}"
users = _get_json(url, headers={"Authorization": f"Bearer {token}"}, timeout_s=20)
@ -186,12 +235,15 @@ def _imap_wait_for_verify_token(
def main() -> int:
portal_base = _env("PORTAL_BASE_URL").rstrip("/")
db_url = _env("PORTAL_DATABASE_URL")
keycloak_base = _env("KEYCLOAK_ADMIN_URL").rstrip("/")
realm = _env("KEYCLOAK_REALM", "atlas")
kc_admin_client_id = _env("KEYCLOAK_ADMIN_CLIENT_ID")
kc_admin_client_secret = _env("KEYCLOAK_ADMIN_CLIENT_SECRET")
portal_e2e_client_id = _env("PORTAL_E2E_CLIENT_ID")
portal_e2e_client_secret = _env("PORTAL_E2E_CLIENT_SECRET")
portal_target_client_id = os.environ.get("PORTAL_TARGET_CLIENT_ID", "bstein-dev-home").strip() or "bstein-dev-home"
portal_admin_username = os.environ.get("E2E_PORTAL_ADMIN_USERNAME", "bstein").strip() or "bstein"
contact_email = os.environ.get("E2E_CONTACT_EMAIL", "robotuser@bstein.dev").strip()
if not contact_email:
@ -202,7 +254,7 @@ def main() -> int:
imap_keycloak_username = os.environ.get("E2E_IMAP_KEYCLOAK_USERNAME", "robotuser").strip()
imap_wait_sec = int(os.environ.get("E2E_IMAP_WAIT_SECONDS", "90"))
token = _keycloak_admin_token(keycloak_base, realm, kc_admin_client_id, kc_admin_client_secret)
token = _keycloak_client_token(keycloak_base, realm, kc_admin_client_id, kc_admin_client_secret)
mailbox_user = _keycloak_find_user(keycloak_base, realm, token, imap_keycloak_username)
if not mailbox_user:
raise SystemExit(f"unable to locate Keycloak mailbox user {imap_keycloak_username!r}")
@ -257,18 +309,28 @@ def main() -> int:
if not isinstance(verify_resp, dict) or verify_resp.get("ok") is not True:
raise SystemExit(f"unexpected verify response: {verify_resp}")
with psycopg.connect(db_url, autocommit=True) as conn:
# Simulate admin approval.
conn.execute(
"""
UPDATE access_requests
SET status = 'accounts_building',
decided_at = NOW(),
decided_by = 'portal-e2e'
WHERE request_code = %s AND status = 'pending'
""",
(request_code,),
)
portal_admin = _keycloak_find_user(keycloak_base, realm, token, portal_admin_username)
if not portal_admin:
raise SystemExit(f"unable to locate portal admin user {portal_admin_username!r} via Keycloak admin API")
portal_admin_user_id = portal_admin.get("id")
if not isinstance(portal_admin_user_id, str) or not portal_admin_user_id:
raise SystemExit("portal admin user missing id")
e2e_subject_token = _keycloak_client_token(keycloak_base, realm, portal_e2e_client_id, portal_e2e_client_secret)
portal_bearer = _keycloak_token_exchange(
keycloak_base=keycloak_base,
realm=realm,
client_id=portal_e2e_client_id,
client_secret=portal_e2e_client_secret,
subject_token=e2e_subject_token,
requested_subject=portal_admin_user_id,
audience=portal_target_client_id,
)
approve_url = f"{portal_base}/api/admin/access/requests/{urllib.parse.quote(username, safe='')}/approve"
approve_resp = _request_json("POST", approve_url, portal_bearer, payload=None, timeout_s=60)
if not isinstance(approve_resp, dict) or approve_resp.get("ok") is not True:
raise SystemExit(f"unexpected approval response: {approve_resp}")
status_url = f"{portal_base}/api/access/request/status"
deadline_s = int(os.environ.get("E2E_DEADLINE_SECONDS", "600"))
@ -301,7 +363,7 @@ def main() -> int:
time.sleep(interval_s)
# Refresh admin token (it may expire during the provisioning wait).
token = _keycloak_admin_token(keycloak_base, realm, kc_admin_client_id, kc_admin_client_secret)
token = _keycloak_client_token(keycloak_base, realm, kc_admin_client_id, kc_admin_client_secret)
user = _keycloak_find_user(keycloak_base, realm, token, username)
if not user:

View File

@ -2,7 +2,7 @@
apiVersion: batch/v1
kind: Job
metadata:
name: portal-onboarding-e2e-test-8
name: portal-onboarding-e2e-test-9
namespace: bstein-dev-home
spec:
backoffLimit: 0
@ -15,11 +15,6 @@ spec:
env:
- name: PORTAL_BASE_URL
value: http://bstein-dev-home-backend.bstein-dev-home.svc.cluster.local
- name: PORTAL_DATABASE_URL
valueFrom:
secretKeyRef:
name: atlas-portal-db
key: PORTAL_DATABASE_URL
- name: KEYCLOAK_ADMIN_URL
value: http://keycloak.sso.svc.cluster.local
- name: KEYCLOAK_REALM
@ -31,6 +26,20 @@ spec:
secretKeyRef:
name: bstein-dev-home-keycloak-admin
key: client_secret
- name: PORTAL_E2E_CLIENT_ID
valueFrom:
secretKeyRef:
name: portal-e2e-client
key: client_id
- name: PORTAL_E2E_CLIENT_SECRET
valueFrom:
secretKeyRef:
name: portal-e2e-client
key: client_secret
- name: PORTAL_TARGET_CLIENT_ID
value: bstein-dev-home
- name: E2E_PORTAL_ADMIN_USERNAME
value: bstein
- name: E2E_USERNAME_PREFIX
value: e2e-portal
- name: E2E_CONTACT_EMAIL
@ -45,7 +54,6 @@ spec:
args:
- |
set -euo pipefail
python -m pip install --no-cache-dir 'psycopg[binary]==3.2.5'
python /scripts/test_portal_onboarding_flow.py
volumeMounts:
- name: tests