scripts: harden atlas cleanup script

This commit is contained in:
Brad Stein 2026-01-05 12:24:12 -03:00
parent 55b25fbfd6
commit 89d47cba79

View File

@ -12,6 +12,7 @@ Targets (best-effort):
Safety:
- Requires an explicit username prefix (e.g. "test-")
- Dry-run unless --apply is set
- --apply requires an explicit --confirm guard
- Validates prefixes to a conservative charset
"""
@ -259,8 +260,13 @@ def _vaultwarden_admin_cookie(admin_token: str, base_url: str) -> str:
data = urllib.parse.urlencode({"token": admin_token}).encode("utf-8")
req = urllib.request.Request(f"{base_url}/admin", data=data, method="POST")
req.add_header("Content-Type", "application/x-www-form-urlencoded")
with urllib.request.urlopen(req, timeout=10) as resp:
set_cookie = resp.headers.get("Set-Cookie") or ""
try:
with urllib.request.urlopen(req, timeout=10) as resp:
set_cookie = resp.headers.get("Set-Cookie") or ""
except urllib.error.HTTPError as exc:
if exc.code == 429:
raise RuntimeError("vaultwarden admin rate limited (HTTP 429)") from exc
raise
cookie = set_cookie.split(";", 1)[0].strip()
if not cookie:
raise RuntimeError("vaultwarden admin cookie missing")
@ -270,8 +276,13 @@ def _vaultwarden_admin_cookie(admin_token: str, base_url: str) -> str:
def _vaultwarden_list_users(base_url: str, cookie: str) -> list[VaultwardenUser]:
req = urllib.request.Request(f"{base_url}/admin/users", method="GET")
req.add_header("Cookie", cookie)
with urllib.request.urlopen(req, timeout=30) as resp:
payload = json.loads(resp.read().decode("utf-8"))
try:
with urllib.request.urlopen(req, timeout=30) as resp:
payload = json.loads(resp.read().decode("utf-8"))
except urllib.error.HTTPError as exc:
if exc.code == 429:
raise RuntimeError("vaultwarden admin rate limited (HTTP 429)") from exc
raise
if not isinstance(payload, list):
raise RuntimeError("unexpected vaultwarden /admin/users response")
users: list[VaultwardenUser] = []
@ -336,17 +347,49 @@ def main() -> int:
action="store_true",
help="Actually delete; otherwise dry-run only.",
)
parser.add_argument(
"--confirm",
default="",
help=(
"Required when using --apply. Must exactly equal the comma-separated "
"sorted prefix list (e.g. 'atlas-,bob-,e2e-,test-')."
),
)
parser.add_argument("--skip-keycloak", action="store_true", help="Skip Keycloak user deletion.")
parser.add_argument("--skip-portal-db", action="store_true", help="Skip portal DB cleanup.")
parser.add_argument("--skip-vaultwarden", action="store_true", help="Skip Vaultwarden cleanup.")
parser.add_argument(
"--protect-keycloak-username",
action="append",
default=[],
help="Keycloak usernames that must never be deleted (repeatable).",
)
parser.add_argument(
"--protect-vaultwarden-email",
action="append",
default=[],
help="Vaultwarden emails that must never be deleted (repeatable).",
)
args = parser.parse_args()
prefixes = _validate_prefixes(args.prefix)
prefixes = sorted(set(_validate_prefixes(args.prefix)))
apply = bool(args.apply)
expected_confirm = ",".join(prefixes)
protected_keycloak = {"bstein", "robotuser", *[u.strip() for u in args.protect_keycloak_username if u.strip()]}
protected_vaultwarden = {e.strip() for e in args.protect_vaultwarden_email if e.strip()}
if apply and args.confirm != expected_confirm:
raise SystemExit(
f"refusing to apply without --confirm '{expected_confirm}' (got '{args.confirm}')"
)
print("Atlas test-user cleanup")
print("prefixes:", ", ".join(prefixes))
print("prefixes:", expected_confirm)
print("mode:", "APPLY (destructive)" if apply else "DRY RUN (no changes)")
if protected_keycloak:
print("protected keycloak usernames:", ", ".join(sorted(protected_keycloak)))
if protected_vaultwarden:
print("protected vaultwarden emails:", ", ".join(sorted(protected_vaultwarden)))
print()
if not args.skip_portal_db:
@ -375,6 +418,8 @@ def main() -> int:
for user in _keycloak_list_users(kc_server, kc_realm, token, prefix):
if not _starts_with_any(user.username, prefixes):
continue
if user.username in protected_keycloak:
continue
found[user.user_id] = user
users = list(found.values())
users.sort(key=lambda u: u.username)
@ -403,12 +448,42 @@ def main() -> int:
admin_token = _kubectl_get_secret_value("vaultwarden", "vaultwarden-admin", "ADMIN_TOKEN")
base_url = "http://127.0.0.1:18081"
cookie = _vaultwarden_admin_cookie(admin_token, base_url)
users = _vaultwarden_list_users(base_url, cookie)
try:
cookie = ""
for attempt in range(7):
try:
cookie = _vaultwarden_admin_cookie(admin_token, base_url)
break
except RuntimeError as exc:
if "rate limited" in str(exc).lower():
time.sleep(min(60.0, 2.0**attempt))
continue
raise
if not cookie:
raise RuntimeError("vaultwarden admin login repeatedly rate limited")
users: list[VaultwardenUser] = []
for attempt in range(7):
try:
users = _vaultwarden_list_users(base_url, cookie)
break
except RuntimeError as exc:
if "rate limited" in str(exc).lower():
time.sleep(min(60.0, 2.0**attempt))
continue
raise
if not users:
raise RuntimeError("vaultwarden user list unavailable (possibly rate limited)")
except RuntimeError as exc:
print(f"Vaultwarden: ERROR: {exc}")
print()
return 1
matched: list[VaultwardenUser] = []
for user in users:
local = user.email.split("@", 1)[0]
if _starts_with_any(local, prefixes):
if user.email in protected_vaultwarden:
continue
matched.append(user)
matched.sort(key=lambda u: u.email)
print(f"Vaultwarden: {len(matched)} users matched")