# services/communication/pin-othrys-job.yaml apiVersion: batch/v1 kind: Job metadata: name: pin-othrys-invite namespace: communication spec: ttlSecondsAfterFinished: 3600 template: spec: restartPolicy: OnFailure containers: - name: pin image: python:3.11-slim env: - name: SYNAPSE_BASE value: http://othrys-synapse-matrix-synapse:8008 - name: SEEDER_USER value: othrys-seeder - name: SEEDER_PASS valueFrom: secretKeyRef: name: atlasbot-credentials key: seeder-password command: - /bin/sh - -c - | set -euo pipefail pip install --no-cache-dir requests >/dev/null python - <<'PY' import requests, urllib.parse, os BASE = os.environ["SYNAPSE_BASE"] def login(user, password): r = requests.post(f"{BASE}/_matrix/client/v3/login", json={ "type": "m.login.password", "identifier": {"type": "m.id.user", "user": user}, "password": password, }) r.raise_for_status() return r.json()["access_token"] def resolve(alias, token): enc = urllib.parse.quote(alias) r = requests.get(f"{BASE}/_matrix/client/v3/directory/room/{enc}", headers={"Authorization": f"Bearer {token}"}) r.raise_for_status() return r.json()["room_id"] def send(room_id, token, body): r = requests.post(f"{BASE}/_matrix/client/v3/rooms/{urllib.parse.quote(room_id)}/send/m.room.message", headers={"Authorization": f"Bearer {token}"}, json={"msgtype": "m.text", "body": body}) r.raise_for_status() return r.json()["event_id"] def pin(room_id, token, event_id): r = requests.put(f"{BASE}/_matrix/client/v3/rooms/{urllib.parse.quote(room_id)}/state/m.room.pinned_events", headers={"Authorization": f"Bearer {token}"}, json={"pinned": [event_id]}) r.raise_for_status() token = login(os.environ["SEEDER_USER"], os.environ["SEEDER_PASS"]) room_id = resolve("#othrys:live.bstein.dev", token) msg = "Invite guests: share https://live.bstein.dev/#/room/#othrys:live.bstein.dev?action=join and choose 'Continue' -> 'Join as guest'." eid = send(room_id, token, msg) pin(room_id, token, eid) PY