triage: sanitize local model diagnoses

This commit is contained in:
codex 2026-05-20 04:57:58 -03:00
parent d8fea14e51
commit ac91e16f16
2 changed files with 162 additions and 7 deletions

View File

@ -19,8 +19,9 @@ _DIAGNOSIS_SYSTEM_PROMPT = (
"or summarize the evidence as nested input data. Confidence must be low, medium, or "
"high. Next actions must be read-only verification or Flux/IaC changes; never suggest "
"mutating kubectl commands or reading Kubernetes Secret values. If evidence is "
"insufficient, say that plainly in root_cause. Use normal English words with spaces; "
"do not concatenate words."
"insufficient, say that plainly in root_cause. All response values must be written "
"in English using ASCII text only. Never answer in Chinese or any other non-English "
"language. Use normal English words with spaces; do not concatenate words."
)
_DIAGNOSIS_RESPONSE_SCHEMA: dict[str, Any] = {
"type": "object",
@ -132,6 +133,8 @@ def _diagnosis_prompt(bundle: dict[str, Any]) -> str:
"Return only headline, root_cause, blast_radius, confidence, needs_human, next_actions, evidence_refs.",
"Do not include keys named pipelines, quality, bundle, evidence, summary, or unknowns.",
"Prefer concrete Jenkins job names, Flux Kustomizations, pod names, nodes, and metrics when present.",
"Only discuss Jenkins suite jobs listed in bundle.summary.failed_suites; ignore stale unrelated jobs.",
"Write every string in English ASCII only.",
],
"bundle": _model_evidence_payload(bundle),
}
@ -173,14 +176,35 @@ def _diagnosis_from_model(
unknowns = list(bundle.get("unknowns") or []) if isinstance(bundle.get("unknowns"), list) else []
if parse_error:
unknowns.append(parse_error)
blocked_jobs = _out_of_scope_jobs(parsed, summary)
diagnosis = {
"headline": _text_value(parsed.get("headline"), "Testing triage needs review."),
"root_cause": _text_value(parsed.get("root_cause"), "Evidence is insufficient for a confident root cause."),
"blast_radius": _text_value(parsed.get("blast_radius"), _blast_radius_fallback(summary)),
"headline": _safe_text_value(
parsed.get("headline"),
"Testing triage needs review.",
unknowns,
"headline",
blocked_jobs,
),
"root_cause": _safe_text_value(
parsed.get("root_cause"),
"Evidence is insufficient for a confident root cause.",
unknowns,
"root_cause",
blocked_jobs,
),
"blast_radius": _safe_text_value(
parsed.get("blast_radius"),
_blast_radius_fallback(summary),
unknowns,
"blast_radius",
blocked_jobs,
),
"confidence": _confidence(parsed.get("confidence")),
"needs_human": _bool_value(parsed.get("needs_human"), bool(summary.get("problem_count"))),
"next_actions": _text_list(parsed.get("next_actions")) or _default_next_actions(summary),
"evidence_refs": _text_list(parsed.get("evidence_refs")) or _default_evidence_refs(summary),
"next_actions": _safe_text_list(parsed.get("next_actions"), unknowns, "next_actions", blocked_jobs)
or _default_next_actions(summary),
"evidence_refs": _safe_evidence_refs(parsed.get("evidence_refs"), summary, unknowns)
or _default_evidence_refs(summary),
}
return {
"kind": "testing_triage_diagnosis",
@ -239,6 +263,23 @@ def _text_value(value: Any, default: str) -> str:
return default
def _safe_text_value(
value: Any,
default: str,
unknowns: list[Any],
field: str,
blocked_jobs: set[str],
) -> str:
text = _text_value(value, default)
if not _english_ascii(text):
unknowns.append(f"model_{field}_non_english")
return default
if _mentions_blocked_job(text, blocked_jobs):
unknowns.append(f"model_{field}_out_of_scope")
return default
return text
def _text_list(value: Any) -> list[str]:
if isinstance(value, str) and value.strip():
return [value.strip()]
@ -247,6 +288,78 @@ def _text_list(value: Any) -> list[str]:
return [str(item).strip() for item in value if str(item).strip()][:8]
def _safe_text_list(value: Any, unknowns: list[Any], field: str, blocked_jobs: set[str]) -> list[str]:
values = []
for item in _text_list(value):
if not _english_ascii(item):
unknowns.append(f"model_{field}_non_english")
continue
if _mentions_blocked_job(item, blocked_jobs):
unknowns.append(f"model_{field}_out_of_scope")
continue
values.append(item)
return values
def _safe_evidence_refs(value: Any, summary: dict[str, Any], unknowns: list[Any]) -> list[str]:
refs = []
for ref in _text_list(value):
if not _english_ascii(ref):
unknowns.append("model_evidence_refs_non_english")
continue
if not _evidence_ref_in_scope(ref, summary):
unknowns.append("model_evidence_refs_out_of_scope")
continue
refs.append(ref)
return refs
def _english_ascii(text: str) -> bool:
return all(ord(char) < 128 for char in text)
def _mentions_blocked_job(text: str, blocked_jobs: set[str]) -> bool:
lowered = text.lower()
return any(job.lower() in lowered for job in blocked_jobs)
def _out_of_scope_jobs(parsed: dict[str, Any], summary: dict[str, Any]) -> set[str]:
allowed = _allowed_suite_jobs(summary)
jobs = {_job_name_from_ref(ref) for ref in _text_list(parsed.get("evidence_refs"))}
return {job for job in jobs if job and job not in allowed}
def _evidence_ref_in_scope(ref: str, summary: dict[str, Any]) -> bool:
job = _job_name_from_ref(ref)
return not job or job in _allowed_suite_jobs(summary)
def _job_name_from_ref(ref: str) -> str:
marker = "/job/"
if marker not in ref:
return ""
return ref.split(marker, 1)[1].split("/", 1)[0].strip().lower()
def _allowed_suite_jobs(summary: dict[str, Any]) -> set[str]:
failed_suites = summary.get("failed_suites") if isinstance(summary.get("failed_suites"), list) else []
aliases = {
"bstein_home": "bstein-dev-home",
"data_prepper": "data-prepper",
"titan_iac": "titan-iac",
}
allowed = set()
for suite in failed_suites:
name = str(suite).strip().lower()
if not name:
continue
allowed.add(name)
allowed.add(name.replace("_", "-"))
if name in aliases:
allowed.add(aliases[name])
return allowed
def _confidence(value: Any) -> str:
confidence = str(value or "").strip().lower()
return confidence if confidence in {"low", "medium", "high"} else "low"

View File

@ -261,6 +261,48 @@ def test_diagnosis_from_model_coerces_fallback_values(monkeypatch) -> None:
assert diagnosis["unknowns"] == ["existing_unknown", "parse warning"]
def test_diagnosis_from_model_rejects_non_english_and_out_of_scope_jobs(monkeypatch) -> None:
monkeypatch.setattr(testing_triage_diagnosis, "settings", SettingsStub(testing_triage_model="triage-model"))
diagnosis = testing_triage_diagnosis._diagnosis_from_model( # noqa: SLF001
{
"generated_at": "bundle-time",
"summary": {
"status": "needs_attention",
"problem_count": 2,
"failed_suites": ["titan_iac"],
},
"unknowns": [],
},
{
"headline": "多项目持续集成状态更新",
"root_cause": "arcanagon is stale and titan_iac failed.",
"blast_radius": "arcanagon and titan_iac",
"confidence": "high",
"needs_human": True,
"next_actions": [
"检查 titan_iac logs.",
"Review arcanagon logs.",
"Review titan_iac logs.",
],
"evidence_refs": [
"https://ci.bstein.dev/job/arcanagon/1/consoleText",
"https://ci.bstein.dev/job/titan-iac/463/consoleText",
],
},
"raw response",
None,
)
assert diagnosis["diagnosis"]["headline"] == "Testing triage needs review."
assert diagnosis["diagnosis"]["root_cause"] == "Evidence is insufficient for a confident root cause."
assert diagnosis["diagnosis"]["blast_radius"] == "titan_iac"
assert diagnosis["diagnosis"]["next_actions"] == ["Review titan_iac logs."]
assert diagnosis["diagnosis"]["evidence_refs"] == ["https://ci.bstein.dev/job/titan-iac/463/consoleText"]
assert "model_headline_non_english" in diagnosis["unknowns"]
assert "model_root_cause_out_of_scope" in diagnosis["unknowns"]
assert "model_evidence_refs_out_of_scope" in diagnosis["unknowns"]
def test_default_evidence_refs_include_failed_suites() -> None:
refs = testing_triage_diagnosis._default_evidence_refs( # noqa: SLF001
{"status": "needs_attention", "problem_count": 3, "failed_suites": ["a", "b", "c", "d", "e", "f", "g"]}