titan-iac/services/comms/bstein-force-leave-job.yaml

189 lines
8.7 KiB
YAML

# services/comms/bstein-force-leave-job.yaml
apiVersion: batch/v1
kind: Job
metadata:
name: bstein-leave-rooms-8
namespace: comms
spec:
backoffLimit: 0
template:
metadata:
annotations:
vault.hashicorp.com/agent-inject: "true"
vault.hashicorp.com/agent-pre-populate-only: "true"
vault.hashicorp.com/role: "comms"
vault.hashicorp.com/agent-inject-secret-mas-admin-secret: "kv/data/atlas/comms/mas-admin-client-runtime"
vault.hashicorp.com/agent-inject-template-mas-admin-secret: |
{{- with secret "kv/data/atlas/comms/mas-admin-client-runtime" -}}{{ .Data.data.client_secret }}{{- end -}}
spec:
restartPolicy: Never
serviceAccountName: comms-vault
volumes:
containers:
- name: leave
image: python:3.11-slim
volumeMounts:
env:
- name: MAS_ADMIN_CLIENT_ID
value: 01KDXMVQBQ5JNY6SEJPZW6Z8BM
- name: MAS_ADMIN_CLIENT_SECRET_FILE
value: /vault/secrets/mas-admin-secret
- name: MAS_TOKEN_URL
value: http://matrix-authentication-service:8080/oauth2/token
- name: MAS_ADMIN_API_BASE
value: http://matrix-authentication-service:8081/api/admin/v1
- name: SYNAPSE_BASE
value: http://matrix-authentication-service:8080
- name: TARGET_USERNAME
value: bstein
- name: TARGET_ROOMS
value: "!OkltaJguODUnZrbcUp:live.bstein.dev,!pMKAVvSRheIOCPIjDM:live.bstein.dev"
command:
- /bin/sh
- -c
- |
set -euo pipefail
python - <<'PY'
import base64
import json
import os
import urllib.error
import urllib.parse
import urllib.request
import time
MAS_ADMIN_CLIENT_ID = os.environ["MAS_ADMIN_CLIENT_ID"]
MAS_ADMIN_CLIENT_SECRET_FILE = os.environ["MAS_ADMIN_CLIENT_SECRET_FILE"]
MAS_TOKEN_URL = os.environ["MAS_TOKEN_URL"]
MAS_ADMIN_API_BASE = os.environ["MAS_ADMIN_API_BASE"].rstrip("/")
SYNAPSE_BASE = os.environ["SYNAPSE_BASE"].rstrip("/")
TARGET_USERNAME = os.environ["TARGET_USERNAME"]
TARGET_ROOMS = [r.strip() for r in os.environ["TARGET_ROOMS"].split(",") if r.strip()]
def http_json(method, url, *, headers=None, json_body=None, form=None, timeout=30):
req_headers = dict(headers or {})
data = None
if json_body is not None and form is not None:
raise ValueError("choose json_body or form, not both")
if json_body is not None:
data = json.dumps(json_body).encode()
req_headers.setdefault("Content-Type", "application/json")
if form is not None:
data = urllib.parse.urlencode(form).encode()
req_headers.setdefault("Content-Type", "application/x-www-form-urlencoded")
req = urllib.request.Request(url, data=data, method=method, headers=req_headers)
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
raw = resp.read()
payload = json.loads(raw.decode("utf-8")) if raw else None
return resp.status, payload
except urllib.error.HTTPError as e:
raw = e.read()
try:
payload = json.loads(raw.decode("utf-8")) if raw else None
except Exception:
payload = None
return e.code, payload
except urllib.error.URLError:
return 0, None
with open(MAS_ADMIN_CLIENT_SECRET_FILE, "r", encoding="utf-8") as f:
mas_admin_client_secret = f.read().strip()
if not mas_admin_client_secret:
raise RuntimeError("MAS admin client secret file is empty")
basic = base64.b64encode(f"{MAS_ADMIN_CLIENT_ID}:{mas_admin_client_secret}".encode()).decode()
token_status = 0
token_payload = None
for attempt in range(1, 6):
token_status, token_payload = http_json(
"POST",
MAS_TOKEN_URL,
headers={"Authorization": f"Basic {basic}"},
form={"grant_type": "client_credentials", "scope": "urn:mas:admin"},
timeout=30,
)
if token_status == 200 and token_payload and "access_token" in token_payload:
break
time.sleep(attempt * 2)
if token_status != 200 or not token_payload or "access_token" not in token_payload:
raise RuntimeError(f"MAS admin token request failed (HTTP {token_status})")
mas_admin_token = token_payload["access_token"]
user_status, user_payload = http_json(
"GET",
f"{MAS_ADMIN_API_BASE}/users/by-username/{urllib.parse.quote(TARGET_USERNAME)}",
headers={"Authorization": f"Bearer {mas_admin_token}"},
timeout=30,
)
if user_status != 200 or not user_payload or "data" not in user_payload or "id" not in user_payload["data"]:
raise RuntimeError(f"MAS user lookup failed (HTTP {user_status})")
actor_user_id = user_payload["data"]["id"]
sess_status, sess_payload = http_json(
"POST",
f"{MAS_ADMIN_API_BASE}/personal-sessions",
headers={"Authorization": f"Bearer {mas_admin_token}"},
json_body={
"actor_user_id": actor_user_id,
"human_name": "bstein room cleanup",
"scope": "urn:matrix:client:api:*",
"expires_in": 300,
},
timeout=30,
)
if sess_status != 201 or not sess_payload or "data" not in sess_payload:
raise RuntimeError(f"MAS personal session create failed (HTTP {sess_status})")
personal_session_id = sess_payload["data"]["id"]
personal_token = (sess_payload.get("data", {}).get("attributes", {}) or {}).get("access_token")
if not personal_token:
raise RuntimeError("MAS personal session did not return an access token")
results = {"rooms": {}, "revoke": None}
failures = []
try:
for room_id in TARGET_ROOMS:
room_q = urllib.parse.quote(room_id, safe="")
leave_status = 0
forget_status = 0
for attempt in range(1, 6):
leave_status, _ = http_json(
"POST",
f"{SYNAPSE_BASE}/_matrix/client/v3/rooms/{room_q}/leave",
headers={"Authorization": f"Bearer {personal_token}"},
json_body={},
timeout=30,
)
forget_status, _ = http_json(
"POST",
f"{SYNAPSE_BASE}/_matrix/client/v3/rooms/{room_q}/forget",
headers={"Authorization": f"Bearer {personal_token}"},
json_body={},
timeout=30,
)
if leave_status in (200, 404) and forget_status in (200, 404):
break
time.sleep(attempt * 2)
results["rooms"][room_id] = {"leave": leave_status, "forget": forget_status}
if leave_status not in (200, 404) or forget_status not in (200, 404):
failures.append(room_id)
finally:
revoke_status, _ = http_json(
"POST",
f"{MAS_ADMIN_API_BASE}/personal-sessions/{urllib.parse.quote(personal_session_id)}/revoke",
headers={"Authorization": f"Bearer {mas_admin_token}"},
json_body={},
timeout=30,
)
results["revoke"] = revoke_status
print(json.dumps(results, indent=2, sort_keys=True))
if failures:
raise SystemExit(f"failed to leave/forget rooms: {', '.join(failures)}")
PY