from __future__ import annotations import json import os import time from pathlib import Path from typing import Any from urllib.error import URLError from urllib.parse import urlencode from urllib.request import urlopen from flask import Flask, jsonify, request, send_from_directory from flask_cors import CORS import httpx app = Flask(__name__, static_folder="../frontend/dist", static_url_path="") CORS(app, resources={r"/api/*": {"origins": "*"}}) MONERO_GET_INFO_URL = os.getenv("MONERO_GET_INFO_URL", "http://monerod.crypto.svc.cluster.local:18081/get_info") VM_BASE_URL = os.getenv( "VM_BASE_URL", "http://victoria-metrics-single-server.monitoring.svc.cluster.local:8428", ).rstrip("/") VM_QUERY_TIMEOUT_SEC = float(os.getenv("VM_QUERY_TIMEOUT_SEC", "2")) HTTP_CHECK_TIMEOUT_SEC = float(os.getenv("HTTP_CHECK_TIMEOUT_SEC", "2")) LAB_STATUS_CACHE_SEC = float(os.getenv("LAB_STATUS_CACHE_SEC", "30")) GRAFANA_HEALTH_URL = os.getenv("GRAFANA_HEALTH_URL", "https://metrics.bstein.dev/api/health") OCEANUS_NODE_EXPORTER_URL = os.getenv("OCEANUS_NODE_EXPORTER_URL", "http://192.168.22.24:9100/metrics") AI_CHAT_API = os.getenv("AI_CHAT_API", "http://ollama.ai.svc.cluster.local:11434").rstrip("/") AI_CHAT_MODEL = os.getenv("AI_CHAT_MODEL", "qwen2.5-coder:7b-instruct-q4_0") AI_CHAT_SYSTEM_PROMPT = os.getenv( "AI_CHAT_SYSTEM_PROMPT", "You are the Titan Lab assistant for bstein.dev. Be concise and helpful.", ) AI_CHAT_TIMEOUT_SEC = float(os.getenv("AI_CHAT_TIMEOUT_SEC", "20")) AI_NODE_NAME = os.getenv("AI_CHAT_NODE_NAME") or os.getenv("AI_NODE_NAME") or "ai-cluster" AI_GPU_DESC = os.getenv("AI_CHAT_GPU_DESC") or "local GPU (dynamic)" AI_PUBLIC_ENDPOINT = os.getenv("AI_PUBLIC_CHAT_ENDPOINT", "https://chat.ai.bstein.dev/api/ai/chat") AI_K8S_LABEL = os.getenv("AI_K8S_LABEL", "app=ollama") AI_K8S_NAMESPACE = os.getenv("AI_K8S_NAMESPACE", "ai") AI_MODEL_ANNOTATION = os.getenv("AI_MODEL_ANNOTATION", "ai.bstein.dev/model") AI_GPU_ANNOTATION = os.getenv("AI_GPU_ANNOTATION", "ai.bstein.dev/gpu") _LAB_STATUS_CACHE: dict[str, Any] = {"ts": 0.0, "value": None} @app.route("/api/healthz") def healthz() -> Any: return jsonify({"ok": True}) @app.route("/api/monero/get_info") def monero_get_info() -> Any: try: with urlopen(MONERO_GET_INFO_URL, timeout=2) as resp: payload = json.loads(resp.read().decode("utf-8")) return jsonify(payload) except (URLError, TimeoutError, ValueError) as exc: return jsonify({"error": str(exc), "url": MONERO_GET_INFO_URL}), 503 def _vm_query(expr: str) -> float | None: url = f"{VM_BASE_URL}/api/v1/query?{urlencode({'query': expr})}" with urlopen(url, timeout=VM_QUERY_TIMEOUT_SEC) as resp: payload = json.loads(resp.read().decode("utf-8")) if payload.get("status") != "success": return None result = (payload.get("data") or {}).get("result") or [] if not result: return None values: list[float] = [] for item in result: try: values.append(float(item["value"][1])) except (KeyError, IndexError, TypeError, ValueError): continue if not values: return None return max(values) def _http_ok(url: str, expect_substring: str | None = None) -> bool: try: with urlopen(url, timeout=HTTP_CHECK_TIMEOUT_SEC) as resp: if getattr(resp, "status", 200) != 200: return False if expect_substring: chunk = resp.read(4096).decode("utf-8", errors="ignore") return expect_substring in chunk return True except (URLError, TimeoutError, ValueError): return False @app.route("/api/lab/status") def lab_status() -> Any: now = time.time() cached = _LAB_STATUS_CACHE.get("value") if cached and (now - float(_LAB_STATUS_CACHE.get("ts", 0.0)) < LAB_STATUS_CACHE_SEC): return jsonify(cached) connected = False atlas_up = False atlas_known = False atlas_source = "unknown" oceanus_up = False oceanus_known = False oceanus_source = "unknown" try: atlas_value = _vm_query('max(up{job="kubernetes-apiservers"})') oceanus_value = _vm_query('max(up{instance=~"(titan-23|192.168.22.24)(:9100)?"})') connected = True atlas_known = atlas_value is not None atlas_up = bool(atlas_value and atlas_value > 0.5) atlas_source = "victoria-metrics" oceanus_known = oceanus_value is not None oceanus_up = bool(oceanus_value and oceanus_value > 0.5) oceanus_source = "victoria-metrics" except (URLError, TimeoutError, ValueError): atlas_value = None oceanus_value = None if not atlas_known: if _http_ok(GRAFANA_HEALTH_URL): connected = True atlas_known = True atlas_up = True atlas_source = "grafana-health" if not oceanus_up: if _http_ok(OCEANUS_NODE_EXPORTER_URL, expect_substring="node_exporter_build_info"): connected = True oceanus_known = True oceanus_up = True oceanus_source = "node-exporter" payload = { "connected": connected, "atlas": {"up": atlas_up, "known": atlas_known, "source": atlas_source}, "oceanus": {"up": oceanus_up, "known": oceanus_known, "source": oceanus_source}, "checked_at": int(now), } _LAB_STATUS_CACHE["ts"] = now _LAB_STATUS_CACHE["value"] = payload return jsonify(payload) @app.route("/api/ai/chat", methods=["POST"]) def ai_chat() -> Any: payload = request.get_json(silent=True) or {} user_message = (payload.get("message") or "").strip() history = payload.get("history") or [] if not user_message: return jsonify({"error": "message required"}), 400 messages: list[dict[str, str]] = [] if AI_CHAT_SYSTEM_PROMPT: messages.append({"role": "system", "content": AI_CHAT_SYSTEM_PROMPT}) for item in history: role = item.get("role") content = (item.get("content") or "").strip() if role in ("user", "assistant") and content: messages.append({"role": role, "content": content}) messages.append({"role": "user", "content": user_message}) body = {"model": AI_CHAT_MODEL, "messages": messages, "stream": False} started = time.time() try: with httpx.Client(timeout=AI_CHAT_TIMEOUT_SEC) as client: resp = client.post(f"{AI_CHAT_API}/api/chat", json=body) resp.raise_for_status() data = resp.json() reply = (data.get("message") or {}).get("content") or "" elapsed_ms = int((time.time() - started) * 1000) return jsonify({"reply": reply, "latency_ms": elapsed_ms}) except (httpx.RequestError, httpx.HTTPStatusError, ValueError) as exc: return jsonify({"error": str(exc)}), 502 @app.route("/api/ai/info", methods=["GET"]) def ai_info() -> Any: meta = _discover_ai_meta() return jsonify(meta) def _discover_ai_meta() -> dict[str, str]: """ Best-effort discovery of which node/gpu is hosting the AI service. Tries the Kubernetes API using the service account if available; falls back to env. """ meta = { "node": AI_NODE_NAME, "gpu": AI_GPU_DESC, "model": AI_CHAT_MODEL, "endpoint": AI_PUBLIC_ENDPOINT or "/api/ai/chat", } # Only attempt k8s if we're in-cluster and credentials exist. sa_path = Path("/var/run/secrets/kubernetes.io/serviceaccount") token_path = sa_path / "token" ca_path = sa_path / "ca.crt" ns_path = sa_path / "namespace" if not token_path.exists() or not ca_path.exists() or not ns_path.exists(): return meta try: token = token_path.read_text().strip() namespace = AI_K8S_NAMESPACE base_url = "https://kubernetes.default.svc" pod_url = f"{base_url}/api/v1/namespaces/{namespace}/pods?labelSelector={AI_K8S_LABEL}" with httpx.Client(verify=str(ca_path), timeout=HTTP_CHECK_TIMEOUT_SEC, headers={"Authorization": f"Bearer {token}"}) as client: resp = client.get(pod_url) resp.raise_for_status() data = resp.json() items = data.get("items") or [] running = [p for p in items if p.get("status", {}).get("phase") == "Running"] or items if running: pod = running[0] node_name = pod.get("spec", {}).get("nodeName") or meta["node"] meta["node"] = node_name annotations = pod.get("metadata", {}).get("annotations") or {} gpu_hint = ( annotations.get(AI_GPU_ANNOTATION) or annotations.get("ai.gpu/description") or annotations.get("gpu/description") ) if gpu_hint: meta["gpu"] = gpu_hint model_hint = annotations.get(AI_MODEL_ANNOTATION) if not model_hint: # Try to infer from container image tag. containers = pod.get("spec", {}).get("containers") or [] if containers: image = containers[0].get("image") or "" model_hint = image.split(":")[-1] if ":" in image else image if model_hint: meta["model"] = model_hint except Exception: # swallow errors; keep fallbacks pass return meta @app.route("/", defaults={"path": ""}) @app.route("/") def serve_frontend(path: str) -> Any: dist_path = Path(app.static_folder) index_path = dist_path / "index.html" if dist_path.exists() and index_path.exists(): target = dist_path / path if path and target.exists(): return send_from_directory(app.static_folder, path) return send_from_directory(app.static_folder, "index.html") return jsonify( { "message": "Frontend not built yet. Run `npm install && npm run build` inside frontend/, then restart Flask.", "available_endpoints": ["/api/healthz", "/api/monero/get_info"], } ) if __name__ == "__main__": app.run(host="0.0.0.0", port=5000, debug=True)