from __future__ import annotations from datetime import datetime, timezone import types from ariadne.services import jenkins_workspace_cleanup as cleanup_module def test_cleanup_jenkins_workspace_storage(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( jenkins_workspace_namespace="jenkins", jenkins_workspace_pvc_prefix="pvc-workspace-", jenkins_workspace_cleanup_min_age_hours=1.0, ) monkeypatch.setattr(cleanup_module, "settings", dummy_settings) now_iso = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") old_iso = "2020-01-01T00:00:00Z" deleted_paths: list[str] = [] def fake_get_json(path: str): if path == "/api/v1/namespaces/jenkins/pods": return { "items": [ { "spec": { "volumes": [ {"persistentVolumeClaim": {"claimName": "pvc-workspace-active"}}, ] } } ] } if path == "/api/v1/namespaces/jenkins/persistentvolumeclaims": return { "items": [ { "metadata": {"name": "pvc-workspace-stale", "creationTimestamp": old_iso}, "status": {"phase": "Lost"}, }, { "metadata": {"name": "pvc-workspace-active", "creationTimestamp": old_iso}, "status": {"phase": "Bound"}, }, { "metadata": {"name": "pvc-workspace-fresh", "creationTimestamp": now_iso}, "status": {"phase": "Lost"}, }, ] } if path == "/api/v1/persistentvolumes": return { "items": [ { "metadata": {"name": "pvc-old", "creationTimestamp": old_iso}, "status": {"phase": "Released"}, "spec": {"claimRef": {"namespace": "jenkins", "name": "pvc-workspace-stale"}}, }, { "metadata": {"name": "pvc-active", "creationTimestamp": old_iso}, "status": {"phase": "Released"}, "spec": {"claimRef": {"namespace": "jenkins", "name": "pvc-workspace-active"}}, }, { "metadata": {"name": "pvc-fresh", "creationTimestamp": now_iso}, "status": {"phase": "Released"}, "spec": {"claimRef": {"namespace": "jenkins", "name": "pvc-workspace-fresh"}}, }, ] } if path == "/apis/longhorn.io/v1beta2/namespaces/longhorn-system/volumes": return { "items": [ {"metadata": {"name": "pvc-old", "creationTimestamp": old_iso}}, { "metadata": { "name": "pvc-orphan", "creationTimestamp": old_iso, "labels": { "kubernetes.io/created-for/pvc/name": "pvc-workspace-orphan", }, } }, { "metadata": { "name": "pvc-orphan-fresh", "creationTimestamp": now_iso, "labels": { "kubernetes.io/created-for/pvc/name": "pvc-workspace-fresh", }, } }, ] } raise AssertionError(f"unexpected path: {path}") def fake_delete_json(path: str): deleted_paths.append(path) return {"status": "Success"} monkeypatch.setattr(cleanup_module, "get_json", fake_get_json) monkeypatch.setattr(cleanup_module, "delete_json", fake_delete_json) summary = cleanup_module.cleanup_jenkins_workspace_storage() assert summary.pvcs_deleted == 1 assert summary.pvs_deleted == 1 assert summary.volumes_deleted == 2 assert summary.failures == 0 assert "/api/v1/namespaces/jenkins/persistentvolumeclaims/pvc-workspace-stale" in deleted_paths assert "/api/v1/persistentvolumes/pvc-old" in deleted_paths assert "/apis/longhorn.io/v1beta2/namespaces/longhorn-system/volumes/pvc-old" in deleted_paths assert "/apis/longhorn.io/v1beta2/namespaces/longhorn-system/volumes/pvc-orphan" in deleted_paths def test_cleanup_jenkins_workspace_storage_failure(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( jenkins_workspace_namespace="jenkins", jenkins_workspace_pvc_prefix="pvc-workspace-", jenkins_workspace_cleanup_min_age_hours=1.0, ) monkeypatch.setattr(cleanup_module, "settings", dummy_settings) def fake_get_json(path: str): if path == "/api/v1/namespaces/jenkins/pods": return {"items": []} if path == "/api/v1/namespaces/jenkins/persistentvolumeclaims": return { "items": [ { "metadata": {"name": "pvc-workspace-stale", "creationTimestamp": "2020-01-01T00:00:00Z"}, "status": {"phase": "Lost"}, } ] } if path == "/api/v1/persistentvolumes": return {"items": []} if path == "/apis/longhorn.io/v1beta2/namespaces/longhorn-system/volumes": return {"items": []} raise AssertionError(f"unexpected path: {path}") def fake_delete_json(_path: str): raise RuntimeError("boom") monkeypatch.setattr(cleanup_module, "get_json", fake_get_json) monkeypatch.setattr(cleanup_module, "delete_json", fake_delete_json) summary = cleanup_module.cleanup_jenkins_workspace_storage() assert summary.failures == 1 assert summary.pvcs_deleted == 0