from __future__ import annotations import json import os import time from pathlib import Path from typing import Any from urllib.error import URLError from urllib.parse import urlencode from urllib.request import urlopen from flask import Flask, jsonify, request, send_from_directory from flask_cors import CORS import httpx app = Flask(__name__, static_folder="../frontend/dist", static_url_path="") CORS(app, resources={r"/api/*": {"origins": "*"}}) MONERO_GET_INFO_URL = os.getenv("MONERO_GET_INFO_URL", "http://monerod.crypto.svc.cluster.local:18081/get_info") VM_BASE_URL = os.getenv( "VM_BASE_URL", "http://victoria-metrics-single-server.monitoring.svc.cluster.local:8428", ).rstrip("/") VM_QUERY_TIMEOUT_SEC = float(os.getenv("VM_QUERY_TIMEOUT_SEC", "2")) HTTP_CHECK_TIMEOUT_SEC = float(os.getenv("HTTP_CHECK_TIMEOUT_SEC", "2")) LAB_STATUS_CACHE_SEC = float(os.getenv("LAB_STATUS_CACHE_SEC", "30")) GRAFANA_HEALTH_URL = os.getenv("GRAFANA_HEALTH_URL", "https://metrics.bstein.dev/api/health") OCEANUS_NODE_EXPORTER_URL = os.getenv("OCEANUS_NODE_EXPORTER_URL", "http://192.168.22.24:9100/metrics") AI_CHAT_API = os.getenv("AI_CHAT_API", "http://ollama.ai.svc.cluster.local:11434").rstrip("/") AI_CHAT_MODEL = os.getenv("AI_CHAT_MODEL", "qwen2.5-coder:7b-instruct-q4_0") AI_CHAT_SYSTEM_PROMPT = os.getenv( "AI_CHAT_SYSTEM_PROMPT", "You are the Titan Lab assistant for bstein.dev. Be concise and helpful.", ) AI_CHAT_TIMEOUT_SEC = float(os.getenv("AI_CHAT_TIMEOUT_SEC", "20")) _LAB_STATUS_CACHE: dict[str, Any] = {"ts": 0.0, "value": None} @app.route("/api/healthz") def healthz() -> Any: return jsonify({"ok": True}) @app.route("/api/monero/get_info") def monero_get_info() -> Any: try: with urlopen(MONERO_GET_INFO_URL, timeout=2) as resp: payload = json.loads(resp.read().decode("utf-8")) return jsonify(payload) except (URLError, TimeoutError, ValueError) as exc: return jsonify({"error": str(exc), "url": MONERO_GET_INFO_URL}), 503 def _vm_query(expr: str) -> float | None: url = f"{VM_BASE_URL}/api/v1/query?{urlencode({'query': expr})}" with urlopen(url, timeout=VM_QUERY_TIMEOUT_SEC) as resp: payload = json.loads(resp.read().decode("utf-8")) if payload.get("status") != "success": return None result = (payload.get("data") or {}).get("result") or [] if not result: return None values: list[float] = [] for item in result: try: values.append(float(item["value"][1])) except (KeyError, IndexError, TypeError, ValueError): continue if not values: return None return max(values) def _http_ok(url: str, expect_substring: str | None = None) -> bool: try: with urlopen(url, timeout=HTTP_CHECK_TIMEOUT_SEC) as resp: if getattr(resp, "status", 200) != 200: return False if expect_substring: chunk = resp.read(4096).decode("utf-8", errors="ignore") return expect_substring in chunk return True except (URLError, TimeoutError, ValueError): return False @app.route("/api/lab/status") def lab_status() -> Any: now = time.time() cached = _LAB_STATUS_CACHE.get("value") if cached and (now - float(_LAB_STATUS_CACHE.get("ts", 0.0)) < LAB_STATUS_CACHE_SEC): return jsonify(cached) connected = False atlas_up = False atlas_known = False atlas_source = "unknown" oceanus_up = False oceanus_known = False oceanus_source = "unknown" try: atlas_value = _vm_query('max(up{job="kubernetes-apiservers"})') oceanus_value = _vm_query('max(up{instance=~"(titan-23|192.168.22.24)(:9100)?"})') connected = True atlas_known = atlas_value is not None atlas_up = bool(atlas_value and atlas_value > 0.5) atlas_source = "victoria-metrics" oceanus_known = oceanus_value is not None oceanus_up = bool(oceanus_value and oceanus_value > 0.5) oceanus_source = "victoria-metrics" except (URLError, TimeoutError, ValueError): atlas_value = None oceanus_value = None if not atlas_known: if _http_ok(GRAFANA_HEALTH_URL): connected = True atlas_known = True atlas_up = True atlas_source = "grafana-health" if not oceanus_up: if _http_ok(OCEANUS_NODE_EXPORTER_URL, expect_substring="node_exporter_build_info"): connected = True oceanus_known = True oceanus_up = True oceanus_source = "node-exporter" payload = { "connected": connected, "atlas": {"up": atlas_up, "known": atlas_known, "source": atlas_source}, "oceanus": {"up": oceanus_up, "known": oceanus_known, "source": oceanus_source}, "checked_at": int(now), } _LAB_STATUS_CACHE["ts"] = now _LAB_STATUS_CACHE["value"] = payload return jsonify(payload) @app.route("/api/ai/chat", methods=["POST"]) def ai_chat() -> Any: payload = request.get_json(silent=True) or {} user_message = (payload.get("message") or "").strip() history = payload.get("history") or [] if not user_message: return jsonify({"error": "message required"}), 400 messages: list[dict[str, str]] = [] if AI_CHAT_SYSTEM_PROMPT: messages.append({"role": "system", "content": AI_CHAT_SYSTEM_PROMPT}) for item in history: role = item.get("role") content = (item.get("content") or "").strip() if role in ("user", "assistant") and content: messages.append({"role": role, "content": content}) messages.append({"role": "user", "content": user_message}) body = {"model": AI_CHAT_MODEL, "messages": messages, "stream": False} started = time.time() try: with httpx.Client(timeout=AI_CHAT_TIMEOUT_SEC) as client: resp = client.post(f"{AI_CHAT_API}/api/chat", json=body) resp.raise_for_status() data = resp.json() reply = (data.get("message") or {}).get("content") or "" elapsed_ms = int((time.time() - started) * 1000) return jsonify({"reply": reply, "latency_ms": elapsed_ms}) except (httpx.RequestError, httpx.HTTPStatusError, ValueError) as exc: return jsonify({"error": str(exc)}), 502 @app.route("/", defaults={"path": ""}) @app.route("/") def serve_frontend(path: str) -> Any: dist_path = Path(app.static_folder) index_path = dist_path / "index.html" if dist_path.exists() and index_path.exists(): target = dist_path / path if path and target.exists(): return send_from_directory(app.static_folder, path) return send_from_directory(app.static_folder, "index.html") return jsonify( { "message": "Frontend not built yet. Run `npm install && npm run build` inside frontend/, then restart Flask.", "available_endpoints": ["/api/healthz", "/api/monero/get_info"], } ) if __name__ == "__main__": app.run(host="0.0.0.0", port=5000, debug=True)