From 7d9b649a4332d263e2580dee12fb731d2f83aa5e Mon Sep 17 00:00:00 2001 From: codex Date: Tue, 21 Apr 2026 01:31:06 -0300 Subject: [PATCH] refactor(ariadne): split jenkins workspace candidates --- .../services/jenkins_workspace_candidates.py | 283 ++++++++++++++++++ ariadne/services/jenkins_workspace_cleanup.py | 282 +---------------- ci/loc_hygiene_waivers.tsv | 1 - 3 files changed, 296 insertions(+), 270 deletions(-) create mode 100644 ariadne/services/jenkins_workspace_candidates.py diff --git a/ariadne/services/jenkins_workspace_candidates.py b/ariadne/services/jenkins_workspace_candidates.py new file mode 100644 index 0000000..a2448ab --- /dev/null +++ b/ariadne/services/jenkins_workspace_candidates.py @@ -0,0 +1,283 @@ +from __future__ import annotations + +from collections.abc import Callable +from dataclasses import dataclass +from datetime import datetime, timedelta, timezone +from typing import Any + + +@dataclass(frozen=True) +class _CleanupCandidate: + name: str + kind: str + path: str + created_at: datetime | None + related_pvc: str | None = None + pv_name: str | None = None + + +@dataclass(frozen=True) +class _LonghornBinding: + pvc_name: Any + pvc_namespace: Any + referenced_pv_name: Any + + +def _parse_timestamp(raw: str) -> datetime | None: + """Parse Kubernetes RFC3339 timestamps into timezone-aware datetimes.""" + + normalized = raw.replace("Z", "+00:00") + try: + return datetime.fromisoformat(normalized) + except ValueError: + return None + + +def _created_at(metadata: dict[str, Any]) -> datetime | None: + raw = metadata.get("creationTimestamp") + if not isinstance(raw, str) or not raw: + return None + return _parse_timestamp(raw) + + +def _is_old_enough(settings_obj: Any, metadata: dict[str, Any]) -> bool: + """Return true when an object age exceeds the configured cleanup threshold.""" + + created_at = _created_at(metadata) + if created_at is None: + return False + min_age = timedelta(hours=settings_obj.jenkins_workspace_cleanup_min_age_hours) + return datetime.now(timezone.utc) - created_at >= min_age + + +def _is_deleting(metadata: dict[str, Any]) -> bool: + deletion_ts = metadata.get("deletionTimestamp") + return isinstance(deletion_ts, str) and bool(deletion_ts.strip()) + + +def _is_workspace_name(settings_obj: Any, name: Any) -> bool: + return isinstance(name, str) and name.startswith(settings_obj.jenkins_workspace_pvc_prefix) + + +def _active_workspace_claims(settings_obj: Any, get_json_func: Callable[[str], dict[str, Any]]) -> set[str]: + """Collect currently referenced Jenkins workspace PVC names from pods.""" + + namespace = settings_obj.jenkins_workspace_namespace + payload = get_json_func(f"/api/v1/namespaces/{namespace}/pods") + items = payload.get("items") if isinstance(payload.get("items"), list) else [] + active: set[str] = set() + for pod in items: + if not isinstance(pod, dict): + continue + metadata = pod.get("metadata") if isinstance(pod.get("metadata"), dict) else {} + annotations = metadata.get("annotations") if isinstance(metadata.get("annotations"), dict) else {} + spec = pod.get("spec") if isinstance(pod.get("spec"), dict) else {} + volumes = spec.get("volumes") if isinstance(spec.get("volumes"), list) else [] + for volume in volumes: + if not isinstance(volume, dict): + continue + claim = volume.get("persistentVolumeClaim") + if not isinstance(claim, dict): + continue + claim_name = claim.get("claimName") + if _is_workspace_name(settings_obj, claim_name): + active.add(claim_name) + claim_name = annotations.get("jenkins.io/workspace-pvc") + if _is_workspace_name(settings_obj, claim_name): + active.add(claim_name) + return active + + +def _workspace_pv_candidates( + settings_obj: Any, + get_json_func: Callable[[str], dict[str, Any]], + active_claims: set[str], +) -> tuple[list[_CleanupCandidate], set[str]]: + """Find releasable Jenkins workspace PVs and keep a set of all PV names.""" + + namespace = settings_obj.jenkins_workspace_namespace + payload = get_json_func("/api/v1/persistentvolumes") + items = payload.get("items") if isinstance(payload.get("items"), list) else [] + candidates: list[_CleanupCandidate] = [] + all_pv_names: set[str] = set() + + for pv in items: + if not isinstance(pv, dict): + continue + metadata = pv.get("metadata") if isinstance(pv.get("metadata"), dict) else {} + status = pv.get("status") if isinstance(pv.get("status"), dict) else {} + spec = pv.get("spec") if isinstance(pv.get("spec"), dict) else {} + name = metadata.get("name") + if isinstance(name, str) and name: + all_pv_names.add(name) + + claim_ref = spec.get("claimRef") if isinstance(spec.get("claimRef"), dict) else {} + claim_namespace = claim_ref.get("namespace") + claim_name = claim_ref.get("name") + phase = status.get("phase") + if claim_namespace != namespace: + continue + if not _is_workspace_name(settings_obj, claim_name): + continue + if _is_deleting(metadata): + continue + if claim_name in active_claims: + continue + if phase not in {"Released", "Failed"}: + continue + if not _is_old_enough(settings_obj, metadata): + continue + if not isinstance(name, str) or not name: + continue + candidates.append( + _CleanupCandidate( + name=name, + kind="pv", + path=f"/api/v1/persistentvolumes/{name}", + created_at=_created_at(metadata), + related_pvc=claim_name if isinstance(claim_name, str) else None, + ) + ) + return candidates, all_pv_names + + +def _workspace_pvc_candidates( + settings_obj: Any, + get_json_func: Callable[[str], dict[str, Any]], + active_claims: set[str], +) -> list[_CleanupCandidate]: + """Find stale Jenkins workspace PVCs that are not actively referenced.""" + + namespace = settings_obj.jenkins_workspace_namespace + payload = get_json_func(f"/api/v1/namespaces/{namespace}/persistentvolumeclaims") + items = payload.get("items") if isinstance(payload.get("items"), list) else [] + candidates: list[_CleanupCandidate] = [] + + for pvc in items: + if not isinstance(pvc, dict): + continue + metadata = pvc.get("metadata") if isinstance(pvc.get("metadata"), dict) else {} + status = pvc.get("status") if isinstance(pvc.get("status"), dict) else {} + claim_name = metadata.get("name") + phase = status.get("phase") + if not _is_workspace_name(settings_obj, claim_name): + continue + if _is_deleting(metadata): + continue + if claim_name in active_claims: + continue + if phase == "Bound": + continue + if not _is_old_enough(settings_obj, metadata): + continue + if not isinstance(claim_name, str) or not claim_name: + continue + candidates.append( + _CleanupCandidate( + name=claim_name, + kind="pvc", + path=f"/api/v1/namespaces/{namespace}/persistentvolumeclaims/{claim_name}", + created_at=_created_at(metadata), + ) + ) + return candidates + + +def _workspace_binding_from_longhorn( + metadata: dict[str, Any], + status: dict[str, Any], +) -> _LonghornBinding: + labels = metadata.get("labels") if isinstance(metadata.get("labels"), dict) else {} + kubernetes_status = status.get("kubernetesStatus") if isinstance(status.get("kubernetesStatus"), dict) else {} + pvc_name = labels.get("kubernetes.io/created-for/pvc/name") + if not isinstance(pvc_name, str) or not pvc_name: + pvc_name = kubernetes_status.get("pvcName") + pvc_namespace = labels.get("kubernetes.io/created-for/pvc/namespace") + if not isinstance(pvc_namespace, str) or not pvc_namespace: + pvc_namespace = kubernetes_status.get("namespace") + referenced_pv_name = kubernetes_status.get("pvName") + return _LonghornBinding( + pvc_name=pvc_name, + pvc_namespace=pvc_namespace, + referenced_pv_name=referenced_pv_name, + ) + + +def _should_delete_longhorn_volume( + settings_obj: Any, + name: str, + binding: _LonghornBinding, + all_pv_names: set[str], + removed_pv_names: set[str], +) -> bool: + if name in removed_pv_names or binding.referenced_pv_name in removed_pv_names: + return True + if not _is_workspace_name(settings_obj, binding.pvc_name): + return False + if ( + isinstance(binding.referenced_pv_name, str) + and binding.referenced_pv_name in all_pv_names + ) or name in all_pv_names: + return False + return ( + binding.pvc_namespace in {None, ""} + or binding.pvc_namespace == settings_obj.jenkins_workspace_namespace + ) + + +def _workspace_longhorn_candidates( + settings_obj: Any, + get_json_func: Callable[[str], dict[str, Any]], + all_pv_names: set[str], + removed_pv_names: set[str], +) -> list[_CleanupCandidate]: + namespace = "longhorn-system" + payload = get_json_func("/apis/longhorn.io/v1beta2/namespaces/longhorn-system/volumes") + items = payload.get("items") if isinstance(payload.get("items"), list) else [] + candidates: list[_CleanupCandidate] = [] + + for volume in items: + if not isinstance(volume, dict): + continue + metadata = volume.get("metadata") if isinstance(volume.get("metadata"), dict) else {} + status = volume.get("status") if isinstance(volume.get("status"), dict) else {} + spec = volume.get("spec") if isinstance(volume.get("spec"), dict) else {} + name = metadata.get("name") + if not isinstance(name, str) or not name: + continue + + binding = _workspace_binding_from_longhorn(metadata, status) + robust_state = status.get("robustness") + state = status.get("state") + attached = status.get("isAttached") + frontend = spec.get("frontend") + if not _should_delete_longhorn_volume( + settings_obj, + name, + binding, + all_pv_names, + removed_pv_names, + ): + continue + if _is_deleting(metadata): + continue + if not _is_old_enough(settings_obj, metadata): + continue + if state not in {None, "detached", "faulted", "unknown"}: + continue + if attached is True: + continue + if robust_state not in {None, "unknown", "faulted", "degraded"}: + continue + if frontend not in {None, "", "blockdev"}: + continue + candidates.append( + _CleanupCandidate( + name=name, + kind="longhorn_volume", + path=f"/apis/longhorn.io/v1beta2/namespaces/{namespace}/volumes/{name}", + created_at=_created_at(metadata), + pv_name=name, + ) + ) + return candidates diff --git a/ariadne/services/jenkins_workspace_cleanup.py b/ariadne/services/jenkins_workspace_cleanup.py index bae7daa..c396b17 100644 --- a/ariadne/services/jenkins_workspace_cleanup.py +++ b/ariadne/services/jenkins_workspace_cleanup.py @@ -1,14 +1,20 @@ from __future__ import annotations from dataclasses import dataclass -from datetime import datetime, timedelta, timezone -from typing import Any +from datetime import datetime, timezone from prometheus_client import Counter, Gauge from ..k8s.client import delete_json, get_json from ..settings import settings from ..utils.logging import get_logger +from .jenkins_workspace_candidates import ( + _CleanupCandidate, + _active_workspace_claims, + _workspace_longhorn_candidates, + _workspace_pv_candidates, + _workspace_pvc_candidates, +) logger = get_logger(__name__) @@ -83,268 +89,6 @@ class JenkinsWorkspaceCleanupSummary: return self.pvs_deleted + self.pvcs_deleted + self.volumes_deleted -@dataclass(frozen=True) -class _CleanupCandidate: - name: str - kind: str - path: str - created_at: datetime | None - related_pvc: str | None = None - pv_name: str | None = None - - -@dataclass(frozen=True) -class _LonghornBinding: - pvc_name: Any - pvc_namespace: Any - referenced_pv_name: Any - - -def _parse_timestamp(raw: str) -> datetime | None: - """Parse Kubernetes RFC3339 timestamps into timezone-aware datetimes.""" - - normalized = raw.replace("Z", "+00:00") - try: - return datetime.fromisoformat(normalized) - except ValueError: - return None - - -def _created_at(metadata: dict[str, Any]) -> datetime | None: - raw = metadata.get("creationTimestamp") - if not isinstance(raw, str) or not raw: - return None - return _parse_timestamp(raw) - - -def _is_old_enough(metadata: dict[str, Any]) -> bool: - """Return true when an object age exceeds the configured cleanup threshold.""" - - created_at = _created_at(metadata) - if created_at is None: - return False - min_age = timedelta(hours=settings.jenkins_workspace_cleanup_min_age_hours) - return datetime.now(timezone.utc) - created_at >= min_age - - -def _is_deleting(metadata: dict[str, Any]) -> bool: - deletion_ts = metadata.get("deletionTimestamp") - return isinstance(deletion_ts, str) and bool(deletion_ts.strip()) - - -def _is_workspace_name(name: Any) -> bool: - return isinstance(name, str) and name.startswith(settings.jenkins_workspace_pvc_prefix) - - -def _active_workspace_claims() -> set[str]: - """Collect currently referenced Jenkins workspace PVC names from pods.""" - - namespace = settings.jenkins_workspace_namespace - payload = get_json(f"/api/v1/namespaces/{namespace}/pods") - items = payload.get("items") if isinstance(payload.get("items"), list) else [] - active: set[str] = set() - for pod in items: - if not isinstance(pod, dict): - continue - metadata = pod.get("metadata") if isinstance(pod.get("metadata"), dict) else {} - annotations = metadata.get("annotations") if isinstance(metadata.get("annotations"), dict) else {} - spec = pod.get("spec") if isinstance(pod.get("spec"), dict) else {} - volumes = spec.get("volumes") if isinstance(spec.get("volumes"), list) else [] - for volume in volumes: - if not isinstance(volume, dict): - continue - claim = volume.get("persistentVolumeClaim") - if not isinstance(claim, dict): - continue - claim_name = claim.get("claimName") - if _is_workspace_name(claim_name): - active.add(claim_name) - claim_name = annotations.get("jenkins.io/workspace-pvc") - if _is_workspace_name(claim_name): - active.add(claim_name) - return active - - -def _workspace_pv_candidates(active_claims: set[str]) -> tuple[list[_CleanupCandidate], set[str]]: - """Find releasable Jenkins workspace PVs and keep a set of all PV names.""" - - namespace = settings.jenkins_workspace_namespace - payload = get_json("/api/v1/persistentvolumes") - items = payload.get("items") if isinstance(payload.get("items"), list) else [] - candidates: list[_CleanupCandidate] = [] - all_pv_names: set[str] = set() - - for pv in items: - if not isinstance(pv, dict): - continue - metadata = pv.get("metadata") if isinstance(pv.get("metadata"), dict) else {} - status = pv.get("status") if isinstance(pv.get("status"), dict) else {} - spec = pv.get("spec") if isinstance(pv.get("spec"), dict) else {} - name = metadata.get("name") - if isinstance(name, str) and name: - all_pv_names.add(name) - - claim_ref = spec.get("claimRef") if isinstance(spec.get("claimRef"), dict) else {} - claim_namespace = claim_ref.get("namespace") - claim_name = claim_ref.get("name") - phase = status.get("phase") - if claim_namespace != namespace: - continue - if not _is_workspace_name(claim_name): - continue - if _is_deleting(metadata): - continue - if claim_name in active_claims: - continue - if phase not in {"Released", "Failed"}: - continue - if not _is_old_enough(metadata): - continue - if not isinstance(name, str) or not name: - continue - candidates.append( - _CleanupCandidate( - name=name, - kind="pv", - path=f"/api/v1/persistentvolumes/{name}", - created_at=_created_at(metadata), - related_pvc=claim_name if isinstance(claim_name, str) else None, - ) - ) - return candidates, all_pv_names - - -def _workspace_pvc_candidates(active_claims: set[str]) -> list[_CleanupCandidate]: - """Find stale Jenkins workspace PVCs that are not actively referenced.""" - - namespace = settings.jenkins_workspace_namespace - payload = get_json(f"/api/v1/namespaces/{namespace}/persistentvolumeclaims") - items = payload.get("items") if isinstance(payload.get("items"), list) else [] - candidates: list[_CleanupCandidate] = [] - - for pvc in items: - if not isinstance(pvc, dict): - continue - metadata = pvc.get("metadata") if isinstance(pvc.get("metadata"), dict) else {} - status = pvc.get("status") if isinstance(pvc.get("status"), dict) else {} - claim_name = metadata.get("name") - phase = status.get("phase") - if not _is_workspace_name(claim_name): - continue - if _is_deleting(metadata): - continue - if claim_name in active_claims: - continue - if phase == "Bound": - continue - if not _is_old_enough(metadata): - continue - if not isinstance(claim_name, str) or not claim_name: - continue - candidates.append( - _CleanupCandidate( - name=claim_name, - kind="pvc", - path=f"/api/v1/namespaces/{namespace}/persistentvolumeclaims/{claim_name}", - created_at=_created_at(metadata), - ) - ) - return candidates - - -def _workspace_binding_from_longhorn( - metadata: dict[str, Any], - status: dict[str, Any], -) -> _LonghornBinding: - labels = metadata.get("labels") if isinstance(metadata.get("labels"), dict) else {} - kubernetes_status = status.get("kubernetesStatus") if isinstance(status.get("kubernetesStatus"), dict) else {} - pvc_name = labels.get("kubernetes.io/created-for/pvc/name") - if not isinstance(pvc_name, str) or not pvc_name: - pvc_name = kubernetes_status.get("pvcName") - pvc_namespace = labels.get("kubernetes.io/created-for/pvc/namespace") - if not isinstance(pvc_namespace, str) or not pvc_namespace: - pvc_namespace = kubernetes_status.get("namespace") - referenced_pv_name = kubernetes_status.get("pvName") - return _LonghornBinding( - pvc_name=pvc_name, - pvc_namespace=pvc_namespace, - referenced_pv_name=referenced_pv_name, - ) - - -def _should_delete_longhorn_volume( - name: str, - binding: _LonghornBinding, - all_pv_names: set[str], - removed_pv_names: set[str], -) -> bool: - if name in removed_pv_names or binding.referenced_pv_name in removed_pv_names: - return True - if not _is_workspace_name(binding.pvc_name): - return False - if ( - isinstance(binding.referenced_pv_name, str) - and binding.referenced_pv_name in all_pv_names - ) or name in all_pv_names: - return False - return ( - binding.pvc_namespace in {None, ""} - or binding.pvc_namespace == settings.jenkins_workspace_namespace - ) - - -def _workspace_longhorn_candidates(all_pv_names: set[str], removed_pv_names: set[str]) -> list[_CleanupCandidate]: - namespace = "longhorn-system" - payload = get_json("/apis/longhorn.io/v1beta2/namespaces/longhorn-system/volumes") - items = payload.get("items") if isinstance(payload.get("items"), list) else [] - candidates: list[_CleanupCandidate] = [] - - for volume in items: - if not isinstance(volume, dict): - continue - metadata = volume.get("metadata") if isinstance(volume.get("metadata"), dict) else {} - status = volume.get("status") if isinstance(volume.get("status"), dict) else {} - spec = volume.get("spec") if isinstance(volume.get("spec"), dict) else {} - name = metadata.get("name") - if not isinstance(name, str) or not name: - continue - - binding = _workspace_binding_from_longhorn(metadata, status) - robust_state = status.get("robustness") - state = status.get("state") - attached = status.get("isAttached") - frontend = spec.get("frontend") - if not _should_delete_longhorn_volume( - name, - binding, - all_pv_names, - removed_pv_names, - ): - continue - if _is_deleting(metadata): - continue - if not _is_old_enough(metadata): - continue - if state not in {None, "detached", "faulted", "unknown"}: - continue - if attached is True: - continue - if robust_state not in {None, "unknown", "faulted", "degraded"}: - continue - if frontend not in {None, "", "blockdev"}: - continue - candidates.append( - _CleanupCandidate( - name=name, - kind="longhorn_volume", - path=f"/apis/longhorn.io/v1beta2/namespaces/{namespace}/volumes/{name}", - created_at=_created_at(metadata), - pv_name=name, - ) - ) - return candidates - - def _validate_cleanup_settings() -> tuple[str, str, bool, int]: namespace = settings.jenkins_workspace_namespace prefix = settings.jenkins_workspace_pvc_prefix.strip() @@ -444,7 +188,7 @@ def _dry_run_summary( all_pv_names: set[str], ) -> JenkinsWorkspaceCleanupSummary: simulated_removed = _planned_removed_pv_names_dry_run(stale_pvcs, stale_pvs, max_deletions) - stale_volumes = _workspace_longhorn_candidates(all_pv_names, simulated_removed) + stale_volumes = _workspace_longhorn_candidates(settings, get_json, all_pv_names, simulated_removed) _record_guard_cap( max_deletions=max_deletions, stale_pvcs=stale_pvcs, @@ -501,7 +245,7 @@ def _delete_run_summary( failure_field="pv", removed_pv_names=removed_pv_names, ) - stale_volumes = _workspace_longhorn_candidates(all_pv_names, removed_pv_names) + stale_volumes = _workspace_longhorn_candidates(settings, get_json, all_pv_names, removed_pv_names) _record_guard_cap( max_deletions=max_deletions, stale_pvcs=stale_pvcs, @@ -584,9 +328,9 @@ def cleanup_jenkins_workspace_storage() -> JenkinsWorkspaceCleanupSummary: ) try: namespace, _prefix, dry_run, max_deletions = _validate_cleanup_settings() - active_claims = _active_workspace_claims() - stale_pvs, all_pv_names = _workspace_pv_candidates(active_claims) - stale_pvcs = _workspace_pvc_candidates(active_claims) + active_claims = _active_workspace_claims(settings, get_json) + stale_pvs, all_pv_names = _workspace_pv_candidates(settings, get_json, active_claims) + stale_pvcs = _workspace_pvc_candidates(settings, get_json, active_claims) if dry_run: summary = _dry_run_summary( namespace=namespace, diff --git a/ci/loc_hygiene_waivers.tsv b/ci/loc_hygiene_waivers.tsv index 1ed9857..d690dbc 100644 --- a/ci/loc_hygiene_waivers.tsv +++ b/ci/loc_hygiene_waivers.tsv @@ -3,7 +3,6 @@ ariadne/services/cluster_state.py split planned; service orchestration decomposi ariadne/app.py split planned; Flask app bootstrap/routes currently co-located ariadne/services/comms.py split planned; comms adapters still consolidated ariadne/manager/provisioning.py split planned; provisioning flow modules pending extraction -ariadne/services/jenkins_workspace_cleanup.py split planned; job orchestration pending extraction tests/test_provisioning.py test module split planned; broad provisioning coverage retained meanwhile tests/test_services.py test module split planned; broad service contract coverage retained meanwhile tests/test_app.py test module split planned; API coverage retained meanwhile