triage: split local diagnosis module

This commit is contained in:
codex 2026-05-20 03:24:27 -03:00
parent 9e6dd16cb0
commit 594f106146
4 changed files with 412 additions and 354 deletions

View File

@ -11,12 +11,17 @@ from ..db.storage import Storage
from ..settings import settings
from ..utils.logging import get_logger
from .cluster_state import collect_cluster_state
from .testing_triage_diagnosis import (
TRIAGE_DIAGNOSIS_EVENT_TYPE,
diagnose_testing_triage,
latest_testing_triage_diagnosis, # noqa: F401 - re-exported for app routes.
model_diagnosis_enabled,
)
logger = get_logger(__name__)
TRIAGE_EVENT_TYPE = "testing_triage_bundle"
TRIAGE_DIAGNOSIS_EVENT_TYPE = "testing_triage_diagnosis"
_SUCCESS_STATUS = "ok|passed|success|not_applicable|skipped|na|n/a"
_JENKINS_TREE = (
"jobs[name,url,color,lastBuild[number,result,timestamp,duration,url],"
@ -26,15 +31,6 @@ _JENKINS_TREE = (
_MAX_JENKINS_LOG_LINES = 80
_MAX_JENKINS_LOG_CHARS = 12000
_MAX_EVIDENCE_ITEMS = 12
_MAX_MODEL_EVIDENCE_CHARS = 24000
_MAX_MODEL_OUTPUT_CHARS = 12000
_DIAGNOSIS_SYSTEM_PROMPT = (
"You are Ariadne's local testing triage model. Use only the supplied JSON evidence. "
"Return JSON only with keys: headline, root_cause, blast_radius, confidence, "
"needs_human, next_actions, evidence_refs. Confidence must be low, medium, or high. "
"Next actions must be read-only verification or Flux/IaC changes; never suggest "
"mutating kubectl commands or reading Kubernetes Secret values."
)
@dataclass(frozen=True)
@ -56,12 +52,6 @@ def latest_testing_triage_bundle(storage: Storage) -> dict[str, Any] | None:
return _latest_event_payload(storage, TRIAGE_EVENT_TYPE)
def latest_testing_triage_diagnosis(storage: Storage) -> dict[str, Any] | None:
"""Return the most recent stored local-model testing diagnosis, if present."""
return _latest_event_payload(storage, TRIAGE_DIAGNOSIS_EVENT_TYPE)
def _latest_event_payload(storage: Storage, event_type: str) -> dict[str, Any] | None:
rows = storage.list_events(limit=1, event_type=event_type)
if not rows:
@ -83,7 +73,7 @@ def run_testing_triage(storage: Storage) -> TestingTriageSummary:
bundle = collect_testing_triage(storage)
storage.record_event(TRIAGE_EVENT_TYPE, bundle)
if _model_diagnosis_enabled():
if model_diagnosis_enabled():
diagnosis = diagnose_testing_triage(bundle)
storage.record_event(TRIAGE_DIAGNOSIS_EVENT_TYPE, diagnosis)
summary = bundle.get("summary") if isinstance(bundle.get("summary"), dict) else {}
@ -162,222 +152,6 @@ def collect_testing_triage(storage: Storage | None = None) -> dict[str, Any]:
return bundle
def diagnose_testing_triage(bundle: dict[str, Any]) -> dict[str, Any]:
"""Ask the configured local model to summarize a deterministic triage bundle."""
base_url = _model_url()
if not base_url:
return _diagnosis_unavailable(bundle, "model_url_not_configured")
try:
with httpx.Client(timeout=_model_timeout()) as client:
response = client.post(
f"{base_url}/api/generate",
json={
"model": _model_name(),
"system": _DIAGNOSIS_SYSTEM_PROMPT,
"prompt": _diagnosis_prompt(bundle),
"stream": False,
"format": "json",
"options": {
"temperature": 0.1,
"top_p": 0.9,
},
},
)
response.raise_for_status()
payload = response.json()
except Exception as exc:
return _diagnosis_unavailable(bundle, f"model_request_failed: {exc}")
raw = str(payload.get("response") or "")
parsed, parse_error = _parse_model_response(raw)
return _diagnosis_from_model(bundle, parsed, raw, parse_error)
def _model_diagnosis_enabled() -> bool:
return bool(_model_url())
def _model_url() -> str:
return str(getattr(settings, "testing_triage_model_url", "") or "").strip().rstrip("/")
def _model_name() -> str:
return str(getattr(settings, "testing_triage_model", "qwen2.5:7b-instruct-q4_0") or "").strip()
def _model_timeout() -> float:
return float(getattr(settings, "testing_triage_model_timeout_sec", 180.0) or 180.0)
def _diagnosis_prompt(bundle: dict[str, Any]) -> str:
payload = {
"task": "Summarize testing and cluster evidence for tonight's debugging work.",
"required_output_schema": {
"headline": "one sentence",
"root_cause": "most likely cause, or say evidence is insufficient",
"blast_radius": "affected suites, namespaces, pods, nodes, or services",
"confidence": "low|medium|high",
"needs_human": True,
"next_actions": ["short, concrete actions"],
"evidence_refs": ["specific evidence keys or values used"],
},
"bundle": _model_evidence_payload(bundle),
}
evidence = json.dumps(payload, sort_keys=True, separators=(",", ":"), default=str)
if len(evidence) > _MAX_MODEL_EVIDENCE_CHARS:
evidence = evidence[:_MAX_MODEL_EVIDENCE_CHARS] + "\n[truncated]"
return evidence
def _model_evidence_payload(bundle: dict[str, Any]) -> dict[str, Any]:
summary = bundle.get("summary") if isinstance(bundle.get("summary"), dict) else {}
evidence = bundle.get("evidence") if isinstance(bundle.get("evidence"), dict) else {}
return {
"kind": bundle.get("kind"),
"generated_at": bundle.get("generated_at"),
"summary": summary,
"evidence": evidence,
"unknowns": bundle.get("unknowns") if isinstance(bundle.get("unknowns"), list) else [],
}
def _parse_model_response(raw: str) -> tuple[dict[str, Any], str | None]:
if not raw.strip():
return {}, "empty_model_response"
try:
parsed = json.loads(raw)
except json.JSONDecodeError as exc:
return {}, f"model_json_parse_failed: {exc}"
return (parsed if isinstance(parsed, dict) else {}, None)
def _diagnosis_from_model(
bundle: dict[str, Any],
parsed: dict[str, Any],
raw: str,
parse_error: str | None,
) -> dict[str, Any]:
summary = bundle.get("summary") if isinstance(bundle.get("summary"), dict) else {}
unknowns = list(bundle.get("unknowns") or []) if isinstance(bundle.get("unknowns"), list) else []
if parse_error:
unknowns.append(parse_error)
diagnosis = {
"headline": _text_value(parsed.get("headline"), "Testing triage needs review."),
"root_cause": _text_value(parsed.get("root_cause"), "Evidence is insufficient for a confident root cause."),
"blast_radius": _text_value(parsed.get("blast_radius"), _blast_radius_fallback(summary)),
"confidence": _confidence(parsed.get("confidence")),
"needs_human": _bool_value(parsed.get("needs_human"), bool(summary.get("problem_count"))),
"next_actions": _text_list(parsed.get("next_actions")) or _default_next_actions(summary),
"evidence_refs": _text_list(parsed.get("evidence_refs")) or _default_evidence_refs(summary),
}
return {
"kind": "testing_triage_diagnosis",
"generated_at": datetime.now(timezone.utc).isoformat(),
"evidence_generated_at": bundle.get("generated_at") or "",
"evidence_summary": summary,
"status": "needs_attention" if diagnosis["needs_human"] or summary.get("problem_count") else "ok",
"model": _model_name(),
"source": "local_ollama",
"diagnosis": diagnosis,
"openclaw": {
"ariadne_latest_url": "/api/internal/testing/triage/diagnosis/latest",
"ariadne_run_url": "/api/internal/testing/triage/diagnosis/run",
"evidence_url": "/api/internal/testing/triage/latest",
},
"unknowns": unknowns,
"raw_model": raw[:_MAX_MODEL_OUTPUT_CHARS],
}
def _diagnosis_unavailable(bundle: dict[str, Any], reason: str) -> dict[str, Any]:
summary = bundle.get("summary") if isinstance(bundle.get("summary"), dict) else {}
return {
"kind": "testing_triage_diagnosis",
"generated_at": datetime.now(timezone.utc).isoformat(),
"evidence_generated_at": bundle.get("generated_at") or "",
"evidence_summary": summary,
"status": "unavailable",
"model": _model_name(),
"source": "local_ollama",
"diagnosis": {
"headline": "Local model diagnosis is unavailable.",
"root_cause": reason,
"blast_radius": _blast_radius_fallback(summary),
"confidence": "low",
"needs_human": True,
"next_actions": [
"Use the stored evidence bundle for manual triage.",
"Verify the Ariadne testing triage model URL and OpenClaw Ollama service health.",
],
"evidence_refs": _default_evidence_refs(summary),
},
"openclaw": {
"ariadne_latest_url": "/api/internal/testing/triage/diagnosis/latest",
"ariadne_run_url": "/api/internal/testing/triage/diagnosis/run",
"evidence_url": "/api/internal/testing/triage/latest",
},
"unknowns": [reason],
"raw_model": "",
}
def _text_value(value: Any, default: str) -> str:
if isinstance(value, str) and value.strip():
return value.strip()
return default
def _text_list(value: Any) -> list[str]:
if isinstance(value, str) and value.strip():
return [value.strip()]
if not isinstance(value, list):
return []
return [str(item).strip() for item in value if str(item).strip()][:8]
def _confidence(value: Any) -> str:
confidence = str(value or "").strip().lower()
return confidence if confidence in {"low", "medium", "high"} else "low"
def _bool_value(value: Any, default: bool) -> bool:
if isinstance(value, bool):
return value
if isinstance(value, str):
lowered = value.strip().lower()
if lowered in {"true", "yes", "1"}:
return True
if lowered in {"false", "no", "0"}:
return False
return default
def _blast_radius_fallback(summary: dict[str, Any]) -> str:
failed_suites = summary.get("failed_suites") if isinstance(summary.get("failed_suites"), list) else []
if failed_suites:
return ", ".join(str(item) for item in failed_suites[:6])
return "No failed suite scope identified in the evidence bundle."
def _default_next_actions(summary: dict[str, Any]) -> list[str]:
if int(summary.get("problem_count") or 0) > 0:
return [
"Review the evidence bundle sections with non-empty problem lists.",
"Check the named Jenkins build logs and Flux Kustomizations before changing manifests.",
]
return ["No action required unless a fresh bundle changes the status."]
def _default_evidence_refs(summary: dict[str, Any]) -> list[str]:
refs = [f"summary.status={summary.get('status')}", f"summary.problem_count={summary.get('problem_count')}"]
failed_suites = summary.get("failed_suites") if isinstance(summary.get("failed_suites"), list) else []
if failed_suites:
refs.append("summary.failed_suites=" + ",".join(str(item) for item in failed_suites[:6]))
return refs
def _latest_cluster_snapshot(storage: Storage | None, errors: list[str]) -> dict[str, Any]:
if storage is not None:
try:

View File

@ -0,0 +1,253 @@
from __future__ import annotations
from datetime import datetime, timezone
import json
from typing import Any
import httpx
from ..db.storage import Storage
from ..settings import settings
TRIAGE_DIAGNOSIS_EVENT_TYPE = "testing_triage_diagnosis"
_MAX_MODEL_EVIDENCE_CHARS = 24000
_MAX_MODEL_OUTPUT_CHARS = 12000
_DIAGNOSIS_SYSTEM_PROMPT = (
"You are Ariadne's local testing triage model. Use only the supplied JSON evidence. "
"Return JSON only with keys: headline, root_cause, blast_radius, confidence, "
"needs_human, next_actions, evidence_refs. Confidence must be low, medium, or high. "
"Next actions must be read-only verification or Flux/IaC changes; never suggest "
"mutating kubectl commands or reading Kubernetes Secret values."
)
def latest_testing_triage_diagnosis(storage: Storage) -> dict[str, Any] | None:
"""Return the most recent stored local-model testing diagnosis, if present."""
rows = storage.list_events(limit=1, event_type=TRIAGE_DIAGNOSIS_EVENT_TYPE)
if not rows:
return None
detail = rows[0].get("detail")
if isinstance(detail, dict):
return detail
if isinstance(detail, str):
try:
payload = json.loads(detail)
except json.JSONDecodeError:
return None
return payload if isinstance(payload, dict) else None
return None
def model_diagnosis_enabled() -> bool:
return bool(_model_url())
def diagnose_testing_triage(bundle: dict[str, Any]) -> dict[str, Any]:
"""Ask the configured local model to summarize a deterministic triage bundle."""
base_url = _model_url()
if not base_url:
return _diagnosis_unavailable(bundle, "model_url_not_configured")
try:
with httpx.Client(timeout=_model_timeout()) as client:
response = client.post(
f"{base_url}/api/generate",
json={
"model": _model_name(),
"system": _DIAGNOSIS_SYSTEM_PROMPT,
"prompt": _diagnosis_prompt(bundle),
"stream": False,
"format": "json",
"options": {"temperature": 0.1, "top_p": 0.9},
},
)
response.raise_for_status()
payload = response.json()
except Exception as exc:
return _diagnosis_unavailable(bundle, f"model_request_failed: {exc}")
raw = str(payload.get("response") or "")
parsed, parse_error = _parse_model_response(raw)
return _diagnosis_from_model(bundle, parsed, raw, parse_error)
def _model_url() -> str:
return str(getattr(settings, "testing_triage_model_url", "") or "").strip().rstrip("/")
def _model_name() -> str:
return str(getattr(settings, "testing_triage_model", "qwen2.5:7b-instruct-q4_0") or "").strip()
def _model_timeout() -> float:
return float(getattr(settings, "testing_triage_model_timeout_sec", 180.0) or 180.0)
def _diagnosis_prompt(bundle: dict[str, Any]) -> str:
payload = {
"task": "Summarize testing and cluster evidence for tonight's debugging work.",
"required_output_schema": {
"headline": "one sentence",
"root_cause": "most likely cause, or say evidence is insufficient",
"blast_radius": "affected suites, namespaces, pods, nodes, or services",
"confidence": "low|medium|high",
"needs_human": True,
"next_actions": ["short, concrete actions"],
"evidence_refs": ["specific evidence keys or values used"],
},
"bundle": _model_evidence_payload(bundle),
}
evidence = json.dumps(payload, sort_keys=True, separators=(",", ":"), default=str)
if len(evidence) > _MAX_MODEL_EVIDENCE_CHARS:
evidence = evidence[:_MAX_MODEL_EVIDENCE_CHARS] + "\n[truncated]"
return evidence
def _model_evidence_payload(bundle: dict[str, Any]) -> dict[str, Any]:
summary = bundle.get("summary") if isinstance(bundle.get("summary"), dict) else {}
evidence = bundle.get("evidence") if isinstance(bundle.get("evidence"), dict) else {}
return {
"kind": bundle.get("kind"),
"generated_at": bundle.get("generated_at"),
"summary": summary,
"evidence": evidence,
"unknowns": bundle.get("unknowns") if isinstance(bundle.get("unknowns"), list) else [],
}
def _parse_model_response(raw: str) -> tuple[dict[str, Any], str | None]:
if not raw.strip():
return {}, "empty_model_response"
try:
parsed = json.loads(raw)
except json.JSONDecodeError as exc:
return {}, f"model_json_parse_failed: {exc}"
return (parsed if isinstance(parsed, dict) else {}, None)
def _diagnosis_from_model(
bundle: dict[str, Any],
parsed: dict[str, Any],
raw: str,
parse_error: str | None,
) -> dict[str, Any]:
summary = bundle.get("summary") if isinstance(bundle.get("summary"), dict) else {}
unknowns = list(bundle.get("unknowns") or []) if isinstance(bundle.get("unknowns"), list) else []
if parse_error:
unknowns.append(parse_error)
diagnosis = {
"headline": _text_value(parsed.get("headline"), "Testing triage needs review."),
"root_cause": _text_value(parsed.get("root_cause"), "Evidence is insufficient for a confident root cause."),
"blast_radius": _text_value(parsed.get("blast_radius"), _blast_radius_fallback(summary)),
"confidence": _confidence(parsed.get("confidence")),
"needs_human": _bool_value(parsed.get("needs_human"), bool(summary.get("problem_count"))),
"next_actions": _text_list(parsed.get("next_actions")) or _default_next_actions(summary),
"evidence_refs": _text_list(parsed.get("evidence_refs")) or _default_evidence_refs(summary),
}
return {
"kind": "testing_triage_diagnosis",
"generated_at": datetime.now(timezone.utc).isoformat(),
"evidence_generated_at": bundle.get("generated_at") or "",
"evidence_summary": summary,
"status": "needs_attention" if diagnosis["needs_human"] or summary.get("problem_count") else "ok",
"model": _model_name(),
"source": "local_ollama",
"diagnosis": diagnosis,
"openclaw": {
"ariadne_latest_url": "/api/internal/testing/triage/diagnosis/latest",
"ariadne_run_url": "/api/internal/testing/triage/diagnosis/run",
"evidence_url": "/api/internal/testing/triage/latest",
},
"unknowns": unknowns,
"raw_model": raw[:_MAX_MODEL_OUTPUT_CHARS],
}
def _diagnosis_unavailable(bundle: dict[str, Any], reason: str) -> dict[str, Any]:
summary = bundle.get("summary") if isinstance(bundle.get("summary"), dict) else {}
return {
"kind": "testing_triage_diagnosis",
"generated_at": datetime.now(timezone.utc).isoformat(),
"evidence_generated_at": bundle.get("generated_at") or "",
"evidence_summary": summary,
"status": "unavailable",
"model": _model_name(),
"source": "local_ollama",
"diagnosis": {
"headline": "Local model diagnosis is unavailable.",
"root_cause": reason,
"blast_radius": _blast_radius_fallback(summary),
"confidence": "low",
"needs_human": True,
"next_actions": [
"Use the stored evidence bundle for manual triage.",
"Verify the Ariadne testing triage model URL and OpenClaw Ollama service health.",
],
"evidence_refs": _default_evidence_refs(summary),
},
"openclaw": {
"ariadne_latest_url": "/api/internal/testing/triage/diagnosis/latest",
"ariadne_run_url": "/api/internal/testing/triage/diagnosis/run",
"evidence_url": "/api/internal/testing/triage/latest",
},
"unknowns": [reason],
"raw_model": "",
}
def _text_value(value: Any, default: str) -> str:
if isinstance(value, str) and value.strip():
return value.strip()
return default
def _text_list(value: Any) -> list[str]:
if isinstance(value, str) and value.strip():
return [value.strip()]
if not isinstance(value, list):
return []
return [str(item).strip() for item in value if str(item).strip()][:8]
def _confidence(value: Any) -> str:
confidence = str(value or "").strip().lower()
return confidence if confidence in {"low", "medium", "high"} else "low"
def _bool_value(value: Any, default: bool) -> bool:
if isinstance(value, bool):
return value
if isinstance(value, str):
lowered = value.strip().lower()
if lowered in {"true", "yes", "1"}:
return True
if lowered in {"false", "no", "0"}:
return False
return default
def _blast_radius_fallback(summary: dict[str, Any]) -> str:
failed_suites = summary.get("failed_suites") if isinstance(summary.get("failed_suites"), list) else []
if failed_suites:
return ", ".join(str(item) for item in failed_suites[:6])
return "No failed suite scope identified in the evidence bundle."
def _default_next_actions(summary: dict[str, Any]) -> list[str]:
if int(summary.get("problem_count") or 0) > 0:
return [
"Review the evidence bundle sections with non-empty problem lists.",
"Check the named Jenkins build logs and Flux Kustomizations before changing manifests.",
]
return ["No action required unless a fresh bundle changes the status."]
def _default_evidence_refs(summary: dict[str, Any]) -> list[str]:
refs = [f"summary.status={summary.get('status')}", f"summary.problem_count={summary.get('problem_count')}"]
failed_suites = summary.get("failed_suites") if isinstance(summary.get("failed_suites"), list) else []
if failed_suites:
refs.append("summary.failed_suites=" + ",".join(str(item) for item in failed_suites[:6]))
return refs

View File

@ -91,26 +91,6 @@ def test_run_testing_triage_stores_latest(monkeypatch) -> None:
assert latest["summary"]["status"] == "ok"
def test_run_testing_triage_stores_diagnosis_when_model_enabled(monkeypatch) -> None:
storage = DummyStorage()
bundle = {"summary": {"status": "needs_attention", "problem_count": 1, "failed_suites": ["ariadne"]}}
diagnosis = {"kind": "testing_triage_diagnosis", "status": "needs_attention"}
monkeypatch.setattr(testing_triage, "settings", SettingsStub(testing_triage_model_url="http://ollama"))
monkeypatch.setattr(testing_triage, "collect_testing_triage", lambda _storage: bundle)
monkeypatch.setattr(testing_triage, "diagnose_testing_triage", lambda _bundle: diagnosis)
summary = testing_triage.run_testing_triage(storage)
latest = testing_triage.latest_testing_triage_diagnosis(storage)
assert summary.status == "needs_attention"
assert [event[0] for event in storage.events] == [
testing_triage.TRIAGE_EVENT_TYPE,
testing_triage.TRIAGE_DIAGNOSIS_EVENT_TYPE,
]
assert latest == diagnosis
def test_latest_testing_triage_bundle_handles_json_strings() -> None:
class JsonStorage:
def list_events(self, limit: int = 1, event_type: str | None = None): # type: ignore[no-untyped-def]
@ -142,107 +122,6 @@ def test_latest_testing_triage_bundle_ignores_bad_payloads() -> None:
assert testing_triage.latest_testing_triage_bundle(EmptyStorage()) is None # type: ignore[arg-type]
def test_diagnose_testing_triage_calls_local_ollama(monkeypatch) -> None:
captured = {}
model_response = json.dumps(
{
"headline": "Ariadne has one failing suite.",
"root_cause": "Jenkins reported a failed ariadne run.",
"blast_radius": "ariadne",
"confidence": "medium",
"needs_human": True,
"next_actions": ["Inspect the failed Jenkins build log."],
"evidence_refs": ["summary.failed_suites=ariadne"],
}
)
class FakeResponse:
def raise_for_status(self) -> None:
return None
def json(self): # type: ignore[no-untyped-def]
return {"response": model_response}
class FakeClient:
def __init__(self, *, timeout) -> None: # type: ignore[no-untyped-def]
captured["timeout"] = timeout
def __enter__(self):
return self
def __exit__(self, *args) -> None: # type: ignore[no-untyped-def]
return None
def post(self, url, json=None): # type: ignore[no-untyped-def]
captured["url"] = url
captured["request"] = json
return FakeResponse()
monkeypatch.setattr(
testing_triage,
"settings",
SettingsStub(
testing_triage_model_url="http://ollama/",
testing_triage_model="tiny-model",
testing_triage_model_timeout_sec=3.0,
),
)
monkeypatch.setattr(testing_triage.httpx, "Client", FakeClient)
diagnosis = testing_triage.diagnose_testing_triage(
{
"kind": "testing_triage_bundle",
"generated_at": "now",
"summary": {"status": "needs_attention", "problem_count": 1, "failed_suites": ["ariadne"]},
"evidence": {"jenkins": {"failed_builds": [{"job": "ariadne"}]}},
"unknowns": [],
}
)
assert captured["url"] == "http://ollama/api/generate"
assert captured["timeout"] == 3.0
assert captured["request"]["model"] == "tiny-model"
assert captured["request"]["format"] == "json"
assert diagnosis["kind"] == "testing_triage_diagnosis"
assert diagnosis["status"] == "needs_attention"
assert diagnosis["diagnosis"]["confidence"] == "medium"
assert diagnosis["diagnosis"]["next_actions"] == ["Inspect the failed Jenkins build log."]
def test_diagnose_testing_triage_handles_disabled_and_bad_json(monkeypatch) -> None:
monkeypatch.setattr(testing_triage, "settings", SettingsStub(testing_triage_model_url=""))
disabled = testing_triage.diagnose_testing_triage({"summary": {"status": "ok", "problem_count": 0}})
assert disabled["status"] == "unavailable"
assert disabled["diagnosis"]["root_cause"] == "model_url_not_configured"
class BadResponse:
def raise_for_status(self) -> None:
return None
def json(self): # type: ignore[no-untyped-def]
return {"response": "not json"}
class BadClient:
def __init__(self, *args, **kwargs) -> None: # type: ignore[no-untyped-def]
return None
def __enter__(self):
return self
def __exit__(self, *args) -> None: # type: ignore[no-untyped-def]
return None
def post(self, url, json=None): # type: ignore[no-untyped-def]
return BadResponse()
monkeypatch.setattr(testing_triage, "settings", SettingsStub(testing_triage_model_url="http://ollama"))
monkeypatch.setattr(testing_triage.httpx, "Client", BadClient)
diagnosis = testing_triage.diagnose_testing_triage({"summary": {"status": "ok", "problem_count": 0}})
assert diagnosis["status"] == "ok"
assert "model_json_parse_failed" in diagnosis["unknowns"][0]
def test_latest_cluster_snapshot_falls_back_to_live_collect(monkeypatch) -> None:
class BrokenStorage:
def latest_cluster_state(self): # type: ignore[no-untyped-def]

View File

@ -0,0 +1,152 @@
from __future__ import annotations
import json
from ariadne.services import testing_triage
from ariadne.services import testing_triage_diagnosis
class DummyStorage:
def __init__(self) -> None:
self.events: list[tuple[str, dict]] = []
def record_event(self, event_type: str, detail: dict) -> None:
self.events.append((event_type, detail))
def list_events(self, limit: int = 1, event_type: str | None = None): # type: ignore[no-untyped-def]
matching = [
{"detail": detail}
for stored_type, detail in self.events
if event_type is None or stored_type == event_type
]
return matching[-limit:][::-1]
class SettingsStub:
def __init__(self, **overrides) -> None: # type: ignore[no-untyped-def]
self.testing_triage_model_url = ""
self.testing_triage_model = "qwen2.5:7b-instruct-q4_0"
self.testing_triage_model_timeout_sec = 1.0
for key, value in overrides.items():
setattr(self, key, value)
def test_run_testing_triage_stores_diagnosis_when_model_enabled(monkeypatch) -> None:
storage = DummyStorage()
bundle = {"summary": {"status": "needs_attention", "problem_count": 1, "failed_suites": ["ariadne"]}}
diagnosis = {"kind": "testing_triage_diagnosis", "status": "needs_attention"}
monkeypatch.setattr(testing_triage, "model_diagnosis_enabled", lambda: True)
monkeypatch.setattr(testing_triage, "collect_testing_triage", lambda _storage: bundle)
monkeypatch.setattr(testing_triage, "diagnose_testing_triage", lambda _bundle: diagnosis)
summary = testing_triage.run_testing_triage(storage) # type: ignore[arg-type]
latest = testing_triage.latest_testing_triage_diagnosis(storage) # type: ignore[arg-type]
assert summary.status == "needs_attention"
assert [event[0] for event in storage.events] == [
testing_triage.TRIAGE_EVENT_TYPE,
testing_triage.TRIAGE_DIAGNOSIS_EVENT_TYPE,
]
assert latest == diagnosis
def test_diagnose_testing_triage_calls_local_ollama(monkeypatch) -> None:
captured = {}
model_response = json.dumps(
{
"headline": "Ariadne has one failing suite.",
"root_cause": "Jenkins reported a failed ariadne run.",
"blast_radius": "ariadne",
"confidence": "medium",
"needs_human": True,
"next_actions": ["Inspect the failed Jenkins build log."],
"evidence_refs": ["summary.failed_suites=ariadne"],
}
)
class FakeResponse:
def raise_for_status(self) -> None:
return None
def json(self): # type: ignore[no-untyped-def]
return {"response": model_response}
class FakeClient:
def __init__(self, *, timeout) -> None: # type: ignore[no-untyped-def]
captured["timeout"] = timeout
def __enter__(self):
return self
def __exit__(self, *args) -> None: # type: ignore[no-untyped-def]
return None
def post(self, url, json=None): # type: ignore[no-untyped-def]
captured["url"] = url
captured["request"] = json
return FakeResponse()
monkeypatch.setattr(
testing_triage_diagnosis,
"settings",
SettingsStub(
testing_triage_model_url="http://ollama/",
testing_triage_model="tiny-model",
testing_triage_model_timeout_sec=3.0,
),
)
monkeypatch.setattr(testing_triage_diagnosis.httpx, "Client", FakeClient)
diagnosis = testing_triage_diagnosis.diagnose_testing_triage(
{
"kind": "testing_triage_bundle",
"generated_at": "now",
"summary": {"status": "needs_attention", "problem_count": 1, "failed_suites": ["ariadne"]},
"evidence": {"jenkins": {"failed_builds": [{"job": "ariadne"}]}},
"unknowns": [],
}
)
assert captured["url"] == "http://ollama/api/generate"
assert captured["timeout"] == 3.0
assert captured["request"]["model"] == "tiny-model"
assert captured["request"]["format"] == "json"
assert diagnosis["kind"] == "testing_triage_diagnosis"
assert diagnosis["status"] == "needs_attention"
assert diagnosis["diagnosis"]["confidence"] == "medium"
assert diagnosis["diagnosis"]["next_actions"] == ["Inspect the failed Jenkins build log."]
def test_diagnose_testing_triage_handles_disabled_and_bad_json(monkeypatch) -> None:
monkeypatch.setattr(testing_triage_diagnosis, "settings", SettingsStub(testing_triage_model_url=""))
disabled = testing_triage_diagnosis.diagnose_testing_triage({"summary": {"status": "ok", "problem_count": 0}})
assert disabled["status"] == "unavailable"
assert disabled["diagnosis"]["root_cause"] == "model_url_not_configured"
class BadResponse:
def raise_for_status(self) -> None:
return None
def json(self): # type: ignore[no-untyped-def]
return {"response": "not json"}
class BadClient:
def __init__(self, *args, **kwargs) -> None: # type: ignore[no-untyped-def]
return None
def __enter__(self):
return self
def __exit__(self, *args) -> None: # type: ignore[no-untyped-def]
return None
def post(self, url, json=None): # type: ignore[no-untyped-def]
return BadResponse()
monkeypatch.setattr(testing_triage_diagnosis, "settings", SettingsStub(testing_triage_model_url="http://ollama"))
monkeypatch.setattr(testing_triage_diagnosis.httpx, "Client", BadClient)
diagnosis = testing_triage_diagnosis.diagnose_testing_triage({"summary": {"status": "ok", "problem_count": 0}})
assert diagnosis["status"] == "ok"
assert "model_json_parse_failed" in diagnosis["unknowns"][0]