diff --git a/atlasbot/engine/answerer/common.py b/atlasbot/engine/answerer/common.py index 1a9e55d..ce09fde 100644 --- a/atlasbot/engine/answerer/common.py +++ b/atlasbot/engine/answerer/common.py @@ -38,17 +38,7 @@ def _strip_followup_meta(reply: str) -> str: return cleaned -def _build_meta( - mode: str, - call_count: int, - call_cap: int, - limit_hit: bool, - time_budget_hit: bool, - time_budget_sec: float, - classify: dict[str, Any], - tool_hint: dict[str, Any] | None, - started: float, -) -> dict[str, Any]: +def _build_meta(mode: str, call_count: int, call_cap: int, limit_hit: bool, time_budget_hit: bool, time_budget_sec: float, classify: dict[str, Any], tool_hint: dict[str, Any] | None, started: float) -> dict[str, Any]: return { "mode": mode, "llm_calls": call_count, @@ -214,13 +204,7 @@ def _build_chunk_groups(chunks: list[dict[str, Any]], group_size: int) -> list[l return groups -async def _score_chunks( - call_llm: Callable[..., Any], - chunks: list[dict[str, Any]], - question: str, - sub_questions: list[str], - plan: ModePlan, -) -> dict[str, float]: +async def _score_chunks(call_llm: Callable[..., Any], chunks: list[dict[str, Any]], question: str, sub_questions: list[str], plan: ModePlan) -> dict[str, float]: scores: dict[str, float] = {chunk["id"]: 0.0 for chunk in chunks} if not chunks: return scores @@ -238,11 +222,7 @@ async def _score_chunks( return await _score_groups_parallel(call_llm, groups, ctx) -async def _score_groups_serial( - call_llm: Callable[..., Any], - groups: list[list[dict[str, Any]]], - ctx: ScoreContext, -) -> dict[str, float]: +async def _score_groups_serial(call_llm: Callable[..., Any], groups: list[list[dict[str, Any]]], ctx: ScoreContext) -> dict[str, float]: scores: dict[str, float] = {} for grp in groups: runs = [await _score_chunk_group(call_llm, grp, ctx.question, ctx.sub_questions) for _ in range(ctx.retries)] @@ -254,11 +234,7 @@ async def _score_groups_serial( return scores -async def _score_groups_parallel( - call_llm: Callable[..., Any], - groups: list[list[dict[str, Any]]], - ctx: ScoreContext, -) -> dict[str, float]: +async def _score_groups_parallel(call_llm: Callable[..., Any], groups: list[list[dict[str, Any]]], ctx: ScoreContext) -> dict[str, float]: coros: list[Awaitable[tuple[int, dict[str, float]]]] = [] for idx, grp in enumerate(groups): for _ in range(ctx.retries): @@ -278,12 +254,7 @@ async def _score_groups_parallel( return scores -async def _score_chunk_group( - call_llm: Callable[..., Any], - group: list[dict[str, Any]], - question: str, - sub_questions: list[str], -) -> dict[str, float]: +async def _score_chunk_group(call_llm: Callable[..., Any], group: list[dict[str, Any]], question: str, sub_questions: list[str]) -> dict[str, float]: prompt = ( prompts.CHUNK_SCORE_PROMPT + "\nQuestion: " @@ -310,13 +281,7 @@ async def _score_chunk_group( return scored -async def _score_chunk_group_run( - call_llm: Callable[..., Any], - idx: int, - group: list[dict[str, Any]], - question: str, - sub_questions: list[str], -) -> tuple[int, dict[str, float]]: +async def _score_chunk_group_run(call_llm: Callable[..., Any], idx: int, group: list[dict[str, Any]], question: str, sub_questions: list[str]) -> tuple[int, dict[str, float]]: return idx, await _score_chunk_group(call_llm, group, question, sub_questions) @@ -332,12 +297,7 @@ def _merge_score_runs(runs: list[dict[str, float]]) -> dict[str, float]: return {key: totals[key] / counts[key] for key in totals} -async def _select_best_score_run( - call_llm: Callable[..., Any], - group: list[dict[str, Any]], - runs: list[dict[str, float]], - ctx: ScoreContext, -) -> dict[str, float]: +async def _select_best_score_run(call_llm: Callable[..., Any], group: list[dict[str, Any]], runs: list[dict[str, float]], ctx: ScoreContext) -> dict[str, float]: if not runs: return {} prompt = ( @@ -364,11 +324,7 @@ async def _select_best_score_run( return runs[idx] -def _keyword_hits( - ranked: list[dict[str, Any]], - head: dict[str, Any], - keywords: list[str] | None, -) -> list[dict[str, Any]]: +def _keyword_hits(ranked: list[dict[str, Any]], head: dict[str, Any], keywords: list[str] | None) -> list[dict[str, Any]]: if not keywords: return [] lowered = [kw.lower() for kw in keywords if isinstance(kw, str) and kw.strip()] @@ -384,13 +340,7 @@ def _keyword_hits( return hits -def _select_chunks( - chunks: list[dict[str, Any]], - scores: dict[str, float], - plan: ModePlan, - keywords: list[str] | None = None, - must_ids: list[str] | None = None, -) -> list[dict[str, Any]]: +def _select_chunks(chunks: list[dict[str, Any]], scores: dict[str, float], plan: ModePlan, keywords: list[str] | None = None, must_ids: list[str] | None = None) -> list[dict[str, Any]]: if not chunks: return [] ranked = sorted(chunks, key=lambda item: scores.get(item["id"], 0.0), reverse=True) @@ -403,12 +353,7 @@ def _select_chunks( return selected -def _append_must_chunks( - chunks: list[dict[str, Any]], - selected: list[dict[str, Any]], - must_ids: list[str] | None, - limit: int, -) -> bool: +def _append_must_chunks(chunks: list[dict[str, Any]], selected: list[dict[str, Any]], must_ids: list[str] | None, limit: int) -> bool: if not must_ids: return False id_map = {item["id"]: item for item in chunks} @@ -421,12 +366,7 @@ def _append_must_chunks( return False -def _append_keyword_chunks( - ranked: list[dict[str, Any]], - selected: list[dict[str, Any]], - keywords: list[str] | None, - limit: int, -) -> bool: +def _append_keyword_chunks(ranked: list[dict[str, Any]], selected: list[dict[str, Any]], keywords: list[str] | None, limit: int) -> bool: if not ranked: return False head = ranked[0] @@ -438,11 +378,7 @@ def _append_keyword_chunks( return False -def _append_ranked_chunks( - ranked: list[dict[str, Any]], - selected: list[dict[str, Any]], - limit: int, -) -> None: +def _append_ranked_chunks(ranked: list[dict[str, Any]], selected: list[dict[str, Any]], limit: int) -> None: for item in ranked: if len(selected) >= limit: break diff --git a/atlasbot/engine/answerer/engine.py b/atlasbot/engine/answerer/engine.py index bce37af..d01b925 100644 --- a/atlasbot/engine/answerer/engine.py +++ b/atlasbot/engine/answerer/engine.py @@ -31,29 +31,14 @@ class AnswerEngine: post-processing helpers stay split across smaller modules. """ - def __init__( - self, - settings: Settings, - llm: LLMClient, - kb: KnowledgeBase, - snapshot: SnapshotProvider, - ) -> None: + def __init__(self, settings: Settings, llm: LLMClient, kb: KnowledgeBase, snapshot: SnapshotProvider) -> None: self._settings = settings self._llm = llm self._kb = kb self._snapshot = snapshot self._store = ClaimStore(settings.state_db_path, settings.conversation_ttl_sec) - async def answer( - self, - question: str, - *, - mode: str, - history: list[dict[str, str]] | None = None, - observer: Callable[[str, str], None] | None = None, - conversation_id: str | None = None, - snapshot_pin: bool | None = None, - ) -> AnswerResult: + async def answer(self, question: str, *, mode: str, history: list[dict[str, str]] | None = None, observer: Callable[[str, str], None] | None = None, conversation_id: str | None = None, snapshot_pin: bool | None = None) -> AnswerResult: """Answer a question by delegating to the staged workflow.""" return await run_answer( @@ -71,15 +56,7 @@ class AnswerEngine: reply = await self._llm.chat(messages, model=self._settings.ollama_model) return AnswerResult(reply, _default_scores(), {"mode": "stock"}) - async def _synthesize_answer( - self, - question: str, - subanswers: list[str], - context: str, - classify: dict[str, Any], - plan: ModePlan, - call_llm: Callable[..., Any], - ) -> str: + async def _synthesize_answer(self, question: str, subanswers: list[str], context: str, classify: dict[str, Any], plan: ModePlan, call_llm: Callable[..., Any]) -> str: style_hint = _style_hint(classify) if not subanswers: prompt = ( @@ -148,13 +125,7 @@ class AnswerEngine: return drafts[idx] return drafts[0] - async def _score_answer( - self, - question: str, - reply: str, - plan: ModePlan, - call_llm: Callable[..., Any], - ) -> AnswerScores: + async def _score_answer(self, question: str, reply: str, plan: ModePlan, call_llm: Callable[..., Any]) -> AnswerScores: if not plan.use_scores: return _default_scores() prompt = prompts.SCORE_PROMPT + "\nQuestion: " + question + "\nAnswer: " + reply @@ -162,14 +133,7 @@ class AnswerEngine: data = _parse_json_block(raw, fallback={}) return _scores_from_json(data) - async def _extract_claims( - self, - question: str, - reply: str, - summary: dict[str, Any], - facts_used: list[str], - call_llm: Callable[..., Any], - ) -> list[ClaimItem]: + async def _extract_claims(self, question: str, reply: str, summary: dict[str, Any], facts_used: list[str], call_llm: Callable[..., Any]) -> list[ClaimItem]: if not reply or not summary: return [] summary_json = _json_excerpt(summary) @@ -208,27 +172,13 @@ class AnswerEngine: claims.append(ClaimItem(id=claim_id, claim=claim_text, evidence=evidence_items)) return claims - async def _dedup_reply( - self, - reply: str, - plan: ModePlan, - call_llm: Callable[..., Any], - tag: str, - ) -> str: + async def _dedup_reply(self, reply: str, plan: ModePlan, call_llm: Callable[..., Any], tag: str) -> str: if not _needs_dedup(reply): return reply dedup_prompt = prompts.DEDUP_PROMPT + "\nDraft: " + reply return await call_llm(prompts.DEDUP_SYSTEM, dedup_prompt, model=plan.fast_model, tag=tag) - async def _answer_followup( # noqa: C901 - self, - question: str, - state: ConversationState, - summary: dict[str, Any], - classify: dict[str, Any], # noqa: ARG002 - plan: ModePlan, - call_llm: Callable[..., Any], - ) -> str: + async def _answer_followup(self, question: str, state: ConversationState, summary: dict[str, Any], classify: dict[str, Any], plan: ModePlan, call_llm: Callable[..., Any]) -> str: # noqa: C901, ARG002 claim_ids = await self._select_claims(question, state.claims, plan, call_llm) selected = [claim for claim in state.claims if claim.id in claim_ids] if claim_ids else state.claims[:2] evidence_lines = [] @@ -284,13 +234,7 @@ class AnswerEngine: reply = _strip_followup_meta(reply) return reply - async def _select_claims( - self, - question: str, - claims: list[ClaimItem], - plan: ModePlan, - call_llm: Callable[..., Any], - ) -> list[str]: + async def _select_claims(self, question: str, claims: list[ClaimItem], plan: ModePlan, call_llm: Callable[..., Any]) -> list[str]: if not claims: return [] claims_brief = [{"id": claim.id, "claim": claim.claim} for claim in claims] @@ -308,14 +252,7 @@ class AnswerEngine: state_payload = self._store.get(conversation_id) return _state_from_payload(state_payload) if state_payload else None - def _store_state( - self, - conversation_id: str, - claims: list[ClaimItem], - summary: dict[str, Any], - snapshot: dict[str, Any] | None, - pin_snapshot: bool, - ) -> None: + def _store_state(self, conversation_id: str, claims: list[ClaimItem], summary: dict[str, Any], snapshot: dict[str, Any] | None, pin_snapshot: bool) -> None: snapshot_id = _snapshot_id(summary) pinned_snapshot = snapshot if pin_snapshot else None payload = { diff --git a/atlasbot/engine/answerer/factsheet.py b/atlasbot/engine/answerer/factsheet.py index 7980700..089b955 100644 --- a/atlasbot/engine/answerer/factsheet.py +++ b/atlasbot/engine/answerer/factsheet.py @@ -76,13 +76,7 @@ def _is_plain_math_question(question: str) -> bool: ) -def _quick_fact_sheet_lines( # noqa: C901 - question: str, - summary_lines: list[str], - kb_lines: list[str], - *, - limit: int, -) -> list[str]: +def _quick_fact_sheet_lines(question: str, summary_lines: list[str], kb_lines: list[str], *, limit: int) -> list[str]: # noqa: C901 tokens = { token for token in re.findall(r"[a-z0-9][a-z0-9_-]{2,}", question.lower()) diff --git a/atlasbot/engine/answerer/post.py b/atlasbot/engine/answerer/post.py index 0a25c59..a3d90f7 100644 --- a/atlasbot/engine/answerer/post.py +++ b/atlasbot/engine/answerer/post.py @@ -59,10 +59,7 @@ def _needs_evidence_guard(reply: str, facts: list[str]) -> bool: return any(term in lower_reply for term in arch_terms) and not any(term in fact_text for term in arch_terms) -async def _contradiction_decision( - ctx: ContradictionContext, - attempts: int = 1, -) -> dict[str, Any]: +async def _contradiction_decision(ctx: ContradictionContext, attempts: int = 1) -> dict[str, Any]: best = {"use_facts": True, "confidence": 50} facts_block = "\n".join(ctx.facts[:12]) for idx in range(max(1, attempts)): @@ -256,12 +253,7 @@ def _expand_tokens(tokens: list[str]) -> list[str]: return expanded -def _ensure_token_coverage( - lines: list[str], - tokens: list[str], - summary_lines: list[str], - max_add: int = 4, -) -> list[str]: +def _ensure_token_coverage(lines: list[str], tokens: list[str], summary_lines: list[str], max_add: int = 4) -> list[str]: if not lines or not tokens or not summary_lines: return lines hay = " ".join(lines).lower() diff --git a/atlasbot/engine/answerer/post_ext.py b/atlasbot/engine/answerer/post_ext.py index c2b93ba..65f23d0 100644 --- a/atlasbot/engine/answerer/post_ext.py +++ b/atlasbot/engine/answerer/post_ext.py @@ -74,12 +74,7 @@ def _needs_focus_fix(question: str, reply: str, classify: dict[str, Any]) -> boo return any(marker in reply.lower() for marker in extra_markers) -def _extract_keywords( - raw_question: str, - normalized: str, - sub_questions: list[str], - keywords: list[Any] | None, -) -> list[str]: +def _extract_keywords(raw_question: str, normalized: str, sub_questions: list[str], keywords: list[Any] | None) -> list[str]: stopwords = { "the", "and", @@ -211,13 +206,10 @@ def _resolve_path(data: Any, path: str) -> Any | None: else: return None if index is not None: - try: - idx = int(index) - if isinstance(cursor, list) and 0 <= idx < len(cursor): - cursor = cursor[idx] - else: - return None - except ValueError: + idx = int(index) + if isinstance(cursor, list) and 0 <= idx < len(cursor): + cursor = cursor[idx] + else: return None return cursor diff --git a/atlasbot/engine/answerer/retrieval.py b/atlasbot/engine/answerer/retrieval.py index 6ca81e3..1f01911 100644 --- a/atlasbot/engine/answerer/retrieval.py +++ b/atlasbot/engine/answerer/retrieval.py @@ -125,9 +125,9 @@ def _metric_ctx_values(ctx: dict[str, Any]) -> tuple[list[str], str, list[str], if not isinstance(summary_lines, list): return [], "", [], [], set() question = ctx.get("question") if isinstance(ctx, dict) else "" - sub_questions = ctx.get("sub_questions") if isinstance(ctx, dict) else [] - keywords = ctx.get("keywords") if isinstance(ctx, dict) else [] - keyword_tokens = ctx.get("keyword_tokens") if isinstance(ctx, dict) else [] + sub_questions = ctx.get("sub_questions") if isinstance(ctx.get("sub_questions"), list) else [] + keywords = ctx.get("keywords") if isinstance(ctx.get("keywords"), list) else [] + keyword_tokens = ctx.get("keyword_tokens") if isinstance(ctx.get("keyword_tokens"), list) else [] token_set = {str(token).lower() for token in keyword_tokens if token} token_set |= {token.lower() for token in _extract_keywords(str(question), str(question), sub_questions=sub_questions, keywords=keywords)} token_set = _token_variants(token_set) diff --git a/atlasbot/engine/answerer/retrieval_ext.py b/atlasbot/engine/answerer/retrieval_ext.py index 705ab08..2b02639 100644 --- a/atlasbot/engine/answerer/retrieval_ext.py +++ b/atlasbot/engine/answerer/retrieval_ext.py @@ -32,13 +32,7 @@ def _metric_key_tokens(summary_lines: list[str]) -> set[str]: return tokens -async def _select_best_candidate( - call_llm: Callable[..., Any], - question: str, - candidates: list[str], - plan: ModePlan, - tag: str, -) -> int: +async def _select_best_candidate(call_llm: Callable[..., Any], question: str, candidates: list[str], plan: ModePlan, tag: str) -> int: if len(candidates) <= 1: return 0 prompt = ( @@ -82,13 +76,7 @@ def _collect_fact_candidates(selected: list[dict[str, Any]], limit: int) -> list return _dedupe_lines(lines, limit=limit) -async def _select_best_list( - call_llm: Callable[..., Any], - question: str, - candidates: list[list[str]], - plan: ModePlan, - tag: str, -) -> list[str]: +async def _select_best_list(call_llm: Callable[..., Any], question: str, candidates: list[list[str]], plan: ModePlan, tag: str) -> list[str]: if not candidates: return [] if len(candidates) == 1: @@ -106,12 +94,7 @@ async def _select_best_list( return chosen -async def _extract_fact_types( - call_llm: Callable[..., Any], - question: str, - keywords: list[str], - plan: ModePlan, -) -> list[str]: +async def _extract_fact_types(call_llm: Callable[..., Any], question: str, keywords: list[str], plan: ModePlan) -> list[str]: prompt = prompts.FACT_TYPES_PROMPT + "\nQuestion: " + question if keywords: prompt += "\nKeywords: " + ", ".join(keywords) @@ -130,12 +113,7 @@ async def _extract_fact_types( return chosen[:10] -async def _derive_signals( - call_llm: Callable[..., Any], - question: str, - fact_types: list[str], - plan: ModePlan, -) -> list[str]: +async def _derive_signals(call_llm: Callable[..., Any], question: str, fact_types: list[str], plan: ModePlan) -> list[str]: if not fact_types: return [] prompt = prompts.SIGNAL_PROMPT.format(question=question, fact_types="; ".join(fact_types)) @@ -154,13 +132,7 @@ async def _derive_signals( return chosen[:12] -async def _scan_chunk_for_signals( - call_llm: Callable[..., Any], - question: str, - signals: list[str], - chunk_lines: list[str], - plan: ModePlan, -) -> list[str]: +async def _scan_chunk_for_signals(call_llm: Callable[..., Any], question: str, signals: list[str], chunk_lines: list[str], plan: ModePlan) -> list[str]: if not signals or not chunk_lines: return [] prompt = prompts.CHUNK_SCAN_PROMPT.format( @@ -183,13 +155,7 @@ async def _scan_chunk_for_signals( return chosen[:15] -async def _prune_metric_candidates( - call_llm: Callable[..., Any], - question: str, - candidates: list[str], - plan: ModePlan, - attempts: int, -) -> list[str]: +async def _prune_metric_candidates(call_llm: Callable[..., Any], question: str, candidates: list[str], plan: ModePlan, attempts: int) -> list[str]: if not candidates: return [] prompt = prompts.FACT_PRUNE_PROMPT.format(question=question, candidates="\n".join(candidates), max_lines=6) @@ -208,13 +174,7 @@ async def _prune_metric_candidates( return chosen[:6] -async def _select_fact_lines( - call_llm: Callable[..., Any], - question: str, - candidates: list[str], - plan: ModePlan, - max_lines: int, -) -> list[str]: +async def _select_fact_lines(call_llm: Callable[..., Any], question: str, candidates: list[str], plan: ModePlan, max_lines: int) -> list[str]: if not candidates: return [] prompt = prompts.FACT_PRUNE_PROMPT.format(question=question, candidates="\n".join(candidates), max_lines=max_lines) diff --git a/atlasbot/engine/answerer/workflow.py b/atlasbot/engine/answerer/workflow.py index c6573a5..9e43aa0 100644 --- a/atlasbot/engine/answerer/workflow.py +++ b/atlasbot/engine/answerer/workflow.py @@ -23,16 +23,7 @@ from .retrieval_ext import * from .spine import * from .workflow_post import finalize_answer -async def run_answer( # noqa: C901 - engine: Any, - question: str, - *, - mode: str, - history: list[dict[str, str]] | None = None, - observer: Callable[[str, str], None] | None = None, - conversation_id: str | None = None, - snapshot_pin: bool | None = None, -) -> AnswerResult: +async def run_answer(engine: Any, question: str, *, mode: str, history: list[dict[str, str]] | None = None, observer: Callable[[str, str], None] | None = None, conversation_id: str | None = None, snapshot_pin: bool | None = None) -> AnswerResult: # noqa: C901 """Answer a question using the staged reasoning pipeline.""" settings = engine._settings @@ -69,14 +60,7 @@ async def run_answer( # noqa: C901 "evidence_fix", } - async def call_llm( - system: str, - prompt: str, - *, - context: str | None = None, - model: str | None = None, - tag: str = "", - ) -> str: + async def call_llm(system: str, prompt: str, *, context: str | None = None, model: str | None = None, tag: str = "") -> str: nonlocal call_count, limit_hit, time_budget_hit if not limitless and call_count >= call_cap: limit_hit = True diff --git a/atlasbot/engine/answerer/workflow_post.py b/atlasbot/engine/answerer/workflow_post.py index 82d9f9d..81190bb 100644 --- a/atlasbot/engine/answerer/workflow_post.py +++ b/atlasbot/engine/answerer/workflow_post.py @@ -15,32 +15,7 @@ from .retrieval import * from .spine import * -async def finalize_answer( # noqa: C901 - *, - engine: Any, - call_llm: Callable[..., Any], - normalized: str, - subanswers: list[str], - context: str, - classify: dict[str, Any], - plan: ModePlan, - summary: dict[str, Any], - summary_lines: list[str], - metric_facts: list[str], - key_facts: list[str], - facts_used: list[str], - allowed_nodes: list[str], - allowed_namespaces: list[str], - runbook_paths: list[str], - lowered_question: str, - force_metric: bool, - keyword_tokens: list[str], - question_tokens: list[str], - snapshot_context: str, - observer: Callable[[str, str], None] | None, - mode: str, - metric_keys: list[str] | None = None, -) -> tuple[str, AnswerScores, list[ClaimItem]]: +async def finalize_answer(*, engine: Any, call_llm: Callable[..., Any], normalized: str, subanswers: list[str], context: str, classify: dict[str, Any], plan: ModePlan, summary: dict[str, Any], summary_lines: list[str], metric_facts: list[str], key_facts: list[str], facts_used: list[str], allowed_nodes: list[str], allowed_namespaces: list[str], runbook_paths: list[str], lowered_question: str, force_metric: bool, keyword_tokens: list[str], question_tokens: list[str], snapshot_context: str, observer: Callable[[str, str], None] | None, mode: str, metric_keys: list[str] | None = None) -> tuple[str, AnswerScores, list[ClaimItem]]: # noqa: C901 """Synthesize and post-process the final answer.""" reply = await engine._synthesize_answer(normalized, subanswers, context, classify, plan, call_llm) diff --git a/atlasbot/main.py b/atlasbot/main.py index 0fe16ca..990155f 100644 --- a/atlasbot/main.py +++ b/atlasbot/main.py @@ -49,14 +49,7 @@ async def main() -> None: queue = QueueManager(settings, handler) await queue.start() - async def answer_handler( - question: str, - mode: str, - history=None, - conversation_id=None, - snapshot_pin: bool | None = None, - observer=None, - ) -> AnswerResult: + async def answer_handler(question: str, mode: str, history=None, conversation_id=None, snapshot_pin: bool | None = None, observer=None) -> AnswerResult: if settings.queue_enabled: payload = await queue.submit( { diff --git a/atlasbot/matrix/bot.py b/atlasbot/matrix/bot.py index 9c01197..5a0778a 100644 --- a/atlasbot/matrix/bot.py +++ b/atlasbot/matrix/bot.py @@ -89,17 +89,7 @@ class MatrixClient: class MatrixBot: """Drive Matrix conversation handling and heartbeat replies.""" - def __init__( - self, - settings: Settings, - bot: MatrixBotConfig, - engine: AnswerEngine, - answer_handler: Callable[ - [str, str, list[dict[str, str]] | None, str | None, Callable[[str, str], None] | None], - Awaitable[AnswerResult], - ] - | None = None, - ) -> None: + def __init__(self, settings: Settings, bot: MatrixBotConfig, engine: AnswerEngine, answer_handler: Callable[[str, str, list[dict[str, str]] | None, str | None, Callable[[str, str], None] | None], Awaitable[AnswerResult]] | None = None) -> None: self._settings = settings self._bot = bot self._engine = engine diff --git a/atlasbot/snapshot/builder/core_a.py b/atlasbot/snapshot/builder/core_a.py index d64956c..9a48a9c 100644 --- a/atlasbot/snapshot/builder/core_a.py +++ b/atlasbot/snapshot/builder/core_a.py @@ -150,11 +150,7 @@ def _merge_cluster_summary(snapshot: dict[str, Any], summary: dict[str, Any]) -> ) -def _merge_cluster_fields( - summary: dict[str, Any], - cluster_summary: dict[str, Any], - field_types: dict[str, type], -) -> None: +def _merge_cluster_fields(summary: dict[str, Any], cluster_summary: dict[str, Any], field_types: dict[str, type]) -> None: for key, expected in field_types.items(): value = cluster_summary.get(key) if isinstance(value, expected): @@ -249,10 +245,7 @@ def _build_hardware_by_node(nodes_detail: list[dict[str, Any]]) -> dict[str, Any return {"hardware_by_node": mapping} if mapping else {} -def _build_hardware_usage( # noqa: C901 - metrics: dict[str, Any], - hardware_by_node: dict[str, Any] | None, -) -> dict[str, Any]: +def _build_hardware_usage(metrics: dict[str, Any], hardware_by_node: dict[str, Any] | None) -> dict[str, Any]: # noqa: C901 if not isinstance(hardware_by_node, dict) or not hardware_by_node: return {} node_load = metrics.get("node_load") if isinstance(metrics.get("node_load"), list) else [] diff --git a/atlasbot/snapshot/builder/format_a.py b/atlasbot/snapshot/builder/format_a.py index 50ae540..36425fa 100644 --- a/atlasbot/snapshot/builder/format_a.py +++ b/atlasbot/snapshot/builder/format_a.py @@ -470,7 +470,9 @@ def _append_pvc_usage(lines: list[str], summary: dict[str, Any]) -> None: return parts = [] for entry in pvc_usage: - metric = entry.get("metric") if isinstance(entry, dict) else {} + if not isinstance(entry, dict): + continue + metric = entry.get("metric") if isinstance(entry.get("metric"), dict) else {} namespace = metric.get("namespace") pvc = metric.get("persistentvolumeclaim") value = entry.get("value") @@ -478,8 +480,6 @@ def _append_pvc_usage(lines: list[str], summary: dict[str, Any]) -> None: parts.append(f"{namespace}/{pvc}={_format_float(value)}%") if parts: lines.append("pvc_usage_top: " + "; ".join(parts)) - - def _append_root_disk_headroom(lines: list[str], summary: dict[str, Any]) -> None: headroom = summary.get("root_disk_low_headroom") if not isinstance(headroom, list) or not headroom: diff --git a/atlasbot/snapshot/builder/format_b.py b/atlasbot/snapshot/builder/format_b.py index 0eef293..8756d82 100644 --- a/atlasbot/snapshot/builder/format_b.py +++ b/atlasbot/snapshot/builder/format_b.py @@ -6,6 +6,25 @@ from .core_a import _VALUE_PAIR_LEN from .format_a import * +def _append_namespace_metric_series( + lines: list[str], + label: str, + entries: list[Any], + formatter: Any, +) -> None: + parts = [] + for entry in entries: + if not isinstance(entry, dict): + continue + metric = entry.get("metric") if isinstance(entry.get("metric"), dict) else {} + namespace = metric.get("namespace") + value = entry.get("value") + if namespace: + parts.append(f"{namespace}={formatter(value)}") + if parts: + lines.append(f"{label}: " + "; ".join(parts)) + + def _append_longhorn(lines: list[str], summary: dict[str, Any]) -> None: # noqa: C901 longhorn = summary.get("longhorn") if isinstance(summary.get("longhorn"), dict) else {} if not longhorn: @@ -52,78 +71,24 @@ def _append_namespace_usage(lines: list[str], summary: dict[str, Any]) -> None: metrics = summary.get("metrics") if isinstance(summary.get("metrics"), dict) else {} cpu_top = metrics.get("namespace_cpu_top") if isinstance(metrics.get("namespace_cpu_top"), list) else [] mem_top = metrics.get("namespace_mem_top") if isinstance(metrics.get("namespace_mem_top"), list) else [] - if cpu_top: - parts = [] - for entry in cpu_top: - metric = entry.get("metric") if isinstance(entry, dict) else {} - namespace = metric.get("namespace") - value = entry.get("value") - if namespace: - parts.append(f"{namespace}={_format_float(value)}") - if parts: - lines.append("namespace_cpu_top: " + "; ".join(parts)) - if mem_top: - parts = [] - for entry in mem_top: - metric = entry.get("metric") if isinstance(entry, dict) else {} - namespace = metric.get("namespace") - value = entry.get("value") - if namespace: - parts.append(f"{namespace}={_format_bytes(value)}") - if parts: - lines.append("namespace_mem_top: " + "; ".join(parts)) + _append_namespace_metric_series(lines, "namespace_cpu_top", cpu_top, _format_float) + _append_namespace_metric_series(lines, "namespace_mem_top", mem_top, _format_bytes) def _append_namespace_requests(lines: list[str], summary: dict[str, Any]) -> None: metrics = summary.get("metrics") if isinstance(summary.get("metrics"), dict) else {} cpu_req = metrics.get("namespace_cpu_requests_top") if isinstance(metrics.get("namespace_cpu_requests_top"), list) else [] mem_req = metrics.get("namespace_mem_requests_top") if isinstance(metrics.get("namespace_mem_requests_top"), list) else [] - if cpu_req: - parts = [] - for entry in cpu_req: - metric = entry.get("metric") if isinstance(entry, dict) else {} - namespace = metric.get("namespace") - value = entry.get("value") - if namespace: - parts.append(f"{namespace}={_format_float(value)}") - if parts: - lines.append("namespace_cpu_requests_top: " + "; ".join(parts)) - if mem_req: - parts = [] - for entry in mem_req: - metric = entry.get("metric") if isinstance(entry, dict) else {} - namespace = metric.get("namespace") - value = entry.get("value") - if namespace: - parts.append(f"{namespace}={_format_bytes(value)}") - if parts: - lines.append("namespace_mem_requests_top: " + "; ".join(parts)) + _append_namespace_metric_series(lines, "namespace_cpu_requests_top", cpu_req, _format_float) + _append_namespace_metric_series(lines, "namespace_mem_requests_top", mem_req, _format_bytes) def _append_namespace_io_net(lines: list[str], summary: dict[str, Any]) -> None: metrics = summary.get("metrics") if isinstance(summary.get("metrics"), dict) else {} net_top = metrics.get("namespace_net_top") if isinstance(metrics.get("namespace_net_top"), list) else [] io_top = metrics.get("namespace_io_top") if isinstance(metrics.get("namespace_io_top"), list) else [] - if net_top: - parts = [] - for entry in net_top: - metric = entry.get("metric") if isinstance(entry, dict) else {} - namespace = metric.get("namespace") - value = entry.get("value") - if namespace: - parts.append(f"{namespace}={_format_rate_bytes(value)}") - if parts: - lines.append("namespace_net_top: " + "; ".join(parts)) - if io_top: - parts = [] - for entry in io_top: - metric = entry.get("metric") if isinstance(entry, dict) else {} - namespace = metric.get("namespace") - value = entry.get("value") - if namespace: - parts.append(f"{namespace}={_format_rate_bytes(value)}") - if parts: - lines.append("namespace_io_top: " + "; ".join(parts)) + _append_namespace_metric_series(lines, "namespace_net_top", net_top, _format_rate_bytes) + _append_namespace_metric_series(lines, "namespace_io_top", io_top, _format_rate_bytes) def _append_pod_usage(lines: list[str], summary: dict[str, Any]) -> None: # noqa: C901 @@ -143,7 +108,9 @@ def _append_pod_usage(lines: list[str], summary: dict[str, Any]) -> None: # noq if cpu_top: parts = [] for entry in cpu_top: - metric = entry.get("metric") if isinstance(entry, dict) else {} + if not isinstance(entry, dict): + continue + metric = entry.get("metric") if isinstance(entry.get("metric"), dict) else {} namespace = metric.get("namespace") pod = metric.get("pod") value = entry.get("value") @@ -154,7 +121,9 @@ def _append_pod_usage(lines: list[str], summary: dict[str, Any]) -> None: # noq if cpu_top_node: parts = [] for entry in cpu_top_node: - metric = entry.get("metric") if isinstance(entry, dict) else {} + if not isinstance(entry, dict): + continue + metric = entry.get("metric") if isinstance(entry.get("metric"), dict) else {} namespace = metric.get("namespace") pod = metric.get("pod") node = metric.get("node") @@ -166,7 +135,9 @@ def _append_pod_usage(lines: list[str], summary: dict[str, Any]) -> None: # noq if mem_top: parts = [] for entry in mem_top: - metric = entry.get("metric") if isinstance(entry, dict) else {} + if not isinstance(entry, dict): + continue + metric = entry.get("metric") if isinstance(entry.get("metric"), dict) else {} namespace = metric.get("namespace") pod = metric.get("pod") value = entry.get("value") @@ -177,7 +148,9 @@ def _append_pod_usage(lines: list[str], summary: dict[str, Any]) -> None: # noq if mem_top_node: parts = [] for entry in mem_top_node: - metric = entry.get("metric") if isinstance(entry, dict) else {} + if not isinstance(entry, dict): + continue + metric = entry.get("metric") if isinstance(entry.get("metric"), dict) else {} namespace = metric.get("namespace") pod = metric.get("pod") node = metric.get("node") @@ -230,7 +203,9 @@ def _append_job_failures(lines: list[str], summary: dict[str, Any]) -> None: return parts = [] for entry in failures: - metric = entry.get("metric") if isinstance(entry, dict) else {} + if not isinstance(entry, dict): + continue + metric = entry.get("metric") if isinstance(entry.get("metric"), dict) else {} namespace = metric.get("namespace") job_name = metric.get("job_name") or metric.get("job") value = entry.get("value") @@ -323,7 +298,9 @@ def _append_postgres(lines: list[str], summary: dict[str, Any]) -> None: if isinstance(by_db, list) and by_db: parts = [] for entry in by_db: - metric = entry.get("metric") if isinstance(entry, dict) else {} + if not isinstance(entry, dict): + continue + metric = entry.get("metric") if isinstance(entry.get("metric"), dict) else {} value = entry.get("value") if isinstance(value, list) and len(value) >= _VALUE_PAIR_LEN: value = value[1] diff --git a/atlasbot/snapshot/builder/format_c.py b/atlasbot/snapshot/builder/format_c.py index bf08000..33c20f4 100644 --- a/atlasbot/snapshot/builder/format_c.py +++ b/atlasbot/snapshot/builder/format_c.py @@ -2,6 +2,7 @@ from __future__ import annotations from typing import Any +from .core_a import PVC_USAGE_CRITICAL from .format_b import * def _append_signals(lines: list[str], summary: dict[str, Any]) -> None: signals = summary.get("signals") if isinstance(summary.get("signals"), list) else [] diff --git a/scripts/check_coverage.py b/scripts/check_coverage.py index 74a3427..727057c 100755 --- a/scripts/check_coverage.py +++ b/scripts/check_coverage.py @@ -5,35 +5,20 @@ from __future__ import annotations import argparse import json -from datetime import date from pathlib import Path -def main() -> int: # noqa: C901 +def main() -> int: """Check each production file against a minimum coverage percentage.""" parser = argparse.ArgumentParser() parser.add_argument("coverage_json") parser.add_argument("--root", default="atlasbot") parser.add_argument("--threshold", type=float, default=95.0) - parser.add_argument("--exceptions-file", default="testing/coverage_exceptions.json") args = parser.parse_args() data = json.loads(Path(args.coverage_json).read_text(encoding="utf-8")) files = data.get("files") if isinstance(data, dict) else {} - exceptions_path = Path(args.exceptions_file) - per_file_thresholds: dict[str, float] = {} - if exceptions_path.exists(): - payload = json.loads(exceptions_path.read_text(encoding="utf-8")) - expires_on = str(payload.get("expires_on") or "").strip() - if expires_on and date.today() > date.fromisoformat(expires_on): - print(f"coverage exceptions expired on {expires_on}: {exceptions_path}") - return 1 - overrides = payload.get("per_file_thresholds") - if isinstance(overrides, dict): - for path, threshold in overrides.items(): - if isinstance(path, str) and isinstance(threshold, (int, float)): - per_file_thresholds[path] = float(threshold) violations: list[tuple[float, str]] = [] for path, payload in sorted(files.items()): if not path.startswith(f"{args.root}/"): @@ -42,9 +27,8 @@ def main() -> int: # noqa: C901 percent = summary.get("percent_covered") if isinstance(summary, dict) else None if not isinstance(percent, (int, float)): continue - threshold = per_file_thresholds.get(path, float(args.threshold)) - if float(percent) < threshold: - violations.append((float(percent), f"{path} (min {threshold:.2f}%)")) + if float(percent) < args.threshold: + violations.append((float(percent), path)) if violations: for percent, path in sorted(violations): @@ -55,3 +39,4 @@ def main() -> int: # noqa: C901 if __name__ == "__main__": raise SystemExit(main()) + diff --git a/tests/test_split_helper_coverage.py b/tests/test_split_helper_coverage.py new file mode 100644 index 0000000..cb4dd9a --- /dev/null +++ b/tests/test_split_helper_coverage.py @@ -0,0 +1,1749 @@ +"""Targeted coverage tests for Atlasbot's split helper modules.""" + +from __future__ import annotations + +import asyncio +import json +import runpy +from dataclasses import replace +from pathlib import Path +from types import SimpleNamespace +from typing import Any + +import httpx +import pytest + +from atlasbot.config import MatrixBotConfig +from atlasbot.engine.answerer import common as answer_common +from atlasbot.engine.answerer import engine as answer_engine +from atlasbot.engine.answerer import factsheet as answer_factsheet +from atlasbot.engine.answerer import post as answer_post +from atlasbot.engine.answerer import post_ext as answer_post_ext +from atlasbot.engine.answerer import retrieval as answer_retrieval +from atlasbot.engine.answerer import retrieval_ext as answer_retrieval_ext +from atlasbot.engine.answerer import spine as answer_spine +from atlasbot.engine.answerer import workflow as answer_workflow +from atlasbot.engine.answerer import workflow_post as answer_workflow_post +from atlasbot.engine.answerer._base import ( + AnswerResult, + AnswerScores, + ClaimItem, + ContradictionContext, + EvidenceItem, + InsightGuardInput, +) +from atlasbot.knowledge.loader import KnowledgeBase +from atlasbot.llm.client import LLMClient, LLMError +from atlasbot.main import _build_engine, result_scores +from atlasbot.matrix.bot import MatrixBot, MatrixClient, _extract_mode, _mode_timeout_sec +from atlasbot.snapshot.builder import SnapshotProvider, core_a, format_a, format_b, format_c +from testing.fakes import build_test_settings + + +class ScriptedCall: + """Return canned async responses keyed by tag.""" + + def __init__(self, responses: dict[str, Any]) -> None: + self._responses = { + key: list(value) if isinstance(value, list) else value for key, value in responses.items() + } + self.calls: list[str] = [] + + async def __call__( + self, + _system: str, + _prompt: str, + *, + context: str | None = None, + model: str | None = None, + tag: str = "", + ) -> str: + del context, model + self.calls.append(tag) + value = self._responses.get(tag, "{}") + if isinstance(value, list): + if not value: + return "{}" + item = value.pop(0) + return str(item) + return str(value) + + +def test_knowledge_base_private_paths(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: + """Cover runbook, catalog, and file-scanning edge branches.""" + + base = tmp_path / "kb" + catalog = base / "catalog" + catalog.mkdir(parents=True) + (catalog / "atlas.json").write_text(json.dumps({"cluster": "atlas", "sources": []}), encoding="utf-8") + (catalog / "runbooks.json").write_text( + json.dumps([{"title": "Good", "path": "runbooks/good.md"}, {"title": "MissingPath"}, "bad-entry"]), + encoding="utf-8", + ) + (base / "notes.md").write_text("alpha\nbeta", encoding="utf-8") + (base / "empty.txt").write_text("", encoding="utf-8") + (base / "bad.md").write_text("boom", encoding="utf-8") + + kb = KnowledgeBase(str(base)) + assert kb.runbook_titles(limit=1) == "Relevant runbooks:\n- Good (runbooks/good.md)" + assert kb.runbook_paths(limit=5) == ["runbooks/good.md"] + assert kb.chunk_lines(max_files=1, max_chars=120) + + lines: list[str] = [] + kb._runbooks = [{"title": "Good", "path": "runbooks/good.md"}, "bad-entry"] # type: ignore[assignment] + kb._append_runbooks(lines) + assert "KB: runbooks.json" in lines + assert "- Good (runbooks/good.md)" in lines + + monkeypatch.setattr("atlasbot.knowledge.loader.json.dumps", lambda *_args, **_kwargs: (_ for _ in ()).throw(RuntimeError("nope"))) + before = list(lines) + kb._append_catalog(lines, max_chars=999) + assert lines == before + + original_read_text = Path.read_text + + def fake_read_text(self: Path, *args: Any, **kwargs: Any) -> str: + if self.name == "bad.md": + raise OSError("blocked") + return original_read_text(self, *args, **kwargs) + + monkeypatch.setattr(Path, "read_text", fake_read_text) + file_lines: list[str] = [] + kb._append_files(file_lines, max_files=1, max_chars=120) + assert any(line.startswith("KB File: notes.md") for line in file_lines) + + empty = KnowledgeBase("") + assert empty.chunk_lines() == [] + + +def test_knowledge_base_limit_and_break_paths(tmp_path: Path) -> None: + """Cover size-guard exits that only trigger near prompt limits.""" + + base = tmp_path / "kb" + catalog = base / "catalog" + catalog.mkdir(parents=True) + (catalog / "atlas.json").write_text(json.dumps({"cluster": "atlas"}), encoding="utf-8") + (base / "notes.md").write_text("alpha", encoding="utf-8") + + kb = KnowledgeBase(str(base)) + assert any(line.startswith("KB File: notes.md") for line in kb.chunk_lines(max_files=2, max_chars=500)) + + no_atlas_lines = ["seed"] + kb._atlas = None + kb._append_catalog(no_atlas_lines, max_chars=500) + assert no_atlas_lines == ["seed"] + + over_limit_lines = ["x" * 25] + kb._atlas = {"cluster": "atlas"} + kb._append_catalog(over_limit_lines, max_chars=10) + assert over_limit_lines == ["x" * 25] + + runbook_lines = ["seed"] + kb._runbooks = [] + kb._append_runbooks(runbook_lines) + assert runbook_lines == ["seed"] + + limit_lines = ["x" * 50] + kb._append_files(limit_lines, max_files=2, max_chars=20) + assert limit_lines == ["x" * 50] + + capped_lines = ["seed"] * 51 + kb._append_files(capped_lines, max_files=1, max_chars=1_000) + assert capped_lines == ["seed"] * 51 + + extend_lines: list[str] = [] + kb._append_files(extend_lines, max_files=5, max_chars=18) + assert extend_lines == ["KB File: notes.md"] + + +def test_llm_client_timeout_fallback_and_parse(monkeypatch: pytest.MonkeyPatch) -> None: + """Exercise timeout, fallback-model, and empty-response branches.""" + + settings = replace(build_test_settings(), ollama_url="http://ollama/api/chat", ollama_api_key="secret") + client = LLMClient(settings) + assert client._endpoint() == "http://ollama/api/chat" + assert client._headers["x-api-key"] == "secret" + + with pytest.raises(LLMError, match="timeout"): + asyncio.run(client.chat([{"role": "user", "content": "hi"}], timeout_sec=0.0)) + + class FakeResponse: + def __init__(self, status_code: int, payload: dict[str, Any]): + self.status_code = status_code + self._payload = payload + + def raise_for_status(self) -> None: + if self.status_code >= 400: + raise httpx.HTTPStatusError( + "bad", + request=httpx.Request("POST", "http://ollama"), + response=httpx.Response(self.status_code), + ) + + def json(self) -> dict[str, Any]: + return self._payload + + calls: list[str] = [] + + class FallbackClient: + def __init__(self, timeout: float | None = None) -> None: + self.timeout = timeout + + async def __aenter__(self) -> "FallbackClient": + return self + + async def __aexit__(self, *exc: object) -> None: + return None + + async def post(self, _url: str, *, json: dict[str, Any], headers: dict[str, str]) -> FakeResponse: + calls.append(json["model"]) + assert headers["Content-Type"] == "application/json" + if json["model"] == "base": + return FakeResponse(404, {}) + return FakeResponse(200, {"message": {"content": "ok"}}) + + monkeypatch.setattr(httpx, "AsyncClient", FallbackClient) + fallback_client = LLMClient(replace(settings, ollama_model="base", ollama_fallback_model="backup", ollama_retries=1)) + assert asyncio.run(fallback_client.chat([{"role": "user", "content": "hello"}])) == "ok" + assert calls == ["base", "backup"] + + class EmptyClient(FallbackClient): + async def post(self, _url: str, *, json: dict[str, Any], headers: dict[str, str]) -> FakeResponse: + del json, headers + return FakeResponse(200, {}) + + monkeypatch.setattr(httpx, "AsyncClient", EmptyClient) + with pytest.raises(LLMError, match="empty response"): + asyncio.run(LLMClient(settings).chat([{"role": "user", "content": "hello"}])) + + +def test_llm_client_deadline_and_exhausted_fallback(monkeypatch: pytest.MonkeyPatch) -> None: + """Cover the timeout-after-error and retry-exhausted fallback edges.""" + + settings = replace(build_test_settings(), ollama_url="http://ollama") + + class TimeoutClient: + def __init__(self, timeout: float | None = None) -> None: + self.timeout = timeout + + async def __aenter__(self) -> "TimeoutClient": + return self + + async def __aexit__(self, *exc: object) -> None: + return None + + async def post(self, _url: str, *, json: dict[str, Any], headers: dict[str, str]) -> str: + del _url, json, headers + raise RuntimeError("boom") + + moments = iter((100.0, 100.0, 100.2)) + with monkeypatch.context() as local_patch: + local_patch.setattr(httpx, "AsyncClient", TimeoutClient) + local_patch.setattr("atlasbot.llm.client.time", SimpleNamespace(monotonic=lambda: next(moments))) + with pytest.raises(LLMError, match="timeout"): + asyncio.run(LLMClient(replace(settings, ollama_retries=0)).chat([{"role": "user", "content": "late"}], timeout_sec=0.1)) + + class FallbackResponse: + status_code = 404 + + def raise_for_status(self) -> None: + return None + + def json(self) -> dict[str, str]: + return {} + + class FallbackOnlyClient(TimeoutClient): + async def post(self, _url: str, *, json: dict[str, Any], headers: dict[str, str]) -> FallbackResponse: + del _url, json, headers + return FallbackResponse() + + monkeypatch.setattr(httpx, "AsyncClient", FallbackOnlyClient) + with pytest.raises(LLMError, match="ollama retries exhausted"): + asyncio.run( + LLMClient(replace(settings, ollama_model="base", ollama_fallback_model="backup", ollama_retries=0)).chat( + [{"role": "user", "content": "fallback"}], + timeout_sec=1.0, + ) + ) + + +def test_result_scores_and_build_engine(tmp_path: Path) -> None: + """Cover score coercion fallbacks and engine construction.""" + + settings = replace(build_test_settings(), kb_dir="", state_db_path=str(tmp_path / "state.db")) + engine = _build_engine(settings) + assert isinstance(engine, answer_engine.AnswerEngine) + + good = result_scores({"scores": {"confidence": 91, "relevance": "88", "satisfaction": 77.1, "hallucination_risk": "low"}}) + assert good.confidence == 91 + assert result_scores({"scores": {"confidence": "broken"}}).confidence == 60 + assert result_scores("bad-payload").hallucination_risk == "medium" # type: ignore[arg-type] + + +def test_main_module_script_entrypoint(monkeypatch: pytest.MonkeyPatch) -> None: + """Cover the `python -m atlasbot.main` entrypoint without booting services.""" + + class StopMain(RuntimeError): + """Stop the module after the entrypoint invokes asyncio.run.""" + + def fake_run(coro: Any) -> None: + coro.close() + raise StopMain("stop") + + monkeypatch.setattr(asyncio, "run", fake_run) + with pytest.raises(StopMain, match="stop"): + runpy.run_module("atlasbot.main", run_name="__main__") + + +def test_matrix_client_and_bot_error_paths(monkeypatch: pytest.MonkeyPatch) -> None: + """Cover Matrix error handling, ignored events, and mode extraction branches.""" + + settings = replace(build_test_settings(), matrix_base="http://matrix", auth_base="http://auth", room_alias="#atlas:example") + bot_cfg = MatrixBotConfig("atlasbot", "pw", ("atlas", "atlas-smart"), "quick") + + class ErrorClient: + def __init__(self, timeout: float | None = None) -> None: + self.timeout = timeout + + async def __aenter__(self) -> "ErrorClient": + return self + + async def __aexit__(self, *exc: object) -> None: + return None + + async def post(self, *_args: Any, **_kwargs: Any) -> SimpleNamespace: + return SimpleNamespace(status_code=200, raise_for_status=lambda: None, json=lambda: {"access_token": "tok"}) + + async def get(self, url: str, **_kwargs: Any) -> SimpleNamespace: + if "directory/room" in url: + raise httpx.HTTPError("no room") + return SimpleNamespace(raise_for_status=lambda: None, json=lambda: {"next_batch": "n2"}) + + monkeypatch.setattr("atlasbot.matrix.bot.httpx.AsyncClient", ErrorClient) + client = MatrixClient(settings, bot_cfg) + assert asyncio.run(client.resolve_room("tok")) == "" + assert asyncio.run(client.sync("tok", "batch-1"))["next_batch"] == "n2" + + mode, cleaned = _extract_mode("Atlas-smart hello", ("atlas",), "") + assert mode == "smart" + assert cleaned == "-smart hello" + assert _mode_timeout_sec(settings, "genius") == settings.genius_time_budget_sec + + class FakeMatrixClient: + def __init__(self) -> None: + self.sent: list[str] = [] + self.login_calls = 0 + self.sync_calls = 0 + + async def login(self) -> str: + self.login_calls += 1 + raise RuntimeError("boot failed") + + async def resolve_room(self, token: str) -> str: + del token + return "" + + async def join_room(self, token: str, room_id: str) -> None: + del token, room_id + + async def send_message(self, token: str, room_id: str, text: str) -> None: + del token, room_id + self.sent.append(text) + + async def sync(self, token: str, since: str | None) -> dict[str, Any]: + del token, since + self.sync_calls += 1 + raise RuntimeError("sync failed") + + sleeps = {"count": 0} + + async def fake_sleep(_seconds: float) -> None: + sleeps["count"] += 1 + raise asyncio.CancelledError + + monkeypatch.setattr("atlasbot.matrix.bot.asyncio.sleep", fake_sleep) + bot = MatrixBot(settings, bot_cfg, SimpleNamespace(answer=None), None) + bot._client = FakeMatrixClient() + with pytest.raises(asyncio.CancelledError): + asyncio.run(bot.run()) + with pytest.raises(asyncio.CancelledError): + asyncio.run(bot._sync_loop("tok")) + assert sleeps["count"] >= 2 + + class SendOnlyClient(FakeMatrixClient): + async def login(self) -> str: + return "tok" + + async def handler(question: str, mode: str, history: list[dict[str, str]] | None, conversation_id: str | None, observer): + del history, conversation_id + if observer: + observer("phase", "working") + return AnswerResult(reply=f"{mode}:{question}", scores=AnswerScores(1, 2, 3, "low"), meta={}) + + bot2 = MatrixBot(replace(settings, thinking_interval_sec=0.001), bot_cfg, SimpleNamespace(answer=None), handler) + bot2._client = SendOnlyClient() + payload = { + "rooms": { + "join": { + "!room": { + "timeline": { + "events": [ + "junk", + {"type": "m.presence", "sender": "user", "content": {}}, + {"type": "m.room.message", "sender": "atlasbot", "content": {"body": "ignore self"}}, + {"type": "m.room.message", "sender": "user", "content": {"body": "atlas what is up?"}}, + ] + } + } + } + } + } + asyncio.run(bot2._handle_sync("tok", payload)) + assert any("Thinking" in item for item in bot2._client.sent) + + +def test_matrix_bot_timeout_variants() -> None: + """Cover smart and genius timeout messages separately.""" + + settings = build_test_settings() + bot_cfg = MatrixBotConfig("atlasbot", "pw", ("atlas", "atlas-smart"), "quick") + + async def sleepy_handler(question: str, mode: str, history, conversation_id, observer): + del question, mode, history, conversation_id, observer + await asyncio.sleep(1.2) + return AnswerResult("late", AnswerScores(1, 2, 3, "low"), {}) + + smart_bot = MatrixBot(replace(settings, thinking_interval_sec=0.001, smart_time_budget_sec=0.01), bot_cfg, SimpleNamespace(answer=None), sleepy_handler) + smart_bot._client = SimpleNamespace( + sent=[], + send_message=lambda token, room_id, text: asyncio.sleep(0, result=smart_bot._client.sent.append(text)), + ) + asyncio.run(smart_bot._answer_with_heartbeat("tok", "!room", "q", "smart")) + assert any("atlas-genius" in msg for msg in smart_bot._client.sent) + + genius_bot = MatrixBot(replace(settings, thinking_interval_sec=0.001, genius_time_budget_sec=0.01), bot_cfg, SimpleNamespace(answer=None), sleepy_handler) + genius_bot._client = SimpleNamespace( + sent=[], + send_message=lambda token, room_id, text: asyncio.sleep(0, result=genius_bot._client.sent.append(text)), + ) + asyncio.run(genius_bot._answer_with_heartbeat("tok", "!room", "q", "genius")) + assert any("ran out of time" in msg for msg in genius_bot._client.sent) + + +def test_answer_common_helper_paths() -> None: + """Cover common chunk-selection and scoring helpers.""" + + settings = replace(build_test_settings(), debug_pipeline=True) + meta = answer_common._build_meta("smart", 2, 5, True, False, 45.0, {"question_type": "metric"}, {"tool": "facts"}, started=0.0) + assert meta["llm_limit_hit"] is True + assert answer_common._llm_call_limit(settings, "smart") == settings.smart_llm_calls_max + assert answer_common._mode_time_budget(settings, "genius") == settings.genius_time_budget_sec + assert answer_common._select_subquestions([{"question": "A", "priority": "nope"}, {"question": "B", "priority": 3}], "fallback", 2) == ["B", "A"] + assert answer_common._chunk_lines(["a", "b", "c"], 2)[0]["summary"] == "a | b" + assert answer_common._raw_snapshot_chunks({"ok": 1, "bad": {1, 2}}) + assert answer_common._build_chunk_groups([{"id": "c1", "summary": "s1"}, {"id": "c2", "summary": "s2"}], 1) == [[{"id": "c1", "summary": "s1"}], [{"id": "c2", "summary": "s2"}]] + assert answer_common._merge_score_runs([{"a": 2.0}, {"a": 4.0, "b": 6.0}]) == {"a": 3.0, "b": 6.0} + + chunks = [{"id": "c1", "text": "atlas cpu 90", "summary": "cpu"}, {"id": "c2", "text": "storage okay", "summary": "storage"}] + ranked = answer_common._select_chunks(chunks, {"c1": 1.0, "c2": 0.5}, answer_common._mode_plan(settings, "smart"), ["storage"], ["c2"]) + assert ranked[0]["id"] == "c1" + assert any(item["id"] == "c2" for item in ranked) + assert answer_common._format_runbooks(["runbooks/fix.md"]).startswith("Relevant runbooks:") + + scripted = ScriptedCall( + { + "chunk_score": '[{"id":"c1","score":1},{"id":"c2","score":"2"}]', + "chunk_select": '{"selected_index": 5}', + } + ) + plan = replace(answer_common._mode_plan(settings, "smart"), score_retries=2, parallelism=2, chunk_group=1) + scores = asyncio.run(answer_common._score_chunks(scripted, chunks, "What is hot?", ["cpu?"], plan)) + assert scores["c1"] >= 0.0 + assert scores["c2"] >= 0.0 + best = asyncio.run( + answer_common._select_best_score_run( + scripted, + [{"id": "c1", "summary": "cpu"}], + [{"c1": 2.0}, {"c1": 8.0}], + answer_common.ScoreContext("q", ["sq"], 2, 2, True, "fast"), + ) + ) + assert best == {"c1": 2.0} + + +def test_answer_common_edge_branches() -> None: + """Cover low-frequency common helper branches and fallbacks.""" + + settings = build_test_settings() + plan = answer_common._mode_plan(settings, "smart") + + assert answer_common._strip_followup_meta("") == "" + assert answer_common._strip_followup_meta("Based on the context, Atlas is warm.") == "Atlas is warm." + assert answer_common._raw_snapshot_chunks(None) == [] + assert asyncio.run(answer_common._score_chunks(ScriptedCall({}), [], "q", [], plan)) == {} + + bad_scores = ScriptedCall({"chunk_score": '[{"id":"c1","score":"oops"},{"score":2},"bad"]'}) + ctx = answer_common.ScoreContext("q", ["sub"], 1, 1, False, "fast") + assert asyncio.run(answer_common._score_groups_serial(bad_scores, [[{"id": "c1", "summary": "one"}]], ctx)) == {"c1": 0.0} + + parallel_scores = ScriptedCall({"chunk_score": ['[{"id":"c1","score":1}]', '[{"id":"c2","score":2}]']}) + parallel = asyncio.run( + answer_common._score_groups_parallel( + parallel_scores, + [[{"id": "c1", "summary": "one"}], [{"id": "c2", "summary": "two"}]], + answer_common.ScoreContext("q", ["sub"], 1, 2, False, "fast"), + ) + ) + assert parallel == {"c1": 1.0, "c2": 2.0} + + selector = ScriptedCall({"chunk_select": ['{"selected_index":"bad"}', '{"selected_index":99}']}) + runs = [{"c1": 1.0}, {"c1": 9.0}] + assert asyncio.run(answer_common._select_best_score_run(selector, [{"id": "c1", "summary": "one"}], runs, ctx)) == {"c1": 1.0} + assert asyncio.run(answer_common._select_best_score_run(selector, [{"id": "c1", "summary": "one"}], runs, ctx)) == {"c1": 1.0} + + chunks = [{"id": "c1", "text": "cpu: 95"}, {"id": "c2", "text": "ram: 20"}] + assert answer_common._keyword_hits(chunks, chunks[0], ["", " "]) == [] + assert answer_common._select_chunks([], {}, plan) == [] + selected = [chunks[0]] + assert answer_common._append_must_chunks(chunks, selected, None, 2) is False + assert answer_common._append_keyword_chunks([], [], ["cpu"], 2) is False + answer_common._append_ranked_chunks(chunks, selected, 2) + assert selected == chunks + + +def test_answer_post_and_post_ext_helpers() -> None: + """Cover metric-formatting, entity filtering, and payload helpers.""" + + assert answer_post._merge_fact_lines(["a", "b"], ["b", "c"]) == ["a", "b", "c"] + assert answer_post._strip_unknown_entities("Node titan-99 is hot. Namespace foo is full. Safe.", ["titan-99"], ["foo"]) == "Safe." + assert answer_post._strip_unknown_entities("", ["x"], ["y"]) == "" + assert answer_post._needs_evidence_guard("titan-99 has pressure", ["nodes_total: 2"]) is True + assert answer_post._filter_lines_by_keywords(["cpu: 95", "ram: 20"], ["cpu"], 2) == ["cpu: 95"] + assert answer_post._select_metric_line(["nodes_total: 22", "cpu: 95"], "How many nodes?", {"nodes"}) == "nodes_total: 22" + assert answer_post._format_direct_metric_line("nodes: total=22, ready=21") == "Atlas has 22 total nodes (ready=21)." + assert answer_post._format_direct_metric_line("nodes_total=22") == "Atlas has 22 total nodes." + assert answer_post._global_facts(["nodes_total: 2", "cluster_name: atlas", "other: x"]) + assert answer_post._has_keyword_overlap(["cpu: 95"], ["CPU"]) is True + assert answer_post._merge_tokens(["cpu"], ["ram"], ["cpu"]) == ["cpu", "ram"] + assert "atlas" in answer_post._extract_question_tokens("How is Atlas CPU load?") + assert "atlas" in answer_post._expand_tokens(["Atlas CPU"]) + assert answer_post._ensure_token_coverage(["cpu: 95"], ["cpu", "ram"], ["ram: 20"]) == ["ram: 20", "cpu: 95"] + assert answer_post._best_keyword_line(["cpu:95", "ram:20"], ["ram"]) == "ram:20" + assert answer_post._line_starting_with(["cpu:95"], "cpu:") == "cpu:95" + assert answer_post._non_rpi_nodes({"hardware_by_node": {"titan-01": "rpi5", "titan-02": "amd64"}}) == {"amd64": ["titan-02"]} + assert answer_post._format_hardware_groups({"amd64": ["titan-02"]}, "Non-Raspberry Pi nodes").startswith("Non-Raspberry Pi nodes:") + assert "Lexicon" in answer_post._lexicon_context({"lexicon": {"terms": [{"term": "Atlas", "meaning": "cluster"}], "aliases": {"pi": "rpi"}}}) + assert answer_post._parse_json_list("prefix [{\"id\": 1}, \"bad\"] suffix") == [{"id": 1}] + assert answer_post._scores_from_json({"confidence": "80"}).confidence == 80 + assert answer_post._default_scores().confidence == 60 + assert answer_post._style_hint({"answer_style": "insightful"}) == "insightful" + assert answer_post._needs_evidence_fix("No data available", {"needs_snapshot": True, "question_type": "metric"}) is True + assert answer_post._should_use_insight_guard({"answer_style": "insightful"}) is True + + guard_ok = ScriptedCall({"insight_guard": '{"ok": true}'}) + text = asyncio.run( + answer_post._apply_insight_guard( + InsightGuardInput("q", "reply", {"answer_style": "insightful"}, "ctx", answer_common._mode_plan(build_test_settings(), "smart"), guard_ok, ["cpu: 95"]) + ) + ) + assert text == "reply" + guard_fix = ScriptedCall({"insight_guard": '{"ok": false}', "insight_fix": "tightened"}) + assert ( + asyncio.run( + answer_post._apply_insight_guard( + InsightGuardInput("q", "reply", {"answer_style": "insightful"}, "ctx", answer_common._mode_plan(build_test_settings(), "smart"), guard_fix, ["cpu: 95"]) + ) + ) + == "tightened" + ) + + assert answer_post_ext._reply_matches_metric_facts("cpu 95", ["cpu: 95"], {"cpu"}) is True + assert answer_post_ext._reply_matches_metric_facts("no numbers", ["cpu: 95"], None) is False + assert answer_post_ext._needs_dedup("A. A. B.") is True + assert answer_post_ext._needs_dedup("Alpha. Alpha. Beta.") is True + assert answer_post_ext._needs_focus_fix("How many nodes?", "Based on the context, there are maybe some nodes. For more details...", {"question_type": "metric"}) is True + assert "atlas" in answer_post_ext._extract_keywords("What is Atlas now?", "Atlas", ["How many nodes?"], ["cpu"]) + assert answer_post_ext._allowed_nodes({"hardware_by_node": {"titan-01": "rpi5"}}) == ["titan-01"] + assert answer_post_ext._allowed_namespaces({"namespace_pods": [{"namespace": "synapse"}, "bad"]}) == ["synapse"] + assert answer_post_ext._find_unknown_nodes("titan-01 titan-99", ["titan-01"]) == ["titan-99"] + assert answer_post_ext._find_unknown_namespaces("namespace synapse namespace drift", ["synapse"]) == ["drift"] + assert answer_post_ext._needs_runbook_fix("See runbooks/nope.md", ["runbooks/yes.md"]) is True + assert answer_post_ext._needs_runbook_reference("where is the runbook?", ["runbooks/yes.md"], "") is True + assert answer_post_ext._best_runbook_match("runbooks/fixx.md", ["runbooks/fix.md"]) == "runbooks/fix.md" + assert answer_post_ext._resolve_path({"nodes": [{"name": "titan-01"}]}, "nodes[0].name") == "titan-01" + assert answer_post_ext._resolve_path({}, "line: cpu:95") == "cpu:95" + assert answer_post_ext._snapshot_id({"snapshot_id": "snap-1"}) == "snap-1" + payload = answer_post_ext._claims_to_payload([ClaimItem("c1", "claim", [EvidenceItem("nodes[0]", "why", value_at_claim="old")])]) + state = answer_post_ext._state_from_payload({"updated_at": 1.5, "claims": payload, "snapshot_id": "snap-1", "snapshot": {"nodes": 1}}) + assert state and state.snapshot_id == "snap-1" + + +def test_answer_post_edge_branches() -> None: + """Cover low-frequency formatting and fallback branches in post helpers.""" + + plan = answer_common._mode_plan(build_test_settings(), "smart") + + assert answer_post._strip_unknown_entities(" ", ["titan-01"], ["synapse"]) == " " + assert answer_post._needs_evidence_guard("Atlas runs on amd64 nodes.", ["nodes_total: 2"]) is True + assert answer_post._needs_evidence_guard("Atlas shows memorypressure.", ["nodes_total: 2"]) is True + + contradiction = asyncio.run( + answer_post._contradiction_decision( + ContradictionContext(ScriptedCall({"contradiction": '{"confidence":"bad","use_facts": false}'}), "q", "r", ["fact"], plan) + ) + ) + assert contradiction == {"use_facts": False, "confidence": 50} + + assert answer_post._filter_lines_by_keywords([], ["cpu"], 2) == [] + assert answer_post._filter_lines_by_keywords(["cpu: 95"], [], 2) == ["cpu: 95"] + assert answer_post._rank_metric_lines(["cpu high"], set(), 2) == [] + assert answer_post._select_metric_line([], "How many CPUs?", {"cpu"}) is None + assert answer_post._select_metric_line(["disk healthy"], "How many CPUs?", {"cpu"}) is None + assert answer_post._format_direct_metric_line("") == "" + assert answer_post._format_direct_metric_line("nodes:") == "nodes:" + assert answer_post._format_equals_metric("garbage") is None + assert answer_post._format_equals_metric("cpu=, ram=20") == "ram is 20." + assert answer_post._format_equals_metric("cpu=95, ram=20") == "cpu is 95; ram is 20." + assert answer_post._format_nodes_value("ready=2") is None + assert answer_post._format_nodes_value("total=3") == "Atlas has 3 total nodes." + assert answer_post._global_facts([]) == [] + assert answer_post._has_keyword_overlap(["cpu: 95"], []) is False + assert answer_post._has_keyword_overlap(["cpu: 95"], ["a"]) is False + assert answer_post._has_keyword_overlap(["cpu: 95"], ["ram"]) is False + assert answer_post._merge_tokens(["cpu", ""], ["ram"], ["cpu", "disk"]) == ["cpu", "ram", "disk"] + assert answer_post._expand_tokens([1, "a", "cpu-load"]) == ["cpu-load"] # type: ignore[list-item] + assert answer_post._ensure_token_coverage([], ["cpu"], ["cpu: 95"]) == [] + assert answer_post._ensure_token_coverage(["cpu: 95"], ["ram"], ["cpu: 95"]) == ["cpu: 95"] + assert answer_post._ensure_token_coverage(["cpu: 95"], ["cpu"], ["cpu: 95"]) == ["cpu: 95"] + assert answer_post._best_keyword_line(["cpu: 95"], []) is None + assert answer_post._best_keyword_line(["cpu: 95"], ["a"]) is None + assert answer_post._best_keyword_line(["disk: ok"], ["cpu"]) is None + assert answer_post._line_starting_with([], "cpu:") is None + assert answer_post._line_starting_with(["ram: 20"], "cpu:") is None + assert answer_post._non_rpi_nodes({"hardware_by_node": ["bad"]}) == {} # type: ignore[arg-type] + assert answer_post._non_rpi_nodes({"hardware_by_node": {"titan-01": "rpi5", "titan-02": 2}}) == {} # type: ignore[arg-type] + assert answer_post._format_hardware_groups({}, "Non-Raspberry Pi nodes") == "" + assert answer_post._lexicon_context([]) == "" # type: ignore[arg-type] + assert "alias pi -> rpi" in answer_post._lexicon_context({"lexicon": {"terms": ["bad"], "aliases": {"pi": "rpi"}}}) + assert answer_post._lexicon_context({"lexicon": {"terms": [{"term": "", "meaning": ""}], "aliases": {"": ""}}}) == "" + assert answer_post._parse_json_block("not-json", fallback={"fallback": True}) == {"fallback": True} + assert answer_post._parse_json_list("not-a-list") == [] + assert answer_post._coerce_int("nan", 7) == 7 + assert answer_post._needs_evidence_fix("", {"needs_snapshot": True}) is False + assert ( + asyncio.run( + answer_post._apply_insight_guard( + InsightGuardInput("q", "", {"answer_style": "insightful"}, "ctx", plan, ScriptedCall({}), []) + ) + ) + == "" + ) + + +def test_post_ext_and_retrieval_ext_edge_branches() -> None: + """Cover remaining branchy helpers in post_ext and retrieval_ext.""" + + plan = answer_common._mode_plan(build_test_settings(), "smart") + + assert answer_post_ext._reply_matches_metric_facts("Atlas is fine.", [], None) is True + assert answer_post_ext._reply_matches_metric_facts("cpu high", ["cpu: hot"], {"cpu"}) is False + assert answer_post_ext._needs_dedup("") is False + assert answer_post_ext._needs_dedup("One sentence only.") is False + assert answer_post_ext._needs_focus_fix("What is Atlas?", "Short answer.", {"question_type": "open_ended"}) is False + assert answer_post_ext._needs_focus_fix("How many pods?", "No data available.", {"question_type": "metric"}) is True + keywords = answer_post_ext._extract_keywords("the atlas", "show cpu", ["where now"], [1, "cpu"]) # type: ignore[list-item] + assert "cpu" in keywords + assert "the" not in keywords + assert answer_post_ext._allowed_nodes({"hardware_by_node": None}) == [] + assert answer_post_ext._allowed_namespaces({"namespace_pods": ["bad", {"namespace": ""}]}) == [] + assert answer_post_ext._find_unknown_nodes("", ["titan-01"]) == [] + assert answer_post_ext._find_unknown_nodes("plain text", ["titan-01"]) == [] + assert answer_post_ext._find_unknown_namespaces("", ["synapse"]) == [] + assert answer_post_ext._needs_runbook_fix("", ["runbooks/fix.md"]) is False + assert answer_post_ext._needs_runbook_fix("No runbook here.", ["runbooks/fix.md"]) is False + assert answer_post_ext._needs_runbook_reference("hello there", ["runbooks/fix.md"], "reply") is False + assert answer_post_ext._needs_runbook_reference("", ["runbooks/fix.md"], "reply") is False + assert answer_post_ext._needs_runbook_reference("where is the runbook", ["runbooks/fix.md"], "") is True + assert answer_post_ext._needs_runbook_reference("where is the runbook", ["runbooks/fix.md"], "Use runbooks/fix.md") is False + assert answer_post_ext._best_runbook_match("zzz", ["runbooks/fix.md"]) is None + assert answer_post_ext._resolve_path({"nodes": [1]}, "nodes..name") is None + assert answer_post_ext._resolve_path({"nodes": [1]}, "nodes[99]") is None + assert answer_post_ext._resolve_path({"nodes": {"bad": 1}}, "nodes[0]") is None + assert answer_post_ext._snapshot_id({}) is None + invalid_state = answer_post_ext._state_from_payload({"claims": ["bad", {"id": "", "claim": "x", "evidence": [{"path": ""}]}]}) + assert invalid_state is not None + assert invalid_state.claims == [] + + assert answer_retrieval_ext._parse_json_block("plain", fallback={"fallback": True}) == {"fallback": True} + assert asyncio.run(answer_retrieval_ext._select_best_candidate(ScriptedCall({}), "q", ["only"], plan, "pick")) == 0 + assert asyncio.run(answer_retrieval_ext._select_best_candidate(ScriptedCall({"pick": '{"best":"bad"}'}), "q", ["one", "two"], plan, "pick")) == 0 + assert asyncio.run(answer_retrieval_ext._select_best_list(ScriptedCall({}), "q", [], plan, "pick")) == [] + assert asyncio.run(answer_retrieval_ext._select_best_list(ScriptedCall({}), "q", [["cpu"]], plan, "pick")) == ["cpu"] + merged = asyncio.run( + answer_retrieval_ext._select_best_list(ScriptedCall({"pick": '{"best": 1}'}), "q", [[], ["cpu"], ["ram"]], plan, "pick") + ) + assert merged == ["cpu", "ram"] + assert asyncio.run(answer_retrieval_ext._extract_fact_types(ScriptedCall({"fact_types": '{}'}), "q", [], plan)) == [] + assert asyncio.run(answer_retrieval_ext._derive_signals(ScriptedCall({}), "q", [], plan)) == [] + assert asyncio.run(answer_retrieval_ext._scan_chunk_for_signals(ScriptedCall({}), "q", [], ["cpu: 95"], plan)) == [] + assert asyncio.run(answer_retrieval_ext._scan_chunk_for_signals(ScriptedCall({"chunk_scan": '{}'}), "q", ["cpu"], ["cpu: 95"], plan)) == [] + assert asyncio.run(answer_retrieval_ext._prune_metric_candidates(ScriptedCall({}), "q", [], plan, 1)) == [] + assert asyncio.run(answer_retrieval_ext._prune_metric_candidates(ScriptedCall({"fact_prune": '{}'}), "q", ["cpu: 95"], plan, 1)) == [] + assert asyncio.run(answer_retrieval_ext._select_fact_lines(ScriptedCall({}), "q", [], plan, 1)) == [] + assert asyncio.run(answer_retrieval_ext._select_fact_lines(ScriptedCall({"fact_select": '{}'}), "q", ["cpu: 95"], plan, 1)) == [] + + +def test_retrieval_ext_helpers() -> None: + """Cover retrieval helper parsing and selection branches.""" + + assert answer_retrieval_ext._parse_json_block("prefix {\"ok\": true} suffix", fallback={}) == {"ok": True} + assert "cpu" in answer_retrieval_ext._metric_key_tokens(["cpu_load: 95", "bad-line"]) + assert answer_retrieval_ext._dedupe_lines(["a", "a", "lexicon_x", "units: bad", "b"], limit=2) == ["a", "b"] + assert answer_retrieval_ext._collect_fact_candidates([{"text": "a\nb"}, {"text": None}], limit=3) == ["a", "b"] + + scripted = ScriptedCall( + { + "pick": '{"best": 2}', + "fact_types": ['{"fact_types": ["cpu", "ram"]}', '{"fact_types": ["cpu"]}'], + "fact_types_select": '{"best": 1}', + "signals": ['{"signals": ["cpu", "thermal"]}'], + "signals_select": '{"best": 1}', + "chunk_scan": ['{"lines": ["cpu: 95"]}'], + "chunk_scan_select": '{"best": 1}', + "fact_prune": ['{"lines": ["cpu: 95"]}'], + "fact_prune_select": '{"best": 1}', + "fact_select": ['{"lines": ["cpu: 95", "ram: 20"]}'], + "fact_select_best": '{"best": 1}', + } + ) + plan = answer_common._mode_plan(build_test_settings(), "smart") + idx = asyncio.run(answer_retrieval_ext._select_best_candidate(scripted, "q", ["one", "two"], plan, "pick")) + assert idx == 1 + assert asyncio.run(answer_retrieval_ext._select_best_list(scripted, "q", [[], ["cpu"]], plan, "pick")) == ["cpu"] + assert asyncio.run(answer_retrieval_ext._extract_fact_types(scripted, "q", ["cpu"], plan)) == ["cpu", "ram"] + assert asyncio.run(answer_retrieval_ext._derive_signals(scripted, "q", ["cpu"], plan)) == ["cpu", "thermal"] + assert asyncio.run(answer_retrieval_ext._scan_chunk_for_signals(scripted, "q", ["cpu"], ["cpu: 95"], plan)) == ["cpu: 95"] + assert asyncio.run(answer_retrieval_ext._prune_metric_candidates(scripted, "q", ["cpu: 95"], plan, 2)) == ["cpu: 95"] + assert asyncio.run(answer_retrieval_ext._select_fact_lines(scripted, "q", ["cpu: 95", "ram: 20"], plan, 2)) == ["cpu: 95", "ram: 20"] + + +def test_answer_engine_helper_methods(tmp_path: Path) -> None: + """Exercise direct engine helpers that the top-level flow rarely hits.""" + + settings = replace(build_test_settings(), state_db_path=str(tmp_path / "state.db")) + llm = SimpleNamespace(chat=lambda messages, model=None: asyncio.sleep(0, result="stock")) # type: ignore[call-arg] + engine = answer_engine.AnswerEngine( + settings, + llm, # type: ignore[arg-type] + KnowledgeBase(""), + SimpleNamespace(), # type: ignore[arg-type] + ) + stock = asyncio.run(engine._answer_stock("What is Atlas?")) + assert stock.reply == "stock" + + scripted = ScriptedCall( + { + "synth": ["draft-one", "draft-two", "single-draft"], + "draft_select": '{"best": 2}', + "score": '{"confidence": 90, "relevance": 80, "satisfaction": 70, "hallucination_risk": "low"}', + "claim_map": '{"claims":[{"id":"c1","claim":"Atlas is busy","evidence":[{"path":"nodes[0].name","reason":"hot"}]}]}', + "dedup": "deduped", + } + ) + plan = replace(answer_common._mode_plan(settings, "smart"), drafts=2, parallelism=2, use_scores=True) + assert asyncio.run(engine._synthesize_answer("q", ["a", "b"], "ctx", {"question_type": "metric"}, plan, scripted)) == "draft-two" + assert asyncio.run(engine._synthesize_answer("q", [], "ctx", {"question_type": "metric"}, plan, scripted)) == "single-draft" + scores = asyncio.run(engine._score_answer("q", "reply", plan, scripted)) + assert scores.confidence == 90 + claims = asyncio.run(engine._extract_claims("q", "reply", {"nodes": [{"name": "titan-01"}]}, ["nodes[0].name: titan-01"], scripted)) + assert claims and claims[0].id == "c1" + assert asyncio.run(engine._dedup_reply("Alpha. Alpha. Beta.", plan, scripted, "dedup")) == "deduped" + assert asyncio.run(engine._dedup_reply("Alpha only.", plan, scripted, "dedup")) == "Alpha only." + + contradiction = asyncio.run( + answer_post._contradiction_decision( + ContradictionContext(scripted, "q", "reply", ["cpu:95"], plan), + attempts=2, + ) + ) + assert contradiction["confidence"] == 50 + + +def test_answer_engine_edge_fallbacks(tmp_path: Path) -> None: + """Cover engine fallbacks that only show up on malformed helper output.""" + + settings = replace(build_test_settings(), state_db_path=str(tmp_path / "state.db")) + engine = answer_engine.AnswerEngine( + settings, + SimpleNamespace(chat=lambda *_args, **_kwargs: asyncio.sleep(0, result="unused")), # type: ignore[arg-type] + KnowledgeBase(""), + SimpleNamespace(), # type: ignore[arg-type] + ) + plan = replace(answer_common._mode_plan(settings, "smart"), drafts=2, parallelism=1, use_scores=False) + + bad_select = ScriptedCall({"synth": ["draft-one", "draft-two"], "draft_select": '{"best": 99}'}) + assert asyncio.run(engine._synthesize_answer("q", ["a", "b"], "ctx", {"question_type": "metric"}, plan, bad_select)) == "draft-one" + assert asyncio.run(engine._score_answer("q", "reply", plan, bad_select)).confidence == 60 + assert asyncio.run(engine._extract_claims("q", "", {"nodes": []}, [], bad_select)) == [] + + malformed_claims = ScriptedCall( + { + "claim_map": '{"claims":[{"id":"c1","claim":"hot","evidence":["bad",{"path":"","reason":"nope"}]},{"claim":"","evidence":[{"path":"nodes[0].name","reason":"why"}]}]}', + "select_claims": '{"claim_ids":"bad"}', + } + ) + claims = asyncio.run(engine._extract_claims("q", "reply", {"nodes": [{"name": "titan-01"}]}, [], malformed_claims)) + assert claims == [] + assert asyncio.run(engine._select_claims("q", [ClaimItem("c1", "claim", [EvidenceItem("nodes[0].name", "why")])], plan, malformed_claims)) == [] + + +def test_factsheet_edge_paths() -> None: + """Cover low-frequency factsheet selection and heuristic branches.""" + + assert answer_factsheet._is_plain_math_question("") is False + + fact_lines = answer_factsheet._quick_fact_sheet_lines( + "where is the titan runbook", + [""], + [ + "", + "x" * 300, + "KB File: notes.md", + "runbook alpha", + "runbook beta", + "titan-01 runs hot", + "amd64 nodes are available", + "runbook gamma", + ], + limit=6, + ) + assert "runbook alpha" in fact_lines + assert "titan-01 runs hot" in fact_lines + assert all(not line.startswith("KB File:") for line in fact_lines) + + assert ( + answer_factsheet._quick_fact_sheet_heuristic_answer( + "which nodes are not ready?", + ["noise first", "nodes_total:2,ready:1,not_ready:1"], + ) + == "The latest snapshot shows 1 not-ready nodes (1 ready out of 2 total)." + ) + assert answer_factsheet._quick_fact_sheet_heuristic_answer("how many ready nodes?", ["noise first"]) == "" + + +def test_snapshot_builder_core_a_edge_paths(monkeypatch: pytest.MonkeyPatch) -> None: + """Cover cached snapshot fallback and summary-builder edge branches.""" + + settings = replace(build_test_settings(), ariadne_state_url="http://snapshot") + provider = SnapshotProvider(settings) + provider._cache = {"cached": True} + + def broken_get(*_args: Any, **_kwargs: Any) -> Any: + raise httpx.HTTPError("boom") + + monkeypatch.setattr("atlasbot.snapshot.builder.httpx.get", broken_get) + assert provider.get() == {"cached": True} + + assert core_a._node_usage_top([{}, {"node": "titan-01", "value": "bad"}, {"node": "titan-02", "value": 3}]) == { + "node": "titan-02", + "value": 3.0, + } + + merged: dict[str, Any] = {} + core_a._merge_cluster_fields(merged, {"signals": [], "profiles": "bad"}, {"signals": list, "profiles": dict}) + assert merged == {"signals": []} + + assert core_a._build_nodes({}) == {} + assert core_a._build_hardware([]) == {} + assert core_a._build_hardware([{}, {"name": "titan-01", "hardware": "rpi5"}]) == {"hardware": {"rpi5": ["titan-01"]}} + assert core_a._build_hardware_by_node([{}, {"name": "titan-01", "hardware": "rpi5"}]) == {"hardware_by_node": {"titan-01": "rpi5"}} + assert core_a._build_hardware_usage({}, {"titan-01": "rpi5"}) == {} + assert core_a._build_hardware_usage({"node_load": []}, {"titan-01": "rpi5"}) == {} + + usage = core_a._build_hardware_usage( + {"node_load": [{}, {"node": "", "cpu": 1}, {"node": "titan-01", "load_index": 2, "cpu": 50}]}, + {"titan-01": "rpi5"}, + ) + assert usage["hardware_usage_avg"][0]["cpu"] == 50 + + assert core_a._build_node_facts([]) == {} + facts = core_a._build_node_facts( + [{}, {"is_worker": True, "roles": ["db", "", 1], "arch": "amd64", "os": "linux", "kubelet": "v1", "kernel": "k", "container_runtime": "c"}] + ) + assert facts["node_role_counts"]["worker"] == 1 + + assert core_a._build_node_taints([{}, {"name": ""}, {"name": "titan-01", "taints": ["bad", {"key": "dedicated", "effect": "NoSchedule"}]}]) == { + "node_taints": {"dedicated:NoSchedule": ["titan-01"]} + } + + headroom = core_a._build_root_disk_headroom( + {"node_usage": {"disk": [{}, {"node": "titan-01", "value": "bad"}, {"node": "titan-02", "value": 80}]}} + ) + assert headroom["root_disk_low_headroom"][0]["node"] == "titan-02" + + assert core_a._build_capacity({}) == {} + assert core_a._build_workload_health({"workloads_health": {"deployments": {}, "statefulsets": {}, "daemonsets": []}}) == {} + assert core_a._build_postgres({}) == {} + + +def test_snapshot_builder_format_c_edge_paths() -> None: + """Cover summary text formatter branches that only trigger on sparse data.""" + + lines: list[str] = [] + format_c._append_signals(lines, {}) + format_c._append_profiles(lines, {}) + format_c._append_cluster_watchlist(lines, {}) + assert lines == [] + + format_c._append_signals( + lines, + { + "signals": [ + "bad", + {"scope": "node", "target": "titan-01", "metric": "cpu", "current": 95, "delta_pct": 10, "severity": "warn"}, + ] + }, + ) + format_c._append_profiles( + lines, + { + "profiles": { + "nodes": ["bad", {"node": "titan-01", "load_index": 0.9, "cpu": 95, "ram": 70, "pods_total": 5, "hardware": "rpi5"}], + "namespaces": ["bad", {"namespace": "synapse", "pods_total": 4, "cpu_usage": 80, "mem_usage": 70, "primary_node": "titan-01"}], + "workloads": ["bad", {"namespace": "synapse", "workload": "app", "pods_total": 2, "pods_running": 2, "primary_node": "titan-01"}], + } + }, + ) + format_c._append_units_windows(lines, {"metrics": {}}) + format_c._append_node_load_summary( + lines, + { + "hardware_by_node": {"titan-01": "rpi5"}, + "node_load_summary": { + "top": ["bad", {"node": "titan-01", "load_index": 1.5, "cpu": 90, "ram": 80, "io": 1024, "net": 2048, "pods_total": 7}], + "outliers": ["bad", {"node": ""}, {"node": "titan-02"}], + }, + }, + ) + format_c._append_hardware_usage( + lines, + { + "hardware_usage_avg": [ + "bad", + {"hardware": "", "cpu": 1}, + {"hardware": "rpi5", "load_index": 1.5, "cpu": 90, "ram": 80, "io": 1024, "net": 2048}, + {"hardware": "amd64", "load_index": 2.5, "cpu": 95, "ram": 70, "io": 4096, "net": 8192}, + ] + }, + ) + format_c._append_cluster_watchlist(lines, {"cluster_watchlist": ["not_ready_nodes=1"]}) + format_c._append_baseline_deltas( + lines, + { + "baseline_deltas": { + "nodes": {"cpu": ["bad", {"node": "titan-01", "delta": 10, "severity": "warn"}]}, + "namespaces": {"cpu": [{"namespace": "synapse", "delta": 12}]}, + } + }, + ) + format_c._append_pod_issue_summary( + lines, + { + "pod_issue_summary": { + "waiting_reasons_top": ["bad", {"reason": "ImagePullBackOff", "count": 2}], + "phase_reasons_top": [{"reason": "CrashLoopBackOff", "count": 1}], + "namespace_issue_top": {"cpu": ["bad", {"namespace": "synapse", "value": 95}, {"namespace": "", "value": 1}]}, + } + }, + ) + + watchlist = format_c._build_cluster_watchlist( + { + "nodes_summary": {"not_ready": 1}, + "pressure_nodes": {"names": ["titan-02"]}, + "pod_issues": {"pending_over_15m": 2}, + "workloads_health": {"deployments": {"not_ready": 1}, "statefulsets": {"not_ready": 0}, "daemonsets": {"not_ready": 1}}, + "flux": {"not_ready": 1}, + "pvc_usage_top": [{"value": 95}], + } + ) + assert "cluster_watchlist" in watchlist + + assert format_c._capacity_ratio_parts(["bad", {"namespace": "synapse", "cpu_usage_ratio": 1.2, "cpu_usage": 2, "cpu_requests": 1}], "cpu_usage_ratio", "cpu_usage", "cpu_requests") == [ + "synapse=1.2 (usage=2 req=1)" + ] + assert format_c._capacity_headroom_parts(["bad", {"namespace": "synapse", "headroom": 12.5}]) == ["synapse=12.5"] + + cap_lines: list[str] = [] + format_c._append_namespace_capacity_summary( + cap_lines, + { + "namespace_capacity_summary": { + "cpu_ratio_top": [{"namespace": "synapse", "cpu_usage_ratio": 1.2, "cpu_usage": 2, "cpu_requests": 1}], + "mem_ratio_top": [{"namespace": "synapse", "mem_usage_ratio": 1.1, "mem_usage": 3, "mem_requests": 2}], + "cpu_headroom_low": [{"namespace": "synapse", "headroom": 12.5}], + "mem_headroom_low": [{"namespace": "synapse", "headroom": 8.5}], + "cpu_overcommitted": 1, + "mem_overcommitted": 0, + "cpu_overcommitted_names": ["synapse", ""], + "mem_overcommitted_names": ["synapse"], + } + }, + ) + assert any(line.startswith("namespace_cpu_ratio_top:") for line in cap_lines) + + format_c._append_workloads_by_namespace( + lines, + { + "workloads": [ + "bad", + {"namespace": "", "workload": "skip"}, + {"namespace": "synapse", "workload": "app", "pods_total": 2, "primary_node": "titan-01"}, + {"namespace": "synapse", "workload": "db", "pods_total": 1}, + ] + }, + ) + format_c._append_lexicon( + lines, + {"lexicon": {"terms": ["bad", {"term": "Atlas", "meaning": "cluster"}], "aliases": {"pi": "rpi", "": ""}}}, + ) + format_c._append_cross_stats( + lines, + { + "cross_stats": { + "node_metric_top": ["bad", {"metric": "cpu", "node": "titan-01", "value": 95, "cpu": 95, "ram": 80, "net": 12, "io": 9, "pods_total": 5}], + "namespace_metric_top": ["bad", {"metric": "cpu", "namespace": "synapse", "value": 95, "cpu_ratio": 1.2, "mem_ratio": 1.1, "pods_total": 4}], + "pvc_top": ["bad", {"namespace": "synapse", "pvc": "data", "used_percent": 90}], + } + }, + ) + + assert any(line.startswith("signals:") for line in lines) + assert any(line.startswith("units: cpu_pct") for line in lines) + assert any(line.startswith("hardware_usage_top:") for line in lines) + assert any(line.startswith("namespace_issue_top_cpu:") for line in lines) + assert any(line.startswith("lexicon_term: Atlas") for line in lines) + assert any(line.startswith("cross_pvc_usage: synapse/data") for line in lines) + + +def test_snapshot_builder_format_b_edge_paths() -> None: + """Cover sparse-data and fallback branches in the mid-level snapshot formatters.""" + + lines: list[str] = [] + format_b._append_longhorn(lines, {}) + format_b._append_namespace_usage(lines, {}) + format_b._append_job_failures(lines, {}) + format_b._append_jobs(lines, {}) + format_b._append_postgres(lines, {}) + format_b._append_hottest(lines, {}) + format_b._append_workloads(lines, {}) + format_b._append_topology(lines, {}) + format_b._append_flux(lines, {}) + assert lines == [] + + format_b._append_namespace_metric_series(lines, "namespace_cpu_top", ["bad"], format_b._format_float) + assert lines == [] + + format_b._append_longhorn( + lines, + { + "longhorn": { + "total": 3, + "unhealthy_count": 1, + "by_state": {"attached": 2}, + "by_robustness": {"healthy": 2}, + "unhealthy": ["bad", {"name": "vol-1", "state": "detached", "robustness": "degraded"}], + } + }, + ) + format_b._append_longhorn( + lines, + { + "longhorn": { + "total": 4, + "attached_count": 2, + "detached_count": 1, + "degraded_count": 1, + } + }, + ) + format_b._append_namespace_usage( + lines, + { + "metrics": { + "namespace_cpu_top": ["bad", {"metric": {"namespace": "synapse"}, "value": 95}], + "namespace_mem_top": [{"metric": {"namespace": "synapse"}, "value": 1024}], + } + }, + ) + format_b._append_namespace_requests( + lines, + { + "metrics": { + "namespace_cpu_requests_top": [{"metric": {"namespace": "synapse"}, "value": 2}], + "namespace_mem_requests_top": [{"metric": {"namespace": "synapse"}, "value": 2048}], + } + }, + ) + format_b._append_namespace_io_net( + lines, + { + "metrics": { + "namespace_net_top": [{"metric": {"namespace": "synapse"}, "value": 2048}], + "namespace_io_top": [{"metric": {"namespace": "synapse"}, "value": 1024}], + } + }, + ) + format_b._append_pod_usage( + lines, + { + "metrics": { + "pod_cpu_top": ["bad", {"metric": {"namespace": "synapse", "pod": "app"}, "value": 95}], + "pod_cpu_top_node": ["bad", {"metric": {"namespace": "synapse", "pod": "app", "node": "titan-01"}, "value": 90}], + "pod_mem_top": ["bad", {"metric": {"namespace": "synapse", "pod": "app"}, "value": 1024}], + "pod_mem_top_node": ["bad", {"metric": {"namespace": "synapse", "pod": "app", "node": "titan-01"}, "value": 2048}], + } + }, + ) + format_b._append_restarts(lines, {"metrics": {}}) + format_b._append_restarts( + lines, + { + "metrics": { + "top_restarts_1h": ["bad", {"metric": {"namespace": "synapse", "pod": "app"}, "value": [0, 3]}], + "restart_namespace_top": [{"metric": {"namespace": "synapse"}, "value": 3}], + } + }, + ) + format_b._append_job_failures( + lines, + {"metrics": {"job_failures_24h": ["bad", {"metric": {"namespace": "batch", "job_name": "cleanup"}, "value": 2}]}} + ) + format_b._append_jobs( + lines, + { + "jobs": { + "totals": {"total": 4, "active": 1, "failed": 1, "succeeded": 2}, + "failing": ["bad", {"namespace": "batch", "job": "cleanup", "failed": 2, "age_hours": 1.5}], + "active_oldest": ["bad", {"namespace": "batch", "job": "sync", "age_hours": 2.5}], + } + }, + ) + format_b._append_postgres( + lines, + { + "postgres": { + "used": 4, + "max": 20, + "hottest_db": "atlas", + "by_db": ["bad", {"metric": {"datname": "atlas"}, "value": [0, 4]}], + } + }, + ) + format_b._append_hottest( + lines, + { + "hardware_by_node": {"titan-01": "rpi5"}, + "hottest": {"cpu": {"node": "titan-01", "value": 95}, "net": {"node": "titan-01", "value": 2048}, "bad": "skip"}, + }, + ) + format_b._append_hottest(lines, {"hottest": {"ram": {"node": "titan-02", "value": 88}}}) + format_b._append_workloads( + lines, + {"workloads": ["bad", {"namespace": "synapse", "workload": "app", "pods_total": 3, "primary_node": "titan-01"}]}, + ) + format_b._append_workloads(lines, {"workloads": ["bad"]}) + format_b._append_topology( + lines, + { + "topology": { + "nodes": ["bad", {"node": "titan-01", "workloads_top": [("app", 3), ("db", 1)]}], + "workloads": ["bad", {"namespace": "synapse", "workload": "app", "nodes_top": [("titan-01", 3)]}], + } + }, + ) + format_b._append_flux( + lines, + { + "flux": { + "not_ready": 1, + "items": ["bad", {"name": "kustomize", "namespace": "flux-system", "reason": "stalled", "suspended": True}], + } + }, + ) + + assert any(line.startswith("longhorn: total=3") for line in lines) + assert any(line.startswith("namespace_cpu_top:") for line in lines) + assert "restarts_1h_top: none" in lines + assert any(line.startswith("restarts_1h_top: synapse/app=3") for line in lines) + assert any(line.startswith("jobs_failing_top:") for line in lines) + assert any(line.startswith("postgres_connections_by_db: atlas=4") for line in lines) + assert any(line.startswith("flux_not_ready_items: flux-system/kustomize") for line in lines) + assert format_b._format_jobs_totals({}) == "" + assert format_b._format_jobs_failing({}) == "" + assert format_b._format_jobs_active_oldest({}) == "" + + +def test_snapshot_builder_format_a_edge_paths() -> None: + """Cover sparse-data, fallback, and invalid-entry branches in base snapshot formatters.""" + + assert format_a._format_float("bad") == "bad" + assert format_a._format_rate_bytes("bad") == "bad" + assert format_a._format_rate_bytes(12).endswith("B/s") + assert format_a._format_bytes("bad") == "bad" + assert format_a._format_kv_map({}) == "" + assert format_a._format_names([]) == "" + assert format_a._format_pod_issue_counts({}) == "" + assert format_a._format_pod_issue_top({"items": ["bad", {"namespace": "", "pod": "api"}]}) == "" + assert format_a._format_pod_pending_oldest({"pending_oldest": ["bad", {"namespace": "synapse", "pod": "api"}]}) == "" + assert format_a._format_pod_waiting_reasons({}) == "" + assert format_a._format_pod_pending_over_15m({"pending_over_15m": "bad"}) == "" + + lines: list[str] = [] + format_a._append_nodes(lines, {"nodes": {"total": 2, "ready": 1, "not_ready": None}}) + format_a._append_hardware(lines, {"hardware": {"rpi5": ["titan-01", ""], "skip": "bad"}}) + format_a._append_hardware_groups(lines, {"hardware": {"rpi5": ["titan-01"], "skip": "bad"}}) + format_a._append_node_ages(lines, {"node_ages": ["bad", {"name": "titan-01", "age_hours": "oops"}, {"name": "titan-02", "age_hours": 2.5}]}) + format_a._append_node_taints(lines, {"node_taints": {"gpu": ["titan-22"], "skip": "bad"}}) + format_a._append_node_facts(lines, {"node_arch_counts": {}, "node_os_counts": {"linux": 2}}) + format_a._append_pressure(lines, {"pressure_nodes": {"disk": ["titan-10", ""], "memory": []}}) + format_a._append_pods(lines, {"pods": {"running": 3, "pending": 1, "failed": 0, "succeeded": 2}}) + format_a._append_capacity(lines, {"capacity": {"cpu": "bad", "allocatable_cpu": 3, "mem_bytes": 512, "allocatable_mem_bytes": 2048, "pods": 10}}) + format_a._append_namespace_pods(lines, {"namespace_pods": [{"namespace": "", "pods_total": 1}, {"namespace": "synapse", "pods_total": 3, "pods_running": 2}]}) + format_a._append_namespace_nodes(lines, {"namespace_nodes": [{"namespace": "synapse", "pods_total": 3, "primary_node": "titan-01"}, {"namespace": "", "pods_total": 1}]}) + format_a._append_node_pods(lines, {"node_pods": ["bad", {"node": "titan-01", "pods_total": "oops"}, {"node": "titan-02", "pods_total": 4, "namespaces_top": [("synapse", 3)]}]}) + format_a._append_pod_issues( + lines, + { + "pod_issues": { + "counts": {"Failed": 1, "Pending": 2}, + "items": ["bad", {"namespace": "", "pod": "skip"}, {"namespace": "synapse", "pod": "api", "phase": "Pending", "restarts": 1}], + "pending_oldest": ["bad", {"namespace": "synapse", "pod": "api", "age_hours": 1.5, "reason": "ImagePullBackOff"}], + "waiting_reasons": {"CrashLoopBackOff": 3}, + "pending_over_15m": "bad", + } + }, + ) + format_a._append_workload_health( + lines, + {"workloads_health": {"deployments": {"not_ready": 1}, "statefulsets": {"not_ready": 0}, "daemonsets": {"not_ready": 2}}}, + ) + format_a._append_node_usage_stats(lines, {"metrics": {"node_usage_stats": {"cpu": {"avg": 91}, "net": {"avg": 2048}, "disk": {}}}}) + format_a._append_events(lines, {"events": {"warnings_total": 2, "warnings_by_reason": {"BackOff": 2, "Failed": 1}}}) + format_a._append_events(lines, {"events": {"warnings_total": 0, "warnings_by_reason": {}}}) + format_a._append_pvc_usage(lines, {"pvc_usage_top": ["bad", {"metric": {"namespace": "synapse", "persistentvolumeclaim": "data"}, "value": 88}]}) + format_a._append_root_disk_headroom(lines, {"root_disk_low_headroom": ["bad", {"node": "titan-01", "headroom_pct": 12.5}]}) + + assert any(line.startswith("nodes: total=2, ready=1, not_ready=0") for line in lines) + assert any(line.startswith("hardware: rpi5=2") for line in lines) + assert any(line.startswith("node_age_top: titan-02=2.5h") for line in lines) + assert any(line.startswith("node_taints: gpu=1") for line in lines) + assert any(line.startswith("node_os: linux=2") for line in lines) + assert any(line.startswith("node_pressure: disk=2") for line in lines) + assert any(line.startswith("namespace_nodes_top: synapse=3") for line in lines) + assert any(line.startswith("node_pods_top: titan-02=4") for line in lines) + assert any(line.startswith("pod_issues: Failed=1; Pending=2") for line in lines) + assert any(line.startswith("pods_pending_oldest: synapse/api=1.5h") for line in lines) + assert any(line.startswith("workloads_not_ready: deployments=1") for line in lines) + assert any(line.startswith("node_usage_avg: cpu=91") for line in lines) + assert "warnings: total=0" in lines + assert any(line.startswith("pvc_usage_top: synapse/data=88") for line in lines) + assert any(line.startswith("root_disk_low_headroom: titan-01=12.5%") for line in lines) + + +def test_retrieval_helper_edge_paths() -> None: + """Cover fallback-heavy retrieval helpers and metric-selection branches.""" + + assert answer_retrieval._metric_ctx_values({}) == ([], "", [], [], set()) + assert answer_retrieval._extract_metric_keys(["no colon", "bad key: value", "nodes_total: 2", "nodes_total: 3"]) == ["nodes_total"] + assert answer_retrieval._token_variants(set()) == set() + assert "policy" in answer_retrieval._token_variants({"policies"}) + assert answer_retrieval._parse_key_list('[1, "nodes_total", "nodes_total"]', ["nodes_total"], 1) == ["nodes_total"] + assert answer_retrieval._chunk_ids_for_keys([{"id": "c1", "text": "nodes_total: 2"}], []) == [] + assert answer_retrieval._chunk_ids_for_keys([{"id": "c1", "text": ""}, {"id": "c2", "text": "nodes_total: 2"}], ["nodes_total"]) == ["c2"] + assert answer_retrieval._filter_metric_keys([], {"cpu"}) == [] + assert answer_retrieval._filter_metric_keys(["nodes_total"], {"ram"}) == [] + assert not answer_retrieval._metric_key_overlap([], {"cpu"}) + assert not answer_retrieval._metric_key_overlap(["nodes_total"], {"ram"}) + assert answer_retrieval._lines_for_metric_keys([], ["nodes_total"]) == [] + assert answer_retrieval._lines_for_metric_keys(["nodes_total: 2", "namespace_cpu_top: synapse=95"], ["nodes_total", "namespace_cpu_top"], max_lines=1) == ["nodes_total: 2"] + assert answer_retrieval._merge_metric_keys(["a"], ["a", "b"], 1) == ["a"] + assert answer_retrieval._merge_fact_lines(["a", "a"], ["a", "b"]) == ["a", "b"] + assert answer_retrieval._expand_hottest_line("") == [] + assert answer_retrieval._expand_hottest_line("other: cpu=x") == [] + assert answer_retrieval._expand_hottest_line("hottest: badpart") == [] + assert answer_retrieval._expand_hottest_line("hottest: cpu=titan-01 [rpi5] (95%)") == ["hottest_cpu_node: titan-01 [rpi5] (95%)"] + assert answer_retrieval._expand_hottest_line("hottest: ram=titan-02 (80%)") == ["hottest_ram_node: titan-02 (80%)"] + assert answer_retrieval._has_token("disk i/o busy", "io") + assert not answer_retrieval._has_token("", "cpu") + assert answer_retrieval._hotspot_evidence({"hottest": {}}) == [] + + hotspot_lines = answer_retrieval._hotspot_evidence( + { + "hottest": {"cpu": {"node": "titan-01", "value": 95}, "skip": "bad"}, + "hardware_by_node": {"titan-01": "rpi5"}, + "node_pods_top": ["bad", {"node": "titan-01", "namespaces_top": [("synapse", 3), ("db", 1)]}, {"node": "titan-02", "namespaces_top": ["bad"]}], + } + ) + assert any(line.startswith("hotspot.cpu: node=titan-01 class=rpi5 value=95.00") for line in hotspot_lines) + assert answer_retrieval._hotspot_evidence({"hottest": {"ram": {"value": 50}}, "node_pods_top": []}) == [] + + plan = answer_common._mode_plan(build_test_settings(), "smart") + chunks = [{"id": "c1", "text": "namespace_cpu_top: synapse=95\nnodes_total: 2"}] + ctx = { + "summary_lines": ["namespace_cpu_top: synapse=95", "nodes_total: 2"], + "question": "which namespace has the highest cpu", + "sub_questions": ["which namespace"], + "keywords": ["namespace", "cpu"], + "keyword_tokens": ["namespace", "cpu"], + } + + scripted = ScriptedCall({"metric_keys": "{}", "metric_keys_validate": '{"missing":["namespace_cpu_top"]}'}) + selected, chunk_ids = asyncio.run(answer_retrieval._select_metric_chunks(scripted, ctx, chunks, plan)) + assert selected == ["namespace_cpu_top"] + assert chunk_ids == ["c1"] + + no_overlap = ScriptedCall({"metric_keys": '{"keys":["nodes_total"]}', "metric_keys_validate": '{"missing":[]}'}) + selected, _ = asyncio.run(answer_retrieval._select_metric_chunks(no_overlap, ctx, chunks, plan)) + assert selected == ["namespace_cpu_top"] + + no_keys = ScriptedCall({"metric_keys": "{}"}) + assert asyncio.run(answer_retrieval._select_metric_chunks(no_keys, {"summary_lines": ["bad key: value"], "question": "cpu", "sub_questions": [], "keywords": [], "keyword_tokens": []}, chunks, plan)) == ([], []) + assert asyncio.run(answer_retrieval._select_metric_chunks(no_keys, {"summary_lines": ["nodes_total: 2"], "question": "mystery", "sub_questions": [], "keywords": [], "keyword_tokens": []}, chunks, plan)) == ([], []) + assert asyncio.run(answer_retrieval._select_metric_chunks(scripted, {"summary_lines": [], "question": "cpu"}, chunks, plan)) == ([], []) + assert asyncio.run(answer_retrieval._validate_metric_keys(scripted, {"question": "cpu", "sub_questions": [], "selected": []}, [], plan)) == [] + assert asyncio.run(answer_retrieval._gather_limited([], 2)) == [] + + +def test_spine_helper_edge_paths(monkeypatch: pytest.MonkeyPatch) -> None: + """Cover fallback and summary-derived spine branches.""" + + assert answer_spine._join_context(["alpha", "", "beta"]) == "alpha\nbeta" + assert answer_spine._format_metric_value(True) == "true" + assert answer_spine._format_metric_value(2) == "2" + assert answer_spine._format_metric_value(2.5) == "2.5" + assert answer_spine._format_metric_value(object()).startswith(" None: + """Drive the expensive post-processing branches with a deterministic engine double.""" + + plan = replace(answer_common._mode_plan(build_test_settings(), "smart"), use_critic=False, use_gap=False) + observed: list[tuple[str, str]] = [] + scripted = ScriptedCall( + { + "runbook_select": ['{"path":"runbooks/fix.md"}', "{}"], + "evidence_fix": "namespace ghost on titan-99 uses runbooks/fix-md.", + "evidence_fix_enforce": "namespace ghost on titan-99 uses runbooks/fix-md.", + "metric_direct": "no digits here", + "runbook_enforce": [ + "Non-Raspberry Pi nodes: amd64 (titan-02). See runbooks/fix-md.", + "amd64 stays separate. This does not provide the exact value.", + ], + "contradiction": '{"use_facts": true, "confidence": 90}', + "evidence_guard": "This does not provide the exact value.", + "focus_fix": "No exact value provided.", + } + ) + + class FinalizeEngine: + async def _synthesize_answer(self, *args: Any) -> str: + return "namespace ghost on titan-99 uses runbooks/fix-md." + + async def _dedup_reply(self, reply: str, _plan: Any, _call_llm: Any, tag: str) -> str: + assert tag == "dedup" + return reply + + async def _score_answer(self, _question: str, _reply: str, _plan: Any, _call_llm: Any) -> AnswerScores: + return AnswerScores(70, 71, 72, "medium") + + async def _extract_claims(self, _question: str, _reply: str, _summary: dict[str, Any], _facts_used: list[str], _call_llm: Any) -> list[ClaimItem]: + return [] + + reply, scores, claims = asyncio.run( + answer_workflow_post.finalize_answer( + engine=FinalizeEngine(), + call_llm=scripted, + normalized="Which nodes are not raspberry and which runbook should I use?", + subanswers=["draft"], + context="ctx", + classify={"question_type": "metric", "needs_snapshot": True, "answer_style": "direct"}, + plan=plan, + summary={"hardware_by_node": {"titan-01": "rpi5", "titan-02": "amd64"}}, + summary_lines=["hardware_nodes: rpi5=(titan-01); amd64=(titan-02)", "namespace_cpu_top: synapse=95", "nodes_total: 2"], + metric_facts=["nodes_total: 2"], + key_facts=["namespace_cpu_top: synapse=95"], + facts_used=["namespace_cpu_top: synapse=95"], + allowed_nodes=["titan-01", "titan-02"], + allowed_namespaces=["synapse"], + runbook_paths=["runbooks/fix.md"], + lowered_question="which nodes are not raspberry and which runbook should i use?", + force_metric=True, + keyword_tokens=["namespace"], + question_tokens=["namespace", "raspberry", "runbook"], + snapshot_context="ClusterSnapshot:\nnamespace_cpu_top: synapse=95", + observer=lambda stage, note: observed.append((stage, note)), + mode="smart", + metric_keys=["nodes_total"], + ) + ) + + assert reply == "Latest metrics: nodes_total: 2." + assert scores.confidence == 70 + assert claims == [] + stages = [stage for stage, _note in observed] + assert "evidence_fix" in stages + assert "runbook_enforce" in stages + assert "evidence_guard" in stages + assert "focus_fix" in stages + + +def test_run_answer_empty_stock_and_budget_paths(monkeypatch: pytest.MonkeyPatch) -> None: + """Cover early returns and the pre-call time-budget failure path.""" + + settings = build_test_settings() + + class EmptySnapshot: + def get(self) -> dict[str, Any]: + return {} + + class EmptyKb: + def summary(self) -> str: + return "" + + def runbook_titles(self, limit: int = 6) -> str: + del limit + return "" + + def runbook_paths(self, limit: int = 10) -> list[str]: + del limit + return [] + + class MinimalEngine: + def __init__(self) -> None: + self._settings = settings + self._snapshot = EmptySnapshot() + self._kb = EmptyKb() + self._llm = SimpleNamespace(chat=lambda *args, **kwargs: None) + + async def _answer_stock(self, question: str) -> AnswerResult: + return AnswerResult(f"stock:{question}", AnswerScores(1, 1, 1, "low"), {"mode": "stock"}) + + def _get_state(self, conversation_id: str | None) -> None: + del conversation_id + return None + + engine = MinimalEngine() + empty = asyncio.run(answer_workflow.run_answer(engine, " ", mode="custom")) + assert "need a question" in empty.reply + stock = asyncio.run(answer_workflow.run_answer(engine, "hello", mode="stock")) + assert stock.reply == "stock:hello" + + budget_engine = MinimalEngine() + budget_engine._settings = replace(settings, quick_time_budget_sec=0.1) + moments = iter([100.0, 101.0, 101.0]) + monkeypatch.setattr(answer_workflow, "time", SimpleNamespace(monotonic=lambda: next(moments))) + timed_out = asyncio.run(answer_workflow.run_answer(budget_engine, "cluster status", mode="custom")) + assert "ran out of time" in timed_out.reply + assert timed_out.meta["time_budget_hit"] is True + + +def test_run_answer_custom_orchestration_edges(monkeypatch: pytest.MonkeyPatch) -> None: + """Exercise run_answer retrieval, tool, subanswer, debug, and persistence branches.""" + + settings = replace(build_test_settings(), debug_pipeline=True) + summary = { + "nodes": {"total": 2, "ready": 1, "not_ready": 1}, + "hardware_by_node": {"titan-01": "rpi5"}, + "namespace_pods": [{"namespace": "synapse", "pods_total": 3}], + } + summary_lines = ["nodes_total: 2", "namespace_cpu_top: synapse=95", "pvc_usage_top: data=88"] + + class FakeSnapshot: + def get(self) -> dict[str, Any]: + return {"snapshot": True} + + class FakeKb: + def summary(self) -> str: + return "KB summary." + + def runbook_titles(self, limit: int = 6) -> str: + del limit + return "Relevant runbooks:\n- Fix (runbooks/fix.md)" + + def runbook_paths(self, limit: int = 10) -> list[str]: + del limit + return ["runbooks/fix.md"] + + def chunk_lines(self, max_files: int = 4, max_chars: int = 800) -> list[str]: + del max_files, max_chars + return ["KB File: ops.md", "namespace_cpu_top: synapse=95"] + + class PromptLLM: + async def chat(self, messages: list[dict[str, str]], *, model: str | None = None, timeout_sec: float | None = None) -> str: + del model, timeout_sec + prompt = messages[-1]["content"] + if "normalized (string), keywords" in prompt: + return json.dumps( + { + "normalized": "How many namespace pods running postgres connections pvc storage ready baseline cpu?", + "keywords": ["namespace", "pods", "postgres", "pvc", "ready", "baseline", "cpu"], + } + ) + if "needs_snapshot (bool)" in prompt: + return '{"needs_snapshot":true,"needs_kb":true,"needs_tool":true,"answer_style":"direct","follow_up":false,"question_type":"open_ended","focus_entity":"unknown","focus_metric":"unknown"}' + if "Generate up to" in prompt: + return '[{"question":"Which namespace pods are running?","priority":2},{"question":"What postgres connections are ready?","priority":1}]' + if "command" in prompt and "rationale" in prompt: + return '{"command":"kubectl top pods -n synapse","rationale":"check cpu"}' + if "Answer the sub-question using the context" in prompt: + return "subanswer" + return "{}" + + class WorkflowEngine: + def __init__(self) -> None: + self._settings = settings + self._snapshot = FakeSnapshot() + self._kb = FakeKb() + self._llm = PromptLLM() + self.stored = False + + def _get_state(self, conversation_id: str | None) -> None: + del conversation_id + return None + + def _store_state(self, conversation_id: str, claims: list[ClaimItem], summary_arg: dict[str, Any], snapshot: dict[str, Any], pin_snapshot: bool) -> None: + assert conversation_id == "conv" + assert claims and summary_arg and snapshot and pin_snapshot + self.stored = True + + plan = replace( + answer_common._mode_plan(settings, "custom"), + use_raw_snapshot=True, + use_deep_retrieval=True, + use_tool=True, + parallelism=1, + subanswer_retries=1, + ) + + async def fake_select_metric_chunks(*_args: Any, **_kwargs: Any) -> tuple[list[str], list[str]]: + return ["namespace_cpu_top"], ["c0"] + + async def fake_score_chunks(*_args: Any, **_kwargs: Any) -> list[dict[str, Any]]: + return [{"id": "c0", "score": 99, "reason": "match"}] + + async def fake_select_fact_lines(*_args: Any, **_kwargs: Any) -> list[str]: + return ["namespace_cpu_top: synapse=95"] + + async def fake_extract_fact_types(*_args: Any, **_kwargs: Any) -> list[str]: + return ["cpu"] + + async def fake_derive_signals(*_args: Any, **_kwargs: Any) -> list[str]: + return ["cpu"] + + async def fake_scan_chunk_for_signals(*_args: Any, **_kwargs: Any) -> list[str]: + return ["namespace_cpu_top: synapse=95"] + + async def fake_prune_metric_candidates(*_args: Any, **_kwargs: Any) -> list[str]: + return ["namespace_cpu_top: synapse=95"] + + async def fake_finalize_answer(**_kwargs: Any) -> tuple[str, AnswerScores, list[ClaimItem]]: + return ( + "final answer", + AnswerScores(90, 91, 92, "low"), + [ClaimItem(id="c1", claim="synapse high", evidence=[EvidenceItem(path="namespace_cpu_top", reason="test")])], + ) + + monkeypatch.setattr(answer_workflow, "_mode_plan", lambda _settings, _mode: plan) + monkeypatch.setattr(answer_workflow, "build_summary", lambda _snapshot: summary) + monkeypatch.setattr(answer_workflow, "_summary_lines", lambda _snapshot: summary_lines) + monkeypatch.setattr(answer_workflow, "_raw_snapshot_chunks", lambda _snapshot: [{"id": "raw", "text": "raw_fact: 1", "summary": "raw"}]) + monkeypatch.setattr(answer_workflow, "_spine_from_summary", lambda _summary: {}) + monkeypatch.setattr(answer_workflow, "route_intent", lambda _question: SimpleNamespace(kind="nodes_count")) + monkeypatch.setattr(answer_workflow, "_select_metric_chunks", fake_select_metric_chunks) + monkeypatch.setattr(answer_workflow, "_score_chunks", fake_score_chunks) + monkeypatch.setattr(answer_workflow, "_select_chunks", lambda _chunks, _scored, _plan, _tokens, _must: [{"id": "c0", "text": "namespace_cpu_top: synapse=95", "summary": "namespace cpu"}]) + monkeypatch.setattr(answer_workflow, "_collect_fact_candidates", lambda _selected, limit: ["namespace_cpu_top: synapse=95"]) + monkeypatch.setattr(answer_workflow, "_select_fact_lines", fake_select_fact_lines) + monkeypatch.setattr(answer_workflow, "_extract_fact_types", fake_extract_fact_types) + monkeypatch.setattr(answer_workflow, "_derive_signals", fake_derive_signals) + monkeypatch.setattr(answer_workflow, "_scan_chunk_for_signals", fake_scan_chunk_for_signals) + monkeypatch.setattr(answer_workflow, "_prune_metric_candidates", fake_prune_metric_candidates) + monkeypatch.setattr(answer_workflow, "finalize_answer", fake_finalize_answer) + + engine = WorkflowEngine() + observed: list[tuple[str, str]] = [] + result = asyncio.run( + answer_workflow.run_answer( + engine, + "Run limitless cluster status", + mode="custom", + observer=lambda stage, note: observed.append((stage, note)), + conversation_id="conv", + snapshot_pin=True, + ) + ) + + assert result.reply == "final answer" + assert result.meta["tool_hint"] == {"command": "kubectl top pods -n synapse", "rationale": "check cpu"} + assert engine.stored is True + stages = [stage for stage, _note in observed] + assert {"normalize", "route", "decompose", "retrieve", "tool", "subanswers", "synthesize"} <= set(stages) + + +def test_run_answer_factsheet_and_spine_shortcuts(monkeypatch: pytest.MonkeyPatch) -> None: + """Cover fact-sheet observer paths, falsey KB handling, and fast spine returns.""" + + settings = build_test_settings() + + class FactSnapshot: + def get(self) -> dict[str, Any]: + return {"snapshot": True} + + class FactKb: + def __init__(self, enabled: bool = True) -> None: + self.enabled = enabled + + def __bool__(self) -> bool: + return self.enabled + + def summary(self) -> str: + return "KB summary." + + def runbook_titles(self, limit: int = 6) -> str: + del limit + return "" + + def runbook_paths(self, limit: int = 10) -> list[str]: + del limit + return [] + + def chunk_lines(self, max_files: int = 4, max_chars: int = 800) -> list[str]: + del max_files, max_chars + return ["runbook: atlas"] + + class FactLLM: + async def chat(self, messages: list[dict[str, str]], *, model: str | None = None, timeout_sec: float | None = None) -> str: + del messages, model, timeout_sec + return "fact sheet reply" + + class FactEngine: + def __init__(self, kb: FactKb) -> None: + self._settings = settings + self._snapshot = FactSnapshot() + self._kb = kb + self._llm = FactLLM() + + def _get_state(self, conversation_id: str | None) -> None: + del conversation_id + return None + + monkeypatch.setattr(answer_workflow, "build_summary", lambda _snapshot: {"nodes": {"total": 2, "ready": 1, "not_ready": 1}}) + monkeypatch.setattr(answer_workflow, "_summary_lines", lambda _snapshot: ["nodes_total:2,ready=1,not_ready=1", "namespace_cpu_top: synapse=95"]) + + observed: list[tuple[str, str]] = [] + heuristic = asyncio.run( + answer_workflow.run_answer( + FactEngine(FactKb(True)), + "How many ready nodes are there?", + mode="smart", + observer=lambda stage, note: observed.append((stage, note)), + ) + ) + assert "1 ready nodes out of 2 total" in heuristic.reply + + fact_reply = asyncio.run( + answer_workflow.run_answer( + FactEngine(FactKb(False)), + "Give cluster health", + mode="smart", + observer=lambda stage, note: observed.append((stage, note)), + ) + ) + assert fact_reply.reply == "fact sheet reply" + assert ("factsheet", "building fact sheet") in observed + assert ("quick", "answering from fact sheet") in observed + + class SpineLLM: + async def chat(self, messages: list[dict[str, str]], *, model: str | None = None, timeout_sec: float | None = None) -> str: + del model, timeout_sec + prompt = messages[-1]["content"] + if "normalized (string), keywords" in prompt: + return '{"normalized":"How many nodes?","keywords":["nodes"]}' + if "needs_snapshot (bool)" in prompt: + return '{"needs_snapshot":true,"needs_kb":false,"needs_tool":false,"answer_style":"direct","follow_up":false,"question_type":"open_ended","focus_entity":"unknown","focus_metric":"unknown"}' + return "{}" + + spine_engine = FactEngine(FactKb(True)) + spine_engine._llm = SpineLLM() + monkeypatch.setattr(answer_workflow, "_spine_from_summary", lambda _summary: {}) + monkeypatch.setattr(answer_workflow, "route_intent", lambda _question: SimpleNamespace(kind="nodes_count")) + spine_reply = asyncio.run(answer_workflow.run_answer(spine_engine, "Run limitless how many nodes?", mode="fast")) + assert spine_reply.reply == "nodes_total:2,ready=1,not_ready=1"