319 lines
10 KiB
Python
Executable File
319 lines
10 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Clean up Vaultwarden test users and invites (manual-only).
|
|
|
|
This script deletes Vaultwarden rows directly from the Postgres database. It is
|
|
intended only for removing test fallout (e.g. e2e-*, test-*) and is deliberately
|
|
conservative:
|
|
|
|
- Requires one or more explicit email prefixes (repeatable).
|
|
- Dry-run by default; --apply requires an exact --confirm guard.
|
|
- Refuses to delete any user with dependent data in Vaultwarden tables.
|
|
- Supports a protected email allowlist to prevent catastrophic mistakes.
|
|
|
|
Example (dry-run):
|
|
scripts/test_vaultwarden_user_cleanup.py --prefix e2e-
|
|
|
|
Example (apply):
|
|
scripts/test_vaultwarden_user_cleanup.py --prefix e2e- --apply --confirm e2e-
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
from dataclasses import dataclass
|
|
from typing import Iterable, Sequence
|
|
|
|
|
|
_SAFE_PREFIX_RE = re.compile(r"^[a-zA-Z0-9][a-zA-Z0-9._-]{0,63}$")
|
|
_UUID_RE = re.compile(r"^[0-9a-fA-F-]{32,36}$")
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class VaultwardenUser:
|
|
uuid: str
|
|
email: str
|
|
dependent_rows: int
|
|
|
|
|
|
def _run(cmd: Sequence[str], *, input_bytes: bytes | None = None) -> str:
|
|
proc = subprocess.run(
|
|
list(cmd),
|
|
input=input_bytes,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
check=False,
|
|
)
|
|
if proc.returncode != 0:
|
|
stderr = proc.stderr.decode("utf-8", errors="replace").strip()
|
|
raise RuntimeError(f"command failed ({proc.returncode}): {' '.join(cmd)}\n{stderr}")
|
|
return proc.stdout.decode("utf-8", errors="replace")
|
|
|
|
|
|
def _kubectl_first_pod(namespace: str) -> str:
|
|
raw = _run(["kubectl", "-n", namespace, "get", "pods", "-o", "json"])
|
|
data = json.loads(raw)
|
|
items = data.get("items") or []
|
|
if not isinstance(items, list) or not items:
|
|
raise RuntimeError(f"no pods found in namespace {namespace}")
|
|
name = items[0].get("metadata", {}).get("name")
|
|
if not isinstance(name, str) or not name:
|
|
raise RuntimeError(f"unexpected pod list in namespace {namespace}")
|
|
return name
|
|
|
|
|
|
def _psql(sql: str) -> str:
|
|
pod = _kubectl_first_pod("postgres")
|
|
return _run(
|
|
[
|
|
"kubectl",
|
|
"-n",
|
|
"postgres",
|
|
"exec",
|
|
"-i",
|
|
pod,
|
|
"--",
|
|
"psql",
|
|
"-U",
|
|
"postgres",
|
|
"-d",
|
|
"vaultwarden",
|
|
"-At",
|
|
"-F",
|
|
"\t",
|
|
"-c",
|
|
sql,
|
|
]
|
|
)
|
|
|
|
|
|
def _validate_prefixes(prefixes: Iterable[str]) -> list[str]:
|
|
cleaned: list[str] = []
|
|
for prefix in prefixes:
|
|
prefix = prefix.strip()
|
|
if not prefix:
|
|
continue
|
|
if not _SAFE_PREFIX_RE.match(prefix):
|
|
raise SystemExit(
|
|
f"invalid prefix '{prefix}': must match {_SAFE_PREFIX_RE.pattern} (alnum plus ._-)"
|
|
)
|
|
if not prefix.endswith("-"):
|
|
raise SystemExit(f"refusing prefix '{prefix}': must end with '-' for safety")
|
|
cleaned.append(prefix)
|
|
if not cleaned:
|
|
raise SystemExit("at least one --prefix is required")
|
|
return sorted(set(cleaned))
|
|
|
|
|
|
def _parse_rows(tsv: str) -> list[list[str]]:
|
|
rows: list[list[str]] = []
|
|
for line in tsv.splitlines():
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
rows.append(line.split("\t"))
|
|
return rows
|
|
|
|
|
|
def _sql_or_email_prefixes(prefixes: list[str]) -> str:
|
|
# prefixes validated to safe charset; safe to interpolate.
|
|
clauses = [f"email LIKE '{p}%'" for p in prefixes]
|
|
return " OR ".join(clauses) if clauses else "FALSE"
|
|
|
|
|
|
def _sql_quote(value: str) -> str:
|
|
return "'" + value.replace("'", "''") + "'"
|
|
|
|
|
|
def _sql_text_array(values: Iterable[str]) -> str:
|
|
items = ",".join(_sql_quote(v) for v in values)
|
|
return f"ARRAY[{items}]::text[]"
|
|
|
|
|
|
def _list_users(prefixes: list[str], protected: set[str]) -> list[VaultwardenUser]:
|
|
clause = _sql_or_email_prefixes(prefixes)
|
|
sql = f"""
|
|
WITH candidates AS (
|
|
SELECT uuid, email
|
|
FROM users
|
|
WHERE enabled
|
|
AND ({clause})
|
|
AND email <> ALL({_sql_text_array(sorted(protected))})
|
|
)
|
|
SELECT
|
|
candidates.uuid,
|
|
candidates.email,
|
|
(
|
|
(SELECT COUNT(*) FROM auth_requests WHERE user_uuid = candidates.uuid) +
|
|
(SELECT COUNT(*) FROM ciphers WHERE user_uuid = candidates.uuid) +
|
|
(SELECT COUNT(*) FROM devices WHERE user_uuid = candidates.uuid) +
|
|
(SELECT COUNT(*) FROM emergency_access WHERE grantor_uuid = candidates.uuid OR grantee_uuid = candidates.uuid) +
|
|
(SELECT COUNT(*) FROM favorites WHERE user_uuid = candidates.uuid) +
|
|
(SELECT COUNT(*) FROM folders WHERE user_uuid = candidates.uuid) +
|
|
(SELECT COUNT(*) FROM sends WHERE user_uuid = candidates.uuid) +
|
|
(SELECT COUNT(*) FROM twofactor WHERE user_uuid = candidates.uuid) +
|
|
(SELECT COUNT(*) FROM twofactor_incomplete WHERE user_uuid = candidates.uuid) +
|
|
(SELECT COUNT(*) FROM users_collections WHERE user_uuid = candidates.uuid) +
|
|
(SELECT COUNT(*) FROM users_organizations WHERE user_uuid = candidates.uuid)
|
|
) AS dependent_rows
|
|
FROM candidates
|
|
ORDER BY candidates.email;
|
|
"""
|
|
out = _psql(sql)
|
|
users: list[VaultwardenUser] = []
|
|
for row in _parse_rows(out):
|
|
if len(row) < 3:
|
|
continue
|
|
uuid, email, dep_raw = row[0].strip(), row[1].strip(), row[2].strip()
|
|
if not uuid or not email:
|
|
continue
|
|
if not _UUID_RE.match(uuid):
|
|
continue
|
|
try:
|
|
dep = int(dep_raw)
|
|
except ValueError:
|
|
dep = 0
|
|
users.append(VaultwardenUser(uuid=uuid, email=email, dependent_rows=dep))
|
|
return users
|
|
|
|
|
|
def _list_invitations(prefixes: list[str], protected: set[str]) -> list[str]:
|
|
clause = _sql_or_email_prefixes(prefixes)
|
|
protected_clause = ""
|
|
if protected:
|
|
protected_clause = f"AND email <> ALL({_sql_text_array(sorted(protected))})"
|
|
sql = f"SELECT email FROM invitations WHERE ({clause}) {protected_clause} ORDER BY email;"
|
|
out = _psql(sql)
|
|
invites: list[str] = []
|
|
for row in _parse_rows(out):
|
|
if not row:
|
|
continue
|
|
email = row[0].strip()
|
|
if email:
|
|
invites.append(email)
|
|
return invites
|
|
|
|
|
|
def _delete_invitations(emails: list[str]) -> int:
|
|
if not emails:
|
|
return 0
|
|
email_list = ",".join(_sql_quote(e) for e in emails)
|
|
sql = f"DELETE FROM invitations WHERE email IN ({email_list});"
|
|
out = _psql(sql)
|
|
match = re.search(r"DELETE\s+(\d+)", out)
|
|
return int(match.group(1)) if match else 0
|
|
|
|
|
|
def _delete_users(uuids: list[str]) -> int:
|
|
if not uuids:
|
|
return 0
|
|
uuid_list = ",".join(_sql_quote(u) for u in uuids)
|
|
sql = f"DELETE FROM users WHERE uuid IN ({uuid_list});"
|
|
out = _psql(sql)
|
|
match = re.search(r"DELETE\s+(\d+)", out)
|
|
return int(match.group(1)) if match else 0
|
|
|
|
|
|
def _parse_args(argv: list[str]) -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
prog="test_vaultwarden_user_cleanup",
|
|
description="Manual-only cleanup for Vaultwarden test users/invites (DB-level).",
|
|
)
|
|
parser.add_argument(
|
|
"--prefix",
|
|
action="append",
|
|
required=True,
|
|
help="Email prefix to target (repeatable). Example: --prefix e2e-",
|
|
)
|
|
parser.add_argument(
|
|
"--apply",
|
|
action="store_true",
|
|
help="Apply deletions (default is dry-run). Requires --confirm.",
|
|
)
|
|
parser.add_argument(
|
|
"--confirm",
|
|
default="",
|
|
help="Required when using --apply. Must exactly equal the comma-separated prefix list.",
|
|
)
|
|
parser.add_argument(
|
|
"--protect-email",
|
|
action="append",
|
|
default=[],
|
|
help="Vaultwarden emails that must never be deleted (repeatable).",
|
|
)
|
|
parser.add_argument(
|
|
"--verbose",
|
|
action="store_true",
|
|
help="List matched emails (and invitation emails).",
|
|
)
|
|
return parser.parse_args(argv)
|
|
|
|
|
|
def main(argv: list[str]) -> int:
|
|
args = _parse_args(argv)
|
|
prefixes = _validate_prefixes(args.prefix)
|
|
expected_confirm = ",".join(prefixes)
|
|
|
|
protected = {e.strip() for e in args.protect_email if e.strip()}
|
|
protected |= {
|
|
"brad@bstein.dev",
|
|
"edstein87@outlook.com",
|
|
"indifox8@gmail.com",
|
|
"mgs.stein@gmail.com",
|
|
"patriot87@gmail.com",
|
|
}
|
|
|
|
if args.apply and args.confirm != expected_confirm:
|
|
print(
|
|
f"error: refusing to apply without --confirm '{expected_confirm}' (got '{args.confirm}')",
|
|
file=sys.stderr,
|
|
)
|
|
return 2
|
|
|
|
users = _list_users(prefixes, protected=protected)
|
|
invites = _list_invitations(prefixes, protected=protected)
|
|
|
|
print(f"prefixes: {expected_confirm}")
|
|
print(f"mode: {'APPLY' if args.apply else 'DRY-RUN'}")
|
|
if protected:
|
|
print(f"protected emails: {', '.join(sorted(protected))}")
|
|
print(f"vaultwarden users matched: {len(users)}")
|
|
print(f"vaultwarden invitations matched: {len(invites)}")
|
|
|
|
if args.verbose:
|
|
for user in users[: min(100, len(users))]:
|
|
print(f" user: {user.email} (deps={user.dependent_rows})")
|
|
if len(users) > 100:
|
|
print(f" ... and {len(users) - 100} more users")
|
|
for email in invites[: min(100, len(invites))]:
|
|
print(f" invite: {email}")
|
|
if len(invites) > 100:
|
|
print(f" ... and {len(invites) - 100} more invitations")
|
|
|
|
unsafe = [u for u in users if u.dependent_rows > 0]
|
|
if unsafe:
|
|
print("refusing to delete users with dependent data:", file=sys.stderr)
|
|
for user in unsafe[: min(50, len(unsafe))]:
|
|
print(f" - {user.email} deps={user.dependent_rows}", file=sys.stderr)
|
|
if len(unsafe) > 50:
|
|
print(f" ... and {len(unsafe) - 50} more", file=sys.stderr)
|
|
return 2
|
|
|
|
if not args.apply:
|
|
print("dry-run complete (no changes made)")
|
|
return 0
|
|
|
|
deleted_invites = _delete_invitations(invites)
|
|
deleted_users = _delete_users([u.uuid for u in users])
|
|
print(f"deleted vaultwarden invitations: {deleted_invites}")
|
|
print(f"deleted vaultwarden users: {deleted_users}")
|
|
print("done")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main(sys.argv[1:]))
|