from __future__ import annotations import json import os import re import time from functools import wraps from pathlib import Path import secrets import string from typing import Any from urllib.error import URLError from urllib.parse import quote from urllib.parse import urlencode from urllib.request import urlopen from flask import Flask, g, jsonify, request, send_from_directory from flask_cors import CORS import httpx import jwt from jwt import PyJWKClient from werkzeug.middleware.proxy_fix import ProxyFix app = Flask(__name__, static_folder="../frontend/dist", static_url_path="") app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_port=1) CORS(app, resources={r"/api/*": {"origins": "*"}}) MONERO_GET_INFO_URL = os.getenv("MONERO_GET_INFO_URL", "http://monerod.crypto.svc.cluster.local:18081/get_info") VM_BASE_URL = os.getenv( "VM_BASE_URL", "http://victoria-metrics-single-server.monitoring.svc.cluster.local:8428", ).rstrip("/") VM_QUERY_TIMEOUT_SEC = float(os.getenv("VM_QUERY_TIMEOUT_SEC", "2")) HTTP_CHECK_TIMEOUT_SEC = float(os.getenv("HTTP_CHECK_TIMEOUT_SEC", "2")) LAB_STATUS_CACHE_SEC = float(os.getenv("LAB_STATUS_CACHE_SEC", "30")) GRAFANA_HEALTH_URL = os.getenv("GRAFANA_HEALTH_URL", "https://metrics.bstein.dev/api/health") OCEANUS_NODE_EXPORTER_URL = os.getenv("OCEANUS_NODE_EXPORTER_URL", "http://192.168.22.24:9100/metrics") AI_CHAT_API = os.getenv("AI_CHAT_API", "http://ollama.ai.svc.cluster.local:11434").rstrip("/") AI_CHAT_MODEL = os.getenv("AI_CHAT_MODEL", "qwen2.5-coder:7b-instruct-q4_0") AI_CHAT_SYSTEM_PROMPT = os.getenv( "AI_CHAT_SYSTEM_PROMPT", "You are the Titan Lab assistant for bstein.dev. Be concise and helpful.", ) AI_CHAT_TIMEOUT_SEC = float(os.getenv("AI_CHAT_TIMEOUT_SEC", "20")) AI_NODE_NAME = os.getenv("AI_CHAT_NODE_NAME") or os.getenv("AI_NODE_NAME") or "ai-cluster" AI_GPU_DESC = os.getenv("AI_CHAT_GPU_DESC") or "local GPU (dynamic)" AI_PUBLIC_ENDPOINT = os.getenv("AI_PUBLIC_CHAT_ENDPOINT", "https://chat.ai.bstein.dev/api/chat") AI_K8S_LABEL = os.getenv("AI_K8S_LABEL", "app=ollama") AI_K8S_NAMESPACE = os.getenv("AI_K8S_NAMESPACE", "ai") AI_MODEL_ANNOTATION = os.getenv("AI_MODEL_ANNOTATION", "ai.bstein.dev/model") AI_GPU_ANNOTATION = os.getenv("AI_GPU_ANNOTATION", "ai.bstein.dev/gpu") AI_WARM_INTERVAL_SEC = float(os.getenv("AI_WARM_INTERVAL_SEC", "300")) AI_WARM_ENABLED = os.getenv("AI_WARM_ENABLED", "true").lower() in ("1", "true", "yes") KEYCLOAK_ENABLED = os.getenv("KEYCLOAK_ENABLED", "false").lower() in ("1", "true", "yes") KEYCLOAK_URL = os.getenv("KEYCLOAK_URL", "https://sso.bstein.dev").rstrip("/") KEYCLOAK_REALM = os.getenv("KEYCLOAK_REALM", "atlas") KEYCLOAK_CLIENT_ID = os.getenv("KEYCLOAK_CLIENT_ID", "bstein-dev-home") KEYCLOAK_ISSUER = os.getenv("KEYCLOAK_ISSUER", f"{KEYCLOAK_URL}/realms/{KEYCLOAK_REALM}").rstrip("/") KEYCLOAK_JWKS_URL = os.getenv("KEYCLOAK_JWKS_URL", f"{KEYCLOAK_ISSUER}/protocol/openid-connect/certs").rstrip("/") KEYCLOAK_ADMIN_URL = os.getenv("KEYCLOAK_ADMIN_URL", KEYCLOAK_URL).rstrip("/") KEYCLOAK_ADMIN_CLIENT_ID = os.getenv("KEYCLOAK_ADMIN_CLIENT_ID", "") KEYCLOAK_ADMIN_CLIENT_SECRET = os.getenv("KEYCLOAK_ADMIN_CLIENT_SECRET", "") KEYCLOAK_ADMIN_REALM = os.getenv("KEYCLOAK_ADMIN_REALM", KEYCLOAK_REALM) ACCOUNT_ALLOWED_GROUPS = [ g.strip() for g in os.getenv("ACCOUNT_ALLOWED_GROUPS", "dev,admin").split(",") if g.strip() ] PORTAL_ADMIN_USERS = [u.strip() for u in os.getenv("PORTAL_ADMIN_USERS", "bstein").split(",") if u.strip()] PORTAL_ADMIN_GROUPS = [g.strip() for g in os.getenv("PORTAL_ADMIN_GROUPS", "admin").split(",") if g.strip()] ACCESS_REQUEST_ENABLED = os.getenv("ACCESS_REQUEST_ENABLED", "true").lower() in ("1", "true", "yes") ACCESS_REQUEST_RATE_LIMIT = int(os.getenv("ACCESS_REQUEST_RATE_LIMIT", "5")) ACCESS_REQUEST_RATE_WINDOW_SEC = int(os.getenv("ACCESS_REQUEST_RATE_WINDOW_SEC", str(60 * 60))) MAILU_DOMAIN = os.getenv("MAILU_DOMAIN", "bstein.dev") MAILU_SYNC_URL = os.getenv( "MAILU_SYNC_URL", "http://mailu-sync-listener.mailu-mailserver.svc.cluster.local:8080/events", ).rstrip("/") JELLYFIN_SYNC_URL = os.getenv("JELLYFIN_SYNC_URL", "").rstrip("/") _KEYCLOAK_JWK_CLIENT: PyJWKClient | None = None _KEYCLOAK_ADMIN_TOKEN: dict[str, Any] = {"token": "", "expires_at": 0.0} _KEYCLOAK_GROUP_ID_CACHE: dict[str, str] = {} _ACCESS_REQUEST_RATE: dict[str, list[float]] = {} _LAB_STATUS_CACHE: dict[str, Any] = {"ts": 0.0, "value": None} @app.route("/api/healthz") def healthz() -> Any: return jsonify({"ok": True}) def _normalize_groups(groups: Any) -> list[str]: if not isinstance(groups, list): return [] cleaned: list[str] = [] for gname in groups: if not isinstance(gname, str): continue cleaned.append(gname.lstrip("/")) return [gname for gname in cleaned if gname] def _extract_bearer_token() -> str | None: header = request.headers.get("Authorization", "") if not header: return None parts = header.split(None, 1) if len(parts) != 2: return None scheme, token = parts[0].lower(), parts[1].strip() if scheme != "bearer" or not token: return None return token def _get_jwk_client() -> PyJWKClient: global _KEYCLOAK_JWK_CLIENT if _KEYCLOAK_JWK_CLIENT is None: _KEYCLOAK_JWK_CLIENT = PyJWKClient(KEYCLOAK_JWKS_URL) return _KEYCLOAK_JWK_CLIENT def _verify_keycloak_token(token: str) -> dict[str, Any]: if not KEYCLOAK_ENABLED: raise ValueError("keycloak not enabled") signing_key = _get_jwk_client().get_signing_key_from_jwt(token).key claims = jwt.decode( token, signing_key, algorithms=["RS256"], options={"verify_aud": False}, issuer=KEYCLOAK_ISSUER, ) # Ensure this token was minted for our frontend client (defense-in-depth). azp = claims.get("azp") aud = claims.get("aud") aud_list: list[str] = [] if isinstance(aud, str): aud_list = [aud] elif isinstance(aud, list): aud_list = [a for a in aud if isinstance(a, str)] if azp != KEYCLOAK_CLIENT_ID and KEYCLOAK_CLIENT_ID not in aud_list: raise ValueError("token not issued for this client") return claims def require_auth(fn): @wraps(fn) def wrapper(*args, **kwargs): token = _extract_bearer_token() if not token: return jsonify({"error": "missing bearer token"}), 401 try: claims = _verify_keycloak_token(token) except Exception: return jsonify({"error": "invalid token"}), 401 g.keycloak_claims = claims g.keycloak_username = claims.get("preferred_username") or "" g.keycloak_email = claims.get("email") or "" g.keycloak_groups = _normalize_groups(claims.get("groups")) return fn(*args, **kwargs) return wrapper def _require_portal_admin() -> tuple[bool, Any]: if not KEYCLOAK_ENABLED: return False, (jsonify({"error": "keycloak not enabled"}), 503) username = getattr(g, "keycloak_username", "") or "" groups = set(getattr(g, "keycloak_groups", []) or []) if username and username in PORTAL_ADMIN_USERS: return True, None if PORTAL_ADMIN_GROUPS and groups.intersection(PORTAL_ADMIN_GROUPS): return True, None return False, (jsonify({"error": "forbidden"}), 403) @app.route("/api/auth/config", methods=["GET"]) def auth_config() -> Any: if not KEYCLOAK_ENABLED: return jsonify({"enabled": False}) issuer = KEYCLOAK_ISSUER public_origin = request.host_url.rstrip("/") redirect_uri = quote(f"{public_origin}/", safe="") login_url = ( f"{issuer}/protocol/openid-connect/auth" f"?client_id={quote(KEYCLOAK_CLIENT_ID, safe='')}" f"&redirect_uri={redirect_uri}" f"&response_type=code" f"&scope=openid" ) return jsonify( { "enabled": True, "url": KEYCLOAK_URL, "realm": KEYCLOAK_REALM, "client_id": KEYCLOAK_CLIENT_ID, "login_url": login_url, "reset_url": login_url, } ) def _require_account_access() -> tuple[bool, Any]: if not KEYCLOAK_ENABLED: return False, (jsonify({"error": "keycloak not enabled"}), 503) if not ACCOUNT_ALLOWED_GROUPS: return True, None groups = set(getattr(g, "keycloak_groups", []) or []) if groups.intersection(ACCOUNT_ALLOWED_GROUPS): return True, None return False, (jsonify({"error": "forbidden"}), 403) def _kc_admin_ready() -> bool: return bool(KEYCLOAK_ADMIN_CLIENT_ID and KEYCLOAK_ADMIN_CLIENT_SECRET) def _kc_admin_get_token() -> str: if not _kc_admin_ready(): raise RuntimeError("keycloak admin client not configured") now = time.time() cached = _KEYCLOAK_ADMIN_TOKEN.get("token") expires_at = float(_KEYCLOAK_ADMIN_TOKEN.get("expires_at") or 0.0) if cached and now < expires_at - 30: return str(cached) token_url = f"{KEYCLOAK_ADMIN_URL}/realms/{KEYCLOAK_ADMIN_REALM}/protocol/openid-connect/token" data = { "grant_type": "client_credentials", "client_id": KEYCLOAK_ADMIN_CLIENT_ID, "client_secret": KEYCLOAK_ADMIN_CLIENT_SECRET, } with httpx.Client(timeout=HTTP_CHECK_TIMEOUT_SEC) as client: resp = client.post(token_url, data=data) resp.raise_for_status() payload = resp.json() token = payload.get("access_token") or "" if not token: raise RuntimeError("no access_token in response") expires_in = int(payload.get("expires_in") or 60) _KEYCLOAK_ADMIN_TOKEN["token"] = token _KEYCLOAK_ADMIN_TOKEN["expires_at"] = now + expires_in return token def _kc_admin_headers() -> dict[str, str]: return {"Authorization": f"Bearer {_kc_admin_get_token()}"} def _kc_admin_find_user(username: str) -> dict[str, Any] | None: url = f"{KEYCLOAK_ADMIN_URL}/admin/realms/{KEYCLOAK_REALM}/users" params = {"username": username, "exact": "true"} with httpx.Client(timeout=HTTP_CHECK_TIMEOUT_SEC) as client: resp = client.get(url, params=params, headers=_kc_admin_headers()) resp.raise_for_status() users = resp.json() if not isinstance(users, list) or not users: return None # Keycloak returns a list even with exact=true; use first match. user = users[0] return user if isinstance(user, dict) else None def _kc_admin_get_user(user_id: str) -> dict[str, Any]: url = f"{KEYCLOAK_ADMIN_URL}/admin/realms/{KEYCLOAK_REALM}/users/{quote(user_id, safe='')}" with httpx.Client(timeout=HTTP_CHECK_TIMEOUT_SEC) as client: resp = client.get(url, headers=_kc_admin_headers()) resp.raise_for_status() data = resp.json() if not isinstance(data, dict): raise RuntimeError("unexpected user payload") return data def _kc_admin_update_user(user_id: str, payload: dict[str, Any]) -> None: url = f"{KEYCLOAK_ADMIN_URL}/admin/realms/{KEYCLOAK_REALM}/users/{quote(user_id, safe='')}" with httpx.Client(timeout=HTTP_CHECK_TIMEOUT_SEC) as client: resp = client.put(url, headers={**_kc_admin_headers(), "Content-Type": "application/json"}, json=payload) resp.raise_for_status() def _kc_set_user_attribute(username: str, key: str, value: str) -> None: user = _kc_admin_find_user(username) if not user: raise RuntimeError("user not found") user_id = user.get("id") or "" if not user_id: raise RuntimeError("user id missing") full = _kc_admin_get_user(user_id) attrs = full.get("attributes") or {} if not isinstance(attrs, dict): attrs = {} attrs[key] = [value] full["attributes"] = attrs _kc_admin_update_user(user_id, full) def _random_password(length: int = 32) -> str: alphabet = string.ascii_letters + string.digits return "".join(secrets.choice(alphabet) for _ in range(length)) def _best_effort_post(url: str) -> None: if not url: return try: with httpx.Client(timeout=HTTP_CHECK_TIMEOUT_SEC) as client: client.post(url, json={"ts": int(time.time())}) except Exception: return @app.route("/api/account/overview", methods=["GET"]) @require_auth def account_overview() -> Any: ok, resp = _require_account_access() if not ok: return resp username = g.keycloak_username mailu_username = f"{username}@{MAILU_DOMAIN}" if username else "" mailu_status = "ready" jellyfin_status = "ready" if not _kc_admin_ready(): mailu_status = "server not configured" jellyfin_status = "server not configured" return jsonify( { "user": {"username": username, "email": g.keycloak_email, "groups": g.keycloak_groups}, "mailu": {"status": mailu_status, "username": mailu_username}, "jellyfin": {"status": jellyfin_status, "username": username}, } ) def _rate_limit_allow(ip: str) -> bool: if ACCESS_REQUEST_RATE_LIMIT <= 0: return True now = time.time() window_start = now - ACCESS_REQUEST_RATE_WINDOW_SEC bucket = _ACCESS_REQUEST_RATE.setdefault(ip, []) bucket[:] = [t for t in bucket if t >= window_start] if len(bucket) >= ACCESS_REQUEST_RATE_LIMIT: return False bucket.append(now) return True def _kc_admin_create_user(payload: dict[str, Any]) -> str: url = f"{KEYCLOAK_ADMIN_URL}/admin/realms/{KEYCLOAK_REALM}/users" with httpx.Client(timeout=HTTP_CHECK_TIMEOUT_SEC) as client: resp = client.post(url, headers={**_kc_admin_headers(), "Content-Type": "application/json"}, json=payload) resp.raise_for_status() location = resp.headers.get("Location") or "" if location: return location.rstrip("/").split("/")[-1] raise RuntimeError("failed to determine created user id") def _kc_admin_delete_user(user_id: str) -> None: url = f"{KEYCLOAK_ADMIN_URL}/admin/realms/{KEYCLOAK_REALM}/users/{quote(user_id, safe='')}" with httpx.Client(timeout=HTTP_CHECK_TIMEOUT_SEC) as client: resp = client.delete(url, headers=_kc_admin_headers()) resp.raise_for_status() def _kc_admin_get_group_id(group_name: str) -> str | None: cached = _KEYCLOAK_GROUP_ID_CACHE.get(group_name) if cached: return cached url = f"{KEYCLOAK_ADMIN_URL}/admin/realms/{KEYCLOAK_REALM}/groups" params = {"search": group_name} with httpx.Client(timeout=HTTP_CHECK_TIMEOUT_SEC) as client: resp = client.get(url, params=params, headers=_kc_admin_headers()) resp.raise_for_status() items = resp.json() if not isinstance(items, list): return None for item in items: if not isinstance(item, dict): continue if item.get("name") == group_name and item.get("id"): gid = str(item["id"]) _KEYCLOAK_GROUP_ID_CACHE[group_name] = gid return gid return None def _kc_admin_add_user_to_group(user_id: str, group_id: str) -> None: url = f"{KEYCLOAK_ADMIN_URL}/admin/realms/{KEYCLOAK_REALM}/users/{quote(user_id, safe='')}/groups/{quote(group_id, safe='')}" with httpx.Client(timeout=HTTP_CHECK_TIMEOUT_SEC) as client: resp = client.put(url, headers=_kc_admin_headers()) resp.raise_for_status() def _kc_admin_execute_actions_email(user_id: str, actions: list[str]) -> None: url = f"{KEYCLOAK_ADMIN_URL}/admin/realms/{KEYCLOAK_REALM}/users/{quote(user_id, safe='')}/execute-actions-email" params = {"client_id": KEYCLOAK_CLIENT_ID, "redirect_uri": request.host_url.rstrip("/") + "/"} with httpx.Client(timeout=HTTP_CHECK_TIMEOUT_SEC) as client: resp = client.put( url, params=params, headers={**_kc_admin_headers(), "Content-Type": "application/json"}, json=actions, ) resp.raise_for_status() def _extract_request_payload() -> tuple[str, str, str]: payload = request.get_json(silent=True) or {} username = (payload.get("username") or "").strip() email = (payload.get("email") or "").strip() note = (payload.get("note") or "").strip() return username, email, note @app.route("/api/access/request", methods=["POST"]) def request_access() -> Any: if not ACCESS_REQUEST_ENABLED: return jsonify({"error": "request access disabled"}), 503 if not _kc_admin_ready(): return jsonify({"error": "server not configured"}), 503 ip = request.remote_addr or "unknown" if not _rate_limit_allow(ip): return jsonify({"error": "rate limited"}), 429 username, email, note = _extract_request_payload() if not username or not email: return jsonify({"error": "username and email are required"}), 400 if len(username) < 3 or len(username) > 32: return jsonify({"error": "username must be 3-32 characters"}), 400 if not re.fullmatch(r"[a-zA-Z0-9._-]+", username): return jsonify({"error": "username contains invalid characters"}), 400 if _kc_admin_find_user(username): return jsonify({"error": "username already exists"}), 409 attrs: dict[str, Any] = { "access_request_status": ["pending"], "access_request_note": [note] if note else [""], "access_request_created_at": [str(int(time.time()))], } user_payload = { "username": username, "enabled": False, "email": email, "emailVerified": False, "attributes": attrs, } try: user_id = _kc_admin_create_user(user_payload) except Exception: return jsonify({"error": "failed to submit request"}), 502 return jsonify({"ok": True, "id": user_id}) def _kc_admin_list_pending_requests(limit: int = 100) -> list[dict[str, Any]]: def is_pending(user: dict[str, Any]) -> bool: attrs = user.get("attributes") or {} if not isinstance(attrs, dict): return False status = attrs.get("access_request_status") if isinstance(status, list) and status: return str(status[0]) == "pending" if isinstance(status, str): return status == "pending" return False url = f"{KEYCLOAK_ADMIN_URL}/admin/realms/{KEYCLOAK_REALM}/users" # Prefer server-side search (q=) if supported by this Keycloak version. # Always filter client-side for correctness. candidates: list[dict[str, Any]] = [] for params in ( {"max": str(limit), "enabled": "false", "q": "access_request_status:pending"}, {"max": str(limit), "enabled": "false"}, {"max": str(limit)}, ): try: with httpx.Client(timeout=HTTP_CHECK_TIMEOUT_SEC) as client: resp = client.get(url, params=params, headers=_kc_admin_headers()) resp.raise_for_status() users = resp.json() if not isinstance(users, list): continue candidates = [u for u in users if isinstance(u, dict)] break except httpx.HTTPStatusError: continue pending = [u for u in candidates if is_pending(u)] return pending[:limit] @app.route("/api/admin/access/requests", methods=["GET"]) @require_auth def admin_list_requests() -> Any: ok, resp = _require_portal_admin() if not ok: return resp if not _kc_admin_ready(): return jsonify({"error": "server not configured"}), 503 try: items = _kc_admin_list_pending_requests() except Exception: return jsonify({"error": "failed to load requests"}), 502 output: list[dict[str, Any]] = [] for user in items: attrs = user.get("attributes") or {} if not isinstance(attrs, dict): attrs = {} output.append( { "id": user.get("id") or "", "username": user.get("username") or "", "email": user.get("email") or "", "created_at": (attrs.get("access_request_created_at") or [""])[0], "note": (attrs.get("access_request_note") or [""])[0], } ) return jsonify({"requests": output}) @app.route("/api/admin/access/requests//approve", methods=["POST"]) @require_auth def admin_approve_request(username: str) -> Any: ok, resp = _require_portal_admin() if not ok: return resp if not _kc_admin_ready(): return jsonify({"error": "server not configured"}), 503 user = _kc_admin_find_user(username) if not user: return jsonify({"error": "user not found"}), 404 user_id = user.get("id") or "" if not user_id: return jsonify({"error": "user id missing"}), 502 full = _kc_admin_get_user(user_id) full["enabled"] = True try: _kc_admin_update_user(user_id, full) except Exception: return jsonify({"error": "failed to enable user"}), 502 group_id = _kc_admin_get_group_id("dev") if group_id: try: _kc_admin_add_user_to_group(user_id, group_id) except Exception: pass try: _kc_admin_execute_actions_email(user_id, ["UPDATE_PASSWORD"]) except Exception: pass return jsonify({"ok": True}) @app.route("/api/admin/access/requests//deny", methods=["POST"]) @require_auth def admin_deny_request(username: str) -> Any: ok, resp = _require_portal_admin() if not ok: return resp if not _kc_admin_ready(): return jsonify({"error": "server not configured"}), 503 user = _kc_admin_find_user(username) if not user: return jsonify({"ok": True}) user_id = user.get("id") or "" if not user_id: return jsonify({"error": "user id missing"}), 502 try: _kc_admin_delete_user(user_id) except Exception: return jsonify({"error": "failed to delete user"}), 502 return jsonify({"ok": True}) @app.route("/api/account/mailu/rotate", methods=["POST"]) @require_auth def account_mailu_rotate() -> Any: ok, resp = _require_account_access() if not ok: return resp if not _kc_admin_ready(): return jsonify({"error": "server not configured"}), 503 username = g.keycloak_username if not username: return jsonify({"error": "missing username"}), 400 password = _random_password() try: _kc_set_user_attribute(username, "mailu_app_password", password) except Exception: return jsonify({"error": "failed to update mail password"}), 502 _best_effort_post(MAILU_SYNC_URL) return jsonify({"password": password}) @app.route("/api/account/jellyfin/rotate", methods=["POST"]) @require_auth def account_jellyfin_rotate() -> Any: ok, resp = _require_account_access() if not ok: return resp if not _kc_admin_ready(): return jsonify({"error": "server not configured"}), 503 username = g.keycloak_username if not username: return jsonify({"error": "missing username"}), 400 password = _random_password() try: _kc_set_user_attribute(username, "jellyfin_app_password", password) except Exception: return jsonify({"error": "failed to update jellyfin password"}), 502 _best_effort_post(JELLYFIN_SYNC_URL) return jsonify({"password": password}) @app.route("/api/monero/get_info") def monero_get_info() -> Any: try: with urlopen(MONERO_GET_INFO_URL, timeout=2) as resp: payload = json.loads(resp.read().decode("utf-8")) return jsonify(payload) except (URLError, TimeoutError, ValueError) as exc: return jsonify({"error": str(exc), "url": MONERO_GET_INFO_URL}), 503 def _vm_query(expr: str) -> float | None: url = f"{VM_BASE_URL}/api/v1/query?{urlencode({'query': expr})}" with urlopen(url, timeout=VM_QUERY_TIMEOUT_SEC) as resp: payload = json.loads(resp.read().decode("utf-8")) if payload.get("status") != "success": return None result = (payload.get("data") or {}).get("result") or [] if not result: return None values: list[float] = [] for item in result: try: values.append(float(item["value"][1])) except (KeyError, IndexError, TypeError, ValueError): continue if not values: return None return max(values) def _http_ok(url: str, expect_substring: str | None = None) -> bool: try: with urlopen(url, timeout=HTTP_CHECK_TIMEOUT_SEC) as resp: if getattr(resp, "status", 200) != 200: return False if expect_substring: chunk = resp.read(4096).decode("utf-8", errors="ignore") return expect_substring in chunk return True except (URLError, TimeoutError, ValueError): return False @app.route("/api/lab/status") def lab_status() -> Any: now = time.time() cached = _LAB_STATUS_CACHE.get("value") if cached and (now - float(_LAB_STATUS_CACHE.get("ts", 0.0)) < LAB_STATUS_CACHE_SEC): return jsonify(cached) connected = False atlas_up = False atlas_known = False atlas_source = "unknown" oceanus_up = False oceanus_known = False oceanus_source = "unknown" try: atlas_value = _vm_query('max(up{job="kubernetes-apiservers"})') oceanus_value = _vm_query('max(up{instance=~"(titan-23|192.168.22.24)(:9100)?"})') connected = True atlas_known = atlas_value is not None atlas_up = bool(atlas_value and atlas_value > 0.5) atlas_source = "victoria-metrics" oceanus_known = oceanus_value is not None oceanus_up = bool(oceanus_value and oceanus_value > 0.5) oceanus_source = "victoria-metrics" except (URLError, TimeoutError, ValueError): atlas_value = None oceanus_value = None if not atlas_known: if _http_ok(GRAFANA_HEALTH_URL): connected = True atlas_known = True atlas_up = True atlas_source = "grafana-health" if not oceanus_up: if _http_ok(OCEANUS_NODE_EXPORTER_URL, expect_substring="node_exporter_build_info"): connected = True oceanus_known = True oceanus_up = True oceanus_source = "node-exporter" payload = { "connected": connected, "atlas": {"up": atlas_up, "known": atlas_known, "source": atlas_source}, "oceanus": {"up": oceanus_up, "known": oceanus_known, "source": oceanus_source}, "checked_at": int(now), } _LAB_STATUS_CACHE["ts"] = now _LAB_STATUS_CACHE["value"] = payload return jsonify(payload) @app.route("/api/chat", methods=["POST"]) @app.route("/api/ai/chat", methods=["POST"]) def ai_chat() -> Any: payload = request.get_json(silent=True) or {} user_message = (payload.get("message") or "").strip() history = payload.get("history") or [] if not user_message: return jsonify({"error": "message required"}), 400 messages: list[dict[str, str]] = [] if AI_CHAT_SYSTEM_PROMPT: messages.append({"role": "system", "content": AI_CHAT_SYSTEM_PROMPT}) for item in history: role = item.get("role") content = (item.get("content") or "").strip() if role in ("user", "assistant") and content: messages.append({"role": role, "content": content}) messages.append({"role": "user", "content": user_message}) body = {"model": AI_CHAT_MODEL, "messages": messages, "stream": False} started = time.time() try: with httpx.Client(timeout=AI_CHAT_TIMEOUT_SEC) as client: resp = client.post(f"{AI_CHAT_API}/api/chat", json=body) resp.raise_for_status() data = resp.json() reply = (data.get("message") or {}).get("content") or "" elapsed_ms = int((time.time() - started) * 1000) return jsonify({"reply": reply, "latency_ms": elapsed_ms}) except (httpx.RequestError, httpx.HTTPStatusError, ValueError) as exc: return jsonify({"error": str(exc)}), 502 @app.route("/api/chat/info", methods=["GET"]) @app.route("/api/ai/info", methods=["GET"]) def ai_info() -> Any: meta = _discover_ai_meta() return jsonify(meta) def _discover_ai_meta() -> dict[str, str]: """ Best-effort discovery of which node/gpu is hosting the AI service. Tries the Kubernetes API using the service account if available; falls back to env. """ meta = { "node": AI_NODE_NAME, "gpu": AI_GPU_DESC, "model": AI_CHAT_MODEL, "endpoint": AI_PUBLIC_ENDPOINT or "/api/chat", } # Only attempt k8s if we're in-cluster and credentials exist. sa_path = Path("/var/run/secrets/kubernetes.io/serviceaccount") token_path = sa_path / "token" ca_path = sa_path / "ca.crt" ns_path = sa_path / "namespace" if not token_path.exists() or not ca_path.exists() or not ns_path.exists(): return meta try: token = token_path.read_text().strip() namespace = AI_K8S_NAMESPACE base_url = "https://kubernetes.default.svc" pod_url = f"{base_url}/api/v1/namespaces/{namespace}/pods?labelSelector={AI_K8S_LABEL}" with httpx.Client(verify=str(ca_path), timeout=HTTP_CHECK_TIMEOUT_SEC, headers={"Authorization": f"Bearer {token}"}) as client: resp = client.get(pod_url) resp.raise_for_status() data = resp.json() items = data.get("items") or [] running = [p for p in items if p.get("status", {}).get("phase") == "Running"] or items if running: pod = running[0] node_name = pod.get("spec", {}).get("nodeName") or meta["node"] meta["node"] = node_name annotations = pod.get("metadata", {}).get("annotations") or {} gpu_hint = ( annotations.get(AI_GPU_ANNOTATION) or annotations.get("ai.gpu/description") or annotations.get("gpu/description") ) if gpu_hint: meta["gpu"] = gpu_hint model_hint = annotations.get(AI_MODEL_ANNOTATION) if not model_hint: # Try to infer from container image tag. containers = pod.get("spec", {}).get("containers") or [] if containers: image = containers[0].get("image") or "" model_hint = image.split(":")[-1] if ":" in image else image if model_hint: meta["model"] = model_hint except Exception: # swallow errors; keep fallbacks pass return meta def _keep_warm() -> None: """Periodically ping the model to keep it warm.""" if not AI_WARM_ENABLED or AI_WARM_INTERVAL_SEC <= 0: return def loop() -> None: while True: time.sleep(AI_WARM_INTERVAL_SEC) try: body = { "model": AI_CHAT_MODEL, "messages": [{"role": "user", "content": "ping"}], "stream": False, } with httpx.Client(timeout=min(AI_CHAT_TIMEOUT_SEC, 15)) as client: client.post(f"{AI_CHAT_API}/api/chat", json=body) except Exception: # best-effort; ignore failures continue import threading threading.Thread(target=loop, daemon=True, name="ai-keep-warm").start() # Start keep-warm loop on import. _keep_warm() @app.route("/", defaults={"path": ""}) @app.route("/") def serve_frontend(path: str) -> Any: dist_path = Path(app.static_folder) index_path = dist_path / "index.html" if dist_path.exists() and index_path.exists(): target = dist_path / path if path and target.exists(): return send_from_directory(app.static_folder, path) return send_from_directory(app.static_folder, "index.html") return jsonify( { "message": "Frontend not built yet. Run `npm install && npm run build` inside frontend/, then restart Flask.", "available_endpoints": ["/api/healthz", "/api/monero/get_info"], } ) if __name__ == "__main__": app.run(host="0.0.0.0", port=5000, debug=True)