titan-iac/services/comms/synapse-admin-ensure-job.yaml

215 lines
8.9 KiB
YAML

# services/comms/synapse-admin-ensure-job.yaml
apiVersion: batch/v1
kind: Job
metadata:
name: synapse-admin-ensure-3
namespace: comms
spec:
backoffLimit: 0
ttlSecondsAfterFinished: 3600
template:
spec:
serviceAccountName: comms-secrets-ensure
restartPolicy: Never
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: node-role.kubernetes.io/worker
operator: Exists
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
preference:
matchExpressions:
- key: kubernetes.io/arch
operator: In
values: ["arm64"]
containers:
- name: ensure
image: python:3.11-slim
env:
- name: VAULT_ADDR
value: http://vault.vault.svc.cluster.local:8200
- name: VAULT_ROLE
value: comms-secrets
- name: SYNAPSE_ADMIN_URL
value: http://othrys-synapse-matrix-synapse.comms.svc.cluster.local:8008
command:
- /bin/sh
- -c
- |
set -euo pipefail
pip install --no-cache-dir psycopg2-binary bcrypt
python - <<'PY'
import json
import os
import secrets
import string
import time
import urllib.error
import urllib.request
import bcrypt
import psycopg2
VAULT_ADDR = os.environ.get("VAULT_ADDR", "http://vault.vault.svc.cluster.local:8200").rstrip("/")
VAULT_ROLE = os.environ.get("VAULT_ROLE", "comms-secrets")
SA_TOKEN_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/token"
PGHOST = "postgres-service.postgres.svc.cluster.local"
PGPORT = 5432
PGDATABASE = "synapse"
PGUSER = "synapse"
def log(msg: str) -> None:
print(msg, flush=True)
def request_json(url: str, payload: dict | None = None) -> dict:
data = None
headers = {"Content-Type": "application/json"}
if payload is not None:
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(url, data=data, headers=headers, method="POST" if data else "GET")
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode("utf-8"))
def vault_login() -> str:
with open(SA_TOKEN_PATH, "r", encoding="utf-8") as f:
jwt = f.read().strip()
payload = {"jwt": jwt, "role": VAULT_ROLE}
resp = request_json(f"{VAULT_ADDR}/v1/auth/kubernetes/login", payload)
token = resp.get("auth", {}).get("client_token")
if not token:
raise RuntimeError("vault login failed")
return token
def vault_get(token: str, path: str) -> dict:
req = urllib.request.Request(
f"{VAULT_ADDR}/v1/kv/data/atlas/{path}",
headers={"X-Vault-Token": token},
)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
payload = json.loads(resp.read().decode("utf-8"))
return payload.get("data", {}).get("data", {})
except urllib.error.HTTPError as exc:
if exc.code == 404:
return {}
raise
def vault_put(token: str, path: str, data: dict) -> None:
payload = {"data": data}
req = urllib.request.Request(
f"{VAULT_ADDR}/v1/kv/data/atlas/{path}",
data=json.dumps(payload).encode("utf-8"),
headers={"X-Vault-Token": token, "Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(req, timeout=30) as resp:
resp.read()
def random_password(length: int = 32) -> str:
alphabet = string.ascii_letters + string.digits
return "".join(secrets.choice(alphabet) for _ in range(length))
def ensure_admin_creds(token: str) -> dict:
data = vault_get(token, "comms/synapse-admin")
username = (data.get("username") or "").strip() or "synapse-admin"
password = (data.get("password") or "").strip()
if not password:
password = random_password()
data["username"] = username
data["password"] = password
vault_put(token, "comms/synapse-admin", data)
return data
def ensure_user(cur, cols, user_id, password, admin):
now_ms = int(time.time() * 1000)
values = {
"name": user_id,
"password_hash": bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode(),
"creation_ts": now_ms,
}
def add_flag(name, flag):
if name not in cols:
return
if cols[name]["type"] in ("smallint", "integer"):
values[name] = int(flag)
else:
values[name] = bool(flag)
add_flag("admin", admin)
add_flag("deactivated", False)
add_flag("shadow_banned", False)
add_flag("is_guest", False)
columns = list(values.keys())
placeholders = ", ".join(["%s"] * len(columns))
updates = ", ".join([f"{col}=EXCLUDED.{col}" for col in columns if col != "name"])
query = f"INSERT INTO users ({', '.join(columns)}) VALUES ({placeholders}) ON CONFLICT (name) DO UPDATE SET {updates};"
cur.execute(query, [values[c] for c in columns])
def get_cols(cur):
cur.execute(
"""
SELECT column_name, is_nullable, column_default, data_type
FROM information_schema.columns
WHERE table_schema = 'public' AND table_name = 'users'
"""
)
cols = {}
for name, is_nullable, default, data_type in cur.fetchall():
cols[name] = {
"nullable": is_nullable == "YES",
"default": default,
"type": data_type,
}
return cols
def ensure_access_token(cur, user_id, token_value):
cur.execute("SELECT COALESCE(MAX(id), 0) + 1 FROM access_tokens")
token_id = cur.fetchone()[0]
cur.execute(
"""
INSERT INTO access_tokens (id, user_id, token, device_id, valid_until_ms)
VALUES (%s, %s, %s, %s, NULL)
ON CONFLICT (token) DO NOTHING
""",
(token_id, user_id, token_value, "ariadne-admin"),
)
vault_token = vault_login()
admin_data = ensure_admin_creds(vault_token)
if admin_data.get("access_token"):
log("synapse admin token already present")
raise SystemExit(0)
synapse_db = vault_get(vault_token, "comms/synapse-db")
pg_password = synapse_db.get("POSTGRES_PASSWORD")
if not pg_password:
raise RuntimeError("synapse db password missing")
user_id = f"@{admin_data['username']}:live.bstein.dev"
conn = psycopg2.connect(
host=PGHOST,
port=PGPORT,
dbname=PGDATABASE,
user=PGUSER,
password=pg_password,
)
token_value = secrets.token_urlsafe(32)
try:
with conn:
with conn.cursor() as cur:
cols = get_cols(cur)
ensure_user(cur, cols, user_id, admin_data["password"], True)
ensure_access_token(cur, user_id, token_value)
finally:
conn.close()
admin_data["access_token"] = token_value
vault_put(vault_token, "comms/synapse-admin", admin_data)
log("synapse admin token stored")
PY