titan-iac/scripts/test_atlas_user_cleanup.py

435 lines
14 KiB
Python
Executable File

#!/usr/bin/env python3
"""Clean up Atlas test users and portal requests (manual-only).
Default behavior is DRY RUN. This script is intended for operators to clean up
test accounts created via the bstein-dev-home onboarding portal.
Targets (best-effort):
- Keycloak users in realm "atlas"
- Atlas portal Postgres rows (access_requests + dependent tables)
- Vaultwarden users/invites created by the portal
Safety:
- Requires an explicit username prefix (e.g. "test-")
- Dry-run unless --apply is set
- Validates prefixes to a conservative charset
"""
from __future__ import annotations
import argparse
import base64
import json
import os
import re
import subprocess
import sys
import time
import urllib.parse
import urllib.request
from dataclasses import dataclass
from typing import Any, Iterable
_SAFE_PREFIX_RE = re.compile(r"^[a-zA-Z0-9][a-zA-Z0-9._-]{0,63}$")
@dataclass(frozen=True)
class KeycloakUser:
user_id: str
username: str
email: str
@dataclass(frozen=True)
class PortalRequestRow:
request_code: str
username: str
status: str
@dataclass(frozen=True)
class VaultwardenUser:
user_id: str
email: str
status: int
def _run(cmd: list[str], *, input_bytes: bytes | None = None) -> str:
proc = subprocess.run(
cmd,
input=input_bytes,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=False,
)
if proc.returncode != 0:
stderr = proc.stderr.decode("utf-8", errors="replace").strip()
raise RuntimeError(f"command failed ({proc.returncode}): {' '.join(cmd)}\n{stderr}")
return proc.stdout.decode("utf-8", errors="replace")
def _kubectl_get_secret_value(namespace: str, name: str, key: str) -> str:
raw_b64 = _run(
[
"kubectl",
"-n",
namespace,
"get",
"secret",
name,
"-o",
f"jsonpath={{.data.{key}}}",
]
).strip()
if not raw_b64:
raise RuntimeError(f"secret {namespace}/{name} key {key} is empty")
return base64.b64decode(raw_b64).decode("utf-8").strip()
def _kubectl_first_pod(namespace: str) -> str:
raw = _run(
[
"kubectl",
"-n",
namespace,
"get",
"pods",
"-o",
"json",
]
)
data = json.loads(raw)
items = data.get("items") or []
if not isinstance(items, list) or not items:
raise RuntimeError(f"no pods found in namespace {namespace}")
pod_name = items[0].get("metadata", {}).get("name")
if not isinstance(pod_name, str) or not pod_name:
raise RuntimeError(f"unexpected pod list in namespace {namespace}")
return pod_name
def _validate_prefixes(prefixes: list[str]) -> list[str]:
cleaned: list[str] = []
for prefix in prefixes:
prefix = prefix.strip()
if not prefix:
continue
if not _SAFE_PREFIX_RE.match(prefix):
raise SystemExit(
f"invalid prefix '{prefix}': must match {_SAFE_PREFIX_RE.pattern} (alnum plus ._-)"
)
cleaned.append(prefix)
if not cleaned:
raise SystemExit("at least one --prefix is required")
return cleaned
def _starts_with_any(value: str, prefixes: Iterable[str]) -> bool:
return any(value.startswith(p) for p in prefixes)
def _keycloak_token(server: str, realm: str, client_id: str, client_secret: str) -> str:
data = urllib.parse.urlencode(
{
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret,
}
).encode("utf-8")
req = urllib.request.Request(
f"{server}/realms/{realm}/protocol/openid-connect/token",
data=data,
method="POST",
)
req.add_header("Content-Type", "application/x-www-form-urlencoded")
with urllib.request.urlopen(req, timeout=15) as resp:
payload = json.loads(resp.read().decode("utf-8"))
token = payload.get("access_token")
if not isinstance(token, str) or not token:
raise RuntimeError("failed to obtain keycloak access token")
return token
def _keycloak_list_users(server: str, realm: str, token: str, search: str) -> list[KeycloakUser]:
query = urllib.parse.urlencode({"max": "1000", "search": search})
req = urllib.request.Request(f"{server}/admin/realms/{realm}/users?{query}", method="GET")
req.add_header("Authorization", f"Bearer {token}")
with urllib.request.urlopen(req, timeout=30) as resp:
payload = json.loads(resp.read().decode("utf-8"))
if not isinstance(payload, list):
raise RuntimeError("unexpected keycloak users response")
users: list[KeycloakUser] = []
for item in payload:
if not isinstance(item, dict):
continue
user_id = item.get("id")
username = item.get("username") or ""
email = item.get("email") or ""
if not isinstance(user_id, str) or not user_id:
continue
if not isinstance(username, str):
continue
users.append(KeycloakUser(user_id=user_id, username=username, email=str(email)))
return users
def _keycloak_delete_user(server: str, realm: str, token: str, user_id: str) -> None:
req = urllib.request.Request(f"{server}/admin/realms/{realm}/users/{user_id}", method="DELETE")
req.add_header("Authorization", f"Bearer {token}")
try:
with urllib.request.urlopen(req, timeout=30) as resp:
_ = resp.read()
except urllib.error.HTTPError as exc:
if exc.code == 404:
return
raise
def _psql_json(portal_db_url: str, sql: str) -> list[dict[str, Any]]:
postgres_pod = _kubectl_first_pod("postgres")
out = _run(
[
"kubectl",
"-n",
"postgres",
"exec",
"-i",
postgres_pod,
"--",
"psql",
portal_db_url,
"-At",
"-F",
"\t",
"-c",
sql,
]
)
rows: list[dict[str, Any]] = []
for line in out.splitlines():
parts = line.split("\t")
rows.append({"cols": parts})
return rows
def _portal_list_requests(portal_db_url: str, prefixes: list[str]) -> list[PortalRequestRow]:
clauses = " OR ".join([f"username LIKE '{p}%'" for p in prefixes])
sql = (
"SELECT request_code, username, status "
"FROM access_requests "
f"WHERE {clauses} "
"ORDER BY created_at DESC;"
)
raw_rows = _psql_json(portal_db_url, sql)
parsed: list[PortalRequestRow] = []
for row in raw_rows:
cols = row.get("cols") or []
if len(cols) < 3:
continue
parsed.append(PortalRequestRow(request_code=cols[0], username=cols[1], status=cols[2]))
return parsed
def _portal_delete_requests(portal_db_url: str, prefixes: list[str]) -> int:
clauses = " OR ".join([f"username LIKE '{p}%'" for p in prefixes])
sql = f"DELETE FROM access_requests WHERE {clauses};"
postgres_pod = _kubectl_first_pod("postgres")
out = _run(
[
"kubectl",
"-n",
"postgres",
"exec",
"-i",
postgres_pod,
"--",
"psql",
portal_db_url,
"-c",
sql,
]
)
# psql prints "DELETE <n>"
match = re.search(r"DELETE\\s+(\\d+)", out)
return int(match.group(1)) if match else 0
def _vaultwarden_admin_cookie(admin_token: str, base_url: str) -> str:
data = urllib.parse.urlencode({"token": admin_token}).encode("utf-8")
req = urllib.request.Request(f"{base_url}/admin", data=data, method="POST")
req.add_header("Content-Type", "application/x-www-form-urlencoded")
with urllib.request.urlopen(req, timeout=10) as resp:
set_cookie = resp.headers.get("Set-Cookie") or ""
cookie = set_cookie.split(";", 1)[0].strip()
if not cookie:
raise RuntimeError("vaultwarden admin cookie missing")
return cookie
def _vaultwarden_list_users(base_url: str, cookie: str) -> list[VaultwardenUser]:
req = urllib.request.Request(f"{base_url}/admin/users", method="GET")
req.add_header("Cookie", cookie)
with urllib.request.urlopen(req, timeout=30) as resp:
payload = json.loads(resp.read().decode("utf-8"))
if not isinstance(payload, list):
raise RuntimeError("unexpected vaultwarden /admin/users response")
users: list[VaultwardenUser] = []
for item in payload:
if not isinstance(item, dict):
continue
user_id = item.get("id")
email = item.get("email")
status = item.get("_status")
if not isinstance(user_id, str) or not user_id:
continue
if not isinstance(email, str) or not email:
continue
if not isinstance(status, int):
status = -1
users.append(VaultwardenUser(user_id=user_id, email=email, status=status))
return users
def _vaultwarden_delete_user(base_url: str, cookie: str, user_id: str) -> None:
req = urllib.request.Request(f"{base_url}/admin/users/{user_id}", method="DELETE")
req.add_header("Cookie", cookie)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
_ = resp.read()
except urllib.error.HTTPError as exc:
if exc.code in {404}:
return
if exc.code == 429:
raise RuntimeError("vaultwarden admin rate limited (HTTP 429)") from exc
raise
def _port_forward(namespace: str, target: str, local_port: int, remote_port: int) -> subprocess.Popen[bytes]:
# Keep stdout/stderr muted to avoid leaking internal details in output.
return subprocess.Popen(
[
"kubectl",
"-n",
namespace,
"port-forward",
target,
f"{local_port}:{remote_port}",
"--address",
"127.0.0.1",
],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--prefix",
action="append",
default=[],
help="Username prefix to match (repeatable). Example: --prefix test-",
)
parser.add_argument(
"--apply",
action="store_true",
help="Actually delete; otherwise dry-run only.",
)
parser.add_argument("--skip-keycloak", action="store_true", help="Skip Keycloak user deletion.")
parser.add_argument("--skip-portal-db", action="store_true", help="Skip portal DB cleanup.")
parser.add_argument("--skip-vaultwarden", action="store_true", help="Skip Vaultwarden cleanup.")
args = parser.parse_args()
prefixes = _validate_prefixes(args.prefix)
apply = bool(args.apply)
print("Atlas test-user cleanup")
print("prefixes:", ", ".join(prefixes))
print("mode:", "APPLY (destructive)" if apply else "DRY RUN (no changes)")
print()
if not args.skip_portal_db:
portal_db_url = _kubectl_get_secret_value("bstein-dev-home", "atlas-portal-db", "PORTAL_DATABASE_URL")
requests = _portal_list_requests(portal_db_url, prefixes)
print(f"Portal DB: {len(requests)} access_requests matched")
for row in requests[:50]:
print(f" {row.request_code}\t{row.status}\t{row.username}")
if len(requests) > 50:
print(f" ... and {len(requests) - 50} more")
if apply and requests:
deleted = _portal_delete_requests(portal_db_url, prefixes)
print(f"Portal DB: deleted {deleted} access_requests (cascade removes tasks/steps/artifacts).")
print()
if not args.skip_keycloak:
kc_server = os.getenv("KEYCLOAK_PUBLIC_URL", "https://sso.bstein.dev").rstrip("/")
kc_realm = os.getenv("KEYCLOAK_REALM", "atlas")
kc_client_id = os.getenv("KEYCLOAK_ADMIN_CLIENT_ID", "bstein-dev-home-admin")
kc_client_secret = _kubectl_get_secret_value(
"bstein-dev-home", "bstein-dev-home-keycloak-admin", "client_secret"
)
token = _keycloak_token(kc_server, kc_realm, kc_client_id, kc_client_secret)
found: dict[str, KeycloakUser] = {}
for prefix in prefixes:
for user in _keycloak_list_users(kc_server, kc_realm, token, prefix):
if not _starts_with_any(user.username, prefixes):
continue
found[user.user_id] = user
users = list(found.values())
users.sort(key=lambda u: u.username)
print(f"Keycloak: {len(users)} users matched")
for user in users[:50]:
email = user.email or "-"
print(f" {user.username}\t{email}\t{user.user_id}")
if len(users) > 50:
print(f" ... and {len(users) - 50} more")
if apply and users:
for user in users:
_keycloak_delete_user(kc_server, kc_realm, token, user.user_id)
print(f"Keycloak: deleted {len(users)} users.")
print()
if not args.skip_vaultwarden:
pf = _port_forward("vaultwarden", "svc/vaultwarden-service", 18081, 80)
try:
# wait briefly for the port-forward to come up
for _ in range(30):
try:
urllib.request.urlopen("http://127.0.0.1:18081/", timeout=1).read(1)
break
except Exception:
time.sleep(0.2)
admin_token = _kubectl_get_secret_value("vaultwarden", "vaultwarden-admin", "ADMIN_TOKEN")
base_url = "http://127.0.0.1:18081"
cookie = _vaultwarden_admin_cookie(admin_token, base_url)
users = _vaultwarden_list_users(base_url, cookie)
matched: list[VaultwardenUser] = []
for user in users:
local = user.email.split("@", 1)[0]
if _starts_with_any(local, prefixes):
matched.append(user)
matched.sort(key=lambda u: u.email)
print(f"Vaultwarden: {len(matched)} users matched")
for user in matched[:50]:
print(f" {user.email}\tstatus={user.status}\t{user.user_id}")
if len(matched) > 50:
print(f" ... and {len(matched) - 50} more")
if apply and matched:
for user in matched:
_vaultwarden_delete_user(base_url, cookie, user.user_id)
print(f"Vaultwarden: deleted {len(matched)} users.")
print()
finally:
pf.terminate()
try:
pf.wait(timeout=3)
except Exception:
pf.kill()
return 0
if __name__ == "__main__":
raise SystemExit(main())