199 lines
7.5 KiB
Python

from __future__ import annotations
import json
import threading
import time
from pathlib import Path
from typing import Any
from flask import jsonify, request
import httpx
from .. import settings
def register(app) -> None:
@app.route("/api/chat", methods=["POST"])
@app.route("/api/ai/chat", methods=["POST"])
def ai_chat() -> Any:
payload = request.get_json(silent=True) or {}
user_message = (payload.get("message") or "").strip()
history = payload.get("history") or []
profile = (payload.get("profile") or payload.get("mode") or "atlas-quick").strip().lower()
conversation_id = payload.get("conversation_id") if isinstance(payload.get("conversation_id"), str) else ""
if not user_message:
return jsonify({"error": "message required"}), 400
started = time.time()
if profile in {"stock", "stock-ai", "stock_ai"}:
reply = _stock_answer(user_message, history)
source = "stock"
else:
mode = "smart" if profile in {"atlas-smart", "smart"} else "quick"
reply = _atlasbot_answer(user_message, mode, conversation_id)
source = f"atlas-{mode}"
if reply:
elapsed_ms = int((time.time() - started) * 1000)
return jsonify({"reply": reply, "latency_ms": elapsed_ms, "source": source})
elapsed_ms = int((time.time() - started) * 1000)
return jsonify(
{
"reply": "Atlasbot is busy. Please try again in a moment.",
"latency_ms": elapsed_ms,
"source": source,
}
)
@app.route("/api/chat/info", methods=["GET"])
@app.route("/api/ai/info", methods=["GET"])
def ai_info() -> Any:
profile = (request.args.get("profile") or "atlas-quick").strip().lower()
meta = _discover_ai_meta(profile)
return jsonify(meta)
_start_keep_warm()
def _atlasbot_answer(message: str, mode: str, conversation_id: str) -> str:
endpoint = settings.AI_ATLASBOT_ENDPOINT
if not endpoint:
return ""
headers: dict[str, str] = {}
if settings.AI_ATLASBOT_TOKEN:
headers["X-Internal-Token"] = settings.AI_ATLASBOT_TOKEN
try:
payload = {"prompt": message, "mode": mode}
if conversation_id:
payload["conversation_id"] = conversation_id
with httpx.Client(timeout=settings.AI_ATLASBOT_TIMEOUT_SEC) as client:
resp = client.post(endpoint, json=payload, headers=headers)
if resp.status_code != 200:
return ""
data = resp.json()
answer = (data.get("answer") or "").strip()
return answer
except (httpx.RequestError, ValueError):
return ""
def _stock_answer(message: str, history: list[dict[str, Any]]) -> str:
body = {
"model": settings.AI_CHAT_MODEL,
"messages": _build_messages(message, history),
"stream": False,
}
try:
with httpx.Client(timeout=settings.AI_CHAT_TIMEOUT_SEC) as client:
resp = client.post(f"{settings.AI_CHAT_API}/api/chat", json=body)
resp.raise_for_status()
data = resp.json()
except (httpx.RequestError, ValueError):
return ""
message_data = data.get("message") if isinstance(data, dict) else None
if isinstance(message_data, dict) and message_data.get("content"):
return str(message_data["content"]).strip()
if isinstance(data, dict) and data.get("response"):
return str(data["response"]).strip()
return ""
def _build_messages(message: str, history: list[dict[str, Any]]) -> list[dict[str, str]]:
messages = [{"role": "system", "content": settings.AI_CHAT_SYSTEM_PROMPT}]
for entry in history:
role = entry.get("role")
content = entry.get("content")
if role in {"user", "assistant"} and isinstance(content, str) and content.strip():
messages.append({"role": role, "content": content})
messages.append({"role": "user", "content": message})
return messages
def _discover_ai_meta(profile: str) -> dict[str, str]:
meta = {
"node": settings.AI_NODE_NAME,
"gpu": settings.AI_GPU_DESC,
"model": settings.AI_CHAT_MODEL,
"endpoint": settings.AI_PUBLIC_ENDPOINT or "/api/chat",
"profile": profile,
}
if profile in {"atlas-smart", "smart"}:
meta["model"] = settings.AI_ATLASBOT_MODEL_SMART or settings.AI_CHAT_MODEL
meta["endpoint"] = "/api/ai/chat"
elif profile in {"atlas-quick", "quick"}:
meta["model"] = settings.AI_ATLASBOT_MODEL_FAST or settings.AI_CHAT_MODEL
meta["endpoint"] = "/api/ai/chat"
elif profile in {"stock", "stock-ai", "stock_ai"}:
meta["model"] = settings.AI_CHAT_MODEL
meta["endpoint"] = "/api/ai/chat"
sa_path = Path("/var/run/secrets/kubernetes.io/serviceaccount")
token_path = sa_path / "token"
ca_path = sa_path / "ca.crt"
ns_path = sa_path / "namespace"
if not token_path.exists() or not ca_path.exists() or not ns_path.exists():
return meta
try:
token = token_path.read_text().strip()
namespace = settings.AI_K8S_NAMESPACE
base_url = "https://kubernetes.default.svc"
pod_url = f"{base_url}/api/v1/namespaces/{namespace}/pods?labelSelector={settings.AI_K8S_LABEL}"
with httpx.Client(
verify=str(ca_path),
timeout=settings.HTTP_CHECK_TIMEOUT_SEC,
headers={"Authorization": f"Bearer {token}"},
) as client:
resp = client.get(pod_url)
resp.raise_for_status()
data = resp.json()
items = data.get("items") or []
running = [p for p in items if p.get("status", {}).get("phase") == "Running"] or items
if running:
pod = running[0]
node_name = pod.get("spec", {}).get("nodeName") or meta["node"]
meta["node"] = node_name
annotations = pod.get("metadata", {}).get("annotations") or {}
gpu_hint = (
annotations.get(settings.AI_GPU_ANNOTATION)
or annotations.get("ai.gpu/description")
or annotations.get("gpu/description")
)
if gpu_hint:
meta["gpu"] = gpu_hint
model_hint = annotations.get(settings.AI_MODEL_ANNOTATION)
if not model_hint:
containers = pod.get("spec", {}).get("containers") or []
if containers:
image = containers[0].get("image") or ""
model_hint = image.split(":")[-1] if ":" in image else image
if model_hint:
meta["model"] = model_hint
except Exception:
pass
return meta
def _start_keep_warm() -> None:
if not settings.AI_WARM_ENABLED or settings.AI_WARM_INTERVAL_SEC <= 0:
return
def loop() -> None:
while True:
time.sleep(settings.AI_WARM_INTERVAL_SEC)
try:
body = {
"model": settings.AI_CHAT_MODEL,
"messages": [{"role": "user", "content": "ping"}],
"stream": False,
}
with httpx.Client(timeout=min(settings.AI_CHAT_TIMEOUT_SEC, 15)) as client:
client.post(f"{settings.AI_CHAT_API}/api/chat", json=body)
except Exception:
continue
threading.Thread(target=loop, daemon=True, name="ai-keep-warm").start()