titan-iac/scripts/test_atlas_user_cleanup.py

510 lines
18 KiB
Python
Raw Permalink Normal View History

2026-01-05 00:25:22 -03:00
#!/usr/bin/env python3
"""Clean up Atlas test users and portal requests (manual-only).
Default behavior is DRY RUN. This script is intended for operators to clean up
test accounts created via the bstein-dev-home onboarding portal.
Targets (best-effort):
- Keycloak users in realm "atlas"
- Atlas portal Postgres rows (access_requests + dependent tables)
- Vaultwarden users/invites created by the portal
Safety:
- Requires an explicit username prefix (e.g. "test-")
- Dry-run unless --apply is set
2026-01-05 12:24:12 -03:00
- --apply requires an explicit --confirm guard
2026-01-05 00:25:22 -03:00
- Validates prefixes to a conservative charset
"""
from __future__ import annotations
import argparse
import base64
import json
import os
import re
import subprocess
import sys
import time
import urllib.parse
import urllib.request
from dataclasses import dataclass
from typing import Any, Iterable
_SAFE_PREFIX_RE = re.compile(r"^[a-zA-Z0-9][a-zA-Z0-9._-]{0,63}$")
@dataclass(frozen=True)
class KeycloakUser:
user_id: str
username: str
email: str
@dataclass(frozen=True)
class PortalRequestRow:
request_code: str
username: str
status: str
@dataclass(frozen=True)
class VaultwardenUser:
user_id: str
email: str
status: int
def _run(cmd: list[str], *, input_bytes: bytes | None = None) -> str:
proc = subprocess.run(
cmd,
input=input_bytes,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=False,
)
if proc.returncode != 0:
stderr = proc.stderr.decode("utf-8", errors="replace").strip()
raise RuntimeError(f"command failed ({proc.returncode}): {' '.join(cmd)}\n{stderr}")
return proc.stdout.decode("utf-8", errors="replace")
def _kubectl_get_secret_value(namespace: str, name: str, key: str) -> str:
raw_b64 = _run(
[
"kubectl",
"-n",
namespace,
"get",
"secret",
name,
"-o",
f"jsonpath={{.data.{key}}}",
]
).strip()
if not raw_b64:
raise RuntimeError(f"secret {namespace}/{name} key {key} is empty")
return base64.b64decode(raw_b64).decode("utf-8").strip()
def _kubectl_first_pod(namespace: str) -> str:
raw = _run(
[
"kubectl",
"-n",
namespace,
"get",
"pods",
"-o",
"json",
]
)
data = json.loads(raw)
items = data.get("items") or []
if not isinstance(items, list) or not items:
raise RuntimeError(f"no pods found in namespace {namespace}")
pod_name = items[0].get("metadata", {}).get("name")
if not isinstance(pod_name, str) or not pod_name:
raise RuntimeError(f"unexpected pod list in namespace {namespace}")
return pod_name
def _validate_prefixes(prefixes: list[str]) -> list[str]:
cleaned: list[str] = []
for prefix in prefixes:
prefix = prefix.strip()
if not prefix:
continue
if not _SAFE_PREFIX_RE.match(prefix):
raise SystemExit(
f"invalid prefix '{prefix}': must match {_SAFE_PREFIX_RE.pattern} (alnum plus ._-)"
)
cleaned.append(prefix)
if not cleaned:
raise SystemExit("at least one --prefix is required")
return cleaned
def _starts_with_any(value: str, prefixes: Iterable[str]) -> bool:
return any(value.startswith(p) for p in prefixes)
def _keycloak_token(server: str, realm: str, client_id: str, client_secret: str) -> str:
data = urllib.parse.urlencode(
{
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret,
}
).encode("utf-8")
req = urllib.request.Request(
f"{server}/realms/{realm}/protocol/openid-connect/token",
data=data,
method="POST",
)
req.add_header("Content-Type", "application/x-www-form-urlencoded")
with urllib.request.urlopen(req, timeout=15) as resp:
payload = json.loads(resp.read().decode("utf-8"))
token = payload.get("access_token")
if not isinstance(token, str) or not token:
raise RuntimeError("failed to obtain keycloak access token")
return token
def _keycloak_list_users(server: str, realm: str, token: str, search: str) -> list[KeycloakUser]:
query = urllib.parse.urlencode({"max": "1000", "search": search})
req = urllib.request.Request(f"{server}/admin/realms/{realm}/users?{query}", method="GET")
req.add_header("Authorization", f"Bearer {token}")
with urllib.request.urlopen(req, timeout=30) as resp:
payload = json.loads(resp.read().decode("utf-8"))
if not isinstance(payload, list):
raise RuntimeError("unexpected keycloak users response")
users: list[KeycloakUser] = []
for item in payload:
if not isinstance(item, dict):
continue
user_id = item.get("id")
username = item.get("username") or ""
email = item.get("email") or ""
if not isinstance(user_id, str) or not user_id:
continue
if not isinstance(username, str):
continue
users.append(KeycloakUser(user_id=user_id, username=username, email=str(email)))
return users
def _keycloak_delete_user(server: str, realm: str, token: str, user_id: str) -> None:
req = urllib.request.Request(f"{server}/admin/realms/{realm}/users/{user_id}", method="DELETE")
req.add_header("Authorization", f"Bearer {token}")
try:
with urllib.request.urlopen(req, timeout=30) as resp:
_ = resp.read()
except urllib.error.HTTPError as exc:
if exc.code == 404:
return
raise
def _psql_json(portal_db_url: str, sql: str) -> list[dict[str, Any]]:
postgres_pod = _kubectl_first_pod("postgres")
out = _run(
[
"kubectl",
"-n",
"postgres",
"exec",
"-i",
postgres_pod,
"--",
"psql",
portal_db_url,
"-At",
"-F",
"\t",
"-c",
sql,
]
)
rows: list[dict[str, Any]] = []
for line in out.splitlines():
parts = line.split("\t")
rows.append({"cols": parts})
return rows
def _portal_list_requests(portal_db_url: str, prefixes: list[str]) -> list[PortalRequestRow]:
clauses = " OR ".join([f"username LIKE '{p}%'" for p in prefixes])
sql = (
"SELECT request_code, username, status "
"FROM access_requests "
f"WHERE {clauses} "
"ORDER BY created_at DESC;"
)
raw_rows = _psql_json(portal_db_url, sql)
parsed: list[PortalRequestRow] = []
for row in raw_rows:
cols = row.get("cols") or []
if len(cols) < 3:
continue
parsed.append(PortalRequestRow(request_code=cols[0], username=cols[1], status=cols[2]))
return parsed
def _portal_delete_requests(portal_db_url: str, prefixes: list[str]) -> int:
clauses = " OR ".join([f"username LIKE '{p}%'" for p in prefixes])
sql = f"DELETE FROM access_requests WHERE {clauses};"
postgres_pod = _kubectl_first_pod("postgres")
out = _run(
[
"kubectl",
"-n",
"postgres",
"exec",
"-i",
postgres_pod,
"--",
"psql",
portal_db_url,
"-c",
sql,
]
)
# psql prints "DELETE <n>"
match = re.search(r"DELETE\\s+(\\d+)", out)
return int(match.group(1)) if match else 0
def _vaultwarden_admin_cookie(admin_token: str, base_url: str) -> str:
data = urllib.parse.urlencode({"token": admin_token}).encode("utf-8")
req = urllib.request.Request(f"{base_url}/admin", data=data, method="POST")
req.add_header("Content-Type", "application/x-www-form-urlencoded")
2026-01-05 12:24:12 -03:00
try:
with urllib.request.urlopen(req, timeout=10) as resp:
set_cookie = resp.headers.get("Set-Cookie") or ""
except urllib.error.HTTPError as exc:
if exc.code == 429:
raise RuntimeError("vaultwarden admin rate limited (HTTP 429)") from exc
raise
2026-01-05 00:25:22 -03:00
cookie = set_cookie.split(";", 1)[0].strip()
if not cookie:
raise RuntimeError("vaultwarden admin cookie missing")
return cookie
def _vaultwarden_list_users(base_url: str, cookie: str) -> list[VaultwardenUser]:
req = urllib.request.Request(f"{base_url}/admin/users", method="GET")
req.add_header("Cookie", cookie)
2026-01-05 12:24:12 -03:00
try:
with urllib.request.urlopen(req, timeout=30) as resp:
payload = json.loads(resp.read().decode("utf-8"))
except urllib.error.HTTPError as exc:
if exc.code == 429:
raise RuntimeError("vaultwarden admin rate limited (HTTP 429)") from exc
raise
2026-01-05 00:25:22 -03:00
if not isinstance(payload, list):
raise RuntimeError("unexpected vaultwarden /admin/users response")
users: list[VaultwardenUser] = []
for item in payload:
if not isinstance(item, dict):
continue
user_id = item.get("id")
email = item.get("email")
status = item.get("_status")
if not isinstance(user_id, str) or not user_id:
continue
if not isinstance(email, str) or not email:
continue
if not isinstance(status, int):
status = -1
users.append(VaultwardenUser(user_id=user_id, email=email, status=status))
return users
def _vaultwarden_delete_user(base_url: str, cookie: str, user_id: str) -> None:
req = urllib.request.Request(f"{base_url}/admin/users/{user_id}", method="DELETE")
req.add_header("Cookie", cookie)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
_ = resp.read()
except urllib.error.HTTPError as exc:
if exc.code in {404}:
return
if exc.code == 429:
raise RuntimeError("vaultwarden admin rate limited (HTTP 429)") from exc
raise
def _port_forward(namespace: str, target: str, local_port: int, remote_port: int) -> subprocess.Popen[bytes]:
# Keep stdout/stderr muted to avoid leaking internal details in output.
return subprocess.Popen(
[
"kubectl",
"-n",
namespace,
"port-forward",
target,
f"{local_port}:{remote_port}",
"--address",
"127.0.0.1",
],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--prefix",
action="append",
default=[],
help="Username prefix to match (repeatable). Example: --prefix test-",
)
parser.add_argument(
"--apply",
action="store_true",
help="Actually delete; otherwise dry-run only.",
)
2026-01-05 12:24:12 -03:00
parser.add_argument(
"--confirm",
default="",
help=(
"Required when using --apply. Must exactly equal the comma-separated "
"sorted prefix list (e.g. 'atlas-,bob-,e2e-,test-')."
),
)
2026-01-05 00:25:22 -03:00
parser.add_argument("--skip-keycloak", action="store_true", help="Skip Keycloak user deletion.")
parser.add_argument("--skip-portal-db", action="store_true", help="Skip portal DB cleanup.")
parser.add_argument("--skip-vaultwarden", action="store_true", help="Skip Vaultwarden cleanup.")
2026-01-05 12:24:12 -03:00
parser.add_argument(
"--protect-keycloak-username",
action="append",
default=[],
help="Keycloak usernames that must never be deleted (repeatable).",
)
parser.add_argument(
"--protect-vaultwarden-email",
action="append",
default=[],
help="Vaultwarden emails that must never be deleted (repeatable).",
)
2026-01-05 00:25:22 -03:00
args = parser.parse_args()
2026-01-05 12:24:12 -03:00
prefixes = sorted(set(_validate_prefixes(args.prefix)))
2026-01-05 00:25:22 -03:00
apply = bool(args.apply)
2026-01-05 12:24:12 -03:00
expected_confirm = ",".join(prefixes)
protected_keycloak = {"bstein", "robotuser", *[u.strip() for u in args.protect_keycloak_username if u.strip()]}
protected_vaultwarden = {e.strip() for e in args.protect_vaultwarden_email if e.strip()}
if apply and args.confirm != expected_confirm:
raise SystemExit(
f"refusing to apply without --confirm '{expected_confirm}' (got '{args.confirm}')"
)
2026-01-05 00:25:22 -03:00
print("Atlas test-user cleanup")
2026-01-05 12:24:12 -03:00
print("prefixes:", expected_confirm)
2026-01-05 00:25:22 -03:00
print("mode:", "APPLY (destructive)" if apply else "DRY RUN (no changes)")
2026-01-05 12:24:12 -03:00
if protected_keycloak:
print("protected keycloak usernames:", ", ".join(sorted(protected_keycloak)))
if protected_vaultwarden:
print("protected vaultwarden emails:", ", ".join(sorted(protected_vaultwarden)))
2026-01-05 00:25:22 -03:00
print()
if not args.skip_portal_db:
portal_db_url = _kubectl_get_secret_value("bstein-dev-home", "atlas-portal-db", "PORTAL_DATABASE_URL")
requests = _portal_list_requests(portal_db_url, prefixes)
print(f"Portal DB: {len(requests)} access_requests matched")
for row in requests[:50]:
print(f" {row.request_code}\t{row.status}\t{row.username}")
if len(requests) > 50:
print(f" ... and {len(requests) - 50} more")
if apply and requests:
deleted = _portal_delete_requests(portal_db_url, prefixes)
print(f"Portal DB: deleted {deleted} access_requests (cascade removes tasks/steps/artifacts).")
print()
if not args.skip_keycloak:
kc_server = os.getenv("KEYCLOAK_PUBLIC_URL", "https://sso.bstein.dev").rstrip("/")
kc_realm = os.getenv("KEYCLOAK_REALM", "atlas")
kc_client_id = os.getenv("KEYCLOAK_ADMIN_CLIENT_ID", "bstein-dev-home-admin")
kc_client_secret = _kubectl_get_secret_value(
"bstein-dev-home", "bstein-dev-home-keycloak-admin", "client_secret"
)
token = _keycloak_token(kc_server, kc_realm, kc_client_id, kc_client_secret)
found: dict[str, KeycloakUser] = {}
for prefix in prefixes:
for user in _keycloak_list_users(kc_server, kc_realm, token, prefix):
if not _starts_with_any(user.username, prefixes):
continue
2026-01-05 12:24:12 -03:00
if user.username in protected_keycloak:
continue
2026-01-05 00:25:22 -03:00
found[user.user_id] = user
users = list(found.values())
users.sort(key=lambda u: u.username)
print(f"Keycloak: {len(users)} users matched")
for user in users[:50]:
email = user.email or "-"
print(f" {user.username}\t{email}\t{user.user_id}")
if len(users) > 50:
print(f" ... and {len(users) - 50} more")
if apply and users:
for user in users:
_keycloak_delete_user(kc_server, kc_realm, token, user.user_id)
print(f"Keycloak: deleted {len(users)} users.")
print()
if not args.skip_vaultwarden:
pf = _port_forward("vaultwarden", "svc/vaultwarden-service", 18081, 80)
try:
# wait briefly for the port-forward to come up
for _ in range(30):
try:
urllib.request.urlopen("http://127.0.0.1:18081/", timeout=1).read(1)
break
except Exception:
time.sleep(0.2)
admin_token = _kubectl_get_secret_value("vaultwarden", "vaultwarden-admin", "ADMIN_TOKEN")
base_url = "http://127.0.0.1:18081"
2026-01-05 12:24:12 -03:00
try:
cookie = ""
for attempt in range(7):
try:
cookie = _vaultwarden_admin_cookie(admin_token, base_url)
break
except RuntimeError as exc:
if "rate limited" in str(exc).lower():
time.sleep(min(60.0, 2.0**attempt))
continue
raise
if not cookie:
raise RuntimeError("vaultwarden admin login repeatedly rate limited")
users: list[VaultwardenUser] = []
for attempt in range(7):
try:
users = _vaultwarden_list_users(base_url, cookie)
break
except RuntimeError as exc:
if "rate limited" in str(exc).lower():
time.sleep(min(60.0, 2.0**attempt))
continue
raise
if not users:
raise RuntimeError("vaultwarden user list unavailable (possibly rate limited)")
except RuntimeError as exc:
print(f"Vaultwarden: ERROR: {exc}")
print()
return 1
2026-01-05 00:25:22 -03:00
matched: list[VaultwardenUser] = []
for user in users:
local = user.email.split("@", 1)[0]
if _starts_with_any(local, prefixes):
2026-01-05 12:24:12 -03:00
if user.email in protected_vaultwarden:
continue
2026-01-05 00:25:22 -03:00
matched.append(user)
matched.sort(key=lambda u: u.email)
print(f"Vaultwarden: {len(matched)} users matched")
for user in matched[:50]:
print(f" {user.email}\tstatus={user.status}\t{user.user_id}")
if len(matched) > 50:
print(f" ... and {len(matched) - 50} more")
if apply and matched:
for user in matched:
_vaultwarden_delete_user(base_url, cookie, user.user_id)
print(f"Vaultwarden: deleted {len(matched)} users.")
print()
finally:
pf.terminate()
try:
pf.wait(timeout=3)
except Exception:
pf.kill()
return 0
if __name__ == "__main__":
raise SystemExit(main())