from __future__ import annotations import json import threading import time from pathlib import Path from typing import Any from flask import jsonify, request import httpx from .. import settings def register(app) -> None: @app.route("/api/chat", methods=["POST"]) @app.route("/api/ai/chat", methods=["POST"]) def ai_chat() -> Any: payload = request.get_json(silent=True) or {} user_message = (payload.get("message") or "").strip() profile = (payload.get("profile") or payload.get("mode") or "atlas-quick").strip().lower() conversation_id = payload.get("conversation_id") if isinstance(payload.get("conversation_id"), str) else "" if not user_message: return jsonify({"error": "message required"}), 400 started = time.time() mode = "quick" if profile in {"atlas-smart", "smart"}: mode = "smart" elif profile in {"atlas-genius", "genius"}: mode = "genius" reply = _atlasbot_answer(user_message, mode, conversation_id) source = f"atlas-{mode}" if reply: elapsed_ms = int((time.time() - started) * 1000) return jsonify({"reply": reply, "latency_ms": elapsed_ms, "source": source}) elapsed_ms = int((time.time() - started) * 1000) if mode == "quick": budget = max(1, int(round(settings.AI_ATLASBOT_TIMEOUT_QUICK_SEC))) fallback = ( f"Quick mode hit {budget}s response budget before finishing. " "Try atlas-smart for a deeper answer." ) elif mode == "smart": budget = max(1, int(round(settings.AI_ATLASBOT_TIMEOUT_SMART_SEC))) fallback = ( f"Smart mode hit {budget}s response budget before finishing. " "Try atlas-genius or ask a narrower follow-up." ) else: fallback = "Atlas genius mode timed out before it could finish. Please retry with a narrower prompt." return jsonify( { "reply": fallback, "latency_ms": elapsed_ms, "source": source, } ) @app.route("/api/chat/info", methods=["GET"]) @app.route("/api/ai/info", methods=["GET"]) def ai_info() -> Any: profile = (request.args.get("profile") or "atlas-quick").strip().lower() meta = _discover_ai_meta(profile) return jsonify(meta) _start_keep_warm() def _atlasbot_answer(message: str, mode: str, conversation_id: str) -> str: endpoint = settings.AI_ATLASBOT_ENDPOINT if not endpoint: return "" headers: dict[str, str] = {} if settings.AI_ATLASBOT_TOKEN: headers["X-Internal-Token"] = settings.AI_ATLASBOT_TOKEN try: payload = {"prompt": message, "mode": mode} if conversation_id: payload["conversation_id"] = conversation_id with httpx.Client(timeout=_atlasbot_timeout_sec(mode)) as client: resp = client.post(endpoint, json=payload, headers=headers) if resp.status_code != 200: return "" data = resp.json() answer = (data.get("reply") or data.get("answer") or "").strip() return answer except (httpx.RequestError, ValueError): return "" def _atlasbot_timeout_sec(mode: str) -> float: if mode == "genius": return settings.AI_ATLASBOT_TIMEOUT_GENIUS_SEC if mode == "smart": return settings.AI_ATLASBOT_TIMEOUT_SMART_SEC return settings.AI_ATLASBOT_TIMEOUT_QUICK_SEC def _discover_ai_meta(profile: str) -> dict[str, str]: meta = { "node": settings.AI_NODE_NAME, "gpu": settings.AI_GPU_DESC, "model": settings.AI_CHAT_MODEL, "endpoint": settings.AI_PUBLIC_ENDPOINT or "/api/chat", "profile": profile, } if profile in {"atlas-smart", "smart"}: meta["model"] = settings.AI_ATLASBOT_MODEL_SMART or settings.AI_CHAT_MODEL meta["endpoint"] = "/api/ai/chat" elif profile in {"atlas-genius", "genius"}: meta["model"] = settings.AI_ATLASBOT_MODEL_GENIUS or settings.AI_CHAT_MODEL meta["endpoint"] = "/api/ai/chat" elif profile in {"atlas-quick", "quick"}: meta["model"] = settings.AI_ATLASBOT_MODEL_FAST or settings.AI_CHAT_MODEL meta["endpoint"] = "/api/ai/chat" sa_path = Path("/var/run/secrets/kubernetes.io/serviceaccount") token_path = sa_path / "token" ca_path = sa_path / "ca.crt" ns_path = sa_path / "namespace" if not token_path.exists() or not ca_path.exists() or not ns_path.exists(): return meta try: token = token_path.read_text().strip() namespace = settings.AI_K8S_NAMESPACE base_url = "https://kubernetes.default.svc" pod_url = f"{base_url}/api/v1/namespaces/{namespace}/pods?labelSelector={settings.AI_K8S_LABEL}" with httpx.Client( verify=str(ca_path), timeout=settings.HTTP_CHECK_TIMEOUT_SEC, headers={"Authorization": f"Bearer {token}"}, ) as client: resp = client.get(pod_url) resp.raise_for_status() data = resp.json() items = data.get("items") or [] running = [p for p in items if p.get("status", {}).get("phase") == "Running"] or items if running: pod = running[0] node_name = pod.get("spec", {}).get("nodeName") or meta["node"] meta["node"] = node_name annotations = pod.get("metadata", {}).get("annotations") or {} gpu_hint = ( annotations.get(settings.AI_GPU_ANNOTATION) or annotations.get("ai.gpu/description") or annotations.get("gpu/description") ) if gpu_hint: meta["gpu"] = gpu_hint model_hint = annotations.get(settings.AI_MODEL_ANNOTATION) if not model_hint: containers = pod.get("spec", {}).get("containers") or [] if containers: image = containers[0].get("image") or "" model_hint = image.split(":")[-1] if ":" in image else image if model_hint: meta["model"] = model_hint except Exception: pass return meta def _start_keep_warm() -> None: if not settings.AI_WARM_ENABLED or settings.AI_WARM_INTERVAL_SEC <= 0: return def loop() -> None: while True: time.sleep(settings.AI_WARM_INTERVAL_SEC) try: body = { "model": settings.AI_CHAT_MODEL, "messages": [{"role": "user", "content": "ping"}], "stream": False, } with httpx.Client(timeout=min(settings.AI_CHAT_TIMEOUT_SEC, 15)) as client: client.post(f"{settings.AI_CHAT_API}/api/chat", json=body) except Exception: continue threading.Thread(target=loop, daemon=True, name="ai-keep-warm").start()