# services/game-stream/wolf-gatekeeper-configmap.yaml apiVersion: v1 kind: ConfigMap metadata: name: wolf-gatekeeper namespace: game-stream data: wolf_gatekeeper.py: | #!/usr/bin/env python3 from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer import ipaddress import json import os import re import subprocess LISTEN = ("0.0.0.0", int(os.environ.get("GATEKEEPER_HTTP_PORT", "8087"))) HOST_ROOT = os.environ.get("HOST_ROOT", "/host") NFT_PATH = os.environ.get("NFT_PATH", "/sbin/nft") MAX_TTL_SECONDS = int(os.environ.get("MAX_TTL_SECONDS", "28800")) TCP_PORTS = ["47984", "47989", "48010"] UDP_PORTS = ["47999", "48100", "48200"] PRIVATE_V4 = ["10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "127.0.0.0/8"] PRIVATE_V6 = ["::1/128", "fc00::/7", "fe80::/10"] def _json(handler, status, payload): body = json.dumps(payload).encode("utf-8") handler.send_response(status) handler.send_header("content-type", "application/json") handler.send_header("content-length", str(len(body))) handler.end_headers() handler.wfile.write(body) def _nft(args, check=True): command = ["chroot", HOST_ROOT, NFT_PATH, *args] proc = subprocess.run(command, check=False, capture_output=True, text=True) if check and proc.returncode != 0: raise RuntimeError(proc.stderr.strip() or proc.stdout.strip() or "nft failed") return proc def _port_set(ports): return ["{", *[f"{port}," for port in ports[:-1]], ports[-1], "}"] def _ensure_rules(): _nft(["add", "table", "inet", "wolf_gatekeeper"], check=False) _nft(["add", "set", "inet", "wolf_gatekeeper", "allowed_v4", "{", "type", "ipv4_addr;", "flags", "timeout;", "}"], check=False) _nft(["add", "set", "inet", "wolf_gatekeeper", "allowed_v6", "{", "type", "ipv6_addr;", "flags", "timeout;", "}"], check=False) _nft( [ "add", "chain", "inet", "wolf_gatekeeper", "input", "{", "type", "filter", "hook", "input", "priority", "-90;", "policy", "accept;", "}", ], check=False, ) _nft(["flush", "chain", "inet", "wolf_gatekeeper", "input"], check=False) _nft(["add", "rule", "inet", "wolf_gatekeeper", "input", "iifname", "lo", "accept"]) for cidr in PRIVATE_V4: _nft(["add", "rule", "inet", "wolf_gatekeeper", "input", "ip", "saddr", cidr, "accept"]) for cidr in PRIVATE_V6: _nft(["add", "rule", "inet", "wolf_gatekeeper", "input", "ip6", "saddr", cidr, "accept"]) _nft(["add", "rule", "inet", "wolf_gatekeeper", "input", "ip", "saddr", "@allowed_v4", "tcp", "dport", *_port_set(TCP_PORTS), "accept"]) _nft(["add", "rule", "inet", "wolf_gatekeeper", "input", "ip", "saddr", "@allowed_v4", "udp", "dport", *_port_set(UDP_PORTS), "accept"]) _nft(["add", "rule", "inet", "wolf_gatekeeper", "input", "ip6", "saddr", "@allowed_v6", "tcp", "dport", *_port_set(TCP_PORTS), "accept"]) _nft(["add", "rule", "inet", "wolf_gatekeeper", "input", "ip6", "saddr", "@allowed_v6", "udp", "dport", *_port_set(UDP_PORTS), "accept"]) _nft(["add", "rule", "inet", "wolf_gatekeeper", "input", "tcp", "dport", *_port_set(TCP_PORTS), "drop"]) _nft(["add", "rule", "inet", "wolf_gatekeeper", "input", "udp", "dport", *_port_set(UDP_PORTS), "drop"]) def _validate_ip(value): ip = ipaddress.ip_address(str(value or "").strip()) return str(ip), ip.version def _set_name(version): return "allowed_v4" if version == 4 else "allowed_v6" def _unlock(value, ttl_seconds): ip, version = _validate_ip(value) ttl = max(60, min(int(ttl_seconds or MAX_TTL_SECONDS), MAX_TTL_SECONDS)) set_name = _set_name(version) _nft(["delete", "element", "inet", "wolf_gatekeeper", set_name, "{", ip, "}"], check=False) _nft(["add", "element", "inet", "wolf_gatekeeper", set_name, "{", ip, "timeout", f"{ttl}s", "}"]) return {"ip": ip, "ttl_seconds": ttl} def _revoke(value): ip, version = _validate_ip(value) _nft(["delete", "element", "inet", "wolf_gatekeeper", _set_name(version), "{", ip, "}"], check=False) return {"ip": ip} def _entries(set_name, pattern): proc = _nft(["list", "set", "inet", "wolf_gatekeeper", set_name], check=False) if proc.returncode != 0: return [] values = [] for match in re.finditer(pattern, proc.stdout): value = match.group(0) if value not in values: values.append(value) return values def _status(): _ensure_rules() v4 = _entries("allowed_v4", r"\b(?:\d{1,3}\.){3}\d{1,3}\b") v6 = _entries("allowed_v6", r"\b[0-9a-fA-F:]{2,}\b") return {"success": True, "active_unlocks": [{"ip": ip} for ip in [*v4, *v6]], "tcp_ports": TCP_PORTS, "udp_ports": UDP_PORTS} class Handler(BaseHTTPRequestHandler): def log_message(self, _format, *args): return def _payload(self): length = int(self.headers.get("content-length") or "0") if not length: return {} try: data = json.loads(self.rfile.read(length).decode("utf-8")) return data if isinstance(data, dict) else {} except Exception: return {} def do_GET(self): try: if self.path == "/healthz": _ensure_rules() _json(self, 200, {"ok": True}) elif self.path == "/status": _json(self, 200, _status()) else: _json(self, 404, {"success": False, "error": "not found"}) except Exception as exc: _json(self, 500, {"success": False, "error": str(exc)}) def do_POST(self): try: payload = self._payload() if self.path == "/unlock": _ensure_rules() result = _unlock(payload.get("ip"), payload.get("ttl_seconds")) result.update({"success": True, "actor": payload.get("actor") or "", "target_user": payload.get("target_user") or ""}) _json(self, 200, result) elif self.path == "/revoke": _ensure_rules() result = _revoke(payload.get("ip")) result.update({"success": True}) _json(self, 200, result) else: _json(self, 404, {"success": False, "error": "not found"}) except Exception as exc: _json(self, 400, {"success": False, "error": str(exc)}) if __name__ == "__main__": _ensure_rules() ThreadingHTTPServer(LISTEN, Handler).serve_forever()