"""Coverage-oriented tests for support modules and render helpers.""" from __future__ import annotations import asyncio import importlib import json import logging as pylogging from dataclasses import replace from pathlib import Path from types import SimpleNamespace from typing import Any import httpx import pytest from fastapi.testclient import TestClient from nats.js.errors import NotFoundError import atlasbot import atlasbot.api import atlasbot.engine import atlasbot.engine.answerer import atlasbot.knowledge import atlasbot.llm import atlasbot.matrix import atlasbot.queue import atlasbot.snapshot from atlasbot.api.http import Api, AnswerRequest, _extract_question from atlasbot.config import ( MatrixBotConfig, Settings, _env_bool, _env_float, _env_int, _load_matrix_bots, load_settings, ) import atlasbot.engine.answerer.common as answer_common import atlasbot.engine.answerer.factsheet as answer_factsheet import atlasbot.engine.answerer.post as answer_post import atlasbot.engine.answerer.post_ext as answer_post_ext import atlasbot.engine.answerer.retrieval as answer_retrieval import atlasbot.engine.answerer.retrieval_ext as answer_retrieval_ext import atlasbot.engine.answerer.spine as answer_spine from atlasbot.engine.answerer import AnswerResult, AnswerScores from atlasbot.engine.answerer._base import ClaimItem, ContradictionContext, EvidenceItem, InsightGuardInput, ScoreContext from atlasbot.engine.intent_router import route_intent from atlasbot.knowledge.loader import KnowledgeBase from atlasbot.llm.client import LLMClient, build_messages, parse_json from atlasbot.logging import JsonFormatter, configure_logging from atlasbot.main import result_scores from atlasbot.matrix.bot import MatrixBot, MatrixClient, _extract_mode, _mode_timeout_sec from atlasbot.queue.nats import QueueManager from atlasbot.snapshot.builder import core_a, core_b, format_a, format_b, format_c, summary_text from atlasbot.state.store import ClaimStore, _safe_json from testing.fakes import build_test_settings def _rich_snapshot() -> dict[str, Any]: return { "collected_at": "2026-04-10T12:00:00Z", "snapshot_version": "v1", "summary": { "signals": [ {"scope": "node", "target": "titan-01", "metric": "cpu", "current": 90, "delta_pct": 15, "severity": "warn"} ], "profiles": { "nodes": [ {"node": "titan-01", "load_index": 0.9, "cpu": 90, "ram": 80, "pods_total": 10, "hardware": "rpi5"} ], "namespaces": [ {"namespace": "synapse", "pods_total": 5, "cpu_usage": 40, "mem_usage": 50, "primary_node": "titan-01"} ], "workloads": [ {"namespace": "synapse", "workload": "matrix", "pods_total": 3, "pods_running": 3, "primary_node": "titan-01"} ], }, "inventory": {"workers": {"total": 2, "ready": 1}}, "topology": { "nodes": [{"name": "titan-01", "role": "worker"}], "workloads": [{"name": "matrix", "node": "titan-01"}], "namespaces": [{"name": "synapse", "pods": 5}], }, "lexicon": { "terms": [{"term": "atlas", "meaning": "Atlas cluster"}], "aliases": {"atlasbot": "atlas"}, }, "cross_stats": { "node_metric_top": [ { "metric": "cpu", "node": "titan-01", "value": 90, "cpu": 90, "ram": 80, "net": 2.5, "io": 1.5, "pods_total": 10, } ], "namespace_metric_top": [ { "metric": "cpu", "namespace": "synapse", "value": 40, "cpu": 40, "ram": 50, "net": 1.5, "io": 1.0, "pods_total": 5, } ], "pvc_top": [{"metric": "usage", "namespace": "synapse", "pvc": "data", "value": 95}], }, "baseline_deltas": { "nodes": { "cpu": [{"node": "titan-01", "delta": 10, "severity": "warn"}], "ram": [{"node": "titan-01", "delta": 5}], }, "namespaces": { "pods": [{"namespace": "synapse", "delta": 8, "severity": "high"}], }, }, "pod_issue_summary": { "waiting_reasons_top": [{"reason": "ImagePullBackOff", "count": 3}], "phase_reasons_top": [{"reason": "Pending", "count": 2}], "namespace_issue_top": {"waiting": [{"namespace": "synapse", "value": 2}]}, }, "trend_requests": {}, "pod_waiting_trends": {}, "pod_terminated_trends": {}, }, "nodes_summary": { "total": 2, "ready": 1, "not_ready": 1, "not_ready_names": ["titan-02"], "by_arch": {"rpi5": 1, "amd64": 1}, "by_role": {"worker": 2}, "workers": {"total": 2, "ready": 1}, "pressure_nodes": {"names": ["titan-02"]}, }, "nodes_detail": [ { "name": "titan-01", "hardware": "rpi5", "arch": "arm64", "os": "linux", "kubelet": "1.30", "kernel": "6.8", "container_runtime": "containerd", "is_worker": True, "roles": ["worker"], "age_hours": 12, "taints": [{"key": "dedicated", "effect": "NoSchedule"}], }, { "name": "titan-02", "hardware": "amd64", "arch": "amd64", "os": "linux", "kubelet": "1.30", "kernel": "6.8", "container_runtime": "containerd", "is_worker": True, "roles": ["worker"], "age_hours": 24, "taints": [{"key": "pressure", "effect": "NoExecute"}], }, ], "metrics": { "node_load": [ {"node": "titan-01", "load_index": 0.9, "cpu": 90, "ram": 80, "net": 100, "io": 50}, {"node": "titan-02", "load_index": 0.4, "cpu": 30, "ram": 20, "net": 10, "io": 5}, ], "pods_running": 12, "pods_pending": 1, "pods_failed": 2, "pods_succeeded": 3, "capacity_cpu": 8, "allocatable_cpu": 7, "capacity_mem_bytes": 8 * 1024 * 1024 * 1024, "allocatable_mem_bytes": 6 * 1024 * 1024 * 1024, "capacity_pods": 110, "allocatable_pods": 100, "namespace_cpu_top": [{"metric": {"namespace": "synapse"}, "value": 95}], "namespace_mem_top": [{"metric": {"namespace": "synapse"}, "value": 1024 * 1024}], "namespace_cpu_requests_top": [{"metric": {"namespace": "synapse"}, "value": 50}], "namespace_mem_requests_top": [{"metric": {"namespace": "synapse"}, "value": 2 * 1024 * 1024}], "namespace_net_top": [{"metric": {"namespace": "synapse"}, "value": 1024}], "namespace_io_top": [{"metric": {"namespace": "synapse"}, "value": 2048}], "pod_cpu_top": [{"metric": {"namespace": "synapse", "pod": "matrix"}, "value": 3.3}], "pod_cpu_top_node": [{"metric": {"namespace": "synapse", "pod": "matrix", "node": "titan-01"}, "value": 3.3}], "pod_mem_top": [{"metric": {"namespace": "synapse", "pod": "matrix"}, "value": 4096}], "pod_mem_top_node": [{"metric": {"namespace": "synapse", "pod": "matrix", "node": "titan-01"}, "value": 4096}], "top_restarts_1h": [{"metric": {"namespace": "synapse", "pod": "matrix"}, "value": [0, 4]}], "restart_namespace_top": [{"metric": {"namespace": "synapse"}, "value": 4}], "job_failures_24h": [{"metric": {"namespace": "synapse", "job_name": "backup"}, "value": 2}], "node_pods_top": [{"node": "titan-01", "pods_total": 5, "namespaces": [{"name": "synapse", "count": 3}]}], "postgres_connections": {"used": 5, "max": 10, "hottest_db": {"label": "synapse", "value": 3}}, "node_usage": { "cpu": [{"node": "titan-01", "value": 90}], "ram": [{"node": "titan-02", "value": 70}], "net": [{"node": "titan-02", "value": 2}], "io": [{"node": "titan-01", "value": 0.5}], "disk": [{"node": "titan-01", "value": 80}], }, "node_load_summary": { "top": [{"node": "titan-01", "load_index": 0.9, "cpu": 90, "ram": 80, "io": 1.5, "net": 2.5, "pods_total": 10}], "outliers": [{"node": "titan-02"}], }, "hardware_usage_avg": [ {"hardware": "rpi5", "load_index": 0.9, "cpu": 90, "ram": 80, "io": 1.5, "net": 2.5}, ], "namespace_capacity_summary": { "cpu_ratio_top": [ {"namespace": "synapse", "cpu_usage_ratio": 0.8, "cpu_usage": 40, "cpu_requests": 50} ], "mem_ratio_top": [ {"namespace": "synapse", "mem_usage_ratio": 0.7, "mem_usage": 70, "mem_requests": 100} ], "cpu_headroom_low": [{"namespace": "synapse", "headroom": 0.2}], "mem_headroom_low": [{"namespace": "synapse", "headroom": 0.3}], "cpu_overcommitted": 1, "mem_overcommitted": 0, "cpu_overcommitted_names": ["synapse"], "mem_overcommitted_names": [], }, "namespace_capacity": [{"namespace": "synapse", "cpu": 1, "mem": 2}], "units": {"cpu_pct": "%", "ram_pct": "%", "net": "bytes/s"}, "windows": {"rates": "5m", "restarts": "1h"}, }, "namespace_pods": [{"namespace": "synapse", "pods_total": 5, "pods_running": 4}], "namespace_nodes": [{"namespace": "synapse", "pods_total": 5, "primary_node": "titan-01"}], "node_pods": [{"node": "titan-01", "pods_total": 5, "namespaces": [{"name": "synapse", "count": 3}]}], "pod_issues": { "counts": {"Failed": 2, "Pending": 1, "Unknown": 0}, "top": [{"namespace": "synapse", "pod": "matrix", "phase": "Pending", "age_hours": 2}], "pending_oldest": [{"namespace": "synapse", "pod": "matrix", "age_hours": 2}], "waiting_reasons_top": [{"reason": "ImagePullBackOff", "count": 3}], "pending_over_15m": 1, "waiting_reasons": {"ImagePullBackOff": 3}, }, "workloads_health": { "deployments": {"ready": 2, "not_ready": 1, "desired": 3}, "statefulsets": {"ready": 1, "not_ready": 0, "desired": 1}, "daemonsets": {"ready": 1, "not_ready": 0, "desired": 1}, }, "events": { "warnings_top_reason": {"ImagePullBackOff": 3}, "warnings_latest": [{"reason": "FailedScheduling", "count": 2}], "warnings_total": 5, }, "jobs": { "totals": {"total": 4, "active": 1, "failed": 1, "succeeded": 2}, "failing": [{"namespace": "synapse", "job_name": "backup", "failed": 1}], "active_oldest": [{"namespace": "synapse", "job_name": "backup", "age_minutes": 30}], }, "postgres": { "used": 5, "max": 10, "hottest_db": {"label": "synapse", "value": 3}, "by_db": [{"label": "synapse", "value": 3}], }, "hottest": { "cpu": {"node": "titan-01", "value": 90}, "ram": {"node": "titan-02", "value": 70}, "net": {"node": "titan-02", "value": 2}, "io": {"node": "titan-01", "value": 0.5}, "disk": {"node": "titan-01", "value": 80}, }, "pvc_usage_top": [{"namespace": "synapse", "pvc": "data", "value": 95}], "root_disk_low_headroom": [{"node": "titan-01", "headroom_pct": 20, "used_pct": 80}], "longhorn": { "total": 2, "attached_count": 1, "detached_count": 1, "degraded_count": 0, "by_state": {"attached": 1, "detached": 1}, "by_robustness": {"healthy": 1, "degraded": 1}, "unhealthy": [{"name": "vol1", "state": "detached", "robustness": "degraded"}], }, "workloads": [{"namespace": "synapse", "name": "matrix", "pods_total": 3, "pods_running": 3}], "flux": { "ready": 1, "not_ready": 1, "items": [{"kind": "HelmRelease", "name": "matrix", "status": "Ready"}], }, } def test_package_imports() -> None: """Import package shims so their `__init__` modules stay covered.""" importlib.import_module("atlasbot") importlib.import_module("atlasbot.api") importlib.import_module("atlasbot.engine") importlib.import_module("atlasbot.engine.answerer") importlib.import_module("atlasbot.knowledge") importlib.import_module("atlasbot.llm") importlib.import_module("atlasbot.matrix") importlib.import_module("atlasbot.queue") importlib.import_module("atlasbot.snapshot") assert atlasbot.snapshot.__name__ == "atlasbot.snapshot" def test_config_helpers_and_load_settings(monkeypatch: pytest.MonkeyPatch) -> None: """Exercise config parsing branches and matrix bot loading.""" monkeypatch.setenv("BOOL_ONE", "yes") monkeypatch.setenv("INT_BAD", "nope") monkeypatch.setenv("FLOAT_BAD", "nope") assert _env_bool("BOOL_ONE") assert _env_int("INT_BAD", "7") == 7 assert _env_float("FLOAT_BAD", "2.5") == 2.5 monkeypatch.setenv("BOT_USER_QUICK", "quick") monkeypatch.setenv("BOT_PASS_QUICK", "pw") monkeypatch.setenv("BOT_USER_SMART", "smart") monkeypatch.setenv("BOT_PASS_SMART", "pw") settings = load_settings() assert settings.matrix_bots[0].mode == "quick" assert settings.matrix_bots[1].mode == "smart" monkeypatch.delenv("BOT_USER_QUICK", raising=False) monkeypatch.delenv("BOT_PASS_QUICK", raising=False) monkeypatch.delenv("BOT_USER_SMART", raising=False) monkeypatch.delenv("BOT_PASS_SMART", raising=False) monkeypatch.setenv("BOT_USER", "atlasbot") monkeypatch.setenv("BOT_PASS", "legacy") legacy = _load_matrix_bots(("atlasbot",)) assert legacy and legacy[0].mode == "" def test_knowledge_base_helpers(tmp_path: Path, caplog: pytest.LogCaptureFixture) -> None: """Read KB data, titles, paths, and prompt chunks from a temp catalog.""" base = tmp_path / "kb" catalog = base / "catalog" catalog.mkdir(parents=True) (catalog / "atlas.json").write_text( json.dumps({"cluster": "titan", "sources": [{"name": "docs"}], "extra": True}), encoding="utf-8", ) (catalog / "runbooks.json").write_text(json.dumps([{"title": "Fix", "path": "runbooks/fix.md"}]), encoding="utf-8") (base / "notes.md").write_text("hello atlas", encoding="utf-8") kb = KnowledgeBase(str(base)) assert "Cluster: titan." in kb.summary() assert "Relevant runbooks" in kb.runbook_titles(limit=1) assert kb.runbook_paths() == ["runbooks/fix.md"] assert kb.chunk_lines(max_files=1, max_chars=200) bad = base / "bad" bad.mkdir() (bad / "catalog").mkdir() (bad / "catalog" / "atlas.json").write_text("{broken", encoding="utf-8") broken = KnowledgeBase(str(bad)) with caplog.at_level(pylogging.WARNING): assert broken.summary() == "" def test_llm_client_helpers_and_fallback(monkeypatch: pytest.MonkeyPatch) -> None: """Exercise message building, JSON parsing, and fallback model logic.""" settings = replace( build_test_settings(), ollama_url="http://example", ollama_model="base", ollama_fallback_model="fallback", ollama_retries=1, ) client = LLMClient(settings) assert client._endpoint().endswith("/api/chat") assert build_messages("sys", "prompt", context="ctx")[1]["content"].startswith("Context") assert parse_json("{\"ok\": true}", fallback={}) == {"ok": True} class FakeResponse: def __init__(self, status_code: int, payload: dict[str, Any]): self.status_code = status_code self._payload = payload def raise_for_status(self) -> None: if self.status_code >= 400: raise httpx.HTTPStatusError("bad", request=httpx.Request("POST", "http://example"), response=httpx.Response(self.status_code)) def json(self) -> dict[str, Any]: return self._payload class FakeAsyncClient: def __init__(self, timeout: float | None = None): self.timeout = timeout async def __aenter__(self) -> FakeAsyncClient: return self async def __aexit__(self, *exc: object) -> None: return None async def post( self, _url: str, *, json: dict[str, Any], headers: dict[str, str], ) -> FakeResponse: model = json["model"] assert headers["Content-Type"] == "application/json" if model == "base": return FakeResponse(404, {}) return FakeResponse(200, {"message": {"content": "hello"}}) monkeypatch.setattr(httpx, "AsyncClient", FakeAsyncClient) reply = asyncio.run(client.chat([{"role": "user", "content": "hi"}], model=None, timeout_sec=1.0)) assert reply == "hello" def test_logging_formatter_and_configure() -> None: """Format a structured record and install JSON logging on the root logger.""" formatter = JsonFormatter() record = pylogging.LogRecord("atlasbot", pylogging.INFO, __file__, 1, "hello %s", ("world",), None) record.extra = {"mode": "quick"} payload = json.loads(formatter.format(record)) assert payload["message"] == "hello world" assert payload["mode"] == "quick" configure_logging("debug") root = pylogging.getLogger() assert root.handlers and isinstance(root.handlers[0].formatter, JsonFormatter) def test_state_store_roundtrip_and_cleanup(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: """Persist, read, and expire a claim payload.""" path = tmp_path / "state.db" store = ClaimStore(str(path), 60) store.set( "conv", { "snapshot_id": "snap-1", "claims": [{"id": "c1"}], "snapshot": {"nodes": 1}, }, ) payload = store.get("conv") assert payload and payload["snapshot_id"] == "snap-1" assert payload["claims"] == [{"id": "c1"}] assert _safe_json("{broken", []) == [] monkeypatch.setattr("atlasbot.state.store.time.monotonic", lambda: 1_000_000.0) store.cleanup() assert store.get("conv") is None @pytest.mark.parametrize( ("question", "kind"), [ ("How many nodes are ready?", "nodes_ready"), ("How many cluster nodes do we have?", "nodes_count"), ("Which nodes are not rpi?", "nodes_non_rpi"), ("What hardware mix do we have?", "hardware_mix"), ("What is the hottest cpu?", "hottest_cpu"), ("What is the hottest ram?", "hottest_ram"), ("How many postgres connections?", "postgres_connections"), ("Which postgres db is hottest?", "postgres_hottest"), ("Which namespace has most pods?", "namespace_most_pods"), ("Is there pressure on the nodes?", "pressure_summary"), ], ) def test_intent_router_patterns(question: str, kind: str) -> None: """Route the main cluster intents into deterministic matches.""" match = route_intent(question) assert match and match.kind == kind def test_api_routes_and_auth() -> None: """Exercise the HTTP wrapper, token check, and question extraction.""" settings = replace(build_test_settings(), internal_token="secret") async def handler( question: str, mode: str, _history: list[dict[str, str]] | None, _conversation_id: str | None, _snapshot_pin: bool | None, ) -> AnswerResult: return AnswerResult( reply=f"{question}:{mode}", scores=AnswerScores(confidence=1, relevance=2, satisfaction=3, hallucination_risk="low"), meta={"mode": mode}, ) api = Api(settings, handler) client = TestClient(api.app) assert client.get("/healthz").json() == {"ok": True} assert client.post("/v1/answer", json={"question": "hi"}).status_code == 401 assert _extract_question(AnswerRequest(prompt=" hello ")).strip() == "hello" response = client.post( "/v1/answer", headers={"X-Internal-Token": "secret"}, json={"prompt": "hello", "mode": "SMART", "conversation_id": "conv-1", "snapshot_pin": True}, ) assert response.status_code == 200 assert response.json()["reply"] == "hello:smart" def test_main_and_queue_and_matrix(monkeypatch: pytest.MonkeyPatch) -> None: """Run the bootstrap path and queueing branch without external services.""" from atlasbot import main as main_mod settings = replace( build_test_settings(), queue_enabled=True, matrix_bots=(MatrixBotConfig("bot", "pw", ("bot",), "quick"),), ) class FakeQueue: def __init__(self, settings: Settings, handler): self.settings = settings self.handler = handler self.started = False async def start(self) -> None: self.started = True async def submit(self, _payload: dict[str, Any]) -> dict[str, Any]: return { "reply": "queued", "scores": {"confidence": 7, "relevance": 8, "satisfaction": 9, "hallucination_risk": "low"}, } class FakeMatrixBot: def __init__(self, _settings: Settings, _bot: MatrixBotConfig, _engine: Any, answer_handler): self.answer_handler = answer_handler async def run(self) -> None: result = await self.answer_handler("what is atlas?", "quick", [], "room-1", None) assert result.reply == "queued" class FakeServer: def __init__(self, config: Any): self.config = config async def serve(self) -> None: return None monkeypatch.setattr(main_mod, "load_settings", lambda: settings) monkeypatch.setattr(main_mod, "configure_logging", lambda _level: None) monkeypatch.setattr(main_mod, "QueueManager", FakeQueue) monkeypatch.setattr(main_mod, "MatrixBot", FakeMatrixBot) monkeypatch.setattr(main_mod.uvicorn, "Server", FakeServer) asyncio.run(main_mod.main()) scores = result_scores({"scores": {"confidence": 10, "relevance": 20, "satisfaction": 30, "hallucination_risk": "low"}}) assert scores.confidence == 10 def test_matrix_and_queue_and_snapshot_helpers(monkeypatch: pytest.MonkeyPatch) -> None: """Drive the Matrix client, queue manager, and snapshot renderers.""" settings = replace(build_test_settings(), matrix_bots=()) bot_cfg = MatrixBotConfig("bot", "pw", ("bot",), "quick") class FakeResp: def __init__(self, payload: dict[str, Any], status_code: int = 200): self._payload = payload self.status_code = status_code def raise_for_status(self) -> None: if self.status_code >= 400: raise httpx.HTTPError("bad") def json(self) -> dict[str, Any]: return self._payload class FakeAsyncClient: def __init__(self, timeout: float | None = None): self.timeout = timeout async def __aenter__(self) -> "FakeAsyncClient": return self async def __aexit__(self, *exc: object) -> None: return None async def post(self, url: str, json: dict[str, Any] | None = None, headers: dict[str, str] | None = None) -> FakeResp: if "login" in url: return FakeResp({"access_token": "tok"}) return FakeResp({}) async def get(self, url: str, headers: dict[str, str] | None = None, params: dict[str, Any] | None = None) -> FakeResp: if "directory/room" in url: return FakeResp({"room_id": "!room"}) return FakeResp({"next_batch": "n1", "rooms": {"join": {}}}) monkeypatch.setattr("atlasbot.matrix.bot.httpx.AsyncClient", FakeAsyncClient) client = MatrixClient(settings, bot_cfg) token = asyncio.run(client.login()) assert token == "tok" assert asyncio.run(client.resolve_room(token)) == "!room" asyncio.run(client.join_room(token, "!room")) asyncio.run(client.send_message(token, "!room", "hello")) assert asyncio.run(client.sync(token, None))["next_batch"] == "n1" mode, cleaned = _extract_mode("atlas-smart hello", ("atlas",), "") assert mode == "smart" assert cleaned == "-smart hello" assert _mode_timeout_sec(settings, "smart") == settings.smart_time_budget_sec class FakeSub: async def next_msg(self, timeout: float) -> Any: return SimpleNamespace(data=json.dumps({"reply": "ok"}).encode(), reply="reply") async def unsubscribe(self) -> None: return None class FakeMsg: def __init__(self) -> None: self.data = json.dumps({"payload": {"question": "q"}}).encode() self.reply = "reply" self.acked = False async def ack(self) -> None: self.acked = True class FakeJS: def __init__(self) -> None: self.streams = [] async def stream_info(self, stream: str) -> None: raise NotFoundError async def add_stream(self, **kwargs: Any) -> None: self.streams.append(kwargs) async def publish(self, subject: str, data: bytes) -> None: self.streams.append({"subject": subject, "data": data}) async def pull_subscribe(self, subject: str, durable: str) -> Any: class Pull: async def fetch(self, count: int, timeout: float) -> list[FakeMsg]: raise RuntimeError("stop") return Pull() class FakeNATS: def __init__(self) -> None: self.published = [] async def connect(self, url: str) -> None: return None def jetstream(self) -> FakeJS: return FakeJS() def new_inbox(self) -> str: return "inbox" async def subscribe(self, reply: str) -> FakeSub: return FakeSub() async def publish(self, reply: str, data: bytes) -> None: self.published.append((reply, data)) async def drain(self) -> None: return None monkeypatch.setattr("atlasbot.queue.nats.NATS", FakeNATS) queue_settings = replace(settings, queue_enabled=True, nats_stream="atlasbot", nats_subject="atlasbot.requests") qm = QueueManager(queue_settings, lambda payload: asyncio.sleep(0, result={"reply": "x"})) asyncio.run(QueueManager(replace(queue_settings, queue_enabled=False), lambda payload: asyncio.sleep(0, result=payload)).start()) asyncio.run(qm.start()) assert asyncio.run(qm.submit({"mode": "quick"})) == {"reply": "ok"} assert asyncio.run(qm.submit({"mode": "genius"})) == {"reply": "ok"} class LoopPull: def __init__(self) -> None: self.calls = 0 async def fetch(self, count: int, timeout: float) -> list[FakeMsg]: del count, timeout self.calls += 1 if self.calls == 1: raise RuntimeError("retry") if self.calls == 2: return [FakeMsg()] raise asyncio.CancelledError class LoopJS: async def pull_subscribe(self, subject: str, durable: str) -> LoopPull: del subject, durable return LoopPull() qm._js = LoopJS() with pytest.raises(asyncio.CancelledError): asyncio.run(qm._worker_loop()) asyncio.run(qm.stop()) snapshot = _rich_snapshot() summary = core_a.build_summary(snapshot) assert summary["nodes"]["total"] == 2 text = summary_text(snapshot) assert "atlas_cluster:" in text assert "hardware_usage_avg:" in text assert "signals:" in text assert "node_profiles:" in text assert "flux:" in text or "flux" in text lines: list[str] = [] format_a._append_nodes(lines, summary) format_a._append_hardware(lines, summary) format_a._append_hardware_groups(lines, summary) format_a._append_node_ages(lines, summary) format_a._append_node_taints(lines, summary) format_a._append_node_facts(lines, summary) format_a._append_pressure(lines, summary) format_a._append_pods(lines, summary) format_a._append_capacity(lines, summary) format_a._append_namespace_pods(lines, summary) format_a._append_namespace_nodes(lines, summary) format_a._append_node_pods(lines, summary) format_a._append_pod_issues(lines, summary) format_a._append_workload_health(lines, summary) format_a._append_node_usage_stats(lines, summary) format_a._append_events(lines, summary) format_a._append_pvc_usage(lines, summary) format_a._append_root_disk_headroom(lines, summary) format_b._append_longhorn(lines, summary) format_b._append_namespace_usage(lines, summary) format_b._append_namespace_requests(lines, summary) format_b._append_namespace_io_net(lines, summary) format_b._append_pod_usage(lines, summary) format_b._append_restarts(lines, summary) format_b._append_job_failures(lines, summary) format_b._append_jobs(lines, summary) format_b._append_postgres(lines, summary) format_b._append_hottest(lines, summary) format_b._append_workloads(lines, summary) format_b._append_topology(lines, summary) format_b._append_flux(lines, summary) format_c._append_signals(lines, summary) format_c._append_profiles(lines, summary) format_c._append_units_windows(lines, summary) format_c._append_node_load_summary(lines, summary) format_c._append_hardware_usage(lines, summary) format_c._append_cluster_watchlist(lines, summary) format_c._append_baseline_deltas(lines, summary) format_c._append_pod_issue_summary(lines, summary) format_c._append_workloads_by_namespace(lines, summary) format_c._append_lexicon(lines, summary) format_c._append_cross_stats(lines, summary) assert any(line.startswith("nodes:") for line in lines) assert any(line.startswith("longhorn:") for line in lines) assert any(line.startswith("signals:") for line in lines) core_b_summary = core_b._build_hottest(snapshot["metrics"]) assert core_b_summary["hottest"]["cpu"]["node"] == "titan-01" def test_matrix_bot_sync_and_heartbeat() -> None: """Drive the Matrix bot heartbeat and sync handlers with a fake client.""" settings = replace(build_test_settings(), thinking_interval_sec=0.001) bot_cfg = MatrixBotConfig("bot", "pw", ("atlas",), "quick") class FakeClient: def __init__(self) -> None: self.sent: list[str] = [] async def login(self) -> str: return "tok" async def resolve_room(self, token: str) -> str: return "!room" async def join_room(self, token: str, room_id: str) -> None: return None async def send_message(self, token: str, room_id: str, text: str) -> None: self.sent.append(text) async def sync(self, token: str, since: str | None) -> dict[str, Any]: return { "next_batch": "n1", "rooms": { "join": { "!room": { "timeline": { "events": [ {"type": "m.room.message", "sender": "user", "content": {"body": "atlas quick what is atlas?"}}, {"type": "m.room.message", "sender": "bot", "content": {"body": "ignored"}}, ] } } } }, } async def answer_handler(question: str, mode: str, history, conversation_id, observer): if observer: observer("stage", "working") return AnswerResult( reply="Atlas has 22 nodes", scores=AnswerScores(confidence=1, relevance=2, satisfaction=3, hallucination_risk="low"), meta={"mode": mode}, ) bot = MatrixBot(settings, bot_cfg, SimpleNamespace(answer=lambda *args, **kwargs: None), answer_handler) bot._client = FakeClient() asyncio.run(bot._answer_with_heartbeat("tok", "!room", "What is Atlas?", "quick")) payload = { "rooms": { "join": { "!room": { "timeline": { "events": [ {"type": "m.room.message", "sender": "user", "content": {"body": "atlas smart hello"}} ] } } } } } asyncio.run(bot._handle_sync("tok", payload)) assert bot._client.sent def test_answerer_helper_coverage_smoke() -> None: """Exercise the split answerer helpers with representative inputs.""" settings = build_test_settings() plan = answer_common._mode_plan(settings, "smart") fast_plan = replace(plan, parallelism=2, score_retries=2, chunk_group=1, chunk_top=2, max_subquestions=2) snapshot = _rich_snapshot() summary = core_a.build_summary(snapshot) summary_lines = answer_spine._summary_lines(snapshot) rich_lines = [ "nodes_total: 2", "nodes_ready: 1", "cluster_name: atlas", "pods_total: 3", "cpu: 90", "ram: 80", "runbooks/fix.md", ] class ScriptedLLM: async def __call__( self, _system: str, _prompt: str, *, context: str | None = None, model: str | None = None, tag: str = "", ) -> str: responses = { "chunk_score": '[{"id":"c1","score":1},{"id":"c2","score":2}]', "chunk_select": '{"selected_index": 1}', "metric_keys": '{"keys":["nodes_total","pods_total"]}', "metric_keys_validate": '{"missing":["pods_total"]}', "fact_types": '{"fact_types":["nodes_total","pods_total"]}', "fact_types_select": '{"best": 1}', "signals": '{"signals":["cpu","ram"]}', "signals_select": '{"best": 1}', "chunk_scan": '{"lines":["cpu: 90"]}', "chunk_scan_select": '{"best": 1}', "fact_prune": '{"lines":["cpu: 90"]}', "fact_prune_select": '{"best": 1}', "fact_select": '{"lines":["cpu: 90"]}', "fact_select_best": '{"best": 1}', "contradiction": '{"use_facts": false, "confidence": 99}', "insight_guard": '{"ok": false}', "insight_fix": "fixed insight", } return responses.get(tag, "{}") scripted_llm = ScriptedLLM() chunks = [ {"id": "c1", "text": "nodes_total: 2\npods_total: 3", "summary": "nodes"}, {"id": "c2", "text": "cpu: 90\nram: 80", "summary": "cpu"}, ] groups = answer_common._build_chunk_groups(chunks, 1) scores = asyncio.run(answer_common._score_chunks(scripted_llm, chunks, "How many nodes?", ["nodes"], fast_plan)) serial_ctx = ScoreContext(question="How many nodes?", sub_questions=["nodes"], retries=2, parallelism=1, select_best=True, fast_model="fast") serial_scores = asyncio.run(answer_common._score_groups_serial(scripted_llm, groups, serial_ctx)) parallel_ctx = ScoreContext(question="How many nodes?", sub_questions=["nodes"], retries=2, parallelism=2, select_best=True, fast_model="fast") parallel_scores = asyncio.run(answer_common._score_groups_parallel(scripted_llm, groups, parallel_ctx)) best_run = asyncio.run(answer_common._select_best_score_run(scripted_llm, groups[0], [{"c1": 1.0}, {"c1": 2.0}], serial_ctx)) selected = answer_common._select_chunks(chunks, {"c1": 0.2, "c2": 0.9}, replace(fast_plan, chunk_top=2), ["cpu"], ["c2"]) assert scores and serial_scores and parallel_scores and best_run and selected assert answer_common._strip_followup_meta("The draft is correct. Atlas is healthy.") == "Atlas is healthy." assert answer_common._llm_call_limit(settings, "smart") == settings.smart_llm_calls_max assert answer_common._mode_time_budget(settings, "quick") == settings.quick_time_budget_sec assert answer_common._select_subquestions([], "fallback", 2) == ["fallback"] assert answer_common._chunk_lines(["a", "b", "c"], 2) assert answer_common._raw_snapshot_chunks(snapshot) assert answer_common._format_runbooks(["runbooks/fix.md"]) assert answer_common._keyword_hits([{"text": "cpu usage"}], {"text": "cpu usage"}, ["cpu"]) assert answer_factsheet._factsheet_kb_chars("quick", 10) assert answer_factsheet._factsheet_line_limit("smart") >= 1 assert answer_factsheet._factsheet_instruction("quick") assert answer_factsheet._factsheet_model("genius", fast_plan) == fast_plan.model assert answer_factsheet._is_plain_math_question("2+2") assert answer_factsheet._quick_fact_sheet_lines("How many nodes?", rich_lines, ["kb"], limit=4) assert answer_factsheet._quick_fact_sheet_text(["nodes_total: 2"]) assert answer_factsheet._quick_fact_sheet_heuristic_answer("How many ready nodes?", ["nodes_total:2,ready:1,not_ready:0"]) assert answer_factsheet._json_excerpt(summary) assert answer_post._strip_unknown_entities("node titan-99 is hot. Atlas is healthy.", ["titan-99"], []) == "Atlas is healthy." assert answer_post._needs_evidence_guard("node titan-99 is hot.", ["node titan-01"]) is True contradiction = asyncio.run( answer_post._contradiction_decision( ContradictionContext(scripted_llm, "why", "draft", ["fact"], fast_plan), attempts=2, ) ) assert contradiction["confidence"] == 99 assert answer_post._format_direct_metric_line("nodes_total: 2") assert answer_post._global_facts(["nodes_total: 2", "other: 1"]) assert answer_post._has_keyword_overlap(["cpu usage"], ["cpu"]) assert answer_post._merge_tokens(["a"], ["b"], ["c"]) == ["a", "b", "c"] assert answer_post._extract_question_tokens("How many nodes?") assert answer_post._expand_tokens(["nodes_total"]) assert answer_post._ensure_token_coverage(["nodes_total: 2"], ["pods"], ["pods_total: 3"], max_add=1) assert answer_post._best_keyword_line(["cpu: 90"], ["cpu"]) == "cpu: 90" assert answer_post._line_starting_with(["cpu: 90"], "cpu") assert answer_post._non_rpi_nodes({"hardware_by_node": {"titan-01": "rpi5", "titan-02": "amd64"}}) == {"amd64": ["titan-02"]} assert answer_post._format_hardware_groups({"amd64": ["titan-02"]}, "Nodes") assert answer_post._lexicon_context({"lexicon": {"terms": [{"term": "atlas", "meaning": "cluster"}], "aliases": {"bot": "atlas"}}}) assert answer_post._parse_json_block("{\"ok\": true}", fallback={}) == {"ok": True} assert answer_post._parse_json_list("[{\"ok\": true}]") == [{"ok": True}] assert answer_post._scores_from_json({"confidence": "1", "relevance": 2, "satisfaction": 3, "hallucination_risk": "low"}).confidence == 1 assert answer_post._coerce_int("4", 1) == 4 assert answer_post._default_scores().hallucination_risk == "medium" assert answer_post._style_hint({"answer_style": "insightful"}) == "insightful" assert answer_post._needs_evidence_fix("we don't know", {"needs_snapshot": True}) is True assert answer_post._should_use_insight_guard({"answer_style": "insightful"}) insight_inputs = InsightGuardInput( question="why", reply="Insightful reply", classify={"answer_style": "insightful", "question_type": "open_ended"}, context="", plan=fast_plan, call_llm=scripted_llm, facts=["fact"], ) assert asyncio.run(answer_post._apply_insight_guard(insight_inputs)) assert answer_post_ext._reply_matches_metric_facts("nodes_total: 2", ["nodes_total: 2"]) assert answer_post_ext._needs_dedup("one. one. one.") answer_post_ext._needs_focus_fix("how many nodes", "For more details. Additional context.", {"question_type": "metric"}) assert answer_post_ext._extract_keywords("How many nodes?", "How many nodes?", ["pods"], ["nodes"]) assert answer_post_ext._allowed_nodes(summary) assert answer_post_ext._allowed_namespaces(summary) assert answer_post_ext._find_unknown_nodes("node titan-99", ["titan-01"]) == ["titan-99"] assert answer_post_ext._find_unknown_namespaces("namespace rogue", ["synapse"]) == ["rogue"] assert answer_post_ext._needs_runbook_fix("see runbooks/bad.md", ["runbooks/fix.md"]) assert answer_post_ext._needs_runbook_reference("where is the runbook", ["runbooks/fix.md"], "") assert answer_post_ext._best_runbook_match("runbooks/fx.md", ["runbooks/fix.md"]) assert answer_post_ext._resolve_path({"a": [{"b": 3}]}, "a[0].b") == 3 assert answer_post_ext._snapshot_id({"snapshot_id": "snap-1"}) == "snap-1" assert answer_post_ext._claims_to_payload([ClaimItem(id="c1", claim="atlas", evidence=[EvidenceItem(path="a.b", reason="r", value_at_claim=1)])]) assert answer_post_ext._state_from_payload({"updated_at": 1.0, "claims": [{"id": "c1", "claim": "atlas", "evidence": [{"path": "a.b", "reason": "r"}]}]}) assert answer_retrieval._metric_ctx_values({"summary_lines": summary_lines, "question": "cpu", "sub_questions": ["pods"], "keywords": ["cpu"], "keyword_tokens": ["cpu"]}) assert answer_retrieval._extract_metric_keys(rich_lines) assert answer_retrieval._token_variants({"nodes"}) assert answer_retrieval._parse_key_list("{\"keys\":[\"nodes_total\"]}", ["nodes_total"], 1) == ["nodes_total"] assert answer_retrieval._chunk_ids_for_keys([{"id": "c1", "text": "nodes_total: 2"}], ["nodes_total"]) == ["c1"] assert answer_retrieval._filter_metric_keys(["nodes_total"], {"nodes"}) assert answer_retrieval._metric_key_overlap(["nodes_total"], {"nodes"}) assert answer_retrieval._lines_for_metric_keys(rich_lines, ["nodes_total"]) assert answer_retrieval._merge_metric_keys(["nodes_total"], ["pods_total"], 3) assert answer_retrieval._merge_fact_lines(["a"], ["b"]) assert answer_retrieval._expand_hottest_line("hottest: cpu=titan-01 (90)") answer_retrieval._has_token("hottest_cpu: titan-01=90", "cpu") answer_retrieval._hotspot_evidence(snapshot) assert asyncio.run(answer_retrieval._select_metric_chunks(scripted_llm, {"summary_lines": summary_lines, "question": "cpu", "sub_questions": ["pods"], "keywords": ["cpu"], "keyword_tokens": ["cpu"]}, chunks, fast_plan)) asyncio.run(answer_retrieval._validate_metric_keys(scripted_llm, {"question": "cpu", "sub_questions": ["pods"], "selected": ["nodes_total"]}, ["nodes_total"], fast_plan)) assert asyncio.run(answer_retrieval._gather_limited([asyncio.sleep(0, result=1), asyncio.sleep(0, result=2)], 1)) assert answer_retrieval_ext._metric_key_tokens(summary_lines) asyncio.run(answer_retrieval_ext._select_best_candidate(scripted_llm, "question", ["a", "b"], fast_plan, "chunk_select")) assert answer_retrieval_ext._dedupe_lines(["x", "x", "y"]) assert answer_retrieval_ext._collect_fact_candidates(chunks, 4) assert asyncio.run(answer_retrieval_ext._select_best_list(scripted_llm, "question", [["a"], ["b"]], fast_plan, "chunk_select")) assert asyncio.run(answer_retrieval_ext._extract_fact_types(scripted_llm, "question", ["cpu"], fast_plan)) assert asyncio.run(answer_retrieval_ext._derive_signals(scripted_llm, "question", ["cpu"], fast_plan)) assert asyncio.run(answer_retrieval_ext._scan_chunk_for_signals(scripted_llm, "question", ["cpu"], ["cpu: 90"], fast_plan)) assert asyncio.run(answer_retrieval_ext._prune_metric_candidates(scripted_llm, "question", ["cpu: 90"], fast_plan, 1)) assert asyncio.run(answer_retrieval_ext._select_fact_lines(scripted_llm, "question", ["cpu: 90"], fast_plan, 1)) assert answer_spine._join_context(["a", "", "b"]) == "a\nb" assert answer_spine._format_history([{"q": "q", "a": "a"}]) assert answer_spine._summary_lines(snapshot) assert answer_spine._line_starting_with(rich_lines, "nodes_total") assert answer_spine._spine_lines(rich_lines) spine_map: dict[str, str] = {} answer_spine._spine_nodes(rich_lines, spine_map) answer_spine._spine_hardware(rich_lines, spine_map) answer_spine._spine_hottest(rich_lines, spine_map) answer_spine._spine_postgres(rich_lines, spine_map) answer_spine._spine_namespaces(rich_lines, spine_map) answer_spine._spine_pressure(rich_lines, spine_map) assert answer_spine._parse_group_line("hardware: rpi5=(titan-01)") assert answer_spine._parse_hottest("hottest: cpu=titan-01 (90)", "cpu") assert answer_spine._spine_answer(route_intent("How many nodes?"), "nodes_total: 2") assert answer_spine._spine_nodes_answer("nodes_total: 2") assert answer_spine._spine_non_rpi_answer("amd64 (titan-02)") assert answer_spine._spine_hardware_answer("hardware: amd64=1") assert answer_spine._spine_hottest_answer("hottest_cpu", "hottest: cpu=titan-01 (90)") assert answer_spine._spine_postgres_answer("postgres_connections: used=5") assert answer_spine._spine_namespace_answer("namespace_most_pods: synapse=5") assert answer_spine._spine_pressure_answer("pressure_nodes: titan-02") assert answer_spine._spine_from_summary(summary) assert answer_spine._spine_from_counts(summary) assert answer_spine._spine_from_hardware(summary) assert answer_spine._spine_from_hottest(summary) assert answer_spine._spine_from_postgres(summary) assert answer_spine._spine_from_namespace_pods(summary) assert answer_spine._spine_from_pressure(summary) assert answer_spine._spine_fallback(route_intent("How many nodes?"), rich_lines) def test_snapshot_builder_coverage_smoke() -> None: """Exercise the split snapshot render helpers end to end.""" snapshot = _rich_snapshot() summary = core_a.build_summary(snapshot) text = summary_text(snapshot) assert summary and text lines: list[str] = [] format_a._format_float(1.5) format_a._format_rate_bytes(2048) format_a._format_bytes(2048) format_a._format_kv_map({"a": 1, "b": 2}) format_a._format_names(["b", "a"]) format_a._append_nodes(lines, summary) format_a._append_hardware(lines, summary) format_a._append_hardware_groups(lines, summary) format_a._append_node_ages(lines, summary) format_a._append_node_taints(lines, summary) format_a._append_node_facts(lines, summary) format_a._append_pressure(lines, summary) format_a._append_pods(lines, summary) format_a._append_capacity(lines, summary) format_a._append_namespace_pods(lines, summary) format_a._append_namespace_nodes(lines, summary) format_a._append_node_pods(lines, summary) format_a._append_pod_issues(lines, summary) format_a._format_pod_issue_counts(summary["pod_issues"]) format_a._format_pod_issue_top(summary["pod_issues"]) format_a._format_pod_pending_oldest(summary["pod_issues"]) format_a._format_pod_waiting_reasons(summary["pod_issues"]) format_a._format_pod_pending_over_15m(summary["pod_issues"]) format_a._append_workload_health(lines, summary) format_a._append_node_usage_stats(lines, summary) format_a._append_events(lines, summary) format_a._append_pvc_usage(lines, summary) format_a._append_root_disk_headroom(lines, summary) format_b._append_longhorn(lines, summary) format_b._append_namespace_usage(lines, summary) format_b._append_namespace_requests(lines, summary) format_b._append_namespace_io_net(lines, summary) format_b._append_pod_usage(lines, summary) format_b._append_restarts(lines, summary) format_b._append_job_failures(lines, summary) format_b._append_jobs(lines, summary) format_b._format_jobs_totals(summary["jobs"]) format_b._format_jobs_failing(summary["jobs"]) format_b._format_jobs_active_oldest(summary["jobs"]) format_b._append_postgres(lines, summary) format_b._append_hottest(lines, summary) format_b._append_workloads(lines, summary) format_b._append_topology(lines, summary) format_b._append_flux(lines, summary) format_c._append_signals(lines, summary) format_c._append_profiles(lines, summary) format_c._append_units_windows(lines, summary) format_c._append_node_load_summary(lines, summary) format_c._append_hardware_usage(lines, summary) format_c._append_cluster_watchlist(lines, summary) format_c._append_baseline_deltas(lines, summary) format_c._append_pod_issue_summary(lines, summary) format_c._reason_line(summary["pod_issue_summary"]["waiting_reasons_top"], "waiting") format_c._append_namespace_issue_lines(lines, summary["pod_issue_summary"]["namespace_issue_top"]) format_c._build_cluster_watchlist(summary) format_c._capacity_ratio_parts(summary["namespace_capacity"], "cpu", "cpu", "mem") format_c._capacity_headroom_parts(summary["namespace_capacity"]) format_c._append_namespace_capacity_summary(lines, summary) format_c._append_workloads_by_namespace(lines, summary) format_c._append_lexicon(lines, summary) format_c._append_cross_stats(lines, summary) assert lines def test_answerer_helper_edge_branches(monkeypatch: pytest.MonkeyPatch) -> None: """Cover alternate branches in the split answerer helper modules.""" settings = replace(build_test_settings(), debug_pipeline=True) logged: list[tuple[str, dict[str, Any]]] = [] monkeypatch.setattr(answer_common, "log", SimpleNamespace(info=lambda message, extra: logged.append((message, extra)))) meta = answer_common._build_meta("custom", 1, 2, True, False, 3.0, {"kind": "x"}, {"cmd": "echo"}, 10.0) assert meta["llm_limit_hit"] is True answer_common._debug_pipeline_log(settings, "edge", {"ok": True}) assert logged and logged[0][0] == "atlasbot_debug" assert answer_common._mode_plan(settings, "genius").drafts == 2 assert answer_common._mode_plan(settings, "custom").use_tool is False assert answer_common._select_subquestions([None, {"question": "", "priority": "x"}], "fallback", 2) == ["fallback"] assert answer_common._chunk_lines([], 3) == [] assert answer_common._raw_snapshot_chunks({"ok": 1, "bad": {1, 2}}) assert answer_common._build_chunk_groups([{"id": "c1", "summary": "a"}], 2) == [[{"id": "c1", "summary": "a"}]] async def score_call(_system: str, _prompt: str, *, model: str | None = None, tag: str = "", **_: Any) -> str: if tag == "chunk_score": return '[{"id":"c1","score":"bad"},{"id":"","score":5},"bad"]' if tag == "chunk_select": return '{"selected_index": 99}' raise AssertionError(tag) groups = [[{"id": "c1", "summary": "a"}]] ctx = ScoreContext(question="q", sub_questions=[], retries=1, parallelism=1, select_best=True, fast_model="fast") assert asyncio.run(answer_common._score_chunk_group(score_call, groups[0], "q", [])) == {"c1": 0.0} assert asyncio.run(answer_common._score_chunk_group_run(score_call, 0, groups[0], "q", [])) == (0, {"c1": 0.0}) assert answer_common._merge_score_runs([]) == {} assert asyncio.run(answer_common._select_best_score_run(score_call, groups[0], [{"c1": 1.0}, {"c1": 2.0}], ctx)) == {"c1": 1.0} assert answer_common._keyword_hits([{"text": "cpu"}, {"text": "ram"}], {"text": "cpu"}, None) == [] assert answer_common._select_chunks([], {}, answer_common._mode_plan(settings, "custom")) == [] selected = [{"id": "c0", "text": "a"}] assert answer_common._append_must_chunks([{"id": "c0"}, {"id": "c1"}], selected, ["c1"], 3) is False assert answer_common._append_keyword_chunks([{"id": "c0", "text": "cpu"}], selected, ["cpu"], 2) is False answer_common._append_ranked_chunks([{"id": "c1"}], selected, 2) assert answer_common._format_runbooks([]) == "" async def retrieval_call(_system: str, _prompt: str, *, model: str | None = None, tag: str = "", **_: Any) -> str: responses = { "fact_types": '{"fact_types":["cpu", 5, "cpu"]}', "fact_types_select": '{"best": 99}', "signals": '{"signals":["cpu", "", "ram"]}', "signals_select": '{"best": 99}', "chunk_scan": '{"lines":["cpu: 1", "missing: 2"]}', "chunk_scan_select": '{"best": 99}', "fact_prune": '{"lines":["cpu: 1", "ram: 2"]}', "fact_prune_select": '{"best": 99}', "fact_select": '{"lines":["cpu: 1"]}', "fact_select_best": '{"best": 99}', } return responses[tag] fast_plan = replace(answer_common._mode_plan(settings, "smart"), metric_retries=2) assert answer_retrieval_ext._parse_json_block("plain", fallback={"ok": True}) == {"ok": True} assert "nodes" in answer_retrieval_ext._metric_key_tokens(["nodes_total: 2"]) assert answer_retrieval_ext._metric_key_tokens([123, "invalid", ": empty"]) == set() assert asyncio.run(answer_retrieval_ext._select_best_candidate(retrieval_call, "q", ["one"], fast_plan, "fact_types_select")) == 0 assert answer_retrieval_ext._dedupe_lines(["lexicon_term: a", "units: x", "cpu", "cpu"], limit=1) == ["cpu"] assert answer_retrieval_ext._collect_fact_candidates([{"text": "cpu: 1\nram: 2"}, {"bad": True}], 3) == ["cpu: 1", "ram: 2"] assert asyncio.run(answer_retrieval_ext._select_best_list(retrieval_call, "q", [[], ["cpu"]], fast_plan, "fact_types_select")) == ["cpu"] assert asyncio.run(answer_retrieval_ext._extract_fact_types(retrieval_call, "q", [], fast_plan)) == ["cpu", "5"] async def retrieval_bad(_system: str, _prompt: str, *, model: str | None = None, tag: str = "", **_: Any) -> str: del _system, _prompt, model, tag return '{"signals":"bad","fact_types":"bad","lines":"bad"}' assert asyncio.run(answer_retrieval_ext._extract_fact_types(retrieval_bad, "q", [], fast_plan)) == [] assert asyncio.run(answer_retrieval_ext._derive_signals(retrieval_call, "q", [], fast_plan)) == [] assert asyncio.run(answer_retrieval_ext._derive_signals(retrieval_bad, "q", ["cpu"], fast_plan)) == [] assert asyncio.run(answer_retrieval_ext._derive_signals(retrieval_call, "q", ["cpu"], fast_plan)) == ["cpu", "ram"] assert asyncio.run(answer_retrieval_ext._scan_chunk_for_signals(retrieval_call, "q", [], ["cpu: 1"], fast_plan)) == [] assert asyncio.run(answer_retrieval_ext._scan_chunk_for_signals(retrieval_bad, "q", ["cpu"], ["cpu: 1"], fast_plan)) == [] assert asyncio.run(answer_retrieval_ext._scan_chunk_for_signals(retrieval_call, "q", ["cpu"], ["cpu: 1", "ram: 2"], fast_plan)) == ["cpu: 1"] assert asyncio.run(answer_retrieval_ext._prune_metric_candidates(retrieval_call, "q", [], fast_plan, 2)) == [] assert asyncio.run(answer_retrieval_ext._prune_metric_candidates(retrieval_bad, "q", ["cpu: 1"], fast_plan, 2)) == [] assert asyncio.run(answer_retrieval_ext._prune_metric_candidates(retrieval_call, "q", ["cpu: 1", "ram: 2"], fast_plan, 2)) == ["cpu: 1", "ram: 2"] assert asyncio.run(answer_retrieval_ext._select_fact_lines(retrieval_call, "q", [], fast_plan, 1)) == [] assert asyncio.run(answer_retrieval_ext._select_fact_lines(retrieval_bad, "q", ["cpu: 1"], fast_plan, 1)) == [] assert asyncio.run(answer_retrieval_ext._select_fact_lines(retrieval_call, "q", ["cpu: 1", "ram: 2"], fast_plan, 1)) == ["cpu: 1"] async def post_call(_system: str, _prompt: str, *, model: str | None = None, tag: str = "", **_: Any) -> str: if tag == "contradiction": return '{"use_facts": false, "confidence": 70}' if tag == "insight_guard": return '{"ok": true}' if tag == "insight_fix": return "fixed" raise AssertionError(tag) assert answer_post._strip_unknown_entities("", ["titan-99"], []) == "" assert answer_post._strip_unknown_entities("Atlas is healthy.", [], []) == "Atlas is healthy." assert answer_post._needs_evidence_guard("", ["fact"]) is False assert answer_post._needs_evidence_guard("pressure is high", ["pressure"]) is False contradiction = asyncio.run( answer_post._contradiction_decision( ContradictionContext(post_call, "q", "draft", ["fact"], fast_plan), attempts=2, ) ) assert contradiction["confidence"] == 70 assert answer_post._format_direct_metric_line("broken line") == "broken line" assert answer_post._global_facts([]) == [] assert answer_post._has_keyword_overlap([], ["cpu"]) is False assert answer_post._extract_question_tokens("") == [] assert answer_post._expand_tokens([]) == [] assert answer_post._ensure_token_coverage([], ["cpu"], ["cpu: 1"]) == [] assert answer_post._best_keyword_line(["ram: 1"], ["cpu"]) is None assert answer_post._line_starting_with([], "cpu") is None assert answer_post._non_rpi_nodes({"hardware_by_node": None}) == {} assert answer_post._format_hardware_groups({}, "Nodes") == "" assert answer_post._lexicon_context({"lexicon": []}) == "" assert answer_post._parse_json_list("nope") == [] assert answer_post._scores_from_json({}).confidence == 60 assert answer_post._coerce_int("bad", 5) == 5 assert answer_post._style_hint({"question_type": "planning"}) == "insightful" assert answer_post._needs_evidence_fix("", {"needs_snapshot": True}) is False assert answer_post._should_use_insight_guard({"question_type": "planning"}) is True insight = InsightGuardInput( question="q", reply="reply", classify={"question_type": "planning"}, context="ctx", plan=fast_plan, call_llm=post_call, facts=[], ) assert asyncio.run(answer_post._apply_insight_guard(insight)) == "reply" assert answer_post_ext._reply_matches_metric_facts("no numbers", ["cpu: 1"]) is False assert answer_post_ext._needs_dedup("short.") is False assert answer_post_ext._needs_focus_fix("why", "direct", {"question_type": "open_ended"}) is False assert answer_post_ext._extract_keywords("Q", "Q", [], []) == [] assert answer_post_ext._allowed_nodes({}) == [] assert answer_post_ext._allowed_namespaces({}) == [] assert answer_post_ext._find_unknown_nodes("titan-01", ["titan-01"]) == [] assert answer_post_ext._find_unknown_namespaces("namespace synapse", ["synapse"]) == [] assert answer_post_ext._needs_runbook_fix("runbooks/fix.md", ["runbooks/fix.md"]) is False assert answer_post_ext._needs_runbook_reference("status", ["runbooks/fix.md"], "ok") is False assert answer_post_ext._best_runbook_match("x", []) is None assert answer_post_ext._resolve_path({"a": []}, "a[1].b") is None assert answer_post_ext._snapshot_id({"snapshot": {"id": "x"}}) is None assert answer_post_ext._claims_to_payload([]) == [] assert answer_post_ext._state_from_payload({}) is None assert answer_factsheet._factsheet_instruction("smart") assert answer_factsheet._factsheet_model("quick", fast_plan) == fast_plan.fast_model assert answer_factsheet._is_plain_math_question("2 + 2") is True assert answer_factsheet._quick_fact_sheet_lines("where is runbook", ["runbooks/fix.md", "cpu: 1"], [], limit=1) assert answer_factsheet._quick_fact_sheet_text([]) == "Fact Sheet:\n- No snapshot facts available." assert "prefer rpi5 workers first" in answer_factsheet._quick_fact_sheet_heuristic_answer( "what is the node placement last resort", ["runbooks/fix.md"], ) assert "1 ready nodes out of 2 total" in answer_factsheet._quick_fact_sheet_heuristic_answer( "how many ready nodes are there", ["nodes_total:2,ready:1,not_ready:1"], ) assert answer_spine._join_context([]) == "" assert answer_spine._format_history([]) == "" assert answer_spine._line_starting_with([], "cpu") is None assert answer_spine._spine_lines([]) == {} extra_spine: dict[str, str] = {} answer_spine._spine_nodes(["nodes: total=2 ready=1 not_ready=1"], extra_spine) answer_spine._spine_hardware(["hardware: amd64=1 (titan-02)"], extra_spine) answer_spine._spine_hottest(["hottest: cpu=titan-01 [rpi5] (90%)"], extra_spine) answer_spine._spine_postgres(["postgres_connections_total: used=5, max=10"], extra_spine) answer_spine._spine_namespaces(["namespace_pods_top: synapse=5"], extra_spine) answer_spine._spine_pressure(["pressure: nodes=0"], extra_spine) assert answer_spine._parse_group_line("invalid") == {} assert answer_spine._parse_hottest("broken", "cpu") is None assert answer_spine._spine_nodes_answer("nodes: total=2 ready=1 not_ready=1") assert answer_spine._spine_pressure_answer("pressure: nodes=0") def test_runtime_and_snapshot_edge_branches(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: """Cover runtime wrappers and sparse snapshot builder branches.""" sparse_summary = { "node_pods": [ {"node": "titan-01", "pods_total": "7", "namespaces_top": [("synapse", 3), ("vault", 2)]}, {"node": "titan-02", "pods_total": "x"}, ], "pod_issues": { "counts": {"Failed": 1}, "items": [{"namespace": "synapse", "pod": "matrix", "phase": "Pending", "restarts": 1}], "pending_oldest": [{"namespace": "synapse", "pod": "matrix", "age_hours": 2, "reason": "Waiting"}], "waiting_reasons": {"ImagePullBackOff": 2}, "pending_over_15m": "2", }, "workloads_health": { "deployments": {"not_ready": 1}, "statefulsets": {"not_ready": 0}, "daemonsets": {"not_ready": 1}, }, "topology": { "nodes": [{"node": "titan-01", "workloads_top": [("matrix", 3)]}], "workloads": [{"namespace": "synapse", "workload": "matrix", "nodes_top": [("titan-01", 3)]}], }, "flux": { "not_ready": 2, "items": [{"namespace": "flux-system", "name": "kustomization", "reason": "waiting", "suspended": True}], }, "namespace_capacity_summary": { "cpu_ratio_top": [{"namespace": "synapse", "cpu_usage_ratio": 0.8, "cpu_usage": 4, "cpu_requests": 5}], "mem_ratio_top": [{"namespace": "synapse", "mem_usage_ratio": 0.7, "mem_usage": 7, "mem_requests": 10}], "cpu_headroom_low": [{"namespace": "synapse", "headroom": 0.2}], "mem_headroom_low": [{"namespace": "synapse", "headroom": 0.3}], "cpu_overcommitted": 1, "mem_overcommitted": 1, "cpu_overcommitted_names": ["synapse"], "mem_overcommitted_names": ["vault"], }, "workloads": [{"namespace": "synapse", "workload": "matrix", "pods_total": 3, "primary_node": "titan-01"}], "lexicon": {"terms": [{"term": "atlas", "meaning": "cluster"}], "aliases": {"bot": "atlas"}}, "cross_stats": { "node_metric_top": [{"metric": "cpu", "node": "titan-01", "value": 90, "cpu": 90, "ram": 80, "net": 1.0, "io": 2.0, "pods_total": 3}], "namespace_metric_top": [{"metric": "cpu", "namespace": "synapse", "value": 40, "cpu_ratio": 0.8, "mem_ratio": 0.7, "pods_total": 3}], "pvc_top": [{"namespace": "synapse", "pvc": "data", "used_percent": 95}], }, "events": {"warnings_total": 2}, } lines: list[str] = [] format_a._append_node_pods(lines, sparse_summary) format_a._append_pod_issues(lines, sparse_summary) format_a._append_workload_health(lines, sparse_summary) format_b._append_topology(lines, sparse_summary) format_b._append_flux(lines, sparse_summary) format_c._append_namespace_capacity_summary(lines, sparse_summary) format_c._append_workloads_by_namespace(lines, sparse_summary) format_c._append_lexicon(lines, sparse_summary) format_c._append_cross_stats(lines, sparse_summary) assert any("node_pods_max" in line for line in lines) assert any("flux_not_ready_items" in line for line in lines) assert any("cross_pvc_usage" in line for line in lines) assert core_a._build_node_ages([{"name": "titan-01", "age_hours": 1}, "bad"]) assert core_a._build_node_facts([{"name": "titan-01", "is_worker": True, "roles": ["worker"], "arch": "arm64"}]) assert core_a._build_node_taints([{"name": "titan-01", "taints": [{"key": "dedicated", "effect": "NoSchedule"}]}]) assert core_a._build_root_disk_headroom({"node_usage": {"disk": [{"node": "titan-01", "value": 80}]}}) assert core_a._build_longhorn({"longhorn": {"total": 1}}) assert core_a._build_node_load({"node_load": [{"node": "titan-01"}]}) assert core_a._build_pods({"pods_running": 1}) assert core_a._build_capacity({"capacity_cpu": 4}) assert core_a._build_namespace_pods({"namespace_pods": [{"namespace": "synapse"}]}) assert core_a._build_namespace_nodes({"namespace_nodes": [{"namespace": "synapse"}]}) assert core_a._build_node_pods({"node_pods": [{"node": "titan-01"}]}) assert core_a._build_node_pods_top({"node_pods_top": [{"node": "titan-01"}]}) assert core_a._build_pod_issues({"pod_issues": {"counts": {}}}) assert core_a._build_events({"events": {"warnings_total": 1}}) assert core_a._build_event_summary({"events": {"warnings_top_reason": {"a": 1}, "warnings_latest": [{"reason": "x"}]}}) assert core_a._build_postgres({"postgres_connections": {"used": 1}}) settings = replace(build_test_settings(), queue_enabled=False) store = ClaimStore(":memory:", 60) assert store.get("") is None store.set("", {"claims": []}) assert _safe_json(None, {}) == {} kb_dir = tmp_path / "kb" (kb_dir / "catalog").mkdir(parents=True) (kb_dir / "catalog" / "runbooks.json").write_text(json.dumps([{"path": "runbooks/fix.md"}, {"title": "Missing path"}]), encoding="utf-8") kb = KnowledgeBase(str(kb_dir)) assert kb.runbook_titles() == "" assert kb.runbook_paths(limit=1) == ["runbooks/fix.md"] from atlasbot.snapshot.builder import SnapshotProvider provider = SnapshotProvider(replace(settings, ariadne_state_url="", snapshot_ttl_sec=1)) provider._cache = {"cached": True} provider._cache_ts = 1.0 monkeypatch.setattr("atlasbot.snapshot.builder.time.monotonic", lambda: 100.0) assert provider.get() == {"cached": True} from atlasbot import main as main_mod captured: dict[str, Any] = {} class QueueProbe: def __init__(self, _settings: Settings, handler): captured["handler"] = handler async def start(self) -> None: return None async def submit(self, payload: dict[str, Any]) -> dict[str, Any]: return {"reply": payload.get("question", ""), "scores": {}} class ApiProbe: def __init__(self, _settings: Settings, answer_handler): captured["answer_handler"] = answer_handler self.app = SimpleNamespace() class ServerProbe: def __init__(self, config: Any): self.config = config async def serve(self) -> None: return None class EngineProbe: async def answer( self, question: str, *, mode: str, history: list[dict[str, str]] | None = None, observer: Any = None, conversation_id: str | None = None, snapshot_pin: bool | None = None, ) -> AnswerResult: return AnswerResult( reply=f"{question}:{mode}:{bool(history)}:{conversation_id}:{snapshot_pin}:{observer is not None}", scores=AnswerScores(confidence=91, relevance=92, satisfaction=93, hallucination_risk="low"), meta={}, ) monkeypatch.setattr(main_mod, "load_settings", lambda: replace(settings, matrix_bots=())) monkeypatch.setattr(main_mod, "configure_logging", lambda _level: None) monkeypatch.setattr(main_mod, "_build_engine", lambda _settings: EngineProbe()) monkeypatch.setattr(main_mod, "QueueManager", QueueProbe) monkeypatch.setattr(main_mod, "Api", ApiProbe) monkeypatch.setattr(main_mod.uvicorn, "Server", ServerProbe) asyncio.run(main_mod.main()) handled = asyncio.run(captured["handler"]({"question": "hello", "mode": "smart", "history": "bad", "conversation_id": 7, "snapshot_pin": "bad"})) assert handled["reply"] answered = asyncio.run(captured["answer_handler"]("hello", "quick", None, None, None, None)) assert answered.reply assert result_scores({"scores": {"confidence": "bad"}}).confidence == 60 qm = QueueManager(replace(settings, queue_enabled=True), lambda payload: asyncio.sleep(0, result=payload)) with pytest.raises(RuntimeError, match="queue not initialized"): asyncio.run(qm.submit({"question": "x"})) assert _mode_timeout_sec(settings, "genius") == settings.genius_time_budget_sec assert _extract_mode("atlas hello", ("atlas",), "quick") == ("quick", "hello")