#!/usr/bin/env python3 """Clean up Atlas test users and portal requests (manual-only). Default behavior is DRY RUN. This script is intended for operators to clean up test accounts created via the bstein-dev-home onboarding portal. Targets (best-effort): - Keycloak users in realm "atlas" - Atlas portal Postgres rows (access_requests + dependent tables) - Vaultwarden users/invites created by the portal Safety: - Requires an explicit username prefix (e.g. "test-") - Dry-run unless --apply is set - Validates prefixes to a conservative charset """ from __future__ import annotations import argparse import base64 import json import os import re import subprocess import sys import time import urllib.parse import urllib.request from dataclasses import dataclass from typing import Any, Iterable _SAFE_PREFIX_RE = re.compile(r"^[a-zA-Z0-9][a-zA-Z0-9._-]{0,63}$") @dataclass(frozen=True) class KeycloakUser: user_id: str username: str email: str @dataclass(frozen=True) class PortalRequestRow: request_code: str username: str status: str @dataclass(frozen=True) class VaultwardenUser: user_id: str email: str status: int def _run(cmd: list[str], *, input_bytes: bytes | None = None) -> str: proc = subprocess.run( cmd, input=input_bytes, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False, ) if proc.returncode != 0: stderr = proc.stderr.decode("utf-8", errors="replace").strip() raise RuntimeError(f"command failed ({proc.returncode}): {' '.join(cmd)}\n{stderr}") return proc.stdout.decode("utf-8", errors="replace") def _kubectl_get_secret_value(namespace: str, name: str, key: str) -> str: raw_b64 = _run( [ "kubectl", "-n", namespace, "get", "secret", name, "-o", f"jsonpath={{.data.{key}}}", ] ).strip() if not raw_b64: raise RuntimeError(f"secret {namespace}/{name} key {key} is empty") return base64.b64decode(raw_b64).decode("utf-8").strip() def _kubectl_first_pod(namespace: str) -> str: raw = _run( [ "kubectl", "-n", namespace, "get", "pods", "-o", "json", ] ) data = json.loads(raw) items = data.get("items") or [] if not isinstance(items, list) or not items: raise RuntimeError(f"no pods found in namespace {namespace}") pod_name = items[0].get("metadata", {}).get("name") if not isinstance(pod_name, str) or not pod_name: raise RuntimeError(f"unexpected pod list in namespace {namespace}") return pod_name def _validate_prefixes(prefixes: list[str]) -> list[str]: cleaned: list[str] = [] for prefix in prefixes: prefix = prefix.strip() if not prefix: continue if not _SAFE_PREFIX_RE.match(prefix): raise SystemExit( f"invalid prefix '{prefix}': must match {_SAFE_PREFIX_RE.pattern} (alnum plus ._-)" ) cleaned.append(prefix) if not cleaned: raise SystemExit("at least one --prefix is required") return cleaned def _starts_with_any(value: str, prefixes: Iterable[str]) -> bool: return any(value.startswith(p) for p in prefixes) def _keycloak_token(server: str, realm: str, client_id: str, client_secret: str) -> str: data = urllib.parse.urlencode( { "grant_type": "client_credentials", "client_id": client_id, "client_secret": client_secret, } ).encode("utf-8") req = urllib.request.Request( f"{server}/realms/{realm}/protocol/openid-connect/token", data=data, method="POST", ) req.add_header("Content-Type", "application/x-www-form-urlencoded") with urllib.request.urlopen(req, timeout=15) as resp: payload = json.loads(resp.read().decode("utf-8")) token = payload.get("access_token") if not isinstance(token, str) or not token: raise RuntimeError("failed to obtain keycloak access token") return token def _keycloak_list_users(server: str, realm: str, token: str, search: str) -> list[KeycloakUser]: query = urllib.parse.urlencode({"max": "1000", "search": search}) req = urllib.request.Request(f"{server}/admin/realms/{realm}/users?{query}", method="GET") req.add_header("Authorization", f"Bearer {token}") with urllib.request.urlopen(req, timeout=30) as resp: payload = json.loads(resp.read().decode("utf-8")) if not isinstance(payload, list): raise RuntimeError("unexpected keycloak users response") users: list[KeycloakUser] = [] for item in payload: if not isinstance(item, dict): continue user_id = item.get("id") username = item.get("username") or "" email = item.get("email") or "" if not isinstance(user_id, str) or not user_id: continue if not isinstance(username, str): continue users.append(KeycloakUser(user_id=user_id, username=username, email=str(email))) return users def _keycloak_delete_user(server: str, realm: str, token: str, user_id: str) -> None: req = urllib.request.Request(f"{server}/admin/realms/{realm}/users/{user_id}", method="DELETE") req.add_header("Authorization", f"Bearer {token}") try: with urllib.request.urlopen(req, timeout=30) as resp: _ = resp.read() except urllib.error.HTTPError as exc: if exc.code == 404: return raise def _psql_json(portal_db_url: str, sql: str) -> list[dict[str, Any]]: postgres_pod = _kubectl_first_pod("postgres") out = _run( [ "kubectl", "-n", "postgres", "exec", "-i", postgres_pod, "--", "psql", portal_db_url, "-At", "-F", "\t", "-c", sql, ] ) rows: list[dict[str, Any]] = [] for line in out.splitlines(): parts = line.split("\t") rows.append({"cols": parts}) return rows def _portal_list_requests(portal_db_url: str, prefixes: list[str]) -> list[PortalRequestRow]: clauses = " OR ".join([f"username LIKE '{p}%'" for p in prefixes]) sql = ( "SELECT request_code, username, status " "FROM access_requests " f"WHERE {clauses} " "ORDER BY created_at DESC;" ) raw_rows = _psql_json(portal_db_url, sql) parsed: list[PortalRequestRow] = [] for row in raw_rows: cols = row.get("cols") or [] if len(cols) < 3: continue parsed.append(PortalRequestRow(request_code=cols[0], username=cols[1], status=cols[2])) return parsed def _portal_delete_requests(portal_db_url: str, prefixes: list[str]) -> int: clauses = " OR ".join([f"username LIKE '{p}%'" for p in prefixes]) sql = f"DELETE FROM access_requests WHERE {clauses};" postgres_pod = _kubectl_first_pod("postgres") out = _run( [ "kubectl", "-n", "postgres", "exec", "-i", postgres_pod, "--", "psql", portal_db_url, "-c", sql, ] ) # psql prints "DELETE " match = re.search(r"DELETE\\s+(\\d+)", out) return int(match.group(1)) if match else 0 def _vaultwarden_admin_cookie(admin_token: str, base_url: str) -> str: data = urllib.parse.urlencode({"token": admin_token}).encode("utf-8") req = urllib.request.Request(f"{base_url}/admin", data=data, method="POST") req.add_header("Content-Type", "application/x-www-form-urlencoded") with urllib.request.urlopen(req, timeout=10) as resp: set_cookie = resp.headers.get("Set-Cookie") or "" cookie = set_cookie.split(";", 1)[0].strip() if not cookie: raise RuntimeError("vaultwarden admin cookie missing") return cookie def _vaultwarden_list_users(base_url: str, cookie: str) -> list[VaultwardenUser]: req = urllib.request.Request(f"{base_url}/admin/users", method="GET") req.add_header("Cookie", cookie) with urllib.request.urlopen(req, timeout=30) as resp: payload = json.loads(resp.read().decode("utf-8")) if not isinstance(payload, list): raise RuntimeError("unexpected vaultwarden /admin/users response") users: list[VaultwardenUser] = [] for item in payload: if not isinstance(item, dict): continue user_id = item.get("id") email = item.get("email") status = item.get("_status") if not isinstance(user_id, str) or not user_id: continue if not isinstance(email, str) or not email: continue if not isinstance(status, int): status = -1 users.append(VaultwardenUser(user_id=user_id, email=email, status=status)) return users def _vaultwarden_delete_user(base_url: str, cookie: str, user_id: str) -> None: req = urllib.request.Request(f"{base_url}/admin/users/{user_id}", method="DELETE") req.add_header("Cookie", cookie) try: with urllib.request.urlopen(req, timeout=30) as resp: _ = resp.read() except urllib.error.HTTPError as exc: if exc.code in {404}: return if exc.code == 429: raise RuntimeError("vaultwarden admin rate limited (HTTP 429)") from exc raise def _port_forward(namespace: str, target: str, local_port: int, remote_port: int) -> subprocess.Popen[bytes]: # Keep stdout/stderr muted to avoid leaking internal details in output. return subprocess.Popen( [ "kubectl", "-n", namespace, "port-forward", target, f"{local_port}:{remote_port}", "--address", "127.0.0.1", ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) def main() -> int: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument( "--prefix", action="append", default=[], help="Username prefix to match (repeatable). Example: --prefix test-", ) parser.add_argument( "--apply", action="store_true", help="Actually delete; otherwise dry-run only.", ) parser.add_argument("--skip-keycloak", action="store_true", help="Skip Keycloak user deletion.") parser.add_argument("--skip-portal-db", action="store_true", help="Skip portal DB cleanup.") parser.add_argument("--skip-vaultwarden", action="store_true", help="Skip Vaultwarden cleanup.") args = parser.parse_args() prefixes = _validate_prefixes(args.prefix) apply = bool(args.apply) print("Atlas test-user cleanup") print("prefixes:", ", ".join(prefixes)) print("mode:", "APPLY (destructive)" if apply else "DRY RUN (no changes)") print() if not args.skip_portal_db: portal_db_url = _kubectl_get_secret_value("bstein-dev-home", "atlas-portal-db", "PORTAL_DATABASE_URL") requests = _portal_list_requests(portal_db_url, prefixes) print(f"Portal DB: {len(requests)} access_requests matched") for row in requests[:50]: print(f" {row.request_code}\t{row.status}\t{row.username}") if len(requests) > 50: print(f" ... and {len(requests) - 50} more") if apply and requests: deleted = _portal_delete_requests(portal_db_url, prefixes) print(f"Portal DB: deleted {deleted} access_requests (cascade removes tasks/steps/artifacts).") print() if not args.skip_keycloak: kc_server = os.getenv("KEYCLOAK_PUBLIC_URL", "https://sso.bstein.dev").rstrip("/") kc_realm = os.getenv("KEYCLOAK_REALM", "atlas") kc_client_id = os.getenv("KEYCLOAK_ADMIN_CLIENT_ID", "bstein-dev-home-admin") kc_client_secret = _kubectl_get_secret_value( "bstein-dev-home", "bstein-dev-home-keycloak-admin", "client_secret" ) token = _keycloak_token(kc_server, kc_realm, kc_client_id, kc_client_secret) found: dict[str, KeycloakUser] = {} for prefix in prefixes: for user in _keycloak_list_users(kc_server, kc_realm, token, prefix): if not _starts_with_any(user.username, prefixes): continue found[user.user_id] = user users = list(found.values()) users.sort(key=lambda u: u.username) print(f"Keycloak: {len(users)} users matched") for user in users[:50]: email = user.email or "-" print(f" {user.username}\t{email}\t{user.user_id}") if len(users) > 50: print(f" ... and {len(users) - 50} more") if apply and users: for user in users: _keycloak_delete_user(kc_server, kc_realm, token, user.user_id) print(f"Keycloak: deleted {len(users)} users.") print() if not args.skip_vaultwarden: pf = _port_forward("vaultwarden", "svc/vaultwarden-service", 18081, 80) try: # wait briefly for the port-forward to come up for _ in range(30): try: urllib.request.urlopen("http://127.0.0.1:18081/", timeout=1).read(1) break except Exception: time.sleep(0.2) admin_token = _kubectl_get_secret_value("vaultwarden", "vaultwarden-admin", "ADMIN_TOKEN") base_url = "http://127.0.0.1:18081" cookie = _vaultwarden_admin_cookie(admin_token, base_url) users = _vaultwarden_list_users(base_url, cookie) matched: list[VaultwardenUser] = [] for user in users: local = user.email.split("@", 1)[0] if _starts_with_any(local, prefixes): matched.append(user) matched.sort(key=lambda u: u.email) print(f"Vaultwarden: {len(matched)} users matched") for user in matched[:50]: print(f" {user.email}\tstatus={user.status}\t{user.user_id}") if len(matched) > 50: print(f" ... and {len(matched) - 50} more") if apply and matched: for user in matched: _vaultwarden_delete_user(base_url, cookie, user.user_id) print(f"Vaultwarden: deleted {len(matched)} users.") print() finally: pf.terminate() try: pf.wait(timeout=3) except Exception: pf.kill() return 0 if __name__ == "__main__": raise SystemExit(main())