#!/usr/bin/env python3 """Clean up Vaultwarden test users and invites (manual-only). This script deletes Vaultwarden rows directly from the Postgres database. It is intended only for removing test fallout (e.g. e2e-*, test-*) and is deliberately conservative: - Requires one or more explicit email prefixes (repeatable). - Dry-run by default; --apply requires an exact --confirm guard. - Refuses to delete any user with dependent data in Vaultwarden tables. - Supports a protected email allowlist to prevent catastrophic mistakes. Example (dry-run): scripts/test_vaultwarden_user_cleanup.py --prefix e2e- Example (apply): scripts/test_vaultwarden_user_cleanup.py --prefix e2e- --apply --confirm e2e- """ from __future__ import annotations import argparse import json import re import subprocess import sys from dataclasses import dataclass from typing import Iterable, Sequence _SAFE_PREFIX_RE = re.compile(r"^[a-zA-Z0-9][a-zA-Z0-9._-]{0,63}$") _UUID_RE = re.compile(r"^[0-9a-fA-F-]{32,36}$") @dataclass(frozen=True) class VaultwardenUser: uuid: str email: str dependent_rows: int def _run(cmd: Sequence[str], *, input_bytes: bytes | None = None) -> str: proc = subprocess.run( list(cmd), input=input_bytes, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False, ) if proc.returncode != 0: stderr = proc.stderr.decode("utf-8", errors="replace").strip() raise RuntimeError(f"command failed ({proc.returncode}): {' '.join(cmd)}\n{stderr}") return proc.stdout.decode("utf-8", errors="replace") def _kubectl_first_pod(namespace: str) -> str: raw = _run(["kubectl", "-n", namespace, "get", "pods", "-o", "json"]) data = json.loads(raw) items = data.get("items") or [] if not isinstance(items, list) or not items: raise RuntimeError(f"no pods found in namespace {namespace}") name = items[0].get("metadata", {}).get("name") if not isinstance(name, str) or not name: raise RuntimeError(f"unexpected pod list in namespace {namespace}") return name def _psql(sql: str) -> str: pod = _kubectl_first_pod("postgres") return _run( [ "kubectl", "-n", "postgres", "exec", "-i", pod, "--", "psql", "-U", "postgres", "-d", "vaultwarden", "-At", "-F", "\t", "-c", sql, ] ) def _validate_prefixes(prefixes: Iterable[str]) -> list[str]: cleaned: list[str] = [] for prefix in prefixes: prefix = prefix.strip() if not prefix: continue if not _SAFE_PREFIX_RE.match(prefix): raise SystemExit( f"invalid prefix '{prefix}': must match {_SAFE_PREFIX_RE.pattern} (alnum plus ._-)" ) if not prefix.endswith("-"): raise SystemExit(f"refusing prefix '{prefix}': must end with '-' for safety") cleaned.append(prefix) if not cleaned: raise SystemExit("at least one --prefix is required") return sorted(set(cleaned)) def _parse_rows(tsv: str) -> list[list[str]]: rows: list[list[str]] = [] for line in tsv.splitlines(): line = line.strip() if not line: continue rows.append(line.split("\t")) return rows def _sql_or_email_prefixes(prefixes: list[str]) -> str: # prefixes validated to safe charset; safe to interpolate. clauses = [f"email LIKE '{p}%'" for p in prefixes] return " OR ".join(clauses) if clauses else "FALSE" def _sql_quote(value: str) -> str: return "'" + value.replace("'", "''") + "'" def _sql_text_array(values: Iterable[str]) -> str: items = ",".join(_sql_quote(v) for v in values) return f"ARRAY[{items}]::text[]" def _list_users(prefixes: list[str], protected: set[str]) -> list[VaultwardenUser]: clause = _sql_or_email_prefixes(prefixes) sql = f""" WITH candidates AS ( SELECT uuid, email FROM users WHERE enabled AND ({clause}) AND email <> ALL({_sql_text_array(sorted(protected))}) ) SELECT candidates.uuid, candidates.email, ( (SELECT COUNT(*) FROM auth_requests WHERE user_uuid = candidates.uuid) + (SELECT COUNT(*) FROM ciphers WHERE user_uuid = candidates.uuid) + (SELECT COUNT(*) FROM devices WHERE user_uuid = candidates.uuid) + (SELECT COUNT(*) FROM emergency_access WHERE grantor_uuid = candidates.uuid OR grantee_uuid = candidates.uuid) + (SELECT COUNT(*) FROM favorites WHERE user_uuid = candidates.uuid) + (SELECT COUNT(*) FROM folders WHERE user_uuid = candidates.uuid) + (SELECT COUNT(*) FROM sends WHERE user_uuid = candidates.uuid) + (SELECT COUNT(*) FROM twofactor WHERE user_uuid = candidates.uuid) + (SELECT COUNT(*) FROM twofactor_incomplete WHERE user_uuid = candidates.uuid) + (SELECT COUNT(*) FROM users_collections WHERE user_uuid = candidates.uuid) + (SELECT COUNT(*) FROM users_organizations WHERE user_uuid = candidates.uuid) ) AS dependent_rows FROM candidates ORDER BY candidates.email; """ out = _psql(sql) users: list[VaultwardenUser] = [] for row in _parse_rows(out): if len(row) < 3: continue uuid, email, dep_raw = row[0].strip(), row[1].strip(), row[2].strip() if not uuid or not email: continue if not _UUID_RE.match(uuid): continue try: dep = int(dep_raw) except ValueError: dep = 0 users.append(VaultwardenUser(uuid=uuid, email=email, dependent_rows=dep)) return users def _list_invitations(prefixes: list[str], protected: set[str]) -> list[str]: clause = _sql_or_email_prefixes(prefixes) protected_clause = "" if protected: protected_clause = f"AND email <> ALL({_sql_text_array(sorted(protected))})" sql = f"SELECT email FROM invitations WHERE ({clause}) {protected_clause} ORDER BY email;" out = _psql(sql) invites: list[str] = [] for row in _parse_rows(out): if not row: continue email = row[0].strip() if email: invites.append(email) return invites def _delete_invitations(emails: list[str]) -> int: if not emails: return 0 email_list = ",".join(_sql_quote(e) for e in emails) sql = f"DELETE FROM invitations WHERE email IN ({email_list});" out = _psql(sql) match = re.search(r"DELETE\s+(\d+)", out) return int(match.group(1)) if match else 0 def _delete_users(uuids: list[str]) -> int: if not uuids: return 0 uuid_list = ",".join(_sql_quote(u) for u in uuids) sql = f"DELETE FROM users WHERE uuid IN ({uuid_list});" out = _psql(sql) match = re.search(r"DELETE\s+(\d+)", out) return int(match.group(1)) if match else 0 def _parse_args(argv: list[str]) -> argparse.Namespace: parser = argparse.ArgumentParser( prog="test_vaultwarden_user_cleanup", description="Manual-only cleanup for Vaultwarden test users/invites (DB-level).", ) parser.add_argument( "--prefix", action="append", required=True, help="Email prefix to target (repeatable). Example: --prefix e2e-", ) parser.add_argument( "--apply", action="store_true", help="Apply deletions (default is dry-run). Requires --confirm.", ) parser.add_argument( "--confirm", default="", help="Required when using --apply. Must exactly equal the comma-separated prefix list.", ) parser.add_argument( "--protect-email", action="append", default=[], help="Vaultwarden emails that must never be deleted (repeatable).", ) parser.add_argument( "--verbose", action="store_true", help="List matched emails (and invitation emails).", ) return parser.parse_args(argv) def main(argv: list[str]) -> int: args = _parse_args(argv) prefixes = _validate_prefixes(args.prefix) expected_confirm = ",".join(prefixes) protected = {e.strip() for e in args.protect_email if e.strip()} protected |= { "brad@bstein.dev", "edstein87@outlook.com", "indifox8@gmail.com", "mgs.stein@gmail.com", "patriot87@gmail.com", } if args.apply and args.confirm != expected_confirm: print( f"error: refusing to apply without --confirm '{expected_confirm}' (got '{args.confirm}')", file=sys.stderr, ) return 2 users = _list_users(prefixes, protected=protected) invites = _list_invitations(prefixes, protected=protected) print(f"prefixes: {expected_confirm}") print(f"mode: {'APPLY' if args.apply else 'DRY-RUN'}") if protected: print(f"protected emails: {', '.join(sorted(protected))}") print(f"vaultwarden users matched: {len(users)}") print(f"vaultwarden invitations matched: {len(invites)}") if args.verbose: for user in users[: min(100, len(users))]: print(f" user: {user.email} (deps={user.dependent_rows})") if len(users) > 100: print(f" ... and {len(users) - 100} more users") for email in invites[: min(100, len(invites))]: print(f" invite: {email}") if len(invites) > 100: print(f" ... and {len(invites) - 100} more invitations") unsafe = [u for u in users if u.dependent_rows > 0] if unsafe: print("refusing to delete users with dependent data:", file=sys.stderr) for user in unsafe[: min(50, len(unsafe))]: print(f" - {user.email} deps={user.dependent_rows}", file=sys.stderr) if len(unsafe) > 50: print(f" ... and {len(unsafe) - 50} more", file=sys.stderr) return 2 if not args.apply: print("dry-run complete (no changes made)") return 0 deleted_invites = _delete_invitations(invites) deleted_users = _delete_users([u.uuid for u in users]) print(f"deleted vaultwarden invitations: {deleted_invites}") print(f"deleted vaultwarden users: {deleted_users}") print("done") return 0 if __name__ == "__main__": raise SystemExit(main(sys.argv[1:]))