ariadne/tests/unit/services/test_jenkins_workspace_candidates_edges.py

171 lines
8.1 KiB
Python

from __future__ import annotations
from datetime import datetime, timezone
import types
from ariadne.services import jenkins_workspace_candidates as candidates
OLD_TS = "2020-01-01T00:00:00Z"
def _settings(**overrides):
values = {
"jenkins_workspace_namespace": "jenkins",
"jenkins_workspace_pvc_prefix": "pvc-workspace-",
"jenkins_workspace_cleanup_min_age_hours": 1.0,
}
values.update(overrides)
return types.SimpleNamespace(**values)
def _payload(items):
return {"items": items}
def test_timestamp_and_age_helpers_handle_invalid_metadata() -> None:
settings = _settings()
assert candidates._parse_timestamp("not-a-date") is None
assert candidates._created_at({"creationTimestamp": ""}) is None
assert candidates._created_at({"creationTimestamp": 123}) is None
assert candidates._is_old_enough(settings, {}) is False
assert candidates._is_deleting({"deletionTimestamp": " "}) is False
assert candidates._is_deleting({"deletionTimestamp": OLD_TS}) is True
def test_active_workspace_claims_ignores_malformed_pods() -> None:
settings = _settings()
def get_json(_path: str):
return _payload(
[
None,
{
"metadata": {"annotations": {"jenkins.io/workspace-pvc": "pvc-workspace-annotated"}},
"spec": {
"volumes": [
None,
{"emptyDir": {}},
{"persistentVolumeClaim": None},
{"persistentVolumeClaim": {"claimName": "not-workspace"}},
{"persistentVolumeClaim": {"claimName": "pvc-workspace-mounted"}},
]
},
},
]
)
assert candidates._active_workspace_claims(settings, get_json) == {
"pvc-workspace-annotated",
"pvc-workspace-mounted",
}
def test_workspace_pv_candidates_filter_every_skip_reason() -> None:
settings = _settings()
fresh_ts = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
def get_json(_path: str):
return _payload(
[
None,
{"metadata": {"name": "pv-wrong-ns", "creationTimestamp": OLD_TS}, "status": {"phase": "Released"}, "spec": {"claimRef": {"namespace": "other", "name": "pvc-workspace-wrong"}}},
{"metadata": {"name": "pv-not-workspace", "creationTimestamp": OLD_TS}, "status": {"phase": "Released"}, "spec": {"claimRef": {"namespace": "jenkins", "name": "data"}}},
{"metadata": {"name": "pv-active", "creationTimestamp": OLD_TS}, "status": {"phase": "Released"}, "spec": {"claimRef": {"namespace": "jenkins", "name": "pvc-workspace-active"}}},
{"metadata": {"name": "pv-deleting", "creationTimestamp": OLD_TS, "deletionTimestamp": OLD_TS}, "status": {"phase": "Released"}, "spec": {"claimRef": {"namespace": "jenkins", "name": "pvc-workspace-deleting"}}},
{"metadata": {"name": "pv-bound", "creationTimestamp": OLD_TS}, "status": {"phase": "Bound"}, "spec": {"claimRef": {"namespace": "jenkins", "name": "pvc-workspace-bound"}}},
{"metadata": {"name": "pv-fresh", "creationTimestamp": fresh_ts}, "status": {"phase": "Released"}, "spec": {"claimRef": {"namespace": "jenkins", "name": "pvc-workspace-fresh"}}},
{"metadata": {"creationTimestamp": OLD_TS}, "status": {"phase": "Released"}, "spec": {"claimRef": {"namespace": "jenkins", "name": "pvc-workspace-no-name"}}},
{"metadata": {"name": "pv-stale", "creationTimestamp": OLD_TS}, "status": {"phase": "Failed"}, "spec": {"claimRef": {"namespace": "jenkins", "name": "pvc-workspace-stale"}}},
]
)
found, all_names = candidates._workspace_pv_candidates(settings, get_json, {"pvc-workspace-active"})
assert {item.name for item in found} == {"pv-stale"}
assert {"pv-wrong-ns", "pv-not-workspace", "pv-active", "pv-stale"} <= all_names
def test_workspace_pvc_candidates_filter_edges() -> None:
fresh_ts = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
def normal_payload(_path: str):
return _payload(
[
None,
{"metadata": {"name": "data", "creationTimestamp": OLD_TS}, "status": {"phase": "Lost"}},
{"metadata": {"name": "pvc-workspace-deleting", "creationTimestamp": OLD_TS, "deletionTimestamp": OLD_TS}, "status": {"phase": "Lost"}},
{"metadata": {"name": "pvc-workspace-active", "creationTimestamp": OLD_TS}, "status": {"phase": "Lost"}},
{"metadata": {"name": "pvc-workspace-bound", "creationTimestamp": OLD_TS}, "status": {"phase": "Bound"}},
{"metadata": {"name": "pvc-workspace-fresh", "creationTimestamp": fresh_ts}, "status": {"phase": "Lost"}},
{"metadata": {"name": "pvc-workspace-stale", "creationTimestamp": OLD_TS}, "status": {"phase": "Lost"}},
]
)
found = candidates._workspace_pvc_candidates(_settings(), normal_payload, {"pvc-workspace-active"})
assert [item.name for item in found] == ["pvc-workspace-stale"]
def empty_name_payload(_path: str):
return _payload([{"metadata": {"name": "", "creationTimestamp": OLD_TS}, "status": {"phase": "Lost"}}])
assert candidates._workspace_pvc_candidates(_settings(jenkins_workspace_pvc_prefix=""), empty_name_payload, []) == []
def test_longhorn_binding_and_deletion_policy_edges() -> None:
settings = _settings()
binding = candidates._workspace_binding_from_longhorn(
{"labels": {"kubernetes.io/created-for/pvc/name": "pvc-workspace-label"}},
{"kubernetesStatus": {"namespace": "jenkins", "pvName": "pv-label"}},
)
assert binding.pvc_name == "pvc-workspace-label"
assert binding.pvc_namespace == "jenkins"
assert candidates._should_delete_longhorn_volume(settings, "vol", binding, set(), {"vol"}) is True
assert candidates._should_delete_longhorn_volume(settings, "vol", candidates._LonghornBinding("data", "jenkins", ""), set(), set()) is False
assert candidates._should_delete_longhorn_volume(settings, "vol", candidates._LonghornBinding("pvc-workspace-a", "jenkins", "pv-a"), {"pv-a"}, set()) is False
assert candidates._should_delete_longhorn_volume(settings, "vol", candidates._LonghornBinding("pvc-workspace-a", "", ""), set(), set()) is True
def test_workspace_longhorn_candidates_filter_edges() -> None:
settings = _settings()
fresh_ts = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
def volume(name: str, **overrides):
item = {
"metadata": {
"name": name,
"creationTimestamp": OLD_TS,
"labels": {
"kubernetes.io/created-for/pvc/name": "pvc-workspace-orphan",
"kubernetes.io/created-for/pvc/namespace": "jenkins",
},
},
"status": {"state": "detached", "isAttached": False, "robustness": "unknown"},
"spec": {"frontend": "blockdev"},
}
for key, value in overrides.items():
item[key].update(value)
return item
def get_json(_path: str):
return _payload(
[
None,
{"metadata": {"creationTimestamp": OLD_TS}},
volume("vol-active-pv", status={"kubernetesStatus": {"pvName": "pv-present"}}),
volume("vol-deleting", metadata={"deletionTimestamp": OLD_TS}),
volume("vol-fresh", metadata={"creationTimestamp": fresh_ts}),
volume("vol-attached-state", status={"state": "attached"}),
volume("vol-attached-flag", status={"isAttached": True}),
volume("vol-healthy", status={"robustness": "healthy"}),
volume("vol-frontend", spec={"frontend": "iscsi"}),
volume("vol-stale"),
]
)
found = candidates._workspace_longhorn_candidates(settings, get_json, {"pv-present"}, set())
assert [item.name for item in found] == ["vol-stale"]
assert found[0].kind == "longhorn_volume"