atlasbot/tests/test_quality_gate_paths.py

811 lines
35 KiB
Python

"""Targeted quality-gate coverage for runtime and answerer orchestration."""
from __future__ import annotations
import asyncio
import json
from dataclasses import replace
from pathlib import Path
from types import SimpleNamespace
from typing import Any
import httpx
import pytest
from atlasbot.api.http import Api, AnswerRequest
from atlasbot.config import MatrixBotConfig
from atlasbot.engine.answerer import (
AnswerEngine,
AnswerResult,
AnswerScores,
ClaimItem,
EvidenceItem,
ModePlan,
)
from atlasbot.engine.answerer.common import _mode_plan
from atlasbot.engine.answerer.engine import AnswerEngine as EngineClass
from atlasbot.engine.answerer.workflow import run_answer
from atlasbot.engine.answerer.workflow_post import finalize_answer
from atlasbot.knowledge.loader import KnowledgeBase
from atlasbot.llm.client import LLMClient, LLMError, parse_json
from atlasbot.main import result_scores
from atlasbot.matrix.bot import MatrixBot, MatrixClient
from atlasbot.queue.nats import QueueManager
from atlasbot.snapshot.builder import SnapshotProvider, build_summary
from testing.fakes import build_test_settings
from tests.test_support_modules import _rich_snapshot
class StaticSnapshot:
"""Return a fixed snapshot for answer-engine tests."""
def __init__(self, payload: dict[str, Any]) -> None:
self._payload = payload
def get(self) -> dict[str, Any]:
"""Return the stored snapshot payload."""
return self._payload
class PromptLLM:
"""Map prompt fragments to canned responses for workflow tests."""
def __init__(self) -> None:
self.calls: list[tuple[str, str]] = []
async def chat(
self,
messages: list[dict[str, str]],
*,
model: str | None = None,
timeout_sec: float | None = None,
) -> str:
"""Return the scripted response for the latest user prompt."""
del timeout_sec
system = messages[0]["content"]
prompt = messages[-1]["content"]
self.calls.append((model or "", prompt))
if "Given chunk summaries, score relevance" in prompt:
items = []
for line in prompt.splitlines():
if line.startswith("- c"):
chunk_id = line.split()[1].rstrip(":")
score = 95 if "cpu" in line.lower() or "synapse" in line.lower() else 80
items.append({"id": chunk_id, "score": score, "reason": "relevant"})
return json.dumps(items or [{"id": "c0", "score": 90, "reason": "relevant"}])
direct = self._direct_response(prompt)
if direct is not None:
return direct
response = self._lookup_response(system, prompt)
if response is not None:
return response
raise AssertionError(f"Unhandled prompt:\nSYSTEM={system}\nPROMPT={prompt}")
def _direct_response(self, prompt: str) -> str | None:
"""Return direct string responses for a few prompt families."""
if "Answer the sub-question using the context" in prompt:
return "The best runbook path is runbooks/fix.md." if "runbook" in prompt.lower() else "synapse is hottest with cpu 95 on titan-01."
markers = [
("Write a final response to the user", "titan-99 is hottest and the runbook is runbooks/wrong.md."),
("Draft:", "synapse is hottest at cpu 95 on titan-01, and amd64 nodes remain separate from raspberry hardware."),
("Return JSON with fields: issues", '{"issues":["mention the exact runbook"],"missing_data":[],"risky_claims":[]}'),
("command (string), rationale", '{"command":"kubectl top pods -n synapse","rationale":"verify namespace cpu"}'),
("confidence (0-100)", '{"confidence":88,"relevance":91,"satisfaction":86,"hallucination_risk":"low"}'),
]
for marker, response in markers:
if marker in prompt:
if marker == "Draft:" and "If Facts are provided" not in prompt:
continue
return response
return None
def _lookup_response(self, system: str, prompt: str) -> str | None:
"""Return canned responses for prompt markers."""
del system
markers = [
(
"normalized (string), keywords",
'{"normalized":"Which namespace is hottest on raspberry hardware and which runbook should I use?","keywords":["namespace","hottest","cpu","raspberry","runbook"]}',
),
(
"needs_snapshot (bool)",
'{"needs_snapshot":true,"needs_kb":true,"needs_tool":true,"answer_style":"insightful","follow_up":false,"question_type":"open_ended","focus_entity":"namespace","focus_metric":"cpu"}',
),
(
"Generate up to",
'[{"id":"q1","question":"Which namespace is hottest?","priority":5,"kind":"metric"},{"id":"q2","question":"Which runbook applies?","priority":4,"kind":"context"}]',
),
("Choose the run that best aligns", '{"selected_index": 1}'),
("AvailableKeys:", '{"keys":["namespace_cpu_top","namespace_pods","hardware_nodes"]}'),
("Return JSON with field: missing", '{"missing":[]}'),
("Return JSON with fields: prefixes", '{"prefixes":["namespace","hottest"]}'),
("fact_types", '{"fact_types":["namespace_cpu_top","hardware_nodes"]}'),
("Return JSON with field: signals", '{"signals":["cpu","synapse","raspberry"]}'),
(
"Signals:",
'{"lines":["namespace_cpu_top: synapse=95","hardware_nodes: rpi5=(titan-01) | amd64=(titan-02)"]}',
),
(
"Return JSON with field: lines",
'{"lines":["namespace_cpu_top: synapse=95","hardware_nodes: rpi5=(titan-01) | amd64=(titan-02)"]}',
),
(
"CandidateFacts:",
'{"lines":["namespace_cpu_top: synapse=95","hardware_nodes: rpi5=(titan-01) | amd64=(titan-02)"]}',
),
(
"FactCandidates:",
'{"lines":["namespace_cpu_top: synapse=95","hardware_nodes: rpi5=(titan-01) | amd64=(titan-02)"]}',
),
(
"Suggest a safe, read-only command",
'{"command":"kubectl top pods -n synapse","rationale":"verify namespace cpu"}',
),
("Pick the best candidate for accuracy and grounding", '{"best": 1}'),
("Pick the best draft for accuracy", '{"best": 1}'),
("Pick the best runbook path", '{"path":"runbooks/fix.md"}'),
("Check the draft against the context", "synapse is hottest on titan-01, but see runbooks/wrong.md."),
("Answer using the fact", "Latest metrics: namespace_cpu_top: synapse=95."),
("Rewrite the draft to only include claims supported by FactsUsed", "synapse is hottest on titan-01."),
("Check if an open-ended answer includes at least two concrete signals", '{"ok": false, "reason": "needs more detail"}'),
("ok (bool), reason (string)", '{"ok": false, "reason": "needs more detail"}'),
("Rewrite the answer using the critique", "synapse is hottest at cpu 95 on titan-01. Use runbooks/fix.md."),
("Return JSON with field: note", '{"note":"The answer would benefit from per-pod CPU samples."}'),
("Score response quality", '{"confidence":88,"relevance":91,"satisfaction":86,"hallucination_risk":"low"}'),
(
"Return JSON with fields: confidence (0-100), relevance (0-100), satisfaction (0-100), hallucination_risk (low|medium|high).",
'{"confidence":88,"relevance":91,"satisfaction":86,"hallucination_risk":"low"}',
),
(
"claims list",
'{"claims":[{"id":"c1","claim":"synapse is hottest","evidence":[{"path":"hottest.cpu.node","reason":"snapshot"}]}]}',
),
("Select the claims most relevant", '{"claim_ids":["c1"]}'),
("Follow-up:", "titan-99 is still hottest."),
("Rewrite the answer to be concise and directly answer the question", "Latest metrics: namespace_cpu_top: synapse=95."),
("Deduplicate repeated statements", "Latest metrics: namespace_cpu_top: synapse=95."),
("Answer using only the Fact Sheet", "Fact sheet answer: namespace_cpu_top: synapse=95. Use runbooks/fix.md."),
]
for marker, response in markers:
if marker in prompt:
return response
return None
class TimeoutLLM:
"""Raise a timeout as soon as the workflow makes an LLM call."""
async def chat(
self,
messages: list[dict[str, str]],
*,
model: str | None = None,
timeout_sec: float | None = None,
) -> str:
"""Trigger the workflow timeout handling branch."""
del messages, model, timeout_sec
raise TimeoutError("boom")
class LimitLLM(PromptLLM):
"""Reuse prompt handling while allowing the workflow to hit call caps."""
def _settings(tmp_path: Path, **overrides: Any):
"""Build settings with an isolated claim-store path."""
return replace(build_test_settings(), state_db_path=str(tmp_path / "state.db"), **overrides)
def _make_engine(tmp_path: Path, llm: Any, **setting_overrides: Any) -> AnswerEngine:
"""Construct a real engine with static snapshot and KB doubles."""
settings = _settings(tmp_path, **setting_overrides)
snapshot = StaticSnapshot(_rich_snapshot())
kb = KnowledgeBase("")
kb.summary = lambda: "KB summary." # type: ignore[method-assign]
kb.runbook_titles = lambda limit=5: "Relevant runbooks:\n- Fix (runbooks/fix.md)" # type: ignore[method-assign]
kb.runbook_paths = lambda limit=10: ["runbooks/fix.md"] # type: ignore[method-assign]
kb.chunk_lines = lambda max_files=20, max_chars=6000: [ # type: ignore[method-assign]
"runbooks/fix.md",
"namespace_cpu_top: synapse=95",
"hardware_nodes: rpi5=(titan-01) | amd64=(titan-02)",
]
return AnswerEngine(settings, llm, kb, snapshot) # type: ignore[arg-type]
def test_engine_helper_methods_cover_state_and_followup(tmp_path: Path) -> None:
"""Cover answer-engine helper branches outside the main workflow."""
settings = _settings(tmp_path)
class StockLLM:
async def chat(self, messages, *, model=None, timeout_sec=None):
del messages, model, timeout_sec
return "stock reply"
engine = EngineClass(settings, StockLLM(), KnowledgeBase(""), StaticSnapshot(_rich_snapshot()))
async def call_llm(_system: str, _prompt: str, *, context: str | None = None, model: str | None = None, tag: str = "") -> str:
del _system, context, model
static = {
"draft_select": '{"best": 2}',
"score": '{"confidence":90,"relevance":91,"satisfaction":92,"hallucination_risk":"low"}',
"claim_map": '{"claims":[{"id":"c1","claim":"cpu is high","evidence":[{"path":"hottest.cpu.node","reason":"why"},{"path":"","reason":"skip"}]},"bad"]}',
"select_claims": '{"claim_ids":["c1"]}',
"followup": "titan-99 is hottest. The draft is correct.",
"followup_fix": "titan-01 is hottest.",
"dedup_followup": "The draft is correct. titan-01 is hottest.",
"dedup": "deduped",
}
if tag == "synth":
return "draft one" if "DraftIndex: 1" in _prompt else "draft two"
if tag in static:
return static[tag]
raise AssertionError(tag)
stock = asyncio.run(engine._answer_stock("hello"))
assert stock.reply == "stock reply"
plan = replace(_mode_plan(settings, "smart"), drafts=2, parallelism=2)
synth = asyncio.run(
engine._synthesize_answer(
"Which node is hottest?",
["draft one", "draft two"],
"ctx",
{"question_type": "metric", "answer_style": "direct"},
plan,
call_llm,
)
)
synth_empty = asyncio.run(
engine._synthesize_answer(
"Which node is hottest?",
[],
"ctx",
{"question_type": "metric", "answer_style": "direct"},
replace(plan, drafts=1, parallelism=1),
call_llm,
)
)
assert synth == "draft two"
assert synth_empty == "draft two"
scored = asyncio.run(engine._score_answer("q", "a", plan, call_llm))
assert scored.hallucination_risk == "low"
assert asyncio.run(engine._score_answer("q", "a", replace(plan, use_scores=False), call_llm)).confidence == 60
summary = build_summary(_rich_snapshot())
claims = asyncio.run(engine._extract_claims("q", "a", summary, ["fact"], call_llm))
assert claims and claims[0].evidence[0].path == "hottest.cpu.node"
assert asyncio.run(engine._extract_claims("q", "", summary, [], call_llm)) == []
assert asyncio.run(engine._dedup_reply("one. one. one.", plan, call_llm, "dedup")) == "deduped"
assert asyncio.run(engine._dedup_reply("single answer", plan, call_llm, "dedup")) == "single answer"
engine._store_state("conv-1", claims, summary, _rich_snapshot(), True)
state = engine._get_state("conv-1")
assert state and state.snapshot
assert engine._get_state(None) is None
engine._cleanup_state()
followup = asyncio.run(
engine._answer_followup(
"Which hardware hotspot is there?",
state,
summary,
{"question_type": "diagnostic"},
plan,
call_llm,
)
)
assert "titan-01" in followup
assert asyncio.run(engine._select_claims("what about that?", claims, plan, call_llm)) == ["c1"]
assert asyncio.run(engine._select_claims("what about that?", [], plan, call_llm)) == []
def test_finalize_answer_covers_post_processing_branches(tmp_path: Path) -> None:
"""Exercise evidence-fix, runbook, guard, critic, and gap paths."""
settings = _settings(tmp_path)
plan = replace(_mode_plan(settings, "smart"), use_gap=True, use_critic=True)
summary = build_summary(_rich_snapshot())
summary_lines = [
"namespace_cpu_top: synapse=95",
"hardware_nodes: rpi5=(titan-01) | amd64=(titan-02)",
"runbooks/fix.md",
]
observed: list[tuple[str, str]] = []
async def call_llm(_system: str, _prompt: str, *, context: str | None = None, model: str | None = None, tag: str = "") -> str:
del _system, context, model
responses = {
"runbook_select": '{"path":"runbooks/fix.md"}',
"evidence_fix": "titan-99 is hottest and see runbooks/wrong.md.",
"evidence_fix_enforce": "titan-99 is hottest and see runbooks/wrong.md.",
"metric_direct": "no numbers here",
"runbook_enforce": "Non-Raspberry Pi nodes: amd64 (titan-02). Use runbooks/fix.md.",
"evidence_guard": "Non-Raspberry Pi nodes: amd64 (titan-02). Use runbooks/fix.md.",
"focus_fix": "Latest metrics: namespace_cpu_top: synapse=95.",
"insight_guard": '{"ok": false, "reason": "needs more detail"}',
"insight_fix": "Latest metrics: namespace_cpu_top: synapse=95. Use runbooks/fix.md.",
"critic": '{"issues":["too vague"]}',
"revise": "Latest metrics: namespace_cpu_top: synapse=95. Use runbooks/fix.md.",
"gap": '{"note":"The answer would benefit from per-pod CPU samples."}',
}
if tag not in responses:
raise AssertionError(_prompt)
return responses[tag]
class FinalizeEngine:
async def _synthesize_answer(self, *args: Any) -> str:
return "titan-99 is hottest and see runbooks/wrong.md."
async def _dedup_reply(self, reply: str, _plan: ModePlan, _call_llm, tag: str) -> str:
assert tag == "dedup"
return reply
async def _score_answer(self, _question: str, _reply: str, _plan: ModePlan, _call_llm) -> AnswerScores:
return AnswerScores(80, 81, 82, "low")
async def _extract_claims(self, _question: str, _reply: str, _summary: dict[str, Any], _facts_used: list[str], _call_llm) -> list[ClaimItem]:
return [ClaimItem(id="c1", claim="cpu high", evidence=[EvidenceItem(path="hottest.cpu.node", reason="snapshot")])]
reply, scores, claims = asyncio.run(
finalize_answer(
engine=FinalizeEngine(),
call_llm=call_llm,
normalized="Which namespace is hottest on raspberry hardware and which runbook should I use?",
subanswers=["synapse is hottest"],
context="ctx",
classify={"question_type": "open_ended", "answer_style": "direct"},
plan=plan,
summary=summary,
summary_lines=summary_lines,
metric_facts=["namespace_cpu_top: synapse=95"],
key_facts=["namespace_cpu_top: synapse=95"],
facts_used=["hardware_nodes: rpi5=(titan-01) | amd64=(titan-02)"],
allowed_nodes=["titan-01", "titan-02"],
allowed_namespaces=["synapse"],
runbook_paths=["runbooks/fix.md"],
lowered_question="which namespace is hottest on raspberry hardware and which runbook should i use?",
force_metric=True,
keyword_tokens=["namespace", "cpu", "raspberry"],
question_tokens=["namespace", "cpu", "raspberry"],
snapshot_context="ClusterSnapshot:\nnamespace_cpu_top: synapse=95",
observer=lambda stage, note: observed.append((stage, note)),
mode="smart",
metric_keys=["namespace_cpu_top"],
)
)
assert "runbooks/fix.md" in reply
assert "synapse=95" in reply
assert scores.confidence == 80
assert claims and claims[0].id == "c1"
assert ("evidence_fix", "repairing missing evidence") in observed
assert ("critic", "reviewing") in observed
assert ("gap", "checking gaps") in observed
def test_run_answer_deep_workflow_persists_state(tmp_path: Path) -> None:
"""Drive the full smart workflow through retrieval, synthesis, and post-processing."""
engine = _make_engine(tmp_path, PromptLLM())
observed: list[tuple[str, str]] = []
result = asyncio.run(
run_answer(
engine,
"Run limitless Which namespace is hottest on raspberry hardware and which runbook should I use?",
mode="smart",
history=[{"q": "before", "a": "earlier"}],
observer=lambda stage, note: observed.append((stage, note)),
conversation_id="room-1",
snapshot_pin=True,
)
)
assert "runbooks/fix.md" in result.reply
assert result.meta["tool_hint"]["command"] == "kubectl top pods -n synapse"
state = engine._get_state("room-1")
assert state and state.claims and state.snapshot
stages = {stage for stage, _note in observed}
assert {"normalize", "route", "retrieve", "tool", "subanswers", "synthesize"} <= stages
def test_run_answer_followup_and_limits(tmp_path: Path) -> None:
"""Cover follow-up routing, reasoning limit, and timeout fallbacks."""
class FollowupLLM(PromptLLM):
def _lookup_response(self, system: str, prompt: str) -> str | None:
if "normalized (string), keywords" in prompt:
return '{"normalized":"What about that?","keywords":["that"]}'
if "needs_snapshot (bool)" in prompt:
return '{"needs_snapshot":true,"needs_kb":false,"needs_tool":false,"answer_style":"direct","follow_up":false,"question_type":"open_ended","focus_entity":"unknown","focus_metric":"unknown"}'
if "Select the claims most relevant" in prompt:
return '{"claim_ids":["c1"]}'
if "Follow-up:" in prompt:
return "titan-99 is still hottest."
return super()._lookup_response(system, prompt)
engine = _make_engine(tmp_path, FollowupLLM())
summary = build_summary(_rich_snapshot())
engine._store_state(
"conv-1",
[ClaimItem(id="c1", claim="synapse is hottest", evidence=[EvidenceItem(path="hottest.cpu.node", reason="snapshot", value_at_claim="titan-01")])],
summary,
_rich_snapshot(),
True,
)
followup = asyncio.run(
run_answer(
engine,
"Run limitless What about that?",
mode="smart",
conversation_id="conv-1",
snapshot_pin=True,
)
)
assert "titan-01" in followup.reply
limit_engine = _make_engine(
tmp_path / "limit",
LimitLLM(),
fast_llm_calls_max=1,
llm_limit_multiplier=1.0,
)
limited = asyncio.run(run_answer(limit_engine, "tell me about cpu and runbooks", mode="custom"))
assert "reasoning limit" in limited.reply
assert limited.meta["llm_limit_hit"] is True
timeout_engine = _make_engine(
tmp_path / "timeout",
TimeoutLLM(),
smart_time_budget_sec=0.1,
ollama_timeout_sec=0.1,
)
timed_out = asyncio.run(run_answer(timeout_engine, "Run limitless tell me about cpu and runbooks", mode="smart"))
assert "time budget" in timed_out.reply.lower()
assert timed_out.meta["time_budget_hit"] is True
def test_api_matrix_queue_main_and_store_edge_paths(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""Exercise remaining API, Matrix, queue, main, and store branches."""
settings = _settings(
tmp_path,
internal_token="secret",
queue_enabled=True,
matrix_bots=(MatrixBotConfig("bot", "pw", ("atlas",), "quick"),),
)
async def handler(
question: str,
mode: str,
history: list[dict[str, str]] | None,
conversation_id: str | None,
snapshot_pin: bool | None,
) -> AnswerResult:
del history, conversation_id, snapshot_pin
return AnswerResult(question + ":" + mode, AnswerScores(1, 2, 3, "low"), {"mode": mode})
api = Api(settings, handler)
from fastapi.testclient import TestClient
client = TestClient(api.app)
assert client.post("/v1/answer", headers={"X-Internal-Token": "secret"}, json={}).status_code == 400
assert client.post("/v1/answer", headers={"X-Internal-Token": "secret"}, json={"content": "hi"}).json()["reply"] == "hi:quick"
assert client.post("/v1/answer", headers={"X-Internal-Token": "secret"}, json={"question": " "}).status_code == 400
assert AnswerRequest(message=" hello ").message == " hello "
class FakeResp:
def __init__(self, payload: dict[str, Any], *, status_code: int = 200) -> None:
self._payload = payload
self.status_code = status_code
def raise_for_status(self) -> None:
if self.status_code >= 400:
raise httpx.HTTPStatusError("bad", request=httpx.Request("GET", "http://x"), response=httpx.Response(self.status_code))
def json(self) -> dict[str, Any]:
return self._payload
class MatrixAsyncClient:
async def __aenter__(self) -> "MatrixAsyncClient":
return self
async def __aexit__(self, *exc: object) -> None:
return None
async def post(self, url: str, json: dict[str, Any] | None = None, headers: dict[str, str] | None = None) -> FakeResp:
del json, headers
if "login" in url:
return FakeResp({"access_token": "tok"})
return FakeResp({})
async def get(self, url: str, headers: dict[str, str] | None = None, params: dict[str, Any] | None = None) -> FakeResp:
del headers, params
if "directory/room" in url:
return FakeResp({}, status_code=404)
return FakeResp({"next_batch": "n1", "rooms": {"join": {}}})
monkeypatch.setattr("atlasbot.matrix.bot.httpx.AsyncClient", lambda timeout=None: MatrixAsyncClient())
matrix_client = MatrixClient(settings, settings.matrix_bots[0])
assert asyncio.run(matrix_client.login()) == "tok"
assert asyncio.run(matrix_client.resolve_room("tok")) == ""
bot = MatrixBot(settings, settings.matrix_bots[0], SimpleNamespace(answer=None), handler)
class BotClient:
def __init__(self) -> None:
self.sent: list[str] = []
self.sync_calls = 0
async def login(self) -> str:
return "tok"
async def resolve_room(self, token: str) -> str:
del token
return "!room"
async def join_room(self, token: str, room_id: str) -> None:
del token, room_id
async def send_message(self, token: str, room_id: str, text: str) -> None:
del token, room_id
self.sent.append(text)
async def sync(self, token: str, since: str | None) -> dict[str, Any]:
del token, since
self.sync_calls += 1
if self.sync_calls == 1:
return {
"next_batch": "n1",
"rooms": {
"join": {
"!room": {
"timeline": {
"events": [
{"type": "m.room.member", "sender": "user"},
{"type": "m.room.message", "sender": "bot", "content": {"body": "ignore"}},
{"type": "m.room.message", "sender": "user", "content": {"body": "atlas quick hi"}},
]
}
}
}
},
}
raise RuntimeError("stop")
bot._client = BotClient()
async def run_bot_once() -> None:
task = asyncio.create_task(bot.run())
await asyncio.sleep(0.01)
task.cancel()
with pytest.raises(asyncio.CancelledError):
await task
asyncio.run(run_bot_once())
assert any("Thinking" in msg for msg in bot._client.sent)
timeout_bot = MatrixBot(replace(settings, thinking_interval_sec=0.001, quick_time_budget_sec=0.01), settings.matrix_bots[0], SimpleNamespace(answer=None), None)
timeout_bot._client = SimpleNamespace(
sent=[],
send_message=lambda token, room_id, text: asyncio.sleep(0, result=timeout_bot._client.sent.append(text)),
)
async def sleepy_handler(question: str, mode: str, history, conversation_id, observer):
del question, mode, history, conversation_id, observer
await asyncio.sleep(1.2)
return AnswerResult("late", AnswerScores(1, 2, 3, "low"), {})
timeout_bot._answer_handler = sleepy_handler
asyncio.run(timeout_bot._answer_with_heartbeat("tok", "!room", "q", "quick"))
assert any("time budget" in msg for msg in timeout_bot._client.sent)
error_bot = MatrixBot(replace(settings, thinking_interval_sec=0.001), settings.matrix_bots[0], SimpleNamespace(answer=None), None)
error_bot._client = SimpleNamespace(
sent=[],
send_message=lambda token, room_id, text: asyncio.sleep(0, result=error_bot._client.sent.append(text)),
)
async def failing_handler(question: str, mode: str, history, conversation_id, observer):
del question, mode, history, conversation_id, observer
raise RuntimeError("boom")
error_bot._answer_handler = failing_handler
asyncio.run(error_bot._answer_with_heartbeat("tok", "!room", "q", "smart"))
assert any("internal error" in msg for msg in error_bot._client.sent)
class DirectQueue:
async def __call__(self, payload: dict[str, Any]) -> dict[str, Any]:
return {"reply": payload["question"]}
direct_qm = QueueManager(replace(settings, queue_enabled=False), DirectQueue())
assert asyncio.run(direct_qm.submit({"question": "direct"})) == {"reply": "direct"}
class FakeSub:
async def next_msg(self, timeout: float) -> Any:
del timeout
return SimpleNamespace(data=json.dumps({"reply": "queued"}).encode())
async def unsubscribe(self) -> None:
return None
class FakeMsg:
def __init__(self, raw: bytes, reply: str = "reply") -> None:
self.data = raw
self.reply = reply
self.acked = False
async def ack(self) -> None:
self.acked = True
published: list[tuple[str, bytes]] = []
class ExistingStreamJS:
async def stream_info(self, stream: str) -> None:
assert stream == settings.nats_stream
async def publish(self, subject: str, data: bytes) -> None:
published.append((subject, data))
async def pull_subscribe(self, subject: str, durable: str):
del subject, durable
class Pull:
def __init__(self) -> None:
self.calls = 0
async def fetch(self, count: int, timeout: float) -> list[FakeMsg]:
del count, timeout
self.calls += 1
if self.calls == 1:
raise RuntimeError("retry")
raise asyncio.CancelledError
return Pull()
class FakeNats:
def __init__(self) -> None:
self.drained = False
async def connect(self, url: str) -> None:
assert url == settings.nats_url
def jetstream(self) -> ExistingStreamJS:
return ExistingStreamJS()
def new_inbox(self) -> str:
return "inbox"
async def subscribe(self, reply: str) -> FakeSub:
assert reply == "inbox"
return FakeSub()
async def publish(self, reply: str, data: bytes) -> None:
published.append((reply, data))
async def drain(self) -> None:
self.drained = True
monkeypatch.setattr("atlasbot.queue.nats.NATS", FakeNats)
queue = QueueManager(settings, DirectQueue())
asyncio.run(queue.start())
assert asyncio.run(queue.submit({"question": "queued", "mode": "smart"})) == {"reply": "queued"}
invalid_msg = FakeMsg(b"not-json")
asyncio.run(queue._handle_message(invalid_msg))
assert invalid_msg.acked is True
handled_msg = FakeMsg(json.dumps({"payload": {"question": "x"}, "reply": "reply"}).encode())
asyncio.run(queue._handle_message(handled_msg))
assert handled_msg.acked is True
failing_queue = QueueManager(settings, lambda payload: (_ for _ in ()).throw(RuntimeError("boom")))
failing_queue._nc = FakeNats()
failing_queue._js = ExistingStreamJS()
failure_msg = FakeMsg(json.dumps({"payload": {"question": "x"}}).encode())
async def failing_handler(payload: dict[str, Any]) -> dict[str, Any]:
del payload
raise RuntimeError("boom")
failing_queue._handler = failing_handler
asyncio.run(failing_queue._handle_message(failure_msg))
assert failure_msg.acked is True
asyncio.run(queue.stop())
assert result_scores({"scores": {"confidence": "9", "relevance": "8", "satisfaction": "7", "hallucination_risk": "low"}}).confidence == 9
assert result_scores({"scores": "bad"}).confidence == 60
def test_kb_llm_snapshot_and_json_edge_paths(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""Cover remaining KB, LLM, snapshot, and JSON parsing branches."""
base = tmp_path / "kb"
catalog = base / "catalog"
catalog.mkdir(parents=True)
(catalog / "atlas.json").write_text(json.dumps({"cluster": "atlas", "sources": ["bad"]}), encoding="utf-8")
(catalog / "runbooks.json").write_text(json.dumps([{"title": "Fix", "path": "runbooks/fix.md"}, {"title": "No path"}]), encoding="utf-8")
(base / "docs.md").write_text("x" * 120, encoding="utf-8")
kb = KnowledgeBase(str(base))
assert kb.runbook_titles(limit=1).count("runbooks/fix.md") == 1
assert kb.chunk_lines(max_files=1, max_chars=60)
assert kb._extend_with_limit([], ["abcdef"], 3) is False
empty_kb = KnowledgeBase("")
assert empty_kb.chunk_lines() == []
settings = _settings(tmp_path, ollama_url="http://example/api/chat", ollama_api_key="secret", ollama_retries=0, ollama_fallback_model="")
client = LLMClient(settings)
assert client._endpoint() == "http://example/api/chat"
assert client._headers["x-api-key"] == "secret"
assert parse_json("```{\"ok\": true}```") == {"ok": True}
assert parse_json("not-json", fallback={"fallback": True}) == {"fallback": True}
class FakeResponse:
def __init__(self, status_code: int, payload: Any) -> None:
self.status_code = status_code
self._payload = payload
def raise_for_status(self) -> None:
if self.status_code >= 400:
raise httpx.HTTPStatusError("bad", request=httpx.Request("POST", "http://example"), response=httpx.Response(self.status_code))
def json(self) -> Any:
return self._payload
responses = iter([FakeResponse(200, {"response": "plain"}), FakeResponse(200, {"reply": "fallback"}), FakeResponse(200, {"message": {}})])
class FakeAsyncClient:
def __init__(self, timeout: float | None = None) -> None:
self.timeout = timeout
async def __aenter__(self) -> "FakeAsyncClient":
return self
async def __aexit__(self, *exc: object) -> None:
return None
async def post(self, _url: str, *, json: dict[str, Any], headers: dict[str, str]) -> FakeResponse:
del _url, json, headers
item = next(responses)
if isinstance(item, Exception):
raise item
return item
monkeypatch.setattr(httpx, "AsyncClient", FakeAsyncClient)
assert asyncio.run(client.chat([{"role": "user", "content": "a"}], timeout_sec=1.0)) == "plain"
assert asyncio.run(client.chat([{"role": "user", "content": "b"}], timeout_sec=1.0)) == "fallback"
with pytest.raises(LLMError, match="empty response"):
asyncio.run(client.chat([{"role": "user", "content": "c"}], timeout_sec=1.0))
error_settings = replace(settings, ollama_retries=1)
error_client = LLMClient(error_settings)
error_responses = iter([httpx.ConnectError("nope"), httpx.ConnectError("still nope")])
class ErrorAsyncClient(FakeAsyncClient):
async def post(self, _url: str, *, json: dict[str, Any], headers: dict[str, str]) -> FakeResponse:
del _url, json, headers
raise next(error_responses)
monkeypatch.setattr(httpx, "AsyncClient", ErrorAsyncClient)
with pytest.raises(LLMError):
asyncio.run(error_client.chat([{"role": "user", "content": "d"}], timeout_sec=1.0))
provider = SnapshotProvider(replace(settings, ariadne_state_url="http://snapshot", ariadne_state_token="tok"))
class SnapshotResp:
def raise_for_status(self) -> None:
return None
def json(self) -> dict[str, Any]:
return {"snapshot_id": "snap-1"}
monkeypatch.setattr("atlasbot.snapshot.builder.httpx.get", lambda url, headers, timeout: SnapshotResp())
assert provider.get() == {"snapshot_id": "snap-1"}
provider._cache = {"snapshot_id": "cached"}
provider._cache_ts = 10_000.0
monkeypatch.setattr("atlasbot.snapshot.builder.time.monotonic", lambda: 10_001.0)
assert provider.get() == {"snapshot_id": "cached"}