from __future__ import annotations from ariadne.services import testing_triage class DummyStorage: def __init__(self) -> None: self.events: list[tuple[str, dict]] = [] def latest_cluster_state(self): # type: ignore[no-untyped-def] return { "collected_at": "2026-05-20T00:00:00+00:00", "summary": { "health_bullets": ["Pods pending: 1"], "attention_ranked": [{"kind": "pod_pending"}], }, "nodes_summary": {"total": 3, "ready": 3, "not_ready": 0}, "flux": {"items": [{"namespace": "flux-system", "name": "monitoring"}]}, "pod_issues": { "items": [{"namespace": "jenkins", "pod": "agent-1", "phase": "Pending"}], "pending_oldest": [{"namespace": "jenkins", "pod": "agent-1"}], }, "jobs": {"failing": [], "active_oldest": []}, "events": {"warnings_recent": []}, } def record_event(self, event_type: str, detail: dict) -> None: self.events.append((event_type, detail)) def list_events(self, limit: int = 1, event_type: str | None = None): # type: ignore[no-untyped-def] matching = [ {"detail": detail} for stored_type, detail in self.events if event_type is None or stored_type == event_type ] return matching[-limit:][::-1] def test_collect_testing_triage_builds_bundle(monkeypatch) -> None: storage = DummyStorage() monkeypatch.setattr( testing_triage, "_vm_items", lambda query, errors: [{"labels": {"suite": "ariadne"}, "value": 1.0}] if "platform_quality_gate_runs_total" in query else [], ) monkeypatch.setattr(testing_triage, "_jenkins_signals", lambda errors: {"failed_builds": []}) bundle = testing_triage.collect_testing_triage(storage) assert bundle["kind"] == "testing_triage_bundle" assert bundle["summary"]["status"] == "needs_attention" assert bundle["summary"]["failed_suites"] == ["ariadne"] assert "Testing Triage Evidence" in bundle["markdown"] assert bundle["openclaw"]["ariadne_latest_url"].endswith("/api/internal/testing/triage/latest") def test_run_testing_triage_stores_latest(monkeypatch) -> None: storage = DummyStorage() monkeypatch.setattr( testing_triage, "collect_testing_triage", lambda _storage: { "summary": {"status": "ok", "problem_count": 0, "failed_suites": []}, }, ) summary = testing_triage.run_testing_triage(storage) latest = testing_triage.latest_testing_triage_bundle(storage) assert summary.status == "ok" assert storage.events[0][0] == testing_triage.TRIAGE_EVENT_TYPE assert latest["summary"]["status"] == "ok"