# services/comms/oneoffs/synapse-admin-ensure-job.yaml # One-off job for comms/synapse-admin-ensure-4. # Purpose: synapse admin ensure 4 (see container args/env in this file). # Run by setting spec.suspend to false, reconcile, then set it back to true. # Safe to delete the finished Job/pod; it should not run continuously. apiVersion: batch/v1 kind: Job metadata: name: synapse-admin-ensure-4 namespace: comms spec: suspend: false backoffLimit: 0 ttlSecondsAfterFinished: 3600 template: spec: serviceAccountName: comms-secrets-ensure restartPolicy: Never affinity: nodeAffinity: requiredDuringSchedulingIgnoredDuringExecution: nodeSelectorTerms: - matchExpressions: - key: node-role.kubernetes.io/worker operator: Exists preferredDuringSchedulingIgnoredDuringExecution: - weight: 100 preference: matchExpressions: - key: kubernetes.io/arch operator: In values: ["arm64"] containers: - name: ensure image: python:3.11-slim env: - name: VAULT_ADDR value: http://vault.vault.svc.cluster.local:8200 - name: VAULT_ROLE value: comms-secrets - name: SYNAPSE_ADMIN_URL value: http://othrys-synapse-matrix-synapse.comms.svc.cluster.local:8008 - name: MAS_AUTH_URL value: http://matrix-authentication-service.comms.svc.cluster.local:8080 command: - /bin/sh - -c - | set -euo pipefail python - <<'PY' import hashlib import hmac import json import os import secrets import string import urllib.error import urllib.request VAULT_ADDR = os.environ.get("VAULT_ADDR", "http://vault.vault.svc.cluster.local:8200").rstrip("/") VAULT_ROLE = os.environ.get("VAULT_ROLE", "comms-secrets") SA_TOKEN_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/token" SYNAPSE_ADMIN_URL = os.environ.get("SYNAPSE_ADMIN_URL", "").rstrip("/") MAS_AUTH_URL = os.environ.get("MAS_AUTH_URL", SYNAPSE_ADMIN_URL).rstrip("/") def log(msg: str) -> None: print(msg, flush=True) def request_json(url: str, payload: dict | None = None, method: str | None = None) -> tuple[int, dict]: data = None headers = {"Content-Type": "application/json"} if payload is not None: data = json.dumps(payload).encode("utf-8") req = urllib.request.Request( url, data=data, headers=headers, method=method or ("POST" if data else "GET"), ) try: with urllib.request.urlopen(req, timeout=30) as resp: body = resp.read().decode("utf-8") return resp.getcode(), json.loads(body) if body else {} except urllib.error.HTTPError as exc: body = exc.read().decode("utf-8") try: payload = json.loads(body) if body else {} except json.JSONDecodeError: payload = {} return exc.code, payload def vault_login() -> str: with open(SA_TOKEN_PATH, "r", encoding="utf-8") as f: jwt = f.read().strip() payload = {"jwt": jwt, "role": VAULT_ROLE} resp = request_json(f"{VAULT_ADDR}/v1/auth/kubernetes/login", payload) token = resp.get("auth", {}).get("client_token") if not token: raise RuntimeError("vault login failed") return token def vault_get(token: str, path: str) -> dict: req = urllib.request.Request( f"{VAULT_ADDR}/v1/kv/data/atlas/{path}", headers={"X-Vault-Token": token}, ) try: with urllib.request.urlopen(req, timeout=30) as resp: payload = json.loads(resp.read().decode("utf-8")) return payload.get("data", {}).get("data", {}) except urllib.error.HTTPError as exc: if exc.code == 404: return {} raise def vault_put(token: str, path: str, data: dict) -> None: payload = {"data": data} req = urllib.request.Request( f"{VAULT_ADDR}/v1/kv/data/atlas/{path}", data=json.dumps(payload).encode("utf-8"), headers={"X-Vault-Token": token, "Content-Type": "application/json"}, method="POST", ) with urllib.request.urlopen(req, timeout=30) as resp: resp.read() def random_password(length: int = 32) -> str: alphabet = string.ascii_letters + string.digits return "".join(secrets.choice(alphabet) for _ in range(length)) def ensure_admin_creds(token: str) -> dict: data = vault_get(token, "comms/synapse-admin") username = (data.get("username") or "").strip() or "synapse-admin" password = (data.get("password") or "").strip() if not password: password = random_password() data["username"] = username data["password"] = password vault_put(token, "comms/synapse-admin", data) return data def registration_mac(nonce: str, username: str, password: str, admin: bool, shared_secret: str) -> str: admin_value = "admin" if admin else "notadmin" msg = "\x00".join([nonce, username, password, admin_value]).encode("utf-8") return hmac.new(shared_secret.encode("utf-8"), msg=msg, digestmod=hashlib.sha1).hexdigest() def synapse_register(shared_secret: str, username: str, password: str, admin: bool) -> None: status, nonce_payload = request_json( f"{SYNAPSE_ADMIN_URL}/_synapse/admin/v1/register", method="GET", ) if status != 200: raise RuntimeError(f"register nonce failed: {status}") nonce = (nonce_payload or {}).get("nonce") if not nonce: raise RuntimeError("register nonce missing") mac = registration_mac(nonce, username, password, admin, shared_secret) payload = { "nonce": nonce, "username": username, "password": password, "admin": admin, "mac": mac, } status, resp = request_json( f"{SYNAPSE_ADMIN_URL}/_synapse/admin/v1/register", payload, method="POST", ) if status in (200, 201): return if resp.get("errcode") == "M_USER_IN_USE": return raise RuntimeError(f"register failed: {status} {resp}") def synapse_login(username: str, password: str) -> str: payload = { "type": "m.login.password", "identifier": {"type": "m.id.user", "user": username}, "password": password, } status, resp = request_json(f"{MAS_AUTH_URL}/_matrix/client/v3/login", payload, method="POST") if status not in (200, 201): raise RuntimeError(f"login failed: {status} {resp}") token = resp.get("access_token") if not token: raise RuntimeError("login returned no access token") return token vault_token = vault_login() admin_data = ensure_admin_creds(vault_token) if admin_data.get("access_token"): log("synapse admin token already present") raise SystemExit(0) reg_secret = vault_get(vault_token, "comms/synapse-registration") shared_secret = (reg_secret.get("registration_shared_secret") or "").strip() if not shared_secret: raise RuntimeError("registration shared secret missing") synapse_register(shared_secret, admin_data["username"], admin_data["password"], True) token_value = synapse_login(admin_data["username"], admin_data["password"]) admin_data["access_token"] = token_value vault_put(vault_token, "comms/synapse-admin", admin_data) log("synapse admin token stored") PY