test(ariadne): cover cluster relationship edges

This commit is contained in:
codex 2026-04-21 03:25:30 -03:00
parent 03f9118f21
commit bca3d87743
2 changed files with 66 additions and 32 deletions

View File

@ -38,12 +38,7 @@ def _pvc_top(entries: list[dict[str, Any]]) -> list[dict[str, Any]]:
return output return output
def _namespace_context( def _namespace_context(namespace_pods: list[dict[str, Any]], namespace_nodes: list[dict[str, Any]], namespace_capacity: list[dict[str, Any]], namespace_baseline: dict[str, dict[str, dict[str, float]]]) -> list[dict[str, Any]]:
namespace_pods: list[dict[str, Any]],
namespace_nodes: list[dict[str, Any]],
namespace_capacity: list[dict[str, Any]],
namespace_baseline: dict[str, dict[str, dict[str, float]]],
) -> list[dict[str, Any]]:
node_map = {entry.get("namespace"): entry for entry in namespace_nodes if isinstance(entry, dict)} node_map = {entry.get("namespace"): entry for entry in namespace_nodes if isinstance(entry, dict)}
cap_map = {entry.get("namespace"): entry for entry in namespace_capacity if isinstance(entry, dict)} cap_map = {entry.get("namespace"): entry for entry in namespace_capacity if isinstance(entry, dict)}
output: list[dict[str, Any]] = [] output: list[dict[str, Any]] = []
@ -152,11 +147,7 @@ def _node_workload_map(workloads: list[dict[str, Any]]) -> dict[str, dict[str, i
return mapping return mapping
def _node_workloads_top( def _node_workloads_top(workload_map: dict[str, dict[str, int]], limit_nodes: int = _NODE_WORKLOAD_LIMIT, limit_workloads: int = _NODE_WORKLOAD_TOP) -> list[dict[str, Any]]:
workload_map: dict[str, dict[str, int]],
limit_nodes: int = _NODE_WORKLOAD_LIMIT,
limit_workloads: int = _NODE_WORKLOAD_TOP,
) -> list[dict[str, Any]]:
output: list[dict[str, Any]] = [] output: list[dict[str, Any]] = []
for node, workloads in workload_map.items(): for node, workloads in workload_map.items():
if not isinstance(node, str) or not node or not isinstance(workloads, dict): if not isinstance(node, str) or not node or not isinstance(workloads, dict):
@ -240,11 +231,7 @@ def _build_lexicon() -> dict[str, Any]:
return {"terms": terms, "aliases": aliases} return {"terms": terms, "aliases": aliases}
def _top_named_entries( def _top_named_entries(entries: list[dict[str, Any]], name_key: str, limit: int) -> list[dict[str, Any]]:
entries: list[dict[str, Any]],
name_key: str,
limit: int,
) -> list[dict[str, Any]]:
output: list[dict[str, Any]] = [] output: list[dict[str, Any]] = []
for entry in entries or []: for entry in entries or []:
if not isinstance(entry, dict): if not isinstance(entry, dict):
@ -295,10 +282,7 @@ def _cross_node_metric_top(metrics: dict[str, Any], node_context: list[dict[str,
return output return output
def _cross_namespace_metric_top( def _cross_namespace_metric_top(metrics: dict[str, Any], namespace_context: list[dict[str, Any]]) -> list[dict[str, Any]]:
metrics: dict[str, Any],
namespace_context: list[dict[str, Any]],
) -> list[dict[str, Any]]:
top = metrics.get("namespace_top") if isinstance(metrics.get("namespace_top"), dict) else {} top = metrics.get("namespace_top") if isinstance(metrics.get("namespace_top"), dict) else {}
namespace_map = { namespace_map = {
entry.get("namespace"): entry entry.get("namespace"): entry
@ -331,12 +315,7 @@ def _cross_namespace_metric_top(
return output return output
def _build_cross_stats( def _build_cross_stats(metrics: dict[str, Any], node_context: list[dict[str, Any]], namespace_context: list[dict[str, Any]], workloads: list[dict[str, Any]]) -> dict[str, Any]:
metrics: dict[str, Any],
node_context: list[dict[str, Any]],
namespace_context: list[dict[str, Any]],
workloads: list[dict[str, Any]],
) -> dict[str, Any]:
return { return {
"node_metric_top": _cross_node_metric_top(metrics, node_context), "node_metric_top": _cross_node_metric_top(metrics, node_context),
"namespace_metric_top": _cross_namespace_metric_top(metrics, namespace_context), "namespace_metric_top": _cross_namespace_metric_top(metrics, namespace_context),
@ -345,12 +324,7 @@ def _build_cross_stats(
} }
def _node_context( def _node_context(node_details: list[dict[str, Any]], node_load: list[dict[str, Any]], node_baseline: dict[str, dict[str, dict[str, float]]], node_workloads: dict[str, dict[str, int]]) -> list[dict[str, Any]]:
node_details: list[dict[str, Any]],
node_load: list[dict[str, Any]],
node_baseline: dict[str, dict[str, dict[str, float]]],
node_workloads: dict[str, dict[str, int]],
) -> list[dict[str, Any]]:
load_map = {entry.get("node"): entry for entry in node_load if isinstance(entry, dict)} load_map = {entry.get("node"): entry for entry in node_load if isinstance(entry, dict)}
output: list[dict[str, Any]] = [] output: list[dict[str, Any]] = []
for entry in node_details: for entry in node_details:

View File

@ -62,6 +62,66 @@ def test_relationship_context_and_cross_stats() -> None:
assert relationships._reason_top({"OOMKilled": 2})[0]["reason"] == "OOMKilled" assert relationships._reason_top({"OOMKilled": 2})[0]["reason"] == "OOMKilled"
def test_relationship_edge_filters_and_baseline_helpers(monkeypatch) -> None:
assert relationships._vector_to_named([None, {"metric": {}, "value": 1}], "node", "node") == []
assert relationships._vector_to_named(
[{"metric": {"node": "titan-1"}, "value": 2}, {"metric": {"node": "titan-2"}, "value": 1}],
"node",
"node",
)[0]["node"] == "titan-1"
assert relationships._pvc_top([{"metric": {}}, {"metric": {"namespace": "apps", "persistentvolumeclaim": "data"}, "value": 90}]) == [
{"namespace": "apps", "pvc": "data", "used_percent": 90}
]
namespace_context = relationships._namespace_context(
[None, {"namespace": ""}, {"namespace": "apps", "pods_total": 1}],
[{"namespace": "apps", "nodes": "bad"}],
[{"namespace": "apps", "cpu_usage": "bad", "mem_usage": 2.0}],
"bad",
)
assert namespace_context[0]["namespace"] == "apps"
assert relationships._namespace_nodes_top([None, namespace_context[0]], 2)[0]["namespace"] == "apps"
workloads = [
None,
{"namespace": "apps", "nodes": {"titan-1": 1}},
{"namespace": "apps", "workload": "api", "nodes": "bad"},
{"namespace": "apps", "workload": "api", "nodes": {"": 1, "titan-1": "2", "titan-2": "bad", "titan-3": 0}},
{"workload": "solo", "nodes": {"titan-2": 1}},
]
node_workloads = relationships._node_workload_map(workloads)
assert node_workloads == {"titan-1": {"apps/api": 2}, "titan-2": {"solo": 1}}
assert relationships._node_workloads_top({"": {}, "titan-1": "bad", "titan-2": {"solo": 1}}, limit_nodes=2)[0]["node"] == "titan-2"
assert relationships._workload_index([{"workload": "api", "pods_total": 1, "nodes": "bad"}])[0]["nodes_top"] == []
assert relationships._events_summary("bad") == {}
events = {"warnings_total": 2, "warnings_by_namespace": {"apps": 2, "db": 1}, "warnings_recent": [1, 2]}
assert relationships._events_summary(events)["top_namespace"] == {"namespace": "apps", "count": 2}
assert relationships._top_named_entries([None, {"node": ""}, {"node": "n1", "value": "bad"}, {"node": "n2", "value": "2"}], "node", 3) == [
{"name": "n2", "value": 2.0},
{"name": "n1", "value": 0.0},
]
monkeypatch.setattr(relationships, "_top_named_entries", lambda *_args: [{"name": ""}])
assert relationships._cross_node_metric_top({"node_usage": {"cpu": [{}]}}, []) == []
assert relationships._cross_namespace_metric_top({"namespace_top": {"cpu": [{}]}}, []) == []
node_context = relationships._node_context(
[None, {"name": ""}, {"name": "titan-1", "pressure": ["DiskPressure"]}],
[{"node": "titan-1", "cpu": "bad", "load_index": 1.0}],
"bad",
"bad",
)
assert node_context[0]["node"] == "titan-1"
assert relationships._baseline_delta("bad", {"avg": 1}) is None
assert relationships._baseline_delta(1, {"avg": 0}) is None
assert relationships._delta_severity(50) == "warning"
assert relationships._delta_severity(5) == "info"
assert relationships._delta_entry_label({"namespace": "apps"}) == ("namespace", "apps")
assert relationships._delta_top([None, {"namespace": "apps", "baseline_delta": {"cpu": 5}}], "cpu")[0]["namespace"] == "apps"
assert relationships._reason_top({"": 1, "OOMKilled": "bad", "BackOff": 2}) == [{"reason": "BackOff", "count": 2}]
def test_health_anomaly_signal_profile_and_attention_domains() -> None: def test_health_anomaly_signal_profile_and_attention_domains() -> None:
metrics = { metrics = {
"nodes_total": 2, "nodes_total": 2,