titan-iac/scripts/test_vaultwarden_user_cleanup.py

319 lines
10 KiB
Python
Executable File

#!/usr/bin/env python3
"""Clean up Vaultwarden test users and invites (manual-only).
This script deletes Vaultwarden rows directly from the Postgres database. It is
intended only for removing test fallout (e.g. e2e-*, test-*) and is deliberately
conservative:
- Requires one or more explicit email prefixes (repeatable).
- Dry-run by default; --apply requires an exact --confirm guard.
- Refuses to delete any user with dependent data in Vaultwarden tables.
- Supports a protected email allowlist to prevent catastrophic mistakes.
Example (dry-run):
scripts/test_vaultwarden_user_cleanup.py --prefix e2e-
Example (apply):
scripts/test_vaultwarden_user_cleanup.py --prefix e2e- --apply --confirm e2e-
"""
from __future__ import annotations
import argparse
import json
import re
import subprocess
import sys
from dataclasses import dataclass
from typing import Iterable, Sequence
_SAFE_PREFIX_RE = re.compile(r"^[a-zA-Z0-9][a-zA-Z0-9._-]{0,63}$")
_UUID_RE = re.compile(r"^[0-9a-fA-F-]{32,36}$")
@dataclass(frozen=True)
class VaultwardenUser:
uuid: str
email: str
dependent_rows: int
def _run(cmd: Sequence[str], *, input_bytes: bytes | None = None) -> str:
proc = subprocess.run(
list(cmd),
input=input_bytes,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=False,
)
if proc.returncode != 0:
stderr = proc.stderr.decode("utf-8", errors="replace").strip()
raise RuntimeError(f"command failed ({proc.returncode}): {' '.join(cmd)}\n{stderr}")
return proc.stdout.decode("utf-8", errors="replace")
def _kubectl_first_pod(namespace: str) -> str:
raw = _run(["kubectl", "-n", namespace, "get", "pods", "-o", "json"])
data = json.loads(raw)
items = data.get("items") or []
if not isinstance(items, list) or not items:
raise RuntimeError(f"no pods found in namespace {namespace}")
name = items[0].get("metadata", {}).get("name")
if not isinstance(name, str) or not name:
raise RuntimeError(f"unexpected pod list in namespace {namespace}")
return name
def _psql(sql: str) -> str:
pod = _kubectl_first_pod("postgres")
return _run(
[
"kubectl",
"-n",
"postgres",
"exec",
"-i",
pod,
"--",
"psql",
"-U",
"postgres",
"-d",
"vaultwarden",
"-At",
"-F",
"\t",
"-c",
sql,
]
)
def _validate_prefixes(prefixes: Iterable[str]) -> list[str]:
cleaned: list[str] = []
for prefix in prefixes:
prefix = prefix.strip()
if not prefix:
continue
if not _SAFE_PREFIX_RE.match(prefix):
raise SystemExit(
f"invalid prefix '{prefix}': must match {_SAFE_PREFIX_RE.pattern} (alnum plus ._-)"
)
if not prefix.endswith("-"):
raise SystemExit(f"refusing prefix '{prefix}': must end with '-' for safety")
cleaned.append(prefix)
if not cleaned:
raise SystemExit("at least one --prefix is required")
return sorted(set(cleaned))
def _parse_rows(tsv: str) -> list[list[str]]:
rows: list[list[str]] = []
for line in tsv.splitlines():
line = line.strip()
if not line:
continue
rows.append(line.split("\t"))
return rows
def _sql_or_email_prefixes(prefixes: list[str]) -> str:
# prefixes validated to safe charset; safe to interpolate.
clauses = [f"email LIKE '{p}%'" for p in prefixes]
return " OR ".join(clauses) if clauses else "FALSE"
def _sql_quote(value: str) -> str:
return "'" + value.replace("'", "''") + "'"
def _sql_text_array(values: Iterable[str]) -> str:
items = ",".join(_sql_quote(v) for v in values)
return f"ARRAY[{items}]::text[]"
def _list_users(prefixes: list[str], protected: set[str]) -> list[VaultwardenUser]:
clause = _sql_or_email_prefixes(prefixes)
sql = f"""
WITH candidates AS (
SELECT uuid, email
FROM users
WHERE enabled
AND ({clause})
AND email <> ALL({_sql_text_array(sorted(protected))})
)
SELECT
candidates.uuid,
candidates.email,
(
(SELECT COUNT(*) FROM auth_requests WHERE user_uuid = candidates.uuid) +
(SELECT COUNT(*) FROM ciphers WHERE user_uuid = candidates.uuid) +
(SELECT COUNT(*) FROM devices WHERE user_uuid = candidates.uuid) +
(SELECT COUNT(*) FROM emergency_access WHERE grantor_uuid = candidates.uuid OR grantee_uuid = candidates.uuid) +
(SELECT COUNT(*) FROM favorites WHERE user_uuid = candidates.uuid) +
(SELECT COUNT(*) FROM folders WHERE user_uuid = candidates.uuid) +
(SELECT COUNT(*) FROM sends WHERE user_uuid = candidates.uuid) +
(SELECT COUNT(*) FROM twofactor WHERE user_uuid = candidates.uuid) +
(SELECT COUNT(*) FROM twofactor_incomplete WHERE user_uuid = candidates.uuid) +
(SELECT COUNT(*) FROM users_collections WHERE user_uuid = candidates.uuid) +
(SELECT COUNT(*) FROM users_organizations WHERE user_uuid = candidates.uuid)
) AS dependent_rows
FROM candidates
ORDER BY candidates.email;
"""
out = _psql(sql)
users: list[VaultwardenUser] = []
for row in _parse_rows(out):
if len(row) < 3:
continue
uuid, email, dep_raw = row[0].strip(), row[1].strip(), row[2].strip()
if not uuid or not email:
continue
if not _UUID_RE.match(uuid):
continue
try:
dep = int(dep_raw)
except ValueError:
dep = 0
users.append(VaultwardenUser(uuid=uuid, email=email, dependent_rows=dep))
return users
def _list_invitations(prefixes: list[str], protected: set[str]) -> list[str]:
clause = _sql_or_email_prefixes(prefixes)
protected_clause = ""
if protected:
protected_clause = f"AND email <> ALL({_sql_text_array(sorted(protected))})"
sql = f"SELECT email FROM invitations WHERE ({clause}) {protected_clause} ORDER BY email;"
out = _psql(sql)
invites: list[str] = []
for row in _parse_rows(out):
if not row:
continue
email = row[0].strip()
if email:
invites.append(email)
return invites
def _delete_invitations(emails: list[str]) -> int:
if not emails:
return 0
email_list = ",".join(_sql_quote(e) for e in emails)
sql = f"DELETE FROM invitations WHERE email IN ({email_list});"
out = _psql(sql)
match = re.search(r"DELETE\s+(\d+)", out)
return int(match.group(1)) if match else 0
def _delete_users(uuids: list[str]) -> int:
if not uuids:
return 0
uuid_list = ",".join(_sql_quote(u) for u in uuids)
sql = f"DELETE FROM users WHERE uuid IN ({uuid_list});"
out = _psql(sql)
match = re.search(r"DELETE\s+(\d+)", out)
return int(match.group(1)) if match else 0
def _parse_args(argv: list[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser(
prog="test_vaultwarden_user_cleanup",
description="Manual-only cleanup for Vaultwarden test users/invites (DB-level).",
)
parser.add_argument(
"--prefix",
action="append",
required=True,
help="Email prefix to target (repeatable). Example: --prefix e2e-",
)
parser.add_argument(
"--apply",
action="store_true",
help="Apply deletions (default is dry-run). Requires --confirm.",
)
parser.add_argument(
"--confirm",
default="",
help="Required when using --apply. Must exactly equal the comma-separated prefix list.",
)
parser.add_argument(
"--protect-email",
action="append",
default=[],
help="Vaultwarden emails that must never be deleted (repeatable).",
)
parser.add_argument(
"--verbose",
action="store_true",
help="List matched emails (and invitation emails).",
)
return parser.parse_args(argv)
def main(argv: list[str]) -> int:
args = _parse_args(argv)
prefixes = _validate_prefixes(args.prefix)
expected_confirm = ",".join(prefixes)
protected = {e.strip() for e in args.protect_email if e.strip()}
protected |= {
"brad@bstein.dev",
"edstein87@outlook.com",
"indifox8@gmail.com",
"mgs.stein@gmail.com",
"patriot87@gmail.com",
}
if args.apply and args.confirm != expected_confirm:
print(
f"error: refusing to apply without --confirm '{expected_confirm}' (got '{args.confirm}')",
file=sys.stderr,
)
return 2
users = _list_users(prefixes, protected=protected)
invites = _list_invitations(prefixes, protected=protected)
print(f"prefixes: {expected_confirm}")
print(f"mode: {'APPLY' if args.apply else 'DRY-RUN'}")
if protected:
print(f"protected emails: {', '.join(sorted(protected))}")
print(f"vaultwarden users matched: {len(users)}")
print(f"vaultwarden invitations matched: {len(invites)}")
if args.verbose:
for user in users[: min(100, len(users))]:
print(f" user: {user.email} (deps={user.dependent_rows})")
if len(users) > 100:
print(f" ... and {len(users) - 100} more users")
for email in invites[: min(100, len(invites))]:
print(f" invite: {email}")
if len(invites) > 100:
print(f" ... and {len(invites) - 100} more invitations")
unsafe = [u for u in users if u.dependent_rows > 0]
if unsafe:
print("refusing to delete users with dependent data:", file=sys.stderr)
for user in unsafe[: min(50, len(unsafe))]:
print(f" - {user.email} deps={user.dependent_rows}", file=sys.stderr)
if len(unsafe) > 50:
print(f" ... and {len(unsafe) - 50} more", file=sys.stderr)
return 2
if not args.apply:
print("dry-run complete (no changes made)")
return 0
deleted_invites = _delete_invitations(invites)
deleted_users = _delete_users([u.uuid for u in users])
print(f"deleted vaultwarden invitations: {deleted_invites}")
print(f"deleted vaultwarden users: {deleted_users}")
print("done")
return 0
if __name__ == "__main__":
raise SystemExit(main(sys.argv[1:]))