titan-iac/scripts/test_atlas_user_cleanup.py

761 lines
26 KiB
Python
Executable File

#!/usr/bin/env python3
"""Clean up Atlas test users and portal requests (manual-only).
Default behavior is DRY RUN. This script is intended for operators to clean up
test accounts created via the bstein-dev-home onboarding portal.
Targets (best-effort):
- Keycloak users in realm "atlas"
- Atlas portal Postgres rows (access_requests + dependent tables)
- Mailu mailboxes created for test users
- Nextcloud Mail accounts created for test users
- Vaultwarden users/invites created by the portal
Safety:
- Requires an explicit username prefix (e.g. "test-")
- Dry-run unless --apply is set
- --apply requires an explicit --confirm guard
- Validates prefixes to a conservative charset
"""
from __future__ import annotations
import argparse
import base64
import json
import os
import re
import subprocess
import sys
import time
import urllib.parse
import urllib.request
from dataclasses import dataclass
from typing import Any, Iterable
_SAFE_PREFIX_RE = re.compile(r"^[a-zA-Z0-9][a-zA-Z0-9._-]{0,63}$")
@dataclass(frozen=True)
class KeycloakUser:
user_id: str
username: str
email: str
@dataclass(frozen=True)
class PortalRequestRow:
request_code: str
username: str
status: str
@dataclass(frozen=True)
class VaultwardenUser:
user_id: str
email: str
status: int
@dataclass(frozen=True)
class MailuUser:
email: str
localpart: str
domain: str
@dataclass(frozen=True)
class NextcloudMailAccount:
account_id: str
email: str
def _run(cmd: list[str], *, input_bytes: bytes | None = None) -> str:
proc = subprocess.run(
cmd,
input=input_bytes,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=False,
)
if proc.returncode != 0:
stderr = proc.stderr.decode("utf-8", errors="replace").strip()
raise RuntimeError(f"command failed ({proc.returncode}): {' '.join(cmd)}\n{stderr}")
return proc.stdout.decode("utf-8", errors="replace")
def _run_capture(cmd: list[str], *, input_bytes: bytes | None = None) -> tuple[int, str, str]:
proc = subprocess.run(
cmd,
input=input_bytes,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=False,
)
stdout = proc.stdout.decode("utf-8", errors="replace")
stderr = proc.stderr.decode("utf-8", errors="replace")
return proc.returncode, stdout, stderr
def _kubectl_get_secret_value(namespace: str, name: str, key: str) -> str:
raw_b64 = _run(
[
"kubectl",
"-n",
namespace,
"get",
"secret",
name,
"-o",
f"jsonpath={{.data.{key}}}",
]
).strip()
if not raw_b64:
raise RuntimeError(f"secret {namespace}/{name} key {key} is empty")
return base64.b64decode(raw_b64).decode("utf-8").strip()
def _kubectl_first_pod(namespace: str) -> str:
raw = _run(
[
"kubectl",
"-n",
namespace,
"get",
"pods",
"-o",
"json",
]
)
data = json.loads(raw)
items = data.get("items") or []
if not isinstance(items, list) or not items:
raise RuntimeError(f"no pods found in namespace {namespace}")
pod_name = items[0].get("metadata", {}).get("name")
if not isinstance(pod_name, str) or not pod_name:
raise RuntimeError(f"unexpected pod list in namespace {namespace}")
return pod_name
def _kubectl_exec(namespace: str, target: str, cmd: list[str]) -> tuple[int, str, str]:
return _run_capture(
[
"kubectl",
"-n",
namespace,
"exec",
"-i",
target,
"--",
*cmd,
]
)
def _validate_prefixes(prefixes: list[str]) -> list[str]:
cleaned: list[str] = []
for prefix in prefixes:
prefix = prefix.strip()
if not prefix:
continue
if not _SAFE_PREFIX_RE.match(prefix):
raise SystemExit(
f"invalid prefix '{prefix}': must match {_SAFE_PREFIX_RE.pattern} (alnum plus ._-)"
)
cleaned.append(prefix)
if not cleaned:
raise SystemExit("at least one --prefix is required")
return cleaned
def _starts_with_any(value: str, prefixes: Iterable[str]) -> bool:
return any(value.startswith(p) for p in prefixes)
def _keycloak_token(server: str, realm: str, client_id: str, client_secret: str) -> str:
data = urllib.parse.urlencode(
{
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret,
}
).encode("utf-8")
req = urllib.request.Request(
f"{server}/realms/{realm}/protocol/openid-connect/token",
data=data,
method="POST",
)
req.add_header("Content-Type", "application/x-www-form-urlencoded")
with urllib.request.urlopen(req, timeout=15) as resp:
payload = json.loads(resp.read().decode("utf-8"))
token = payload.get("access_token")
if not isinstance(token, str) or not token:
raise RuntimeError("failed to obtain keycloak access token")
return token
def _keycloak_list_users(server: str, realm: str, token: str, search: str) -> list[KeycloakUser]:
query = urllib.parse.urlencode({"max": "1000", "search": search})
req = urllib.request.Request(f"{server}/admin/realms/{realm}/users?{query}", method="GET")
req.add_header("Authorization", f"Bearer {token}")
with urllib.request.urlopen(req, timeout=30) as resp:
payload = json.loads(resp.read().decode("utf-8"))
if not isinstance(payload, list):
raise RuntimeError("unexpected keycloak users response")
users: list[KeycloakUser] = []
for item in payload:
if not isinstance(item, dict):
continue
user_id = item.get("id")
username = item.get("username") or ""
email = item.get("email") or ""
if not isinstance(user_id, str) or not user_id:
continue
if not isinstance(username, str):
continue
users.append(KeycloakUser(user_id=user_id, username=username, email=str(email)))
return users
def _keycloak_delete_user(server: str, realm: str, token: str, user_id: str) -> None:
req = urllib.request.Request(f"{server}/admin/realms/{realm}/users/{user_id}", method="DELETE")
req.add_header("Authorization", f"Bearer {token}")
try:
with urllib.request.urlopen(req, timeout=30) as resp:
_ = resp.read()
except urllib.error.HTTPError as exc:
if exc.code == 404:
return
raise
def _sql_quote(value: str) -> str:
return "'" + value.replace("'", "''") + "'"
def _psql_exec(db_name: str, sql: str, *, user: str = "postgres") -> str:
postgres_pod = _kubectl_first_pod("postgres")
return _run(
[
"kubectl",
"-n",
"postgres",
"exec",
"-i",
postgres_pod,
"--",
"psql",
"-U",
user,
"-d",
db_name,
"-c",
sql,
]
)
def _psql_tsv(db_name: str, sql: str, *, user: str = "postgres") -> list[list[str]]:
postgres_pod = _kubectl_first_pod("postgres")
out = _run(
[
"kubectl",
"-n",
"postgres",
"exec",
"-i",
postgres_pod,
"--",
"psql",
"-U",
user,
"-d",
db_name,
"-At",
"-F",
"\t",
"-c",
sql,
]
)
rows: list[list[str]] = []
for line in out.splitlines():
parts = line.split("\t")
rows.append(parts)
return rows
def _psql_json(portal_db_url: str, sql: str) -> list[dict[str, Any]]:
postgres_pod = _kubectl_first_pod("postgres")
out = _run(
[
"kubectl",
"-n",
"postgres",
"exec",
"-i",
postgres_pod,
"--",
"psql",
portal_db_url,
"-At",
"-F",
"\t",
"-c",
sql,
]
)
rows: list[dict[str, Any]] = []
for line in out.splitlines():
parts = line.split("\t")
rows.append({"cols": parts})
return rows
def _portal_list_requests(portal_db_url: str, prefixes: list[str]) -> list[PortalRequestRow]:
clauses = " OR ".join([f"username LIKE '{p}%'" for p in prefixes])
sql = (
"SELECT request_code, username, status "
"FROM access_requests "
f"WHERE {clauses} "
"ORDER BY created_at DESC;"
)
raw_rows = _psql_json(portal_db_url, sql)
parsed: list[PortalRequestRow] = []
for row in raw_rows:
cols = row.get("cols") or []
if len(cols) < 3:
continue
parsed.append(PortalRequestRow(request_code=cols[0], username=cols[1], status=cols[2]))
return parsed
def _portal_delete_requests(portal_db_url: str, prefixes: list[str]) -> int:
clauses = " OR ".join([f"username LIKE '{p}%'" for p in prefixes])
sql = f"DELETE FROM access_requests WHERE {clauses};"
postgres_pod = _kubectl_first_pod("postgres")
out = _run(
[
"kubectl",
"-n",
"postgres",
"exec",
"-i",
postgres_pod,
"--",
"psql",
portal_db_url,
"-c",
sql,
]
)
# psql prints "DELETE <n>"
match = re.search(r"DELETE\\s+(\\d+)", out)
return int(match.group(1)) if match else 0
def _mailu_list_users(prefixes: list[str], domain: str, db_name: str, protected: set[str]) -> list[MailuUser]:
if not prefixes or not domain:
return []
clauses = " OR ".join([f"localpart LIKE '{p}%'" for p in prefixes])
sql = (
'SELECT email, localpart, domain_name '
'FROM "user" '
f"WHERE domain_name = {_sql_quote(domain)} AND ({clauses}) "
"ORDER BY email;"
)
rows = _psql_tsv(db_name, sql)
users: list[MailuUser] = []
for row in rows:
if len(row) < 3:
continue
email = row[0].strip()
if not email or email in protected:
continue
users.append(MailuUser(email=email, localpart=row[1].strip(), domain=row[2].strip()))
return users
def _mailu_delete_users(db_name: str, emails: list[str]) -> int:
if not emails:
return 0
email_list = ",".join(_sql_quote(e) for e in emails)
sql = f'DELETE FROM "user" WHERE email IN ({email_list});'
out = _psql_exec(db_name, sql)
match = re.search(r"DELETE\\s+(\\d+)", out)
return int(match.group(1)) if match else 0
_NEXTCLOUD_ACCOUNT_RE = re.compile(r"^Account\\s+(\\d+):")
_EMAIL_RE = re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+")
def _nextcloud_exec(cmd: list[str]) -> tuple[int, str, str]:
namespace = os.getenv("NEXTCLOUD_NAMESPACE", "nextcloud").strip() or "nextcloud"
target = os.getenv("NEXTCLOUD_EXEC_TARGET", "deploy/nextcloud").strip() or "deploy/nextcloud"
return _kubectl_exec(namespace, target, cmd)
def _parse_nextcloud_mail_accounts(export_output: str) -> list[NextcloudMailAccount]:
accounts: list[NextcloudMailAccount] = []
current_id = ""
for line in export_output.splitlines():
line = line.strip()
if not line:
continue
match = _NEXTCLOUD_ACCOUNT_RE.match(line)
if match:
current_id = match.group(1)
continue
if not current_id or "@" not in line:
continue
email_match = _EMAIL_RE.search(line)
if not email_match:
continue
accounts.append(NextcloudMailAccount(account_id=current_id, email=email_match.group(0)))
current_id = ""
return accounts
def _nextcloud_list_mail_accounts(username: str) -> list[NextcloudMailAccount]:
occ_path = os.getenv("NEXTCLOUD_OCC_PATH", "/var/www/html/occ").strip() or "/var/www/html/occ"
rc, out, err = _nextcloud_exec(["php", occ_path, "mail:account:export", username])
if rc != 0:
message = (err or out).strip()
lowered = message.lower()
if any(token in lowered for token in ("not found", "does not exist", "no such user", "unknown user")):
return []
raise RuntimeError(f"nextcloud mail export failed for {username}: {message}")
return _parse_nextcloud_mail_accounts(out)
def _nextcloud_delete_mail_account(account_id: str) -> None:
occ_path = os.getenv("NEXTCLOUD_OCC_PATH", "/var/www/html/occ").strip() or "/var/www/html/occ"
rc, out, err = _nextcloud_exec(["php", occ_path, "mail:account:delete", "-q", account_id])
if rc != 0:
message = (err or out).strip()
raise RuntimeError(f"nextcloud mail delete failed for account {account_id}: {message}")
def _vaultwarden_admin_cookie(admin_token: str, base_url: str) -> str:
data = urllib.parse.urlencode({"token": admin_token}).encode("utf-8")
req = urllib.request.Request(f"{base_url}/admin", data=data, method="POST")
req.add_header("Content-Type", "application/x-www-form-urlencoded")
try:
with urllib.request.urlopen(req, timeout=10) as resp:
set_cookie = resp.headers.get("Set-Cookie") or ""
except urllib.error.HTTPError as exc:
if exc.code == 429:
raise RuntimeError("vaultwarden admin rate limited (HTTP 429)") from exc
raise
cookie = set_cookie.split(";", 1)[0].strip()
if not cookie:
raise RuntimeError("vaultwarden admin cookie missing")
return cookie
def _vaultwarden_list_users(base_url: str, cookie: str) -> list[VaultwardenUser]:
req = urllib.request.Request(f"{base_url}/admin/users", method="GET")
req.add_header("Cookie", cookie)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
payload = json.loads(resp.read().decode("utf-8"))
except urllib.error.HTTPError as exc:
if exc.code == 429:
raise RuntimeError("vaultwarden admin rate limited (HTTP 429)") from exc
raise
if not isinstance(payload, list):
raise RuntimeError("unexpected vaultwarden /admin/users response")
users: list[VaultwardenUser] = []
for item in payload:
if not isinstance(item, dict):
continue
user_id = item.get("id")
email = item.get("email")
status = item.get("_status")
if not isinstance(user_id, str) or not user_id:
continue
if not isinstance(email, str) or not email:
continue
if not isinstance(status, int):
status = -1
users.append(VaultwardenUser(user_id=user_id, email=email, status=status))
return users
def _vaultwarden_delete_user(base_url: str, cookie: str, user_id: str) -> None:
req = urllib.request.Request(f"{base_url}/admin/users/{user_id}", method="DELETE")
req.add_header("Cookie", cookie)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
_ = resp.read()
except urllib.error.HTTPError as exc:
if exc.code in {404}:
return
if exc.code == 429:
raise RuntimeError("vaultwarden admin rate limited (HTTP 429)") from exc
raise
def _port_forward(namespace: str, target: str, local_port: int, remote_port: int) -> subprocess.Popen[bytes]:
# Keep stdout/stderr muted to avoid leaking internal details in output.
return subprocess.Popen(
[
"kubectl",
"-n",
namespace,
"port-forward",
target,
f"{local_port}:{remote_port}",
"--address",
"127.0.0.1",
],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--prefix",
action="append",
default=[],
help="Username prefix to match (repeatable). Example: --prefix test-",
)
parser.add_argument(
"--apply",
action="store_true",
help="Actually delete; otherwise dry-run only.",
)
parser.add_argument(
"--confirm",
default="",
help=(
"Required when using --apply. Must exactly equal the comma-separated "
"sorted prefix list (e.g. 'atlas-,bob-,e2e-,test-')."
),
)
parser.add_argument("--skip-keycloak", action="store_true", help="Skip Keycloak user deletion.")
parser.add_argument("--skip-mailu", action="store_true", help="Skip Mailu mailbox cleanup.")
parser.add_argument("--skip-nextcloud-mail", action="store_true", help="Skip Nextcloud Mail account cleanup.")
parser.add_argument("--skip-portal-db", action="store_true", help="Skip portal DB cleanup.")
parser.add_argument("--skip-vaultwarden", action="store_true", help="Skip Vaultwarden cleanup.")
parser.add_argument(
"--protect-keycloak-username",
action="append",
default=[],
help="Keycloak usernames that must never be deleted (repeatable).",
)
parser.add_argument(
"--protect-mailu-email",
action="append",
default=[],
help="Mailu emails that must never be deleted (repeatable).",
)
parser.add_argument(
"--protect-nextcloud-username",
action="append",
default=[],
help="Nextcloud usernames that must never be touched (repeatable).",
)
parser.add_argument(
"--protect-vaultwarden-email",
action="append",
default=[],
help="Vaultwarden emails that must never be deleted (repeatable).",
)
args = parser.parse_args()
prefixes = sorted(set(_validate_prefixes(args.prefix)))
apply = bool(args.apply)
expected_confirm = ",".join(prefixes)
protected_keycloak = {"bstein", "robotuser", *[u.strip() for u in args.protect_keycloak_username if u.strip()]}
protected_mailu = {e.strip() for e in args.protect_mailu_email if e.strip()}
protected_nextcloud = {u.strip() for u in args.protect_nextcloud_username if u.strip()}
protected_vaultwarden = {e.strip() for e in args.protect_vaultwarden_email if e.strip()}
mailu_domain = os.getenv("MAILU_DOMAIN", "bstein.dev").strip() or "bstein.dev"
mailu_db_name = os.getenv("MAILU_DB_NAME", "mailu").strip() or "mailu"
if apply and args.confirm != expected_confirm:
raise SystemExit(
f"refusing to apply without --confirm '{expected_confirm}' (got '{args.confirm}')"
)
print("Atlas test-user cleanup")
print("prefixes:", expected_confirm)
print("mode:", "APPLY (destructive)" if apply else "DRY RUN (no changes)")
if protected_keycloak:
print("protected keycloak usernames:", ", ".join(sorted(protected_keycloak)))
if protected_mailu:
print("protected mailu emails:", ", ".join(sorted(protected_mailu)))
if protected_nextcloud:
print("protected nextcloud usernames:", ", ".join(sorted(protected_nextcloud)))
if protected_vaultwarden:
print("protected vaultwarden emails:", ", ".join(sorted(protected_vaultwarden)))
print()
portal_requests: list[PortalRequestRow] = []
if not args.skip_portal_db:
portal_db_url = _kubectl_get_secret_value("bstein-dev-home", "atlas-portal-db", "PORTAL_DATABASE_URL")
portal_requests = _portal_list_requests(portal_db_url, prefixes)
print(f"Portal DB: {len(portal_requests)} access_requests matched")
for row in portal_requests[:50]:
print(f" {row.request_code}\t{row.status}\t{row.username}")
if len(portal_requests) > 50:
print(f" ... and {len(portal_requests) - 50} more")
if apply and portal_requests:
deleted = _portal_delete_requests(portal_db_url, prefixes)
print(f"Portal DB: deleted {deleted} access_requests (cascade removes tasks/steps/artifacts).")
print()
keycloak_users: list[KeycloakUser] = []
if not args.skip_keycloak:
kc_server = os.getenv("KEYCLOAK_PUBLIC_URL", "https://sso.bstein.dev").rstrip("/")
kc_realm = os.getenv("KEYCLOAK_REALM", "atlas")
kc_client_id = os.getenv("KEYCLOAK_ADMIN_CLIENT_ID", "bstein-dev-home-admin")
kc_client_secret = _kubectl_get_secret_value(
"bstein-dev-home", "bstein-dev-home-keycloak-admin", "client_secret"
)
token = _keycloak_token(kc_server, kc_realm, kc_client_id, kc_client_secret)
found: dict[str, KeycloakUser] = {}
for prefix in prefixes:
for user in _keycloak_list_users(kc_server, kc_realm, token, prefix):
if not _starts_with_any(user.username, prefixes):
continue
if user.username in protected_keycloak:
continue
found[user.user_id] = user
keycloak_users = list(found.values())
keycloak_users.sort(key=lambda u: u.username)
print(f"Keycloak: {len(keycloak_users)} users matched")
for user in keycloak_users[:50]:
email = user.email or "-"
print(f" {user.username}\t{email}\t{user.user_id}")
if len(keycloak_users) > 50:
print(f" ... and {len(keycloak_users) - 50} more")
if apply and keycloak_users:
for user in keycloak_users:
_keycloak_delete_user(kc_server, kc_realm, token, user.user_id)
print(f"Keycloak: deleted {len(keycloak_users)} users.")
print()
if not args.skip_mailu:
mailu_users = _mailu_list_users(prefixes, mailu_domain, mailu_db_name, protected_mailu)
print(f"Mailu: {len(mailu_users)} mailboxes matched (domain={mailu_domain})")
for user in mailu_users[:50]:
print(f" {user.email}\t{user.localpart}\t{user.domain}")
if len(mailu_users) > 50:
print(f" ... and {len(mailu_users) - 50} more")
if apply and mailu_users:
deleted = _mailu_delete_users(mailu_db_name, [u.email for u in mailu_users])
print(f"Mailu: deleted {deleted} mailboxes.")
print()
if not args.skip_nextcloud_mail:
nextcloud_usernames = {row.username for row in portal_requests if row.username}
nextcloud_usernames.update({u.username for u in keycloak_users if u.username})
nextcloud_usernames = {u for u in nextcloud_usernames if _starts_with_any(u, prefixes)}
nextcloud_usernames = {u for u in nextcloud_usernames if u not in protected_nextcloud}
matches: list[tuple[str, NextcloudMailAccount]] = []
for username in sorted(nextcloud_usernames):
accounts = _nextcloud_list_mail_accounts(username)
for account in accounts:
email = account.email.strip()
if not email:
continue
if not email.lower().endswith(f"@{mailu_domain.lower()}"):
continue
localpart = email.split("@", 1)[0]
if not _starts_with_any(localpart, prefixes):
continue
if email in protected_mailu:
continue
matches.append((username, account))
print(f"Nextcloud Mail: {len(matches)} accounts matched")
for username, account in matches[:50]:
print(f" {username}\t{account.account_id}\t{account.email}")
if len(matches) > 50:
print(f" ... and {len(matches) - 50} more")
if apply and matches:
for _, account in matches:
_nextcloud_delete_mail_account(account.account_id)
print(f"Nextcloud Mail: deleted {len(matches)} accounts.")
print()
if not args.skip_vaultwarden:
pf = _port_forward("vaultwarden", "svc/vaultwarden-service", 18081, 80)
try:
# wait briefly for the port-forward to come up
for _ in range(30):
try:
urllib.request.urlopen("http://127.0.0.1:18081/", timeout=1).read(1)
break
except Exception:
time.sleep(0.2)
admin_token = _kubectl_get_secret_value("vaultwarden", "vaultwarden-admin", "ADMIN_TOKEN")
base_url = "http://127.0.0.1:18081"
try:
cookie = ""
for attempt in range(7):
try:
cookie = _vaultwarden_admin_cookie(admin_token, base_url)
break
except RuntimeError as exc:
if "rate limited" in str(exc).lower():
time.sleep(min(60.0, 2.0**attempt))
continue
raise
if not cookie:
raise RuntimeError("vaultwarden admin login repeatedly rate limited")
users: list[VaultwardenUser] = []
for attempt in range(7):
try:
users = _vaultwarden_list_users(base_url, cookie)
break
except RuntimeError as exc:
if "rate limited" in str(exc).lower():
time.sleep(min(60.0, 2.0**attempt))
continue
raise
if not users:
raise RuntimeError("vaultwarden user list unavailable (possibly rate limited)")
except RuntimeError as exc:
print(f"Vaultwarden: ERROR: {exc}")
print()
return 1
matched: list[VaultwardenUser] = []
for user in users:
local = user.email.split("@", 1)[0]
if _starts_with_any(local, prefixes):
if user.email in protected_vaultwarden:
continue
matched.append(user)
matched.sort(key=lambda u: u.email)
print(f"Vaultwarden: {len(matched)} users matched")
for user in matched[:50]:
print(f" {user.email}\tstatus={user.status}\t{user.user_id}")
if len(matched) > 50:
print(f" ... and {len(matched) - 50} more")
if apply and matched:
for user in matched:
_vaultwarden_delete_user(base_url, cookie, user.user_id)
print(f"Vaultwarden: deleted {len(matched)} users.")
print()
finally:
pf.terminate()
try:
pf.wait(timeout=3)
except Exception:
pf.kill()
return 0
if __name__ == "__main__":
raise SystemExit(main())