# services/comms/oneoffs/synapse-admin-ensure-job.yaml # One-off job for comms/synapse-admin-ensure-14. # Purpose: synapse admin ensure 14 (see container args/env in this file). # Run by setting spec.suspend to false, reconcile, then set it back to true. # Safe to delete the finished Job/pod; it should not run continuously. apiVersion: batch/v1 kind: Job metadata: name: synapse-admin-ensure-14 namespace: comms spec: suspend: true backoffLimit: 0 ttlSecondsAfterFinished: 3600 template: spec: serviceAccountName: comms-secrets-ensure restartPolicy: Never affinity: nodeAffinity: requiredDuringSchedulingIgnoredDuringExecution: nodeSelectorTerms: - matchExpressions: - key: node-role.kubernetes.io/worker operator: Exists preferredDuringSchedulingIgnoredDuringExecution: - weight: 100 preference: matchExpressions: - key: kubernetes.io/arch operator: In values: ["arm64"] containers: - name: ensure image: registry.bstein.dev/infra/synapse-admin-ensure:0.1.0 imagePullPolicy: Always env: - name: VAULT_ADDR value: http://vault.vault.svc.cluster.local:8200 - name: VAULT_ROLE value: comms-secrets - name: SYNAPSE_ADMIN_URL value: http://othrys-synapse-matrix-synapse.comms.svc.cluster.local:8008 command: - /bin/sh - -c - | set -euo pipefail python - <<'PY' import json import os import urllib.error import urllib.parse import urllib.request import psycopg2 VAULT_ADDR = os.environ.get("VAULT_ADDR", "http://vault.vault.svc.cluster.local:8200").rstrip("/") VAULT_ROLE = os.environ.get("VAULT_ROLE", "comms-secrets") SA_TOKEN_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/token" SYNAPSE_ADMIN_URL = os.environ.get("SYNAPSE_ADMIN_URL", "").rstrip("/") PGHOST = "postgres-service.postgres.svc.cluster.local" PGPORT = 5432 PGDATABASE = "synapse" PGUSER = "synapse" def log(msg: str) -> None: print(msg, flush=True) def request_json(url: str, payload: dict | None = None) -> dict: data = None headers = {"Content-Type": "application/json"} if payload is not None: data = json.dumps(payload).encode("utf-8") req = urllib.request.Request(url, data=data, headers=headers, method="POST" if data else "GET") with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode("utf-8")) def vault_login() -> str: with open(SA_TOKEN_PATH, "r", encoding="utf-8") as f: jwt = f.read().strip() payload = {"jwt": jwt, "role": VAULT_ROLE} resp = request_json(f"{VAULT_ADDR}/v1/auth/kubernetes/login", payload) token = resp.get("auth", {}).get("client_token") if not token: raise RuntimeError("vault login failed") return token def vault_get(token: str, path: str) -> dict: req = urllib.request.Request( f"{VAULT_ADDR}/v1/kv/data/atlas/{path}", headers={"X-Vault-Token": token}, ) try: with urllib.request.urlopen(req, timeout=30) as resp: payload = json.loads(resp.read().decode("utf-8")) return payload.get("data", {}).get("data", {}) except urllib.error.HTTPError as exc: if exc.code == 404: return {} raise def vault_put(token: str, path: str, data: dict) -> None: payload = {"data": data} req = urllib.request.Request( f"{VAULT_ADDR}/v1/kv/data/atlas/{path}", data=json.dumps(payload).encode("utf-8"), headers={"X-Vault-Token": token, "Content-Type": "application/json"}, method="POST", ) with urllib.request.urlopen(req, timeout=30) as resp: resp.read() def ensure_admin_creds(token: str) -> dict: data = vault_get(token, "comms/synapse-admin") username = "othrys-seeder" if data.get("username") != username: data["username"] = username data.pop("access_token", None) vault_put(token, "comms/synapse-admin", data) return data def get_cols(cur): cur.execute( """ SELECT column_name, is_nullable, column_default, data_type FROM information_schema.columns WHERE table_schema = 'public' AND table_name = 'users' """ ) cols = {} for name, is_nullable, default, data_type in cur.fetchall(): cols[name] = { "nullable": is_nullable == "YES", "default": default, "type": data_type, } return cols def admin_token_valid(token: str, user_id: str) -> bool: if not token or not SYNAPSE_ADMIN_URL: return False encoded = urllib.parse.quote(user_id, safe="") url = f"{SYNAPSE_ADMIN_URL}/_synapse/admin/v2/users/{encoded}" req = urllib.request.Request(url, headers={"Authorization": f"Bearer {token}"}) try: with urllib.request.urlopen(req, timeout=30) as resp: resp.read() return True except urllib.error.HTTPError as exc: if exc.code == 404: return True if exc.code in (401, 403): return False raise vault_token = vault_login() admin_data = ensure_admin_creds(vault_token) user_id = f"@{admin_data['username']}:live.bstein.dev" existing_token = admin_data.get("access_token") if existing_token and admin_token_valid(existing_token, user_id): log("synapse admin token already present and valid") raise SystemExit(0) if existing_token: log("synapse admin token invalid; rotating") admin_data.pop("access_token", None) vault_put(vault_token, "comms/synapse-admin", admin_data) synapse_db = vault_get(vault_token, "comms/synapse-db") pg_password = synapse_db.get("POSTGRES_PASSWORD") if not pg_password: raise RuntimeError("synapse db password missing") conn = psycopg2.connect( host=PGHOST, port=PGPORT, dbname=PGDATABASE, user=PGUSER, password=pg_password, ) try: with conn: with conn.cursor() as cur: cols = get_cols(cur) if "admin" not in cols: raise RuntimeError("users.admin column missing") cur.execute( """ SELECT token FROM access_tokens WHERE user_id = %s AND valid_until_ms IS NULL ORDER BY id DESC LIMIT 1 """, (user_id,), ) row = cur.fetchone() if not row: raise RuntimeError(f"no access token found for {user_id}") token_value = row[0] finally: conn.close() admin_data["access_token"] = token_value vault_put(vault_token, "comms/synapse-admin", admin_data) if not admin_token_valid(token_value, user_id): raise RuntimeError("synapse admin token validation failed") log("synapse admin token stored") PY