diff --git a/scripts/test_atlas_user_cleanup.py b/scripts/test_atlas_user_cleanup.py new file mode 100755 index 0000000..2ca837a --- /dev/null +++ b/scripts/test_atlas_user_cleanup.py @@ -0,0 +1,434 @@ +#!/usr/bin/env python3 +"""Clean up Atlas test users and portal requests (manual-only). + +Default behavior is DRY RUN. This script is intended for operators to clean up +test accounts created via the bstein-dev-home onboarding portal. + +Targets (best-effort): + - Keycloak users in realm "atlas" + - Atlas portal Postgres rows (access_requests + dependent tables) + - Vaultwarden users/invites created by the portal + +Safety: + - Requires an explicit username prefix (e.g. "test-") + - Dry-run unless --apply is set + - Validates prefixes to a conservative charset +""" + +from __future__ import annotations + +import argparse +import base64 +import json +import os +import re +import subprocess +import sys +import time +import urllib.parse +import urllib.request +from dataclasses import dataclass +from typing import Any, Iterable + + +_SAFE_PREFIX_RE = re.compile(r"^[a-zA-Z0-9][a-zA-Z0-9._-]{0,63}$") + + +@dataclass(frozen=True) +class KeycloakUser: + user_id: str + username: str + email: str + + +@dataclass(frozen=True) +class PortalRequestRow: + request_code: str + username: str + status: str + + +@dataclass(frozen=True) +class VaultwardenUser: + user_id: str + email: str + status: int + + +def _run(cmd: list[str], *, input_bytes: bytes | None = None) -> str: + proc = subprocess.run( + cmd, + input=input_bytes, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + check=False, + ) + if proc.returncode != 0: + stderr = proc.stderr.decode("utf-8", errors="replace").strip() + raise RuntimeError(f"command failed ({proc.returncode}): {' '.join(cmd)}\n{stderr}") + return proc.stdout.decode("utf-8", errors="replace") + + +def _kubectl_get_secret_value(namespace: str, name: str, key: str) -> str: + raw_b64 = _run( + [ + "kubectl", + "-n", + namespace, + "get", + "secret", + name, + "-o", + f"jsonpath={{.data.{key}}}", + ] + ).strip() + if not raw_b64: + raise RuntimeError(f"secret {namespace}/{name} key {key} is empty") + return base64.b64decode(raw_b64).decode("utf-8").strip() + + +def _kubectl_first_pod(namespace: str) -> str: + raw = _run( + [ + "kubectl", + "-n", + namespace, + "get", + "pods", + "-o", + "json", + ] + ) + data = json.loads(raw) + items = data.get("items") or [] + if not isinstance(items, list) or not items: + raise RuntimeError(f"no pods found in namespace {namespace}") + pod_name = items[0].get("metadata", {}).get("name") + if not isinstance(pod_name, str) or not pod_name: + raise RuntimeError(f"unexpected pod list in namespace {namespace}") + return pod_name + + +def _validate_prefixes(prefixes: list[str]) -> list[str]: + cleaned: list[str] = [] + for prefix in prefixes: + prefix = prefix.strip() + if not prefix: + continue + if not _SAFE_PREFIX_RE.match(prefix): + raise SystemExit( + f"invalid prefix '{prefix}': must match {_SAFE_PREFIX_RE.pattern} (alnum plus ._-)" + ) + cleaned.append(prefix) + if not cleaned: + raise SystemExit("at least one --prefix is required") + return cleaned + + +def _starts_with_any(value: str, prefixes: Iterable[str]) -> bool: + return any(value.startswith(p) for p in prefixes) + + +def _keycloak_token(server: str, realm: str, client_id: str, client_secret: str) -> str: + data = urllib.parse.urlencode( + { + "grant_type": "client_credentials", + "client_id": client_id, + "client_secret": client_secret, + } + ).encode("utf-8") + req = urllib.request.Request( + f"{server}/realms/{realm}/protocol/openid-connect/token", + data=data, + method="POST", + ) + req.add_header("Content-Type", "application/x-www-form-urlencoded") + with urllib.request.urlopen(req, timeout=15) as resp: + payload = json.loads(resp.read().decode("utf-8")) + token = payload.get("access_token") + if not isinstance(token, str) or not token: + raise RuntimeError("failed to obtain keycloak access token") + return token + + +def _keycloak_list_users(server: str, realm: str, token: str, search: str) -> list[KeycloakUser]: + query = urllib.parse.urlencode({"max": "1000", "search": search}) + req = urllib.request.Request(f"{server}/admin/realms/{realm}/users?{query}", method="GET") + req.add_header("Authorization", f"Bearer {token}") + with urllib.request.urlopen(req, timeout=30) as resp: + payload = json.loads(resp.read().decode("utf-8")) + if not isinstance(payload, list): + raise RuntimeError("unexpected keycloak users response") + users: list[KeycloakUser] = [] + for item in payload: + if not isinstance(item, dict): + continue + user_id = item.get("id") + username = item.get("username") or "" + email = item.get("email") or "" + if not isinstance(user_id, str) or not user_id: + continue + if not isinstance(username, str): + continue + users.append(KeycloakUser(user_id=user_id, username=username, email=str(email))) + return users + + +def _keycloak_delete_user(server: str, realm: str, token: str, user_id: str) -> None: + req = urllib.request.Request(f"{server}/admin/realms/{realm}/users/{user_id}", method="DELETE") + req.add_header("Authorization", f"Bearer {token}") + try: + with urllib.request.urlopen(req, timeout=30) as resp: + _ = resp.read() + except urllib.error.HTTPError as exc: + if exc.code == 404: + return + raise + + +def _psql_json(portal_db_url: str, sql: str) -> list[dict[str, Any]]: + postgres_pod = _kubectl_first_pod("postgres") + out = _run( + [ + "kubectl", + "-n", + "postgres", + "exec", + "-i", + postgres_pod, + "--", + "psql", + portal_db_url, + "-At", + "-F", + "\t", + "-c", + sql, + ] + ) + rows: list[dict[str, Any]] = [] + for line in out.splitlines(): + parts = line.split("\t") + rows.append({"cols": parts}) + return rows + + +def _portal_list_requests(portal_db_url: str, prefixes: list[str]) -> list[PortalRequestRow]: + clauses = " OR ".join([f"username LIKE '{p}%'" for p in prefixes]) + sql = ( + "SELECT request_code, username, status " + "FROM access_requests " + f"WHERE {clauses} " + "ORDER BY created_at DESC;" + ) + raw_rows = _psql_json(portal_db_url, sql) + parsed: list[PortalRequestRow] = [] + for row in raw_rows: + cols = row.get("cols") or [] + if len(cols) < 3: + continue + parsed.append(PortalRequestRow(request_code=cols[0], username=cols[1], status=cols[2])) + return parsed + + +def _portal_delete_requests(portal_db_url: str, prefixes: list[str]) -> int: + clauses = " OR ".join([f"username LIKE '{p}%'" for p in prefixes]) + sql = f"DELETE FROM access_requests WHERE {clauses};" + postgres_pod = _kubectl_first_pod("postgres") + out = _run( + [ + "kubectl", + "-n", + "postgres", + "exec", + "-i", + postgres_pod, + "--", + "psql", + portal_db_url, + "-c", + sql, + ] + ) + # psql prints "DELETE " + match = re.search(r"DELETE\\s+(\\d+)", out) + return int(match.group(1)) if match else 0 + + +def _vaultwarden_admin_cookie(admin_token: str, base_url: str) -> str: + data = urllib.parse.urlencode({"token": admin_token}).encode("utf-8") + req = urllib.request.Request(f"{base_url}/admin", data=data, method="POST") + req.add_header("Content-Type", "application/x-www-form-urlencoded") + with urllib.request.urlopen(req, timeout=10) as resp: + set_cookie = resp.headers.get("Set-Cookie") or "" + cookie = set_cookie.split(";", 1)[0].strip() + if not cookie: + raise RuntimeError("vaultwarden admin cookie missing") + return cookie + + +def _vaultwarden_list_users(base_url: str, cookie: str) -> list[VaultwardenUser]: + req = urllib.request.Request(f"{base_url}/admin/users", method="GET") + req.add_header("Cookie", cookie) + with urllib.request.urlopen(req, timeout=30) as resp: + payload = json.loads(resp.read().decode("utf-8")) + if not isinstance(payload, list): + raise RuntimeError("unexpected vaultwarden /admin/users response") + users: list[VaultwardenUser] = [] + for item in payload: + if not isinstance(item, dict): + continue + user_id = item.get("id") + email = item.get("email") + status = item.get("_status") + if not isinstance(user_id, str) or not user_id: + continue + if not isinstance(email, str) or not email: + continue + if not isinstance(status, int): + status = -1 + users.append(VaultwardenUser(user_id=user_id, email=email, status=status)) + return users + + +def _vaultwarden_delete_user(base_url: str, cookie: str, user_id: str) -> None: + req = urllib.request.Request(f"{base_url}/admin/users/{user_id}", method="DELETE") + req.add_header("Cookie", cookie) + try: + with urllib.request.urlopen(req, timeout=30) as resp: + _ = resp.read() + except urllib.error.HTTPError as exc: + if exc.code in {404}: + return + if exc.code == 429: + raise RuntimeError("vaultwarden admin rate limited (HTTP 429)") from exc + raise + + +def _port_forward(namespace: str, target: str, local_port: int, remote_port: int) -> subprocess.Popen[bytes]: + # Keep stdout/stderr muted to avoid leaking internal details in output. + return subprocess.Popen( + [ + "kubectl", + "-n", + namespace, + "port-forward", + target, + f"{local_port}:{remote_port}", + "--address", + "127.0.0.1", + ], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "--prefix", + action="append", + default=[], + help="Username prefix to match (repeatable). Example: --prefix test-", + ) + parser.add_argument( + "--apply", + action="store_true", + help="Actually delete; otherwise dry-run only.", + ) + parser.add_argument("--skip-keycloak", action="store_true", help="Skip Keycloak user deletion.") + parser.add_argument("--skip-portal-db", action="store_true", help="Skip portal DB cleanup.") + parser.add_argument("--skip-vaultwarden", action="store_true", help="Skip Vaultwarden cleanup.") + args = parser.parse_args() + + prefixes = _validate_prefixes(args.prefix) + apply = bool(args.apply) + + print("Atlas test-user cleanup") + print("prefixes:", ", ".join(prefixes)) + print("mode:", "APPLY (destructive)" if apply else "DRY RUN (no changes)") + print() + + if not args.skip_portal_db: + portal_db_url = _kubectl_get_secret_value("bstein-dev-home", "atlas-portal-db", "PORTAL_DATABASE_URL") + requests = _portal_list_requests(portal_db_url, prefixes) + print(f"Portal DB: {len(requests)} access_requests matched") + for row in requests[:50]: + print(f" {row.request_code}\t{row.status}\t{row.username}") + if len(requests) > 50: + print(f" ... and {len(requests) - 50} more") + if apply and requests: + deleted = _portal_delete_requests(portal_db_url, prefixes) + print(f"Portal DB: deleted {deleted} access_requests (cascade removes tasks/steps/artifacts).") + print() + + if not args.skip_keycloak: + kc_server = os.getenv("KEYCLOAK_PUBLIC_URL", "https://sso.bstein.dev").rstrip("/") + kc_realm = os.getenv("KEYCLOAK_REALM", "atlas") + kc_client_id = os.getenv("KEYCLOAK_ADMIN_CLIENT_ID", "bstein-dev-home-admin") + kc_client_secret = _kubectl_get_secret_value( + "bstein-dev-home", "bstein-dev-home-keycloak-admin", "client_secret" + ) + token = _keycloak_token(kc_server, kc_realm, kc_client_id, kc_client_secret) + found: dict[str, KeycloakUser] = {} + for prefix in prefixes: + for user in _keycloak_list_users(kc_server, kc_realm, token, prefix): + if not _starts_with_any(user.username, prefixes): + continue + found[user.user_id] = user + users = list(found.values()) + users.sort(key=lambda u: u.username) + print(f"Keycloak: {len(users)} users matched") + for user in users[:50]: + email = user.email or "-" + print(f" {user.username}\t{email}\t{user.user_id}") + if len(users) > 50: + print(f" ... and {len(users) - 50} more") + if apply and users: + for user in users: + _keycloak_delete_user(kc_server, kc_realm, token, user.user_id) + print(f"Keycloak: deleted {len(users)} users.") + print() + + if not args.skip_vaultwarden: + pf = _port_forward("vaultwarden", "svc/vaultwarden-service", 18081, 80) + try: + # wait briefly for the port-forward to come up + for _ in range(30): + try: + urllib.request.urlopen("http://127.0.0.1:18081/", timeout=1).read(1) + break + except Exception: + time.sleep(0.2) + + admin_token = _kubectl_get_secret_value("vaultwarden", "vaultwarden-admin", "ADMIN_TOKEN") + base_url = "http://127.0.0.1:18081" + cookie = _vaultwarden_admin_cookie(admin_token, base_url) + users = _vaultwarden_list_users(base_url, cookie) + matched: list[VaultwardenUser] = [] + for user in users: + local = user.email.split("@", 1)[0] + if _starts_with_any(local, prefixes): + matched.append(user) + matched.sort(key=lambda u: u.email) + print(f"Vaultwarden: {len(matched)} users matched") + for user in matched[:50]: + print(f" {user.email}\tstatus={user.status}\t{user.user_id}") + if len(matched) > 50: + print(f" ... and {len(matched) - 50} more") + if apply and matched: + for user in matched: + _vaultwarden_delete_user(base_url, cookie, user.user_id) + print(f"Vaultwarden: deleted {len(matched)} users.") + print() + finally: + pf.terminate() + try: + pf.wait(timeout=3) + except Exception: + pf.kill() + return 0 + + +if __name__ == "__main__": + raise SystemExit(main())