# services/game-stream/wolf-api-proxy-configmap.yaml apiVersion: v1 kind: ConfigMap metadata: name: wolf-api-proxy namespace: game-stream data: wolf_api_proxy.py: | #!/usr/bin/env python3 from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer import json import os import socket SOCKET_PATH = os.environ.get("WOLF_SOCKET_PATH", "/run/user/wolf/wolf.sock") LISTEN = ("0.0.0.0", int(os.environ.get("WOLF_API_PROXY_PORT", "8088"))) def _response(handler, status, payload): body = json.dumps(payload).encode("utf-8") handler.send_response(status) handler.send_header("content-type", "application/json") handler.send_header("content-length", str(len(body))) handler.end_headers() handler.wfile.write(body) def _forward(method, path, body): if not path.startswith("/api/v1/"): return 404, {"success": False, "error": "unsupported path"} if method not in {"GET", "POST"}: return 405, {"success": False, "error": "unsupported method"} if not os.path.exists(SOCKET_PATH): return 503, {"success": False, "error": "wolf socket is not ready"} payload = body or b"" request = [ f"{method} {path} HTTP/1.1", "Host: wolf.local", "Connection: close", f"Content-Length: {len(payload)}", ] if payload: request.append("Content-Type: application/json") request_bytes = ("\r\n".join(request) + "\r\n\r\n").encode("utf-8") + payload with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock: sock.settimeout(5) sock.connect(SOCKET_PATH) sock.sendall(request_bytes) chunks = [] while True: chunk = sock.recv(65536) if not chunk: break chunks.append(chunk) raw = b"".join(chunks) header_bytes, _, body_bytes = raw.partition(b"\r\n\r\n") status_line = header_bytes.splitlines()[0].decode("utf-8", "replace") if header_bytes else "" try: status = int(status_line.split()[1]) except Exception: status = 502 try: return status, json.loads(body_bytes.decode("utf-8") or "{}") except Exception: return status, {"success": False, "error": body_bytes.decode("utf-8", "replace")} class Handler(BaseHTTPRequestHandler): def log_message(self, _format, *args): return def do_GET(self): status, payload = _forward("GET", self.path, b"") _response(self, status, payload) def do_POST(self): length = int(self.headers.get("content-length") or "0") body = self.rfile.read(length) if length else b"" status, payload = _forward("POST", self.path, body) _response(self, status, payload) if __name__ == "__main__": ThreadingHTTPServer(LISTEN, Handler).serve_forever()