atlasbot: satisfy ruff complexity checks

This commit is contained in:
Brad Stein 2026-02-02 23:41:37 -03:00
parent 96109d6c24
commit 8bd6023b61
3 changed files with 38 additions and 22 deletions

View File

@ -17,6 +17,12 @@ from atlasbot.state.store import ClaimStore
log = logging.getLogger(__name__)
FOLLOWUP_SHORT_WORDS = 6
NS_ENTRY_MIN_LEN = 2
DEDUP_MIN_SENTENCES = 3
TOKEN_MIN_LEN = 3
RUNBOOK_SIMILARITY_THRESHOLD = 0.4
class LLMLimitReached(RuntimeError):
pass
@ -91,7 +97,7 @@ class AnswerEngine:
self._snapshot = snapshot
self._store = ClaimStore(settings.state_db_path, settings.conversation_ttl_sec)
async def answer(
async def answer( # noqa: C901, PLR0912, PLR0913, PLR0915
self,
question: str,
*,
@ -252,7 +258,7 @@ class AnswerEngine:
if not classify.get("follow_up") and state and state.claims:
follow_terms = ("there", "that", "those", "these", "it", "them", "that one", "this", "former", "latter")
if any(term in lowered_question for term in follow_terms) or len(normalized.split()) <= 6:
if any(term in lowered_question for term in follow_terms) or len(normalized.split()) <= FOLLOWUP_SHORT_WORDS:
classify["follow_up"] = True
if classify.get("follow_up") and state and state.claims:
@ -590,7 +596,7 @@ class AnswerEngine:
reply = await self._llm.chat(messages, model=self._settings.ollama_model)
return AnswerResult(reply, _default_scores(), {"mode": "stock"})
async def _synthesize_answer(
async def _synthesize_answer( # noqa: PLR0913
self,
question: str,
subanswers: list[str],
@ -716,7 +722,7 @@ class AnswerEngine:
dedup_prompt = prompts.DEDUP_PROMPT + "\nDraft: " + reply
return await call_llm(prompts.DEDUP_SYSTEM, dedup_prompt, model=plan.fast_model, tag=tag)
async def _answer_followup(
async def _answer_followup( # noqa: C901, PLR0913
self,
question: str,
state: ConversationState,
@ -846,7 +852,7 @@ def _strip_followup_meta(reply: str) -> str:
return cleaned
def _build_meta(
def _build_meta( # noqa: PLR0913
mode: str,
call_count: int,
call_cap: int,
@ -1140,7 +1146,7 @@ def _hotspot_evidence(summary: dict[str, Any]) -> list[str]:
node_class = hardware_by_node.get(node)
ns_parts = []
for entry in ns_map.get(node, [])[:3]:
if isinstance(entry, (list, tuple)) and len(entry) >= 2:
if isinstance(entry, (list, tuple)) and len(entry) >= NS_ENTRY_MIN_LEN:
ns_parts.append(f"{entry[0]}={entry[1]}")
ns_text = ", ".join(ns_parts)
value_text = f"{value:.2f}" if isinstance(value, (int, float)) else str(value)
@ -1375,7 +1381,7 @@ def _strip_unknown_entities(reply: str, unknown_nodes: list[str], unknown_namesp
return cleaned or reply
def _lexicon_context(summary: dict[str, Any]) -> str:
def _lexicon_context(summary: dict[str, Any]) -> str: # noqa: C901
if not isinstance(summary, dict):
return ""
lexicon = summary.get("lexicon")
@ -1479,7 +1485,7 @@ def _needs_dedup(reply: str) -> bool:
if not reply:
return False
sentences = [s.strip() for s in re.split(r"(?<=[.!?])\\s+", reply) if s.strip()]
if len(sentences) < 3:
if len(sentences) < DEDUP_MIN_SENTENCES:
return False
seen = set()
for sent in sentences:
@ -1533,7 +1539,7 @@ def _extract_keywords(
tokens: list[str] = []
for source in [raw_question, normalized, *sub_questions]:
for part in re.split(r"[^a-zA-Z0-9_-]+", source.lower()):
if len(part) < 3 or part in stopwords:
if len(part) < TOKEN_MIN_LEN or part in stopwords:
continue
tokens.append(part)
if keywords:
@ -1619,7 +1625,7 @@ def _best_runbook_match(candidate: str, allowed: list[str]) -> str | None:
if score > best_score:
best_score = score
best = path
return best if best_score >= 0.4 else None
return best if best_score >= RUNBOOK_SIMILARITY_THRESHOLD else None
def _resolve_path(data: Any, path: str) -> Any | None:

View File

@ -45,7 +45,7 @@ async def main() -> None:
queue = QueueManager(settings, handler)
await queue.start()
async def answer_handler(
async def answer_handler( # noqa: PLR0913
question: str,
mode: str,
history=None,

View File

@ -8,6 +8,8 @@ from atlasbot.config import Settings
log = logging.getLogger(__name__)
PVC_USAGE_CRITICAL = 90
_BYTES_KB = 1024
_BYTES_MB = 1024 * 1024
_BYTES_GB = 1024 * 1024 * 1024
@ -220,7 +222,10 @@ def _build_hardware_by_node(nodes_detail: list[dict[str, Any]]) -> dict[str, Any
return {"hardware_by_node": mapping} if mapping else {}
def _build_hardware_usage(metrics: dict[str, Any], hardware_by_node: dict[str, Any] | None) -> dict[str, Any]:
def _build_hardware_usage( # noqa: C901
metrics: dict[str, Any],
hardware_by_node: dict[str, Any] | None,
) -> dict[str, Any]:
if not isinstance(hardware_by_node, dict) or not hardware_by_node:
return {}
node_load = metrics.get("node_load") if isinstance(metrics.get("node_load"), list) else []
@ -564,7 +569,7 @@ def _format_names(names: list[str]) -> str:
return ", ".join(sorted(names))
def _append_nodes(lines: list[str], summary: dict[str, Any]) -> None:
def _append_nodes(lines: list[str], summary: dict[str, Any]) -> None: # noqa: C901
nodes = summary.get("nodes") if isinstance(summary.get("nodes"), dict) else {}
if not nodes:
return
@ -781,7 +786,7 @@ def _append_namespace_nodes(lines: list[str], summary: dict[str, Any]) -> None:
lines.append("namespace_nodes_top: " + "; ".join(parts))
def _append_node_pods(lines: list[str], summary: dict[str, Any]) -> None:
def _append_node_pods(lines: list[str], summary: dict[str, Any]) -> None: # noqa: C901, PLR0912
node_pods = summary.get("node_pods")
if not isinstance(node_pods, list) or not node_pods:
return
@ -1408,7 +1413,7 @@ def _append_workloads(lines: list[str], summary: dict[str, Any]) -> None:
lines.append("workloads_top: " + "; ".join(parts))
def _append_topology(lines: list[str], summary: dict[str, Any]) -> None:
def _append_topology(lines: list[str], summary: dict[str, Any]) -> None: # noqa: C901, PLR0912
topology = summary.get("topology") if isinstance(summary.get("topology"), dict) else {}
if not topology:
return
@ -1497,7 +1502,7 @@ def _append_signals(lines: list[str], summary: dict[str, Any]) -> None:
lines.append(f"- {detail}")
def _append_profiles(lines: list[str], summary: dict[str, Any]) -> None:
def _append_profiles(lines: list[str], summary: dict[str, Any]) -> None: # noqa: C901
profiles = summary.get("profiles") if isinstance(summary.get("profiles"), dict) else {}
if not profiles:
return
@ -1584,7 +1589,7 @@ def _append_node_load_summary(lines: list[str], summary: dict[str, Any]) -> None
lines.append("node_load_outliers: " + _format_names(names))
def _append_hardware_usage(lines: list[str], summary: dict[str, Any]) -> None:
def _append_hardware_usage(lines: list[str], summary: dict[str, Any]) -> None: # noqa: C901, PLR0912
usage = summary.get("hardware_usage_avg")
if not isinstance(usage, list) or not usage:
return
@ -1661,9 +1666,11 @@ def _build_cluster_watchlist(summary: dict[str, Any]) -> dict[str, Any]:
if flux_not_ready > 0:
items.append(f"flux_not_ready={flux_not_ready}")
pvc_usage = summary.get("pvc_usage_top") if isinstance(summary.get("pvc_usage_top"), list) else []
high_pvc = [entry for entry in pvc_usage if isinstance(entry, dict) and (entry.get("value") or 0) >= 90]
high_pvc = [
entry for entry in pvc_usage if isinstance(entry, dict) and (entry.get("value") or 0) >= PVC_USAGE_CRITICAL
]
if high_pvc:
items.append("pvc_usage>=90%")
items.append(f"pvc_usage>={PVC_USAGE_CRITICAL}%")
return {"cluster_watchlist": items} if items else {}
@ -1695,7 +1702,10 @@ def _capacity_headroom_parts(entries: list[dict[str, Any]]) -> list[str]:
return parts
def _append_namespace_capacity_summary(lines: list[str], summary: dict[str, Any]) -> None: # noqa: C901
def _append_namespace_capacity_summary( # noqa: C901, PLR0912
lines: list[str],
summary: dict[str, Any],
) -> None:
cap = summary.get("namespace_capacity_summary")
if not isinstance(cap, dict) or not cap:
return
@ -1784,7 +1794,7 @@ def _append_lexicon(lines: list[str], summary: dict[str, Any]) -> None:
lines.append(f"lexicon_alias: {key} => {value}")
def _append_cross_stats(lines: list[str], summary: dict[str, Any]) -> None:
def _append_cross_stats(lines: list[str], summary: dict[str, Any]) -> None: # noqa: C901
cross_stats = summary.get("cross_stats")
if not isinstance(cross_stats, dict):
return
@ -1841,7 +1851,7 @@ def _append_cross_stats(lines: list[str], summary: dict[str, Any]) -> None:
lines.append(f"cross_pvc_usage: {namespace}/{pvc} used={_format_float(used)}")
def summary_text(snapshot: dict[str, Any] | None) -> str:
def summary_text(snapshot: dict[str, Any] | None) -> str: # noqa: PLR0915
summary = build_summary(snapshot)
if not summary:
return ""