scripts: add vaultwarden test cleanup
This commit is contained in:
parent
28a5d53c98
commit
a4105c68db
318
scripts/test_vaultwarden_user_cleanup.py
Executable file
318
scripts/test_vaultwarden_user_cleanup.py
Executable file
@ -0,0 +1,318 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Clean up Vaultwarden test users and invites (manual-only).
|
||||
|
||||
This script deletes Vaultwarden rows directly from the Postgres database. It is
|
||||
intended only for removing test fallout (e.g. e2e-*, test-*) and is deliberately
|
||||
conservative:
|
||||
|
||||
- Requires one or more explicit email prefixes (repeatable).
|
||||
- Dry-run by default; --apply requires an exact --confirm guard.
|
||||
- Refuses to delete any user with dependent data in Vaultwarden tables.
|
||||
- Supports a protected email allowlist to prevent catastrophic mistakes.
|
||||
|
||||
Example (dry-run):
|
||||
scripts/test_vaultwarden_user_cleanup.py --prefix e2e-
|
||||
|
||||
Example (apply):
|
||||
scripts/test_vaultwarden_user_cleanup.py --prefix e2e- --apply --confirm e2e-
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
from dataclasses import dataclass
|
||||
from typing import Iterable, Sequence
|
||||
|
||||
|
||||
_SAFE_PREFIX_RE = re.compile(r"^[a-zA-Z0-9][a-zA-Z0-9._-]{0,63}$")
|
||||
_UUID_RE = re.compile(r"^[0-9a-fA-F-]{32,36}$")
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class VaultwardenUser:
|
||||
uuid: str
|
||||
email: str
|
||||
dependent_rows: int
|
||||
|
||||
|
||||
def _run(cmd: Sequence[str], *, input_bytes: bytes | None = None) -> str:
|
||||
proc = subprocess.run(
|
||||
list(cmd),
|
||||
input=input_bytes,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
check=False,
|
||||
)
|
||||
if proc.returncode != 0:
|
||||
stderr = proc.stderr.decode("utf-8", errors="replace").strip()
|
||||
raise RuntimeError(f"command failed ({proc.returncode}): {' '.join(cmd)}\n{stderr}")
|
||||
return proc.stdout.decode("utf-8", errors="replace")
|
||||
|
||||
|
||||
def _kubectl_first_pod(namespace: str) -> str:
|
||||
raw = _run(["kubectl", "-n", namespace, "get", "pods", "-o", "json"])
|
||||
data = json.loads(raw)
|
||||
items = data.get("items") or []
|
||||
if not isinstance(items, list) or not items:
|
||||
raise RuntimeError(f"no pods found in namespace {namespace}")
|
||||
name = items[0].get("metadata", {}).get("name")
|
||||
if not isinstance(name, str) or not name:
|
||||
raise RuntimeError(f"unexpected pod list in namespace {namespace}")
|
||||
return name
|
||||
|
||||
|
||||
def _psql(sql: str) -> str:
|
||||
pod = _kubectl_first_pod("postgres")
|
||||
return _run(
|
||||
[
|
||||
"kubectl",
|
||||
"-n",
|
||||
"postgres",
|
||||
"exec",
|
||||
"-i",
|
||||
pod,
|
||||
"--",
|
||||
"psql",
|
||||
"-U",
|
||||
"postgres",
|
||||
"-d",
|
||||
"vaultwarden",
|
||||
"-At",
|
||||
"-F",
|
||||
"\t",
|
||||
"-c",
|
||||
sql,
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def _validate_prefixes(prefixes: Iterable[str]) -> list[str]:
|
||||
cleaned: list[str] = []
|
||||
for prefix in prefixes:
|
||||
prefix = prefix.strip()
|
||||
if not prefix:
|
||||
continue
|
||||
if not _SAFE_PREFIX_RE.match(prefix):
|
||||
raise SystemExit(
|
||||
f"invalid prefix '{prefix}': must match {_SAFE_PREFIX_RE.pattern} (alnum plus ._-)"
|
||||
)
|
||||
if not prefix.endswith("-"):
|
||||
raise SystemExit(f"refusing prefix '{prefix}': must end with '-' for safety")
|
||||
cleaned.append(prefix)
|
||||
if not cleaned:
|
||||
raise SystemExit("at least one --prefix is required")
|
||||
return sorted(set(cleaned))
|
||||
|
||||
|
||||
def _parse_rows(tsv: str) -> list[list[str]]:
|
||||
rows: list[list[str]] = []
|
||||
for line in tsv.splitlines():
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
rows.append(line.split("\t"))
|
||||
return rows
|
||||
|
||||
|
||||
def _sql_or_email_prefixes(prefixes: list[str]) -> str:
|
||||
# prefixes validated to safe charset; safe to interpolate.
|
||||
clauses = [f"email LIKE '{p}%'" for p in prefixes]
|
||||
return " OR ".join(clauses) if clauses else "FALSE"
|
||||
|
||||
|
||||
def _sql_quote(value: str) -> str:
|
||||
return "'" + value.replace("'", "''") + "'"
|
||||
|
||||
|
||||
def _sql_text_array(values: Iterable[str]) -> str:
|
||||
items = ",".join(_sql_quote(v) for v in values)
|
||||
return f"ARRAY[{items}]::text[]"
|
||||
|
||||
|
||||
def _list_users(prefixes: list[str], protected: set[str]) -> list[VaultwardenUser]:
|
||||
clause = _sql_or_email_prefixes(prefixes)
|
||||
sql = f"""
|
||||
WITH candidates AS (
|
||||
SELECT uuid, email
|
||||
FROM users
|
||||
WHERE enabled
|
||||
AND ({clause})
|
||||
AND email <> ALL({_sql_text_array(sorted(protected))})
|
||||
)
|
||||
SELECT
|
||||
candidates.uuid,
|
||||
candidates.email,
|
||||
(
|
||||
(SELECT COUNT(*) FROM auth_requests WHERE user_uuid = candidates.uuid) +
|
||||
(SELECT COUNT(*) FROM ciphers WHERE user_uuid = candidates.uuid) +
|
||||
(SELECT COUNT(*) FROM devices WHERE user_uuid = candidates.uuid) +
|
||||
(SELECT COUNT(*) FROM emergency_access WHERE grantor_uuid = candidates.uuid OR grantee_uuid = candidates.uuid) +
|
||||
(SELECT COUNT(*) FROM favorites WHERE user_uuid = candidates.uuid) +
|
||||
(SELECT COUNT(*) FROM folders WHERE user_uuid = candidates.uuid) +
|
||||
(SELECT COUNT(*) FROM sends WHERE user_uuid = candidates.uuid) +
|
||||
(SELECT COUNT(*) FROM twofactor WHERE user_uuid = candidates.uuid) +
|
||||
(SELECT COUNT(*) FROM twofactor_incomplete WHERE user_uuid = candidates.uuid) +
|
||||
(SELECT COUNT(*) FROM users_collections WHERE user_uuid = candidates.uuid) +
|
||||
(SELECT COUNT(*) FROM users_organizations WHERE user_uuid = candidates.uuid)
|
||||
) AS dependent_rows
|
||||
FROM candidates
|
||||
ORDER BY candidates.email;
|
||||
"""
|
||||
out = _psql(sql)
|
||||
users: list[VaultwardenUser] = []
|
||||
for row in _parse_rows(out):
|
||||
if len(row) < 3:
|
||||
continue
|
||||
uuid, email, dep_raw = row[0].strip(), row[1].strip(), row[2].strip()
|
||||
if not uuid or not email:
|
||||
continue
|
||||
if not _UUID_RE.match(uuid):
|
||||
continue
|
||||
try:
|
||||
dep = int(dep_raw)
|
||||
except ValueError:
|
||||
dep = 0
|
||||
users.append(VaultwardenUser(uuid=uuid, email=email, dependent_rows=dep))
|
||||
return users
|
||||
|
||||
|
||||
def _list_invitations(prefixes: list[str], protected: set[str]) -> list[str]:
|
||||
clause = _sql_or_email_prefixes(prefixes)
|
||||
protected_clause = ""
|
||||
if protected:
|
||||
protected_clause = f"AND email <> ALL({_sql_text_array(sorted(protected))})"
|
||||
sql = f"SELECT email FROM invitations WHERE ({clause}) {protected_clause} ORDER BY email;"
|
||||
out = _psql(sql)
|
||||
invites: list[str] = []
|
||||
for row in _parse_rows(out):
|
||||
if not row:
|
||||
continue
|
||||
email = row[0].strip()
|
||||
if email:
|
||||
invites.append(email)
|
||||
return invites
|
||||
|
||||
|
||||
def _delete_invitations(emails: list[str]) -> int:
|
||||
if not emails:
|
||||
return 0
|
||||
email_list = ",".join(_sql_quote(e) for e in emails)
|
||||
sql = f"DELETE FROM invitations WHERE email IN ({email_list});"
|
||||
out = _psql(sql)
|
||||
match = re.search(r"DELETE\s+(\d+)", out)
|
||||
return int(match.group(1)) if match else 0
|
||||
|
||||
|
||||
def _delete_users(uuids: list[str]) -> int:
|
||||
if not uuids:
|
||||
return 0
|
||||
uuid_list = ",".join(_sql_quote(u) for u in uuids)
|
||||
sql = f"DELETE FROM users WHERE uuid IN ({uuid_list});"
|
||||
out = _psql(sql)
|
||||
match = re.search(r"DELETE\s+(\d+)", out)
|
||||
return int(match.group(1)) if match else 0
|
||||
|
||||
|
||||
def _parse_args(argv: list[str]) -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser(
|
||||
prog="test_vaultwarden_user_cleanup",
|
||||
description="Manual-only cleanup for Vaultwarden test users/invites (DB-level).",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--prefix",
|
||||
action="append",
|
||||
required=True,
|
||||
help="Email prefix to target (repeatable). Example: --prefix e2e-",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--apply",
|
||||
action="store_true",
|
||||
help="Apply deletions (default is dry-run). Requires --confirm.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--confirm",
|
||||
default="",
|
||||
help="Required when using --apply. Must exactly equal the comma-separated prefix list.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--protect-email",
|
||||
action="append",
|
||||
default=[],
|
||||
help="Vaultwarden emails that must never be deleted (repeatable).",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--verbose",
|
||||
action="store_true",
|
||||
help="List matched emails (and invitation emails).",
|
||||
)
|
||||
return parser.parse_args(argv)
|
||||
|
||||
|
||||
def main(argv: list[str]) -> int:
|
||||
args = _parse_args(argv)
|
||||
prefixes = _validate_prefixes(args.prefix)
|
||||
expected_confirm = ",".join(prefixes)
|
||||
|
||||
protected = {e.strip() for e in args.protect_email if e.strip()}
|
||||
protected |= {
|
||||
"brad@bstein.dev",
|
||||
"edstein87@outlook.com",
|
||||
"indifox8@gmail.com",
|
||||
"mgs.stein@gmail.com",
|
||||
"patriot87@gmail.com",
|
||||
}
|
||||
|
||||
if args.apply and args.confirm != expected_confirm:
|
||||
print(
|
||||
f"error: refusing to apply without --confirm '{expected_confirm}' (got '{args.confirm}')",
|
||||
file=sys.stderr,
|
||||
)
|
||||
return 2
|
||||
|
||||
users = _list_users(prefixes, protected=protected)
|
||||
invites = _list_invitations(prefixes, protected=protected)
|
||||
|
||||
print(f"prefixes: {expected_confirm}")
|
||||
print(f"mode: {'APPLY' if args.apply else 'DRY-RUN'}")
|
||||
if protected:
|
||||
print(f"protected emails: {', '.join(sorted(protected))}")
|
||||
print(f"vaultwarden users matched: {len(users)}")
|
||||
print(f"vaultwarden invitations matched: {len(invites)}")
|
||||
|
||||
if args.verbose:
|
||||
for user in users[: min(100, len(users))]:
|
||||
print(f" user: {user.email} (deps={user.dependent_rows})")
|
||||
if len(users) > 100:
|
||||
print(f" ... and {len(users) - 100} more users")
|
||||
for email in invites[: min(100, len(invites))]:
|
||||
print(f" invite: {email}")
|
||||
if len(invites) > 100:
|
||||
print(f" ... and {len(invites) - 100} more invitations")
|
||||
|
||||
unsafe = [u for u in users if u.dependent_rows > 0]
|
||||
if unsafe:
|
||||
print("refusing to delete users with dependent data:", file=sys.stderr)
|
||||
for user in unsafe[: min(50, len(unsafe))]:
|
||||
print(f" - {user.email} deps={user.dependent_rows}", file=sys.stderr)
|
||||
if len(unsafe) > 50:
|
||||
print(f" ... and {len(unsafe) - 50} more", file=sys.stderr)
|
||||
return 2
|
||||
|
||||
if not args.apply:
|
||||
print("dry-run complete (no changes made)")
|
||||
return 0
|
||||
|
||||
deleted_invites = _delete_invitations(invites)
|
||||
deleted_users = _delete_users([u.uuid for u in users])
|
||||
print(f"deleted vaultwarden invitations: {deleted_invites}")
|
||||
print(f"deleted vaultwarden users: {deleted_users}")
|
||||
print("done")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main(sys.argv[1:]))
|
||||
15
scripts/test_vaultwarden_user_cleanup.sh
Executable file
15
scripts/test_vaultwarden_user_cleanup.sh
Executable file
@ -0,0 +1,15 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
# Manual-only helper to clean Vaultwarden test users and invites from Postgres.
|
||||
#
|
||||
# Usage (dry-run):
|
||||
# scripts/test_vaultwarden_user_cleanup.sh --prefix e2e-
|
||||
#
|
||||
# Usage (apply):
|
||||
# scripts/test_vaultwarden_user_cleanup.sh --prefix e2e- --apply --confirm e2e-
|
||||
|
||||
SCRIPT_DIR="$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" >/dev/null 2>&1 && pwd)"
|
||||
|
||||
python3 "${SCRIPT_DIR}/test_vaultwarden_user_cleanup.py" "$@"
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user