# services/communication/guest-register-configmap.yaml apiVersion: v1 kind: ConfigMap metadata: name: matrix-guest-register data: server.py: | import base64 import json import os import random import secrets from http.server import BaseHTTPRequestHandler, HTTPServer from urllib import error, parse, request SYNAPSE_BASE = os.environ.get("SYNAPSE_BASE", "http://othrys-synapse-matrix-synapse:8008").rstrip("/") AUTH_BASE = os.environ.get("AUTH_BASE", "http://matrix-authentication-service:8080").rstrip("/") SERVER_NAME = os.environ.get("MATRIX_SERVER_NAME", "live.bstein.dev") SEEDER_USER = os.environ["SEEDER_USER"] SEEDER_PASS = os.environ["SEEDER_PASS"] # Basic rate limiting (best-effort) to avoid accidental abuse. # Count requests per client IP over a short window. RATE_WINDOW_SEC = int(os.environ.get("RATE_WINDOW_SEC", "60")) RATE_MAX = int(os.environ.get("RATE_MAX", "30")) _rate = {} # ip -> [window_start, count] ADJ = ["brisk", "calm", "eager", "gentle", "merry", "nifty", "rapid", "sunny", "witty", "zesty"] NOUN = ["otter", "falcon", "comet", "ember", "grove", "harbor", "meadow", "raven", "river", "summit"] def _json(method, url, *, token=None, body=None, timeout=20): headers = {"Content-Type": "application/json"} if token: headers["Authorization"] = f"Bearer {token}" data = None if body is not None: data = json.dumps(body).encode() req = request.Request(url, data=data, headers=headers, method=method) try: with request.urlopen(req, timeout=timeout) as resp: raw = resp.read() payload = json.loads(raw.decode()) if raw else {} return resp.status, payload except error.HTTPError as e: raw = e.read() try: payload = json.loads(raw.decode()) if raw else {} except Exception: payload = {} return e.code, payload _seeder_token = None _seeder_token_at = 0.0 def _login(user, password): status, payload = _json( "POST", f"{AUTH_BASE}/_matrix/client/v3/login", body={ "type": "m.login.password", "identifier": {"type": "m.id.user", "user": user}, "password": password, }, timeout=20, ) if status != 200 or "access_token" not in payload: raise RuntimeError("login_failed") return payload def _seeder_access_token(now): global _seeder_token, _seeder_token_at if _seeder_token and (now - _seeder_token_at) < 300: return _seeder_token payload = _login(SEEDER_USER, SEEDER_PASS) _seeder_token = payload["access_token"] _seeder_token_at = now return _seeder_token def _create_user(admin_token, localpart, password, displayname): user_id = f"@{localpart}:{SERVER_NAME}" status, payload = _json( "PUT", f"{SYNAPSE_BASE}/_synapse/admin/v2/users/{parse.quote(user_id)}", token=admin_token, body={ "password": password, "admin": False, "deactivated": False, "displayname": displayname, }, timeout=25, ) if status not in (200, 201): raise RuntimeError("user_create_failed") return user_id def _generate_localpart(): return "guest-" + secrets.token_hex(6) def _generate_displayname(): return f"{random.choice(ADJ)}-{random.choice(NOUN)}" def _rate_check(ip, now): win, cnt = _rate.get(ip, (now, 0)) if now - win > RATE_WINDOW_SEC: _rate[ip] = (now, 1) return True if cnt >= RATE_MAX: return False _rate[ip] = (win, cnt + 1) return True class Handler(BaseHTTPRequestHandler): server_version = "matrix-guest-register" def _send_json(self, code, payload): body = json.dumps(payload).encode() self.send_response(code) self.send_header("Content-Type", "application/json") self.send_header("Access-Control-Allow-Origin", "*") self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS") self.send_header("Access-Control-Allow-Headers", "Content-Type, Authorization, X-Requested-With") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def do_OPTIONS(self): # noqa: N802 self.send_response(204) self.send_header("Access-Control-Allow-Origin", "*") self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS") self.send_header("Access-Control-Allow-Headers", "Content-Type, Authorization, X-Requested-With") self.end_headers() def do_GET(self): # noqa: N802 if self.path in ("/healthz", "/"): return self._send_json(200, {"ok": True}) return self._send_json(404, {"errcode": "M_NOT_FOUND", "error": "not_found"}) def do_POST(self): # noqa: N802 # We only implement guest registration (used by Element Web "Join as guest"). parsed = parse.urlparse(self.path) if parsed.path not in ("/_matrix/client/v3/register", "/_matrix/client/r0/register"): return self._send_json(404, {"errcode": "M_NOT_FOUND", "error": "not_found"}) qs = parse.parse_qs(parsed.query) kind = (qs.get("kind") or ["user"])[0] if kind != "guest": return self._send_json( 403, { "errcode": "M_FORBIDDEN", "error": "Registration is disabled; use https://bstein.dev/request-access for accounts.", }, ) # Best-effort client IP from X-Forwarded-For (Traefik). xfwd = self.headers.get("x-forwarded-for", "") ip = (xfwd.split(",")[0].strip() if xfwd else "") or self.client_address[0] now = __import__("time").time() if not _rate_check(ip, now): return self._send_json(429, {"errcode": "M_LIMIT_EXCEEDED", "error": "rate_limited"}) # Consume request body (Element may send fields; we ignore). length = int(self.headers.get("content-length", "0") or "0") _ = self.rfile.read(length) if length else b"{}" # Create a short-lived "guest" account by provisioning a normal user with a random password. # This keeps MAS/OIDC intact while restoring a no-signup guest UX. try: admin_token = _seeder_access_token(now) displayname = _generate_displayname() localpart = _generate_localpart() password = base64.urlsafe_b64encode(secrets.token_bytes(24)).decode().rstrip("=") user_id = _create_user(admin_token, localpart, password, displayname) login_payload = _login(localpart, password) except Exception: return self._send_json(502, {"errcode": "M_UNKNOWN", "error": "guest_provision_failed"}) resp = { "user_id": login_payload.get("user_id") or user_id, "access_token": login_payload.get("access_token"), "device_id": login_payload.get("device_id"), "home_server": SERVER_NAME, } # Do not expose refresh tokens for guests. return self._send_json(200, resp) def main(): port = int(os.environ.get("PORT", "8080")) HTTPServer(("0.0.0.0", port), Handler).serve_forever() if __name__ == "__main__": main()