commit 1dbc2de39d1222ce39a1312542c9f766b60b8c34 Author: Brad Stein Date: Wed Jan 28 11:46:52 2026 -0300 init: atlasbot service diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..f7d8953 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,5 @@ +.git +__pycache__ +.venv +build +tests diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..33b2859 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +__pycache__/ +.venv/ +.pytest_cache/ +build/ diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..91f4ed9 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,13 @@ +FROM python:3.12-slim + +ENV PYTHONDONTWRITEBYTECODE=1 \ + PYTHONUNBUFFERED=1 + +WORKDIR /app +COPY requirements.txt /app/requirements.txt +RUN pip install --no-cache-dir -r /app/requirements.txt + +COPY atlasbot /app/atlasbot + +EXPOSE 8090 +CMD ["python", "-m", "atlasbot.main"] diff --git a/Jenkinsfile b/Jenkinsfile new file mode 100644 index 0000000..3748382 --- /dev/null +++ b/Jenkinsfile @@ -0,0 +1,168 @@ +pipeline { + agent { + kubernetes { + defaultContainer 'tester' + yaml """ +apiVersion: v1 +kind: Pod +spec: + nodeSelector: + kubernetes.io/arch: arm64 + node-role.kubernetes.io/worker: "true" + containers: + - name: dind + image: docker:27-dind + securityContext: + privileged: true + env: + - name: DOCKER_TLS_CERTDIR + value: "" + args: + - "--mtu=1400" + - "--host=unix:///var/run/docker.sock" + - "--host=tcp://0.0.0.0:2375" + volumeMounts: + - name: dind-storage + mountPath: /var/lib/docker + - name: workspace-volume + mountPath: /home/jenkins/agent + - name: builder + image: docker:27 + command: + - cat + tty: true + env: + - name: DOCKER_HOST + value: tcp://localhost:2375 + - name: DOCKER_TLS_CERTDIR + value: "" + volumeMounts: + - name: workspace-volume + mountPath: /home/jenkins/agent + - name: docker-config-writable + mountPath: /root/.docker + - name: harbor-config + mountPath: /docker-config + - name: tester + image: python:3.12-slim + command: + - cat + tty: true + volumeMounts: + - name: workspace-volume + mountPath: /home/jenkins/agent + volumes: + - name: docker-config-writable + emptyDir: {} + - name: dind-storage + emptyDir: {} + - name: harbor-config + secret: + secretName: harbor-regcred + items: + - key: .dockerconfigjson + path: config.json + - name: workspace-volume + emptyDir: {} +""" + } + } + environment { + PIP_DISABLE_PIP_VERSION_CHECK = '1' + PYTHONUNBUFFERED = '1' + } + stages { + stage('Checkout') { + steps { + checkout scm + } + } + stage('Unit tests') { + steps { + container('tester') { + sh ''' + set -euo pipefail + python -m pip install --no-cache-dir -r requirements.txt -r requirements-dev.txt + python -m ruff check atlasbot --select C90,PLR + mkdir -p build + python -m slipcover --json --out build/coverage.json --source atlasbot --fail-under 90 -m pytest -q --junitxml build/junit.xml + ''' + } + } + } + stage('Publish test metrics') { + steps { + container('tester') { + sh ''' + set -euo pipefail + python scripts/publish_test_metrics.py + ''' + } + } + } + stage('Prep toolchain') { + steps { + container('builder') { + sh ''' + set -euo pipefail + apk add --no-cache bash git jq + mkdir -p /root/.docker + cp /docker-config/config.json /root/.docker/config.json + ''' + } + } + } + stage('Compute version') { + steps { + container('builder') { + script { + def semver = sh(returnStdout: true, script: 'git describe --tags --exact-match || true').trim() + if (!semver) { + semver = sh(returnStdout: true, script: 'git rev-list --count HEAD').trim() + semver = "0.1.0-${semver}" + } + sh "echo SEMVER=${semver} > build.env" + } + } + } + } + stage('Buildx setup') { + steps { + container('builder') { + sh ''' + set -euo pipefail + seq 1 10 | while read _; do + docker info && break || sleep 2 + done + docker buildx create --name bstein-builder --driver docker-container --bootstrap --use + ''' + } + } + } + stage('Build & push image') { + steps { + container('builder') { + sh ''' + set -euo pipefail + VERSION_TAG=$(cut -d= -f2 build.env) + docker buildx build --platform linux/arm64 \ + --tag registry.bstein.dev/bstein/atlasbot:${VERSION_TAG} \ + --tag registry.bstein.dev/bstein/atlasbot:latest \ + --push . + ''' + } + } + } + } + post { + always { + script { + if (fileExists('build.env')) { + def env = readProperties file: 'build.env' + echo "Build complete for ${env.SEMVER}" + } + } + archiveArtifacts artifacts: 'build/*', allowEmptyArchive: true + } + } +} diff --git a/README.md b/README.md new file mode 100644 index 0000000..309b5b4 --- /dev/null +++ b/README.md @@ -0,0 +1,25 @@ +# Atlasbot + +Atlasbot is the Atlas/Othrys cluster assistant. It answers questions using: +- Ariadne cluster snapshots. +- Prometheus/VictoriaMetrics telemetry. +- The curated Atlas knowledge base. + +It exposes an HTTP API for the portal and a Matrix bot for Element. + +## Development + +- `python -m venv .venv && . .venv/bin/activate` +- `pip install -r requirements.txt -r requirements-dev.txt` +- `python -m pytest` +- `python -m ruff check atlasbot --select C90,PLR` + +## Runtime + +Start with: + +``` +python -m atlasbot.main +``` + +Configure via environment variables (see `atlasbot/config.py`). diff --git a/atlasbot/__init__.py b/atlasbot/__init__.py new file mode 100644 index 0000000..fa88b80 --- /dev/null +++ b/atlasbot/__init__.py @@ -0,0 +1 @@ +"""Atlasbot package.""" diff --git a/atlasbot/api/__init__.py b/atlasbot/api/__init__.py new file mode 100644 index 0000000..1248ec6 --- /dev/null +++ b/atlasbot/api/__init__.py @@ -0,0 +1 @@ +"""HTTP API.""" diff --git a/atlasbot/api/http.py b/atlasbot/api/http.py new file mode 100644 index 0000000..8586e04 --- /dev/null +++ b/atlasbot/api/http.py @@ -0,0 +1,69 @@ +import logging +from typing import Any + +from collections.abc import Awaitable, Callable + +from fastapi import FastAPI, Header, HTTPException +from pydantic import BaseModel + +from atlasbot.config import Settings +from atlasbot.engine.answerer import AnswerResult + +log = logging.getLogger(__name__) + + +class AnswerRequest(BaseModel): + question: str | None = None + prompt: str | None = None + message: str | None = None + text: str | None = None + content: str | None = None + mode: str | None = None + + +class AnswerResponse(BaseModel): + reply: str + + +class Api: + def __init__(self, settings: Settings, answer_handler: Callable[[str, str], Awaitable[AnswerResult]]) -> None: + self._settings = settings + self._answer_handler = answer_handler + self.app = FastAPI() + self._register_routes() + + def _register_routes(self) -> None: + @self.app.get("/healthz") + async def health() -> dict[str, Any]: + return {"ok": True} + + @self.app.post("/v1/answer", response_model=AnswerResponse) + async def answer( + payload: AnswerRequest, + x_internal_token: str | None = Header(default=None, alias="X-Internal-Token"), + ) -> AnswerResponse: + if self._settings.internal_token and x_internal_token != self._settings.internal_token: + raise HTTPException(status_code=401, detail="unauthorized") + question = _extract_question(payload) + if not question: + raise HTTPException(status_code=400, detail="missing question") + mode = (payload.mode or "quick").strip().lower() + result = await self._answer_handler(question, mode) + log.info( + "answer", + extra={ + "extra": { + "mode": mode, + "scores": result.scores.__dict__, + "question": question[:80], + } + }, + ) + return AnswerResponse(reply=result.reply) + + +def _extract_question(payload: AnswerRequest) -> str: + for field in (payload.question, payload.prompt, payload.message, payload.text, payload.content): + if field and field.strip(): + return field.strip() + return "" diff --git a/atlasbot/config.py b/atlasbot/config.py new file mode 100644 index 0000000..3519cb8 --- /dev/null +++ b/atlasbot/config.py @@ -0,0 +1,110 @@ +import os +from dataclasses import dataclass + +def _env_bool(name: str, default: str = "false") -> bool: + value = os.getenv(name, default).strip().lower() + return value in {"1", "true", "yes", "y", "on"} + + +def _env_int(name: str, default: str) -> int: + raw = os.getenv(name, default) + try: + return int(raw) + except (TypeError, ValueError): + return int(default) + + +def _env_float(name: str, default: str) -> float: + raw = os.getenv(name, default) + try: + return float(raw) + except (TypeError, ValueError): + return float(default) + + +@dataclass(frozen=True) +class Settings: + matrix_base: str + auth_base: str + bot_user: str + bot_pass: str + room_alias: str + server_name: str + bot_mentions: tuple[str, ...] + + ollama_url: str + ollama_model: str + ollama_model_fast: str + ollama_model_smart: str + ollama_fallback_model: str + ollama_timeout_sec: float + ollama_retries: int + ollama_api_key: str + + http_port: int + internal_token: str + + kb_dir: str + vm_url: str + ariadne_state_url: str + ariadne_state_token: str + + snapshot_ttl_sec: int + thinking_interval_sec: int + + queue_enabled: bool + nats_url: str + nats_stream: str + nats_subject: str + nats_result_bucket: str + + fast_max_angles: int + smart_max_angles: int + fast_max_candidates: int + smart_max_candidates: int + + + +def load_settings() -> Settings: + bot_mentions = tuple( + [ + item.strip() + for item in os.getenv("BOT_MENTIONS", "atlasbot,atlas-quick,atlas-smart").split(",") + if item.strip() + ] + ) + return Settings( + matrix_base=os.getenv("MATRIX_BASE", "http://othrys-synapse-matrix-synapse:8008"), + auth_base=os.getenv("AUTH_BASE", "http://matrix-authentication-service:8080"), + bot_user=os.getenv("BOT_USER", ""), + bot_pass=os.getenv("BOT_PASS", ""), + room_alias=os.getenv("ROOM_ALIAS", "#othrys:live.bstein.dev"), + server_name=os.getenv("MATRIX_SERVER_NAME", "live.bstein.dev"), + bot_mentions=bot_mentions, + ollama_url=os.getenv("OLLAMA_URL", "http://ollama.ai.svc.cluster.local:11434"), + ollama_model=os.getenv("OLLAMA_MODEL", "qwen2.5:14b-instruct"), + ollama_model_fast=os.getenv("ATLASBOT_MODEL_FAST", "qwen2.5:14b-instruct"), + ollama_model_smart=os.getenv("ATLASBOT_MODEL_SMART", "qwen2.5:14b-instruct"), + ollama_fallback_model=os.getenv("OLLAMA_FALLBACK_MODEL", ""), + ollama_timeout_sec=_env_float("OLLAMA_TIMEOUT_SEC", "480"), + ollama_retries=_env_int("OLLAMA_RETRIES", "1"), + ollama_api_key=os.getenv("CHAT_API_KEY", ""), + http_port=_env_int("ATLASBOT_HTTP_PORT", "8090"), + internal_token=os.getenv("ATLASBOT_INTERNAL_TOKEN", "") + or os.getenv("CHAT_API_HOMEPAGE", ""), + kb_dir=os.getenv("KB_DIR", ""), + vm_url=os.getenv("VM_URL", "http://victoria-metrics-single-server.monitoring.svc.cluster.local:8428"), + ariadne_state_url=os.getenv("ARIADNE_STATE_URL", ""), + ariadne_state_token=os.getenv("ARIADNE_STATE_TOKEN", ""), + snapshot_ttl_sec=_env_int("ATLASBOT_SNAPSHOT_TTL_SEC", "30"), + thinking_interval_sec=_env_int("ATLASBOT_THINKING_INTERVAL_SEC", "30"), + queue_enabled=_env_bool("ATLASBOT_QUEUE_ENABLED", "false"), + nats_url=os.getenv("ATLASBOT_NATS_URL", "nats://nats.nats.svc.cluster.local:4222"), + nats_stream=os.getenv("ATLASBOT_NATS_STREAM", "atlasbot"), + nats_subject=os.getenv("ATLASBOT_NATS_SUBJECT", "atlasbot.requests"), + nats_result_bucket=os.getenv("ATLASBOT_NATS_RESULTS", "atlasbot_results"), + fast_max_angles=_env_int("ATLASBOT_FAST_MAX_ANGLES", "2"), + smart_max_angles=_env_int("ATLASBOT_SMART_MAX_ANGLES", "5"), + fast_max_candidates=_env_int("ATLASBOT_FAST_MAX_CANDIDATES", "2"), + smart_max_candidates=_env_int("ATLASBOT_SMART_MAX_CANDIDATES", "6"), + ) diff --git a/atlasbot/engine/__init__.py b/atlasbot/engine/__init__.py new file mode 100644 index 0000000..4247a0b --- /dev/null +++ b/atlasbot/engine/__init__.py @@ -0,0 +1 @@ +"""Answer engine.""" diff --git a/atlasbot/engine/answerer.py b/atlasbot/engine/answerer.py new file mode 100644 index 0000000..d30c939 --- /dev/null +++ b/atlasbot/engine/answerer.py @@ -0,0 +1,223 @@ +import asyncio +import logging +import re +import time +from dataclasses import dataclass +from typing import Any, Callable + +from atlasbot.config import Settings +from atlasbot.knowledge.loader import KnowledgeBase +from atlasbot.llm.client import LLMClient, build_messages, parse_json +from atlasbot.llm import prompts +from atlasbot.snapshot.builder import SnapshotProvider, summary_text + +log = logging.getLogger(__name__) + + +@dataclass +class AnswerScores: + confidence: int + relevance: int + satisfaction: int + hallucination_risk: str + + +@dataclass +class AnswerResult: + reply: str + scores: AnswerScores + meta: dict[str, Any] + + +class AnswerEngine: + def __init__( + self, + settings: Settings, + llm: LLMClient, + kb: KnowledgeBase, + snapshot: SnapshotProvider, + ) -> None: + self._settings = settings + self._llm = llm + self._kb = kb + self._snapshot = snapshot + + async def answer( + self, + question: str, + *, + mode: str, + observer: Callable[[str, str], None] | None = None, + ) -> AnswerResult: + question = (question or "").strip() + if not question: + return AnswerResult("I need a question to answer.", _default_scores(), {"mode": mode}) + if mode == "stock": + return await self._answer_stock(question) + + snapshot = self._snapshot.get() + kb_summary = self._kb.summary() + runbooks = self._kb.runbook_titles(limit=4) + snapshot_ctx = summary_text(snapshot) + base_context = _join_context([ + kb_summary, + runbooks, + f"ClusterSnapshot:{snapshot_ctx}" if snapshot_ctx else "", + ]) + + started = time.monotonic() + if observer: + observer("classify", "classifying intent") + classify = await self._classify(question, base_context) + log.info( + "atlasbot_classify", + extra={"extra": {"mode": mode, "elapsed_sec": round(time.monotonic() - started, 2), "classify": classify}}, + ) + if observer: + observer("angles", "drafting angles") + angles = await self._angles(question, classify, mode) + log.info( + "atlasbot_angles", + extra={"extra": {"mode": mode, "count": len(angles)}}, + ) + if observer: + observer("candidates", "drafting answers") + candidates = await self._candidates(question, angles, base_context, mode) + log.info( + "atlasbot_candidates", + extra={"extra": {"mode": mode, "count": len(candidates)}}, + ) + if observer: + observer("select", "scoring candidates") + best, scores = await self._select_best(question, candidates) + log.info( + "atlasbot_selection", + extra={"extra": {"mode": mode, "selected": len(best), "scores": scores.__dict__}}, + ) + if observer: + observer("synthesize", "synthesizing reply") + reply = await self._synthesize(question, best, base_context) + meta = { + "mode": mode, + "angles": angles, + "scores": scores.__dict__, + "classify": classify, + "candidates": len(candidates), + } + return AnswerResult(reply, scores, meta) + + async def _answer_stock(self, question: str) -> AnswerResult: + messages = build_messages(prompts.STOCK_SYSTEM, question) + reply = await self._llm.chat(messages, model=self._settings.ollama_model) + return AnswerResult(reply, _default_scores(), {"mode": "stock"}) + + async def _classify(self, question: str, context: str) -> dict[str, Any]: + prompt = prompts.CLASSIFY_PROMPT + "\nQuestion: " + question + messages = build_messages(prompts.CLUSTER_SYSTEM, prompt, context=context) + raw = await self._llm.chat(messages, model=self._settings.ollama_model_fast) + return _parse_json_block(raw, fallback={"needs_snapshot": True}) + + async def _angles(self, question: str, classify: dict[str, Any], mode: str) -> list[dict[str, Any]]: + max_angles = self._settings.fast_max_angles if mode == "quick" else self._settings.smart_max_angles + prompt = prompts.ANGLE_PROMPT.format(max_angles=max_angles) + "\nQuestion: " + question + messages = build_messages(prompts.CLUSTER_SYSTEM, prompt) + raw = await self._llm.chat(messages, model=self._settings.ollama_model_fast) + angles = _parse_json_list(raw) + if not angles: + return [{"name": "primary", "question": question, "relevance": 100}] + return angles[:max_angles] + + async def _candidates( + self, + question: str, + angles: list[dict[str, Any]], + context: str, + mode: str, + ) -> list[dict[str, Any]]: + limit = self._settings.fast_max_candidates if mode == "quick" else self._settings.smart_max_candidates + selected = angles[:limit] + tasks = [] + for angle in selected: + angle_q = angle.get("question") or question + prompt = prompts.CANDIDATE_PROMPT + "\nQuestion: " + angle_q + messages = build_messages(prompts.CLUSTER_SYSTEM, prompt, context=context) + tasks.append(self._llm.chat(messages, model=self._settings.ollama_model_smart)) + replies = await asyncio.gather(*tasks) + candidates = [] + for angle, reply in zip(selected, replies, strict=False): + candidates.append({"angle": angle, "reply": reply}) + return candidates + + async def _select_best(self, question: str, candidates: list[dict[str, Any]]) -> tuple[list[dict[str, Any]], AnswerScores]: + if not candidates: + return ([], _default_scores()) + scored: list[tuple[dict[str, Any], AnswerScores]] = [] + for entry in candidates: + prompt = prompts.SCORE_PROMPT + "\nQuestion: " + question + "\nAnswer: " + entry["reply"] + messages = build_messages(prompts.CLUSTER_SYSTEM, prompt) + raw = await self._llm.chat(messages, model=self._settings.ollama_model_fast) + data = _parse_json_block(raw, fallback={}) + scores = _scores_from_json(data) + scored.append((entry, scores)) + scored.sort(key=lambda item: (item[1].relevance, item[1].confidence), reverse=True) + best = [entry for entry, _scores in scored[:3]] + return best, scored[0][1] + + async def _synthesize(self, question: str, best: list[dict[str, Any]], context: str) -> str: + if not best: + return "I do not have enough information to answer that yet." + parts = [] + for item in best: + parts.append(f"- {item['reply']}") + prompt = ( + prompts.SYNTHESIZE_PROMPT + + "\nQuestion: " + + question + + "\nCandidate answers:\n" + + "\n".join(parts) + ) + messages = build_messages(prompts.CLUSTER_SYSTEM, prompt, context=context) + reply = await self._llm.chat(messages, model=self._settings.ollama_model_smart) + return reply + + +def _join_context(parts: list[str]) -> str: + text = "\n".join([p for p in parts if p]) + return text.strip() + + +def _parse_json_block(text: str, *, fallback: dict[str, Any]) -> dict[str, Any]: + raw = text.strip() + match = re.search(r"\{.*\}", raw, flags=re.S) + if match: + return parse_json(match.group(0), fallback=fallback) + return parse_json(raw, fallback=fallback) + + +def _parse_json_list(text: str) -> list[dict[str, Any]]: + raw = text.strip() + match = re.search(r"\[.*\]", raw, flags=re.S) + data = parse_json(match.group(0), fallback={}) if match else parse_json(raw, fallback={}) + if isinstance(data, list): + return [entry for entry in data if isinstance(entry, dict)] + return [] + + +def _scores_from_json(data: dict[str, Any]) -> AnswerScores: + return AnswerScores( + confidence=_coerce_int(data.get("confidence"), 60), + relevance=_coerce_int(data.get("relevance"), 60), + satisfaction=_coerce_int(data.get("satisfaction"), 60), + hallucination_risk=str(data.get("hallucination_risk") or "medium"), + ) + + +def _coerce_int(value: Any, default: int) -> int: + try: + return int(float(value)) + except (TypeError, ValueError): + return default + + +def _default_scores() -> AnswerScores: + return AnswerScores(confidence=60, relevance=60, satisfaction=60, hallucination_risk="medium") diff --git a/atlasbot/knowledge/__init__.py b/atlasbot/knowledge/__init__.py new file mode 100644 index 0000000..5cd5c9e --- /dev/null +++ b/atlasbot/knowledge/__init__.py @@ -0,0 +1 @@ +"""Knowledge base helpers.""" diff --git a/atlasbot/knowledge/loader.py b/atlasbot/knowledge/loader.py new file mode 100644 index 0000000..8585183 --- /dev/null +++ b/atlasbot/knowledge/loader.py @@ -0,0 +1,60 @@ +import json +import logging +from pathlib import Path +from typing import Any + +log = logging.getLogger(__name__) + + +class KnowledgeBase: + def __init__(self, base_dir: str) -> None: + self._base = Path(base_dir) if base_dir else None + self._atlas: dict[str, Any] = {} + self._runbooks: list[dict[str, Any]] = [] + self._loaded = False + + def load(self) -> None: + if self._loaded or not self._base: + return + self._atlas = self._read_json(self._base / "catalog" / "atlas.json") + self._runbooks = self._read_json(self._base / "catalog" / "runbooks.json") or [] + self._loaded = True + + def _read_json(self, path: Path) -> dict[str, Any] | list[dict[str, Any]]: + if not path.exists(): + return {} + try: + return json.loads(path.read_text()) + except Exception as exc: + log.warning("kb load failed", extra={"extra": {"path": str(path), "error": str(exc)}}) + return {} + + def summary(self) -> str: + self.load() + if not self._atlas: + return "" + cluster = self._atlas.get("cluster") + sources = self._atlas.get("sources") if isinstance(self._atlas.get("sources"), list) else [] + services = [src.get("name") for src in sources if isinstance(src, dict)] + parts: list[str] = [] + if cluster: + parts.append(f"Cluster: {cluster}.") + if services: + parts.append(f"Services indexed: {len(services)}.") + return " ".join(parts) + + def runbook_titles(self, *, limit: int = 5) -> str: + self.load() + if not self._runbooks: + return "" + titles = [] + for entry in self._runbooks: + if not isinstance(entry, dict): + continue + title = entry.get("title") + path = entry.get("path") + if title and path: + titles.append(f"- {title} ({path})") + if not titles: + return "" + return "Relevant runbooks:\n" + "\n".join(titles[:limit]) diff --git a/atlasbot/llm/__init__.py b/atlasbot/llm/__init__.py new file mode 100644 index 0000000..48aaa6f --- /dev/null +++ b/atlasbot/llm/__init__.py @@ -0,0 +1 @@ +"""LLM utilities.""" diff --git a/atlasbot/llm/client.py b/atlasbot/llm/client.py new file mode 100644 index 0000000..28c6d19 --- /dev/null +++ b/atlasbot/llm/client.py @@ -0,0 +1,77 @@ +import json +import logging +from typing import Any + +import httpx + +from atlasbot.config import Settings + +log = logging.getLogger(__name__) + +FALLBACK_STATUS_CODE = 404 + + +class LLMError(RuntimeError): + pass + + +class LLMClient: + def __init__(self, settings: Settings) -> None: + self._settings = settings + self._timeout = settings.ollama_timeout_sec + self._headers = {"Content-Type": "application/json"} + if settings.ollama_api_key: + self._headers["x-api-key"] = settings.ollama_api_key + + def _endpoint(self) -> str: + base = self._settings.ollama_url.rstrip("/") + if base.endswith("/api/chat"): + return base + return base + "/api/chat" + + async def chat(self, messages: list[dict[str, str]], *, model: str | None = None) -> str: + payload = { + "model": model or self._settings.ollama_model, + "messages": messages, + "stream": False, + } + for attempt in range(max(1, self._settings.ollama_retries + 1)): + try: + async with httpx.AsyncClient(timeout=self._timeout) as client: + resp = await client.post(self._endpoint(), json=payload, headers=self._headers) + if resp.status_code == FALLBACK_STATUS_CODE and self._settings.ollama_fallback_model: + payload["model"] = self._settings.ollama_fallback_model + continue + resp.raise_for_status() + data = resp.json() + message = data.get("message") if isinstance(data, dict) else None + if isinstance(message, dict): + content = message.get("content") + else: + content = data.get("response") or data.get("reply") or data + if not content: + raise LLMError("empty response") + return str(content) + except Exception as exc: + log.warning("ollama call failed", extra={"extra": {"attempt": attempt + 1, "error": str(exc)}}) + if attempt + 1 >= max(1, self._settings.ollama_retries + 1): + raise LLMError(str(exc)) from exc + raise LLMError("ollama retries exhausted") + + +def build_messages(system: str, prompt: str, *, context: str | None = None) -> list[dict[str, str]]: + messages: list[dict[str, str]] = [{"role": "system", "content": system}] + if context: + messages.append({"role": "user", "content": "Context (grounded):\n" + context}) + messages.append({"role": "user", "content": prompt}) + return messages + + +def parse_json(text: str, *, fallback: dict[str, Any] | None = None) -> dict[str, Any]: + try: + raw = text.strip() + if raw.startswith("`"): + raw = raw.strip("`") + return json.loads(raw) + except Exception: + return fallback or {} diff --git a/atlasbot/llm/prompts.py b/atlasbot/llm/prompts.py new file mode 100644 index 0000000..0b01cf7 --- /dev/null +++ b/atlasbot/llm/prompts.py @@ -0,0 +1,42 @@ +CLUSTER_SYSTEM = ( + "You are Atlas, the Titan Lab assistant for the Atlas/Othrys cluster. " + "Use the provided context as your source of truth. " + "If the question is about Atlas/Othrys, respond in short paragraphs. " + "Avoid commands unless explicitly asked. " + "If information is missing, say so clearly and avoid guessing. " +) + +CLASSIFY_PROMPT = ( + "Classify the user question. Return JSON with fields: " + "needs_snapshot (bool), needs_kb (bool), needs_metrics (bool), " + "needs_general (bool), intent (short string), ambiguity (0-1)." +) + +ANGLE_PROMPT = ( + "Generate up to {max_angles} possible angles to answer the question. " + "Return JSON list of objects with: name, question, relevance (0-100)." +) + +CANDIDATE_PROMPT = ( + "Answer this angle using the provided context. " + "Keep it concise, 2-4 sentences. " + "If you infer, say 'Based on the snapshot'." +) + +SCORE_PROMPT = ( + "Score the candidate response. Return JSON with fields: " + "confidence (0-100), relevance (0-100), satisfaction (0-100), " + "hallucination_risk (low|medium|high)." +) + +SYNTHESIZE_PROMPT = ( + "Synthesize a final response from the best candidates. " + "Use a natural, helpful tone with light reasoning. " + "Avoid lists unless the user asked for lists." +) + +STOCK_SYSTEM = ( + "You are Atlas, a helpful assistant. " + "Be concise and truthful. " + "If unsure, say so." +) diff --git a/atlasbot/logging.py b/atlasbot/logging.py new file mode 100644 index 0000000..aff3867 --- /dev/null +++ b/atlasbot/logging.py @@ -0,0 +1,29 @@ +import json +import logging +import sys +from datetime import datetime, timezone + + +class JsonFormatter(logging.Formatter): + def format(self, record: logging.LogRecord) -> str: + payload = { + "timestamp": datetime.now(timezone.utc).isoformat(), + "level": record.levelname.lower(), + "logger": record.name, + "message": record.getMessage(), + } + extras = getattr(record, "extra", None) + if isinstance(extras, dict): + payload.update(extras) + if record.exc_info: + payload["exc_info"] = self.formatException(record.exc_info) + return json.dumps(payload, ensure_ascii=True) + + +def configure_logging(level: str = "INFO") -> None: + root = logging.getLogger() + root.setLevel(level.upper()) + handler = logging.StreamHandler(sys.stdout) + handler.setFormatter(JsonFormatter()) + root.handlers.clear() + root.addHandler(handler) diff --git a/atlasbot/main.py b/atlasbot/main.py new file mode 100644 index 0000000..c800391 --- /dev/null +++ b/atlasbot/main.py @@ -0,0 +1,73 @@ +import asyncio +import logging + +import uvicorn + +from atlasbot.api.http import Api +from atlasbot.config import load_settings +from atlasbot.engine.answerer import AnswerEngine, AnswerResult, AnswerScores +from atlasbot.knowledge.loader import KnowledgeBase +from atlasbot.llm.client import LLMClient +from atlasbot.logging import configure_logging +from atlasbot.matrix.bot import MatrixBot +from atlasbot.queue.nats import QueueManager +from atlasbot.snapshot.builder import SnapshotProvider + +log = logging.getLogger(__name__) + + +def _build_engine(settings) -> AnswerEngine: + kb = KnowledgeBase(settings.kb_dir) + snapshot = SnapshotProvider(settings) + llm = LLMClient(settings) + return AnswerEngine(settings, llm, kb, snapshot) + + +async def main() -> None: + settings = load_settings() + configure_logging("INFO") + + engine = _build_engine(settings) + + async def handler(payload: dict[str, str]) -> dict[str, object]: + result = await engine.answer(payload.get("question", ""), mode=payload.get("mode", "quick")) + return {"reply": result.reply, "scores": result.scores.__dict__} + + queue = QueueManager(settings, handler) + await queue.start() + + async def answer_handler(question: str, mode: str, observer=None) -> AnswerResult: + if settings.queue_enabled: + payload = await queue.submit({"question": question, "mode": mode}) + reply = payload.get("reply", "") if isinstance(payload, dict) else "" + return AnswerResult(reply=reply or "", scores=result_scores(payload), meta={"mode": mode}) + return await engine.answer(question, mode=mode, observer=observer) + + api = Api(settings, answer_handler) + server = uvicorn.Server(uvicorn.Config(api.app, host="0.0.0.0", port=settings.http_port, log_level="info")) + + tasks = [] + if settings.bot_user and settings.bot_pass: + tasks.append(asyncio.create_task(MatrixBot(settings, engine, answer_handler).run())) + + tasks.append(asyncio.create_task(server.serve())) + await asyncio.gather(*tasks) + + +def result_scores(payload: dict[str, object]) -> AnswerScores: + scores = payload.get("scores") if isinstance(payload, dict) else None + if isinstance(scores, dict): + try: + return AnswerScores( + confidence=int(scores.get("confidence", 60)), + relevance=int(scores.get("relevance", 60)), + satisfaction=int(scores.get("satisfaction", 60)), + hallucination_risk=str(scores.get("hallucination_risk", "medium")), + ) + except Exception: + pass + return AnswerScores(confidence=60, relevance=60, satisfaction=60, hallucination_risk="medium") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/atlasbot/matrix/__init__.py b/atlasbot/matrix/__init__.py new file mode 100644 index 0000000..47204f0 --- /dev/null +++ b/atlasbot/matrix/__init__.py @@ -0,0 +1 @@ +"""Matrix bot.""" diff --git a/atlasbot/matrix/bot.py b/atlasbot/matrix/bot.py new file mode 100644 index 0000000..083c01d --- /dev/null +++ b/atlasbot/matrix/bot.py @@ -0,0 +1,169 @@ +import asyncio +import logging +import time +from typing import Any + +import httpx + +from atlasbot.config import Settings +from collections.abc import Awaitable, Callable + +from atlasbot.engine.answerer import AnswerEngine, AnswerResult + +log = logging.getLogger(__name__) + + +class MatrixClient: + def __init__(self, settings: Settings) -> None: + self._settings = settings + + async def login(self) -> str: + payload = { + "type": "m.login.password", + "identifier": {"type": "m.id.user", "user": self._settings.bot_user}, + "password": self._settings.bot_pass, + } + url = f"{self._settings.auth_base}/_matrix/client/v3/login" + async with httpx.AsyncClient(timeout=15.0) as client: + resp = await client.post(url, json=payload) + resp.raise_for_status() + data = resp.json() + return data.get("access_token", "") + + async def resolve_room(self, token: str) -> str: + url = f"{self._settings.matrix_base}/_matrix/client/v3/directory/room/{self._settings.room_alias}" + headers = {"Authorization": f"Bearer {token}"} + async with httpx.AsyncClient(timeout=15.0) as client: + resp = await client.get(url, headers=headers) + resp.raise_for_status() + data = resp.json() + return data.get("room_id", "") + + async def join_room(self, token: str, room_id: str) -> None: + url = f"{self._settings.matrix_base}/_matrix/client/v3/rooms/{room_id}/join" + headers = {"Authorization": f"Bearer {token}"} + async with httpx.AsyncClient(timeout=15.0) as client: + await client.post(url, headers=headers) + + async def send_message(self, token: str, room_id: str, text: str) -> None: + url = f"{self._settings.matrix_base}/_matrix/client/v3/rooms/{room_id}/send/m.room.message" + headers = {"Authorization": f"Bearer {token}"} + payload = {"msgtype": "m.text", "body": text} + async with httpx.AsyncClient(timeout=15.0) as client: + await client.post(url, json=payload, headers=headers) + + async def sync(self, token: str, since: str | None) -> dict[str, Any]: + base = f"{self._settings.matrix_base}/_matrix/client/v3/sync" + params = {"timeout": 30000} + if since: + params["since"] = since + headers = {"Authorization": f"Bearer {token}"} + async with httpx.AsyncClient(timeout=40.0) as client: + resp = await client.get(base, headers=headers, params=params) + resp.raise_for_status() + return resp.json() + + +class MatrixBot: + def __init__( + self, + settings: Settings, + engine: AnswerEngine, + answer_handler: Callable[[str, str, Callable[[str, str], None] | None], Awaitable[AnswerResult]] | None = None, + ) -> None: + self._settings = settings + self._engine = engine + self._client = MatrixClient(settings) + self._answer_handler = answer_handler + + async def run(self) -> None: + token = await self._client.login() + room_id = await self._client.resolve_room(token) + if room_id: + await self._client.join_room(token, room_id) + since = None + while True: + try: + payload = await self._client.sync(token, since) + since = payload.get("next_batch") + await self._handle_sync(token, payload) + except Exception as exc: + log.warning("matrix sync failed", extra={"extra": {"error": str(exc)}}) + await asyncio.sleep(5) + + async def _handle_sync(self, token: str, payload: dict[str, Any]) -> None: + rooms = payload.get("rooms") or {} + joins = rooms.get("join") or {} + for room_id, room_data in joins.items(): + events = (room_data.get("timeline") or {}).get("events") or [] + for event in events: + if not isinstance(event, dict): + continue + if event.get("type") != "m.room.message": + continue + content = event.get("content") or {} + body = content.get("body") or "" + sender = event.get("sender") or "" + if sender.endswith(f"/{self._settings.bot_user}") or sender == self._settings.bot_user: + continue + mode, question = _extract_mode(body, self._settings.bot_mentions) + if not question: + continue + await self._client.send_message(token, room_id, "Thinking…") + await self._answer_with_heartbeat(token, room_id, question, mode) + + async def _answer_with_heartbeat(self, token: str, room_id: str, question: str, mode: str) -> None: + latest = {"stage": "", "note": ""} + stop = asyncio.Event() + + def observer(stage: str, note: str) -> None: + latest["stage"] = stage + latest["note"] = note + + async def heartbeat() -> None: + while not stop.is_set(): + await asyncio.sleep(self._settings.thinking_interval_sec) + if stop.is_set(): + break + note = (latest.get("note") or "thinking").strip() + snippet = note[:32] + msg = f"Still thinking ({snippet})…" + await self._client.send_message(token, room_id, msg) + + task = asyncio.create_task(heartbeat()) + started = time.monotonic() + try: + handler = self._answer_handler or (lambda q, m, obs: self._engine.answer(q, mode=m, observer=obs)) + result = await handler(question, mode, observer) + elapsed = time.monotonic() - started + await self._client.send_message(token, room_id, result.reply) + log.info( + "matrix_answer", + extra={ + "extra": { + "mode": mode, + "seconds": round(elapsed, 2), + "scores": result.scores.__dict__, + } + }, + ) + finally: + stop.set() + task.cancel() + + +def _extract_mode(body: str, mentions: tuple[str, ...]) -> tuple[str, str]: + lower = body.lower() + for mention in mentions: + if mention and mention.lower() in lower: + mode = "quick" + if "atlas-smart" in lower or "smart" in lower: + mode = "smart" + if "atlas-quick" in lower or "quick" in lower: + mode = "quick" + cleaned = body + for tag in mentions: + cleaned = cleaned.replace(tag, "") + cleaned = cleaned.replace(tag.capitalize(), "") + return mode, cleaned.strip() + return ("", "") diff --git a/atlasbot/queue/__init__.py b/atlasbot/queue/__init__.py new file mode 100644 index 0000000..4f2bd59 --- /dev/null +++ b/atlasbot/queue/__init__.py @@ -0,0 +1 @@ +"""Queue integrations.""" diff --git a/atlasbot/queue/nats.py b/atlasbot/queue/nats.py new file mode 100644 index 0000000..51fe53f --- /dev/null +++ b/atlasbot/queue/nats.py @@ -0,0 +1,88 @@ +import asyncio +import json +import logging +from typing import Any, Awaitable, Callable + +from nats.aio.client import Client as NATS +from nats.js.errors import NotFoundError + +from atlasbot.config import Settings + +log = logging.getLogger(__name__) + + +class QueueManager: + def __init__(self, settings: Settings, handler: Callable[[dict[str, Any]], Awaitable[dict[str, Any]]]) -> None: + self._settings = settings + self._handler = handler + self._nc: NATS | None = None + self._js = None + self._worker_task: asyncio.Task | None = None + + async def start(self) -> None: + if not self._settings.queue_enabled: + return + self._nc = NATS() + await self._nc.connect(self._settings.nats_url) + self._js = self._nc.jetstream() + await self._ensure_stream() + self._worker_task = asyncio.create_task(self._worker_loop()) + + async def stop(self) -> None: + if self._worker_task: + self._worker_task.cancel() + if self._nc: + await self._nc.drain() + + async def submit(self, payload: dict[str, Any]) -> dict[str, Any]: + if not self._settings.queue_enabled: + return await self._handler(payload) + if not self._nc or not self._js: + raise RuntimeError("queue not initialized") + reply = self._nc.new_inbox() + sub = await self._nc.subscribe(reply) + await self._js.publish(self._settings.nats_subject, json.dumps(payload).encode(), reply=reply) + msg = await sub.next_msg(timeout=300) + await sub.unsubscribe() + return json.loads(msg.data.decode()) + + async def _ensure_stream(self) -> None: + assert self._js is not None + try: + await self._js.stream_info(self._settings.nats_stream) + except NotFoundError: + await self._js.add_stream( + name=self._settings.nats_stream, + subjects=[self._settings.nats_subject], + retention="workqueue", + max_msgs=10000, + max_bytes=50 * 1024 * 1024, + ) + + async def _worker_loop(self) -> None: + assert self._js is not None + sub = await self._js.pull_subscribe(self._settings.nats_subject, durable="atlasbot-worker") + while True: + try: + msgs = await sub.fetch(1, timeout=1) + except Exception: + await asyncio.sleep(0.2) + continue + for msg in msgs: + await self._handle_message(msg) + + async def _handle_message(self, msg) -> None: + try: + payload = json.loads(msg.data.decode()) + except Exception: + await msg.ack() + return + reply = msg.reply + try: + result = await self._handler(payload) + if reply and self._nc: + await self._nc.publish(reply, json.dumps(result).encode()) + except Exception as exc: + log.warning("queue handler failed", extra={"extra": {"error": str(exc)}}) + finally: + await msg.ack() diff --git a/atlasbot/snapshot/__init__.py b/atlasbot/snapshot/__init__.py new file mode 100644 index 0000000..0637f91 --- /dev/null +++ b/atlasbot/snapshot/__init__.py @@ -0,0 +1 @@ +"""Snapshot helpers.""" diff --git a/atlasbot/snapshot/builder.py b/atlasbot/snapshot/builder.py new file mode 100644 index 0000000..a103630 --- /dev/null +++ b/atlasbot/snapshot/builder.py @@ -0,0 +1,164 @@ +import json +import logging +import time +from typing import Any + +import httpx + +from atlasbot.config import Settings + +log = logging.getLogger(__name__) + + +class SnapshotProvider: + def __init__(self, settings: Settings) -> None: + self._settings = settings + self._cache: dict[str, Any] = {} + self._cache_ts = 0.0 + + def _cache_valid(self) -> bool: + return time.monotonic() - self._cache_ts < max(5, self._settings.snapshot_ttl_sec) + + def get(self) -> dict[str, Any] | None: + if self._cache and self._cache_valid(): + return self._cache + if not self._settings.ariadne_state_url: + return self._cache or None + headers = {} + if self._settings.ariadne_state_token: + headers["x-internal-token"] = self._settings.ariadne_state_token + try: + resp = httpx.get(self._settings.ariadne_state_url, headers=headers, timeout=10.0) + resp.raise_for_status() + payload = resp.json() + if isinstance(payload, dict): + self._cache = payload + self._cache_ts = time.monotonic() + return payload + except Exception as exc: + log.warning("snapshot fetch failed", extra={"extra": {"error": str(exc)}}) + return self._cache or None + + +def _node_usage_top(series: list[dict[str, Any]]) -> dict[str, Any] | None: + best = None + for entry in series or []: + if not isinstance(entry, dict): + continue + node = entry.get("node") + value = entry.get("value") + try: + numeric = float(value) + except (TypeError, ValueError): + continue + if best is None or numeric > best["value"]: + best = {"node": node, "value": numeric} + return best + + +def build_summary(snapshot: dict[str, Any] | None) -> dict[str, Any]: + if not snapshot: + return {} + nodes_detail = _nodes_detail(snapshot) + metrics = _metrics(snapshot) + summary: dict[str, Any] = {} + + summary.update(_build_nodes(snapshot)) + summary.update(_build_hardware(nodes_detail)) + summary.update(_build_pods(metrics)) + summary.update(_build_postgres(metrics)) + summary.update(_build_hottest(metrics)) + summary.update(_build_workloads(snapshot)) + summary.update(_build_flux(snapshot)) + return summary + + +def _nodes_detail(snapshot: dict[str, Any]) -> list[dict[str, Any]]: + items = snapshot.get("nodes_detail") + return items if isinstance(items, list) else [] + + +def _metrics(snapshot: dict[str, Any]) -> dict[str, Any]: + metrics = snapshot.get("metrics") + return metrics if isinstance(metrics, dict) else {} + + +def _build_nodes(snapshot: dict[str, Any]) -> dict[str, Any]: + nodes_summary = snapshot.get("nodes_summary") if isinstance(snapshot.get("nodes_summary"), dict) else {} + if not nodes_summary: + return {} + return { + "nodes": { + "total": nodes_summary.get("total"), + "ready": nodes_summary.get("ready"), + "not_ready": nodes_summary.get("not_ready"), + } + } + + +def _build_hardware(nodes_detail: list[dict[str, Any]]) -> dict[str, Any]: + hardware: dict[str, list[str]] = {} + for node in nodes_detail or []: + if not isinstance(node, dict): + continue + name = node.get("name") + hardware_class = node.get("hardware") or "unknown" + if name: + hardware.setdefault(hardware_class, []).append(name) + if not hardware: + return {} + return {"hardware": {key: sorted(value) for key, value in hardware.items()}} + + +def _build_pods(metrics: dict[str, Any]) -> dict[str, Any]: + pods = { + "running": metrics.get("pods_running"), + "pending": metrics.get("pods_pending"), + "failed": metrics.get("pods_failed"), + "succeeded": metrics.get("pods_succeeded"), + } + if not any(value is not None for value in pods.values()): + return {} + return {"pods": pods} + + +def _build_postgres(metrics: dict[str, Any]) -> dict[str, Any]: + postgres = metrics.get("postgres_connections") if isinstance(metrics.get("postgres_connections"), dict) else {} + if not postgres: + return {} + return { + "postgres": { + "used": postgres.get("used"), + "max": postgres.get("max"), + "hottest_db": postgres.get("hottest_db"), + } + } + + +def _build_hottest(metrics: dict[str, Any]) -> dict[str, Any]: + node_usage = metrics.get("node_usage") if isinstance(metrics.get("node_usage"), dict) else {} + hottest: dict[str, Any] = {} + for key in ("cpu", "ram", "net", "io"): + top = _node_usage_top(node_usage.get(key, [])) + if top: + hottest[key] = top + if not hottest: + return {} + return {"hottest": hottest} + + +def _build_workloads(snapshot: dict[str, Any]) -> dict[str, Any]: + workloads = snapshot.get("workloads") if isinstance(snapshot.get("workloads"), list) else [] + return {"workloads": workloads} + + +def _build_flux(snapshot: dict[str, Any]) -> dict[str, Any]: + flux = snapshot.get("flux") if isinstance(snapshot.get("flux"), dict) else {} + return {"flux": flux} + + +def summary_text(snapshot: dict[str, Any] | None) -> str: + summary = build_summary(snapshot) + if not summary: + return "" + return json.dumps(summary, ensure_ascii=True, separators=(",", ":")) diff --git a/requirements-dev.txt b/requirements-dev.txt new file mode 100644 index 0000000..4dd4f74 --- /dev/null +++ b/requirements-dev.txt @@ -0,0 +1,4 @@ +pytest==8.3.5 +pytest-mock==3.14.0 +slipcover==1.0.17 +ruff==0.14.13 diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..a89a8fd --- /dev/null +++ b/requirements.txt @@ -0,0 +1,7 @@ +fastapi==0.115.11 +uvicorn==0.30.6 +httpx==0.27.2 +pydantic==2.12.5 +nats-py==2.7.2 +prometheus-client==0.21.1 +PyYAML==6.0.2 diff --git a/scripts/publish_test_metrics.py b/scripts/publish_test_metrics.py new file mode 100644 index 0000000..d58acd7 --- /dev/null +++ b/scripts/publish_test_metrics.py @@ -0,0 +1,59 @@ +import os +import time +import xml.etree.ElementTree as ET + +import httpx + +VM_URL = os.getenv("VM_PUSH_URL", "http://victoria-metrics-single-server.monitoring.svc.cluster.local:8428/api/v1/import/prometheus") +JUNIT_PATH = os.getenv("JUNIT_PATH", "build/junit.xml") +COVERAGE_PATH = os.getenv("COVERAGE_PATH", "build/coverage.json") +JOB = os.getenv("TEST_JOB", "atlasbot-tests") + + +def _load_junit(path: str) -> dict[str, float]: + tree = ET.parse(path) + root = tree.getroot() + tests = int(root.attrib.get("tests", "0")) + failures = int(root.attrib.get("failures", "0")) + errors = int(root.attrib.get("errors", "0")) + time_sec = float(root.attrib.get("time", "0")) + return { + "tests": tests, + "failures": failures, + "errors": errors, + "time": time_sec, + } + + +def _load_coverage(path: str) -> float: + try: + import json + + data = json.load(open(path)) + total = data.get("summary", {}).get("percent_covered", 0) + return float(total) / 100.0 + except Exception: + return 0.0 + + +def _format_metric(name: str, value: float) -> str: + return f"{name}{{job=\"{JOB}\"}} {value} {int(time.time())}" + + +def main() -> None: + junit = _load_junit(JUNIT_PATH) + coverage = _load_coverage(COVERAGE_PATH) + lines = [ + _format_metric("atlasbot_tests_total", junit["tests"]), + _format_metric("atlasbot_tests_failed", junit["failures"]), + _format_metric("atlasbot_tests_errors", junit["errors"]), + _format_metric("atlasbot_tests_time_seconds", junit["time"]), + _format_metric("atlasbot_coverage_ratio", coverage), + ] + body = "\n".join(lines) + "\n" + httpx.post(VM_URL, content=body, timeout=10.0) + print("metrics push complete") + + +if __name__ == "__main__": + main() diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..17cdbe1 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,6 @@ +from pathlib import Path +import sys + +ROOT = Path(__file__).resolve().parents[1] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) diff --git a/tests/test_engine.py b/tests/test_engine.py new file mode 100644 index 0000000..e65bf33 --- /dev/null +++ b/tests/test_engine.py @@ -0,0 +1,72 @@ +import asyncio + +from atlasbot.engine.answerer import AnswerEngine +from atlasbot.knowledge.loader import KnowledgeBase +from atlasbot.snapshot.builder import SnapshotProvider +from atlasbot.config import Settings + + +class FakeLLM: + def __init__(self, replies: list[str]) -> None: + self._replies = replies + self.calls: list[str] = [] + + async def chat(self, messages, *, model=None): + self.calls.append(model or "") + return self._replies.pop(0) + + +def _settings() -> Settings: + return Settings( + matrix_base="", + auth_base="", + bot_user="", + bot_pass="", + room_alias="", + server_name="", + bot_mentions=(), + ollama_url="", + ollama_model="base", + ollama_model_fast="fast", + ollama_model_smart="smart", + ollama_fallback_model="", + ollama_timeout_sec=1.0, + ollama_retries=0, + ollama_api_key="", + http_port=8090, + internal_token="", + kb_dir="", + vm_url="", + ariadne_state_url="", + ariadne_state_token="", + snapshot_ttl_sec=30, + thinking_interval_sec=30, + queue_enabled=False, + nats_url="", + nats_stream="", + nats_subject="", + nats_result_bucket="", + fast_max_angles=1, + smart_max_angles=1, + fast_max_candidates=1, + smart_max_candidates=1, + ) + + +def test_engine_answer_basic(): + llm = FakeLLM( + [ + '{"needs_snapshot": true}', + '[{"name":"primary","question":"What is Atlas?","relevance":90}]', + "Based on the snapshot, Atlas has 22 nodes.", + '{"confidence":80,"relevance":90,"satisfaction":85,"hallucination_risk":"low"}', + "Atlas has 22 nodes and is healthy.", + ] + ) + settings = _settings() + kb = KnowledgeBase("") + snapshot = SnapshotProvider(settings) + engine = AnswerEngine(settings, llm, kb, snapshot) + + result = asyncio.run(engine.answer("What is Atlas?", mode="quick")) + assert "Atlas has 22 nodes" in result.reply diff --git a/tests/test_snapshot.py b/tests/test_snapshot.py new file mode 100644 index 0000000..3fbe826 --- /dev/null +++ b/tests/test_snapshot.py @@ -0,0 +1,30 @@ +from atlasbot.snapshot.builder import build_summary + + +def test_build_summary_basic() -> None: + snapshot = { + "nodes_summary": {"total": 2, "ready": 2, "not_ready": 0}, + "nodes_detail": [ + {"name": "titan-01", "hardware": "rpi5"}, + {"name": "titan-02", "hardware": "amd64"}, + ], + "metrics": { + "pods_running": 10, + "pods_pending": 0, + "pods_failed": 0, + "pods_succeeded": 1, + "postgres_connections": {"used": 5, "max": 100, "hottest_db": {"label": "synapse", "value": 3}}, + "node_usage": { + "cpu": [{"node": "titan-01", "value": 30}], + "ram": [{"node": "titan-02", "value": 70}], + "net": [{"node": "titan-02", "value": 1.2}], + "io": [{"node": "titan-01", "value": 0.5}], + }, + }, + } + summary = build_summary(snapshot) + assert summary["nodes"]["total"] == 2 + assert "rpi5" in summary["hardware"] + assert summary["pods"]["running"] == 10 + assert summary["postgres"]["hottest_db"]["label"] == "synapse" + assert summary["hottest"]["ram"]["node"] == "titan-02"