test(ariadne): cover cluster state fetcher failures

This commit is contained in:
codex 2026-04-21 02:55:55 -03:00
parent a17654819c
commit 6b6b9677be
2 changed files with 33 additions and 3 deletions

View File

@ -10,6 +10,9 @@ from .cluster_state_pods import *
from .cluster_state_workloads import *
PodFetchResult = tuple[list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]], dict[str, Any]]
def _get_json(path: str) -> dict[str, Any]:
facade = sys.modules.get("ariadne.services.cluster_state")
getter = getattr(facade, "get_json", _default_get_json) if facade is not None else _default_get_json
@ -40,9 +43,7 @@ def _fetch_flux(errors: list[str]) -> dict[str, Any]:
return {}
def _fetch_pods(
errors: list[str],
) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]], dict[str, Any]]:
def _fetch_pods(errors: list[str]) -> PodFetchResult:
workloads: list[dict[str, Any]] = []
namespace_pods: list[dict[str, Any]] = []
namespace_nodes: list[dict[str, Any]] = []

View File

@ -0,0 +1,29 @@
from __future__ import annotations
from ariadne.services import cluster_state_fetchers as fetchers
def test_cluster_state_fetchers_record_individual_api_errors(monkeypatch) -> None:
def fail_json(path: str):
raise RuntimeError(f"boom {path}")
monkeypatch.setattr(fetchers, "_get_json", fail_json)
errors: list[str] = []
assert fetchers._fetch_nodes(errors) == ({}, [], {})
assert fetchers._fetch_flux(errors) == {}
assert fetchers._fetch_pods(errors) == ([], [], [], [], {})
assert fetchers._fetch_jobs(errors) == {}
assert fetchers._fetch_longhorn(errors) == {}
assert fetchers._fetch_workload_health(errors) == {}
assert fetchers._fetch_events(errors) == {}
assert [entry.split(":", 1)[0] for entry in errors] == [
"nodes",
"flux",
"pods",
"jobs",
"longhorn",
"workloads_health",
"events",
]