refactor(ariadne): split jenkins workspace candidates

This commit is contained in:
codex 2026-04-21 01:31:06 -03:00
parent b5d60fb3be
commit 7d9b649a43
3 changed files with 296 additions and 270 deletions

View File

@ -0,0 +1,283 @@
from __future__ import annotations
from collections.abc import Callable
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Any
@dataclass(frozen=True)
class _CleanupCandidate:
name: str
kind: str
path: str
created_at: datetime | None
related_pvc: str | None = None
pv_name: str | None = None
@dataclass(frozen=True)
class _LonghornBinding:
pvc_name: Any
pvc_namespace: Any
referenced_pv_name: Any
def _parse_timestamp(raw: str) -> datetime | None:
"""Parse Kubernetes RFC3339 timestamps into timezone-aware datetimes."""
normalized = raw.replace("Z", "+00:00")
try:
return datetime.fromisoformat(normalized)
except ValueError:
return None
def _created_at(metadata: dict[str, Any]) -> datetime | None:
raw = metadata.get("creationTimestamp")
if not isinstance(raw, str) or not raw:
return None
return _parse_timestamp(raw)
def _is_old_enough(settings_obj: Any, metadata: dict[str, Any]) -> bool:
"""Return true when an object age exceeds the configured cleanup threshold."""
created_at = _created_at(metadata)
if created_at is None:
return False
min_age = timedelta(hours=settings_obj.jenkins_workspace_cleanup_min_age_hours)
return datetime.now(timezone.utc) - created_at >= min_age
def _is_deleting(metadata: dict[str, Any]) -> bool:
deletion_ts = metadata.get("deletionTimestamp")
return isinstance(deletion_ts, str) and bool(deletion_ts.strip())
def _is_workspace_name(settings_obj: Any, name: Any) -> bool:
return isinstance(name, str) and name.startswith(settings_obj.jenkins_workspace_pvc_prefix)
def _active_workspace_claims(settings_obj: Any, get_json_func: Callable[[str], dict[str, Any]]) -> set[str]:
"""Collect currently referenced Jenkins workspace PVC names from pods."""
namespace = settings_obj.jenkins_workspace_namespace
payload = get_json_func(f"/api/v1/namespaces/{namespace}/pods")
items = payload.get("items") if isinstance(payload.get("items"), list) else []
active: set[str] = set()
for pod in items:
if not isinstance(pod, dict):
continue
metadata = pod.get("metadata") if isinstance(pod.get("metadata"), dict) else {}
annotations = metadata.get("annotations") if isinstance(metadata.get("annotations"), dict) else {}
spec = pod.get("spec") if isinstance(pod.get("spec"), dict) else {}
volumes = spec.get("volumes") if isinstance(spec.get("volumes"), list) else []
for volume in volumes:
if not isinstance(volume, dict):
continue
claim = volume.get("persistentVolumeClaim")
if not isinstance(claim, dict):
continue
claim_name = claim.get("claimName")
if _is_workspace_name(settings_obj, claim_name):
active.add(claim_name)
claim_name = annotations.get("jenkins.io/workspace-pvc")
if _is_workspace_name(settings_obj, claim_name):
active.add(claim_name)
return active
def _workspace_pv_candidates(
settings_obj: Any,
get_json_func: Callable[[str], dict[str, Any]],
active_claims: set[str],
) -> tuple[list[_CleanupCandidate], set[str]]:
"""Find releasable Jenkins workspace PVs and keep a set of all PV names."""
namespace = settings_obj.jenkins_workspace_namespace
payload = get_json_func("/api/v1/persistentvolumes")
items = payload.get("items") if isinstance(payload.get("items"), list) else []
candidates: list[_CleanupCandidate] = []
all_pv_names: set[str] = set()
for pv in items:
if not isinstance(pv, dict):
continue
metadata = pv.get("metadata") if isinstance(pv.get("metadata"), dict) else {}
status = pv.get("status") if isinstance(pv.get("status"), dict) else {}
spec = pv.get("spec") if isinstance(pv.get("spec"), dict) else {}
name = metadata.get("name")
if isinstance(name, str) and name:
all_pv_names.add(name)
claim_ref = spec.get("claimRef") if isinstance(spec.get("claimRef"), dict) else {}
claim_namespace = claim_ref.get("namespace")
claim_name = claim_ref.get("name")
phase = status.get("phase")
if claim_namespace != namespace:
continue
if not _is_workspace_name(settings_obj, claim_name):
continue
if _is_deleting(metadata):
continue
if claim_name in active_claims:
continue
if phase not in {"Released", "Failed"}:
continue
if not _is_old_enough(settings_obj, metadata):
continue
if not isinstance(name, str) or not name:
continue
candidates.append(
_CleanupCandidate(
name=name,
kind="pv",
path=f"/api/v1/persistentvolumes/{name}",
created_at=_created_at(metadata),
related_pvc=claim_name if isinstance(claim_name, str) else None,
)
)
return candidates, all_pv_names
def _workspace_pvc_candidates(
settings_obj: Any,
get_json_func: Callable[[str], dict[str, Any]],
active_claims: set[str],
) -> list[_CleanupCandidate]:
"""Find stale Jenkins workspace PVCs that are not actively referenced."""
namespace = settings_obj.jenkins_workspace_namespace
payload = get_json_func(f"/api/v1/namespaces/{namespace}/persistentvolumeclaims")
items = payload.get("items") if isinstance(payload.get("items"), list) else []
candidates: list[_CleanupCandidate] = []
for pvc in items:
if not isinstance(pvc, dict):
continue
metadata = pvc.get("metadata") if isinstance(pvc.get("metadata"), dict) else {}
status = pvc.get("status") if isinstance(pvc.get("status"), dict) else {}
claim_name = metadata.get("name")
phase = status.get("phase")
if not _is_workspace_name(settings_obj, claim_name):
continue
if _is_deleting(metadata):
continue
if claim_name in active_claims:
continue
if phase == "Bound":
continue
if not _is_old_enough(settings_obj, metadata):
continue
if not isinstance(claim_name, str) or not claim_name:
continue
candidates.append(
_CleanupCandidate(
name=claim_name,
kind="pvc",
path=f"/api/v1/namespaces/{namespace}/persistentvolumeclaims/{claim_name}",
created_at=_created_at(metadata),
)
)
return candidates
def _workspace_binding_from_longhorn(
metadata: dict[str, Any],
status: dict[str, Any],
) -> _LonghornBinding:
labels = metadata.get("labels") if isinstance(metadata.get("labels"), dict) else {}
kubernetes_status = status.get("kubernetesStatus") if isinstance(status.get("kubernetesStatus"), dict) else {}
pvc_name = labels.get("kubernetes.io/created-for/pvc/name")
if not isinstance(pvc_name, str) or not pvc_name:
pvc_name = kubernetes_status.get("pvcName")
pvc_namespace = labels.get("kubernetes.io/created-for/pvc/namespace")
if not isinstance(pvc_namespace, str) or not pvc_namespace:
pvc_namespace = kubernetes_status.get("namespace")
referenced_pv_name = kubernetes_status.get("pvName")
return _LonghornBinding(
pvc_name=pvc_name,
pvc_namespace=pvc_namespace,
referenced_pv_name=referenced_pv_name,
)
def _should_delete_longhorn_volume(
settings_obj: Any,
name: str,
binding: _LonghornBinding,
all_pv_names: set[str],
removed_pv_names: set[str],
) -> bool:
if name in removed_pv_names or binding.referenced_pv_name in removed_pv_names:
return True
if not _is_workspace_name(settings_obj, binding.pvc_name):
return False
if (
isinstance(binding.referenced_pv_name, str)
and binding.referenced_pv_name in all_pv_names
) or name in all_pv_names:
return False
return (
binding.pvc_namespace in {None, ""}
or binding.pvc_namespace == settings_obj.jenkins_workspace_namespace
)
def _workspace_longhorn_candidates(
settings_obj: Any,
get_json_func: Callable[[str], dict[str, Any]],
all_pv_names: set[str],
removed_pv_names: set[str],
) -> list[_CleanupCandidate]:
namespace = "longhorn-system"
payload = get_json_func("/apis/longhorn.io/v1beta2/namespaces/longhorn-system/volumes")
items = payload.get("items") if isinstance(payload.get("items"), list) else []
candidates: list[_CleanupCandidate] = []
for volume in items:
if not isinstance(volume, dict):
continue
metadata = volume.get("metadata") if isinstance(volume.get("metadata"), dict) else {}
status = volume.get("status") if isinstance(volume.get("status"), dict) else {}
spec = volume.get("spec") if isinstance(volume.get("spec"), dict) else {}
name = metadata.get("name")
if not isinstance(name, str) or not name:
continue
binding = _workspace_binding_from_longhorn(metadata, status)
robust_state = status.get("robustness")
state = status.get("state")
attached = status.get("isAttached")
frontend = spec.get("frontend")
if not _should_delete_longhorn_volume(
settings_obj,
name,
binding,
all_pv_names,
removed_pv_names,
):
continue
if _is_deleting(metadata):
continue
if not _is_old_enough(settings_obj, metadata):
continue
if state not in {None, "detached", "faulted", "unknown"}:
continue
if attached is True:
continue
if robust_state not in {None, "unknown", "faulted", "degraded"}:
continue
if frontend not in {None, "", "blockdev"}:
continue
candidates.append(
_CleanupCandidate(
name=name,
kind="longhorn_volume",
path=f"/apis/longhorn.io/v1beta2/namespaces/{namespace}/volumes/{name}",
created_at=_created_at(metadata),
pv_name=name,
)
)
return candidates

View File

@ -1,14 +1,20 @@
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Any
from datetime import datetime, timezone
from prometheus_client import Counter, Gauge
from ..k8s.client import delete_json, get_json
from ..settings import settings
from ..utils.logging import get_logger
from .jenkins_workspace_candidates import (
_CleanupCandidate,
_active_workspace_claims,
_workspace_longhorn_candidates,
_workspace_pv_candidates,
_workspace_pvc_candidates,
)
logger = get_logger(__name__)
@ -83,268 +89,6 @@ class JenkinsWorkspaceCleanupSummary:
return self.pvs_deleted + self.pvcs_deleted + self.volumes_deleted
@dataclass(frozen=True)
class _CleanupCandidate:
name: str
kind: str
path: str
created_at: datetime | None
related_pvc: str | None = None
pv_name: str | None = None
@dataclass(frozen=True)
class _LonghornBinding:
pvc_name: Any
pvc_namespace: Any
referenced_pv_name: Any
def _parse_timestamp(raw: str) -> datetime | None:
"""Parse Kubernetes RFC3339 timestamps into timezone-aware datetimes."""
normalized = raw.replace("Z", "+00:00")
try:
return datetime.fromisoformat(normalized)
except ValueError:
return None
def _created_at(metadata: dict[str, Any]) -> datetime | None:
raw = metadata.get("creationTimestamp")
if not isinstance(raw, str) or not raw:
return None
return _parse_timestamp(raw)
def _is_old_enough(metadata: dict[str, Any]) -> bool:
"""Return true when an object age exceeds the configured cleanup threshold."""
created_at = _created_at(metadata)
if created_at is None:
return False
min_age = timedelta(hours=settings.jenkins_workspace_cleanup_min_age_hours)
return datetime.now(timezone.utc) - created_at >= min_age
def _is_deleting(metadata: dict[str, Any]) -> bool:
deletion_ts = metadata.get("deletionTimestamp")
return isinstance(deletion_ts, str) and bool(deletion_ts.strip())
def _is_workspace_name(name: Any) -> bool:
return isinstance(name, str) and name.startswith(settings.jenkins_workspace_pvc_prefix)
def _active_workspace_claims() -> set[str]:
"""Collect currently referenced Jenkins workspace PVC names from pods."""
namespace = settings.jenkins_workspace_namespace
payload = get_json(f"/api/v1/namespaces/{namespace}/pods")
items = payload.get("items") if isinstance(payload.get("items"), list) else []
active: set[str] = set()
for pod in items:
if not isinstance(pod, dict):
continue
metadata = pod.get("metadata") if isinstance(pod.get("metadata"), dict) else {}
annotations = metadata.get("annotations") if isinstance(metadata.get("annotations"), dict) else {}
spec = pod.get("spec") if isinstance(pod.get("spec"), dict) else {}
volumes = spec.get("volumes") if isinstance(spec.get("volumes"), list) else []
for volume in volumes:
if not isinstance(volume, dict):
continue
claim = volume.get("persistentVolumeClaim")
if not isinstance(claim, dict):
continue
claim_name = claim.get("claimName")
if _is_workspace_name(claim_name):
active.add(claim_name)
claim_name = annotations.get("jenkins.io/workspace-pvc")
if _is_workspace_name(claim_name):
active.add(claim_name)
return active
def _workspace_pv_candidates(active_claims: set[str]) -> tuple[list[_CleanupCandidate], set[str]]:
"""Find releasable Jenkins workspace PVs and keep a set of all PV names."""
namespace = settings.jenkins_workspace_namespace
payload = get_json("/api/v1/persistentvolumes")
items = payload.get("items") if isinstance(payload.get("items"), list) else []
candidates: list[_CleanupCandidate] = []
all_pv_names: set[str] = set()
for pv in items:
if not isinstance(pv, dict):
continue
metadata = pv.get("metadata") if isinstance(pv.get("metadata"), dict) else {}
status = pv.get("status") if isinstance(pv.get("status"), dict) else {}
spec = pv.get("spec") if isinstance(pv.get("spec"), dict) else {}
name = metadata.get("name")
if isinstance(name, str) and name:
all_pv_names.add(name)
claim_ref = spec.get("claimRef") if isinstance(spec.get("claimRef"), dict) else {}
claim_namespace = claim_ref.get("namespace")
claim_name = claim_ref.get("name")
phase = status.get("phase")
if claim_namespace != namespace:
continue
if not _is_workspace_name(claim_name):
continue
if _is_deleting(metadata):
continue
if claim_name in active_claims:
continue
if phase not in {"Released", "Failed"}:
continue
if not _is_old_enough(metadata):
continue
if not isinstance(name, str) or not name:
continue
candidates.append(
_CleanupCandidate(
name=name,
kind="pv",
path=f"/api/v1/persistentvolumes/{name}",
created_at=_created_at(metadata),
related_pvc=claim_name if isinstance(claim_name, str) else None,
)
)
return candidates, all_pv_names
def _workspace_pvc_candidates(active_claims: set[str]) -> list[_CleanupCandidate]:
"""Find stale Jenkins workspace PVCs that are not actively referenced."""
namespace = settings.jenkins_workspace_namespace
payload = get_json(f"/api/v1/namespaces/{namespace}/persistentvolumeclaims")
items = payload.get("items") if isinstance(payload.get("items"), list) else []
candidates: list[_CleanupCandidate] = []
for pvc in items:
if not isinstance(pvc, dict):
continue
metadata = pvc.get("metadata") if isinstance(pvc.get("metadata"), dict) else {}
status = pvc.get("status") if isinstance(pvc.get("status"), dict) else {}
claim_name = metadata.get("name")
phase = status.get("phase")
if not _is_workspace_name(claim_name):
continue
if _is_deleting(metadata):
continue
if claim_name in active_claims:
continue
if phase == "Bound":
continue
if not _is_old_enough(metadata):
continue
if not isinstance(claim_name, str) or not claim_name:
continue
candidates.append(
_CleanupCandidate(
name=claim_name,
kind="pvc",
path=f"/api/v1/namespaces/{namespace}/persistentvolumeclaims/{claim_name}",
created_at=_created_at(metadata),
)
)
return candidates
def _workspace_binding_from_longhorn(
metadata: dict[str, Any],
status: dict[str, Any],
) -> _LonghornBinding:
labels = metadata.get("labels") if isinstance(metadata.get("labels"), dict) else {}
kubernetes_status = status.get("kubernetesStatus") if isinstance(status.get("kubernetesStatus"), dict) else {}
pvc_name = labels.get("kubernetes.io/created-for/pvc/name")
if not isinstance(pvc_name, str) or not pvc_name:
pvc_name = kubernetes_status.get("pvcName")
pvc_namespace = labels.get("kubernetes.io/created-for/pvc/namespace")
if not isinstance(pvc_namespace, str) or not pvc_namespace:
pvc_namespace = kubernetes_status.get("namespace")
referenced_pv_name = kubernetes_status.get("pvName")
return _LonghornBinding(
pvc_name=pvc_name,
pvc_namespace=pvc_namespace,
referenced_pv_name=referenced_pv_name,
)
def _should_delete_longhorn_volume(
name: str,
binding: _LonghornBinding,
all_pv_names: set[str],
removed_pv_names: set[str],
) -> bool:
if name in removed_pv_names or binding.referenced_pv_name in removed_pv_names:
return True
if not _is_workspace_name(binding.pvc_name):
return False
if (
isinstance(binding.referenced_pv_name, str)
and binding.referenced_pv_name in all_pv_names
) or name in all_pv_names:
return False
return (
binding.pvc_namespace in {None, ""}
or binding.pvc_namespace == settings.jenkins_workspace_namespace
)
def _workspace_longhorn_candidates(all_pv_names: set[str], removed_pv_names: set[str]) -> list[_CleanupCandidate]:
namespace = "longhorn-system"
payload = get_json("/apis/longhorn.io/v1beta2/namespaces/longhorn-system/volumes")
items = payload.get("items") if isinstance(payload.get("items"), list) else []
candidates: list[_CleanupCandidate] = []
for volume in items:
if not isinstance(volume, dict):
continue
metadata = volume.get("metadata") if isinstance(volume.get("metadata"), dict) else {}
status = volume.get("status") if isinstance(volume.get("status"), dict) else {}
spec = volume.get("spec") if isinstance(volume.get("spec"), dict) else {}
name = metadata.get("name")
if not isinstance(name, str) or not name:
continue
binding = _workspace_binding_from_longhorn(metadata, status)
robust_state = status.get("robustness")
state = status.get("state")
attached = status.get("isAttached")
frontend = spec.get("frontend")
if not _should_delete_longhorn_volume(
name,
binding,
all_pv_names,
removed_pv_names,
):
continue
if _is_deleting(metadata):
continue
if not _is_old_enough(metadata):
continue
if state not in {None, "detached", "faulted", "unknown"}:
continue
if attached is True:
continue
if robust_state not in {None, "unknown", "faulted", "degraded"}:
continue
if frontend not in {None, "", "blockdev"}:
continue
candidates.append(
_CleanupCandidate(
name=name,
kind="longhorn_volume",
path=f"/apis/longhorn.io/v1beta2/namespaces/{namespace}/volumes/{name}",
created_at=_created_at(metadata),
pv_name=name,
)
)
return candidates
def _validate_cleanup_settings() -> tuple[str, str, bool, int]:
namespace = settings.jenkins_workspace_namespace
prefix = settings.jenkins_workspace_pvc_prefix.strip()
@ -444,7 +188,7 @@ def _dry_run_summary(
all_pv_names: set[str],
) -> JenkinsWorkspaceCleanupSummary:
simulated_removed = _planned_removed_pv_names_dry_run(stale_pvcs, stale_pvs, max_deletions)
stale_volumes = _workspace_longhorn_candidates(all_pv_names, simulated_removed)
stale_volumes = _workspace_longhorn_candidates(settings, get_json, all_pv_names, simulated_removed)
_record_guard_cap(
max_deletions=max_deletions,
stale_pvcs=stale_pvcs,
@ -501,7 +245,7 @@ def _delete_run_summary(
failure_field="pv",
removed_pv_names=removed_pv_names,
)
stale_volumes = _workspace_longhorn_candidates(all_pv_names, removed_pv_names)
stale_volumes = _workspace_longhorn_candidates(settings, get_json, all_pv_names, removed_pv_names)
_record_guard_cap(
max_deletions=max_deletions,
stale_pvcs=stale_pvcs,
@ -584,9 +328,9 @@ def cleanup_jenkins_workspace_storage() -> JenkinsWorkspaceCleanupSummary:
)
try:
namespace, _prefix, dry_run, max_deletions = _validate_cleanup_settings()
active_claims = _active_workspace_claims()
stale_pvs, all_pv_names = _workspace_pv_candidates(active_claims)
stale_pvcs = _workspace_pvc_candidates(active_claims)
active_claims = _active_workspace_claims(settings, get_json)
stale_pvs, all_pv_names = _workspace_pv_candidates(settings, get_json, active_claims)
stale_pvcs = _workspace_pvc_candidates(settings, get_json, active_claims)
if dry_run:
summary = _dry_run_summary(
namespace=namespace,

View File

@ -3,7 +3,6 @@ ariadne/services/cluster_state.py split planned; service orchestration decomposi
ariadne/app.py split planned; Flask app bootstrap/routes currently co-located
ariadne/services/comms.py split planned; comms adapters still consolidated
ariadne/manager/provisioning.py split planned; provisioning flow modules pending extraction
ariadne/services/jenkins_workspace_cleanup.py split planned; job orchestration pending extraction
tests/test_provisioning.py test module split planned; broad provisioning coverage retained meanwhile
tests/test_services.py test module split planned; broad service contract coverage retained meanwhile
tests/test_app.py test module split planned; API coverage retained meanwhile

1 # path reason
3 ariadne/app.py split planned; Flask app bootstrap/routes currently co-located
4 ariadne/services/comms.py split planned; comms adapters still consolidated
5 ariadne/manager/provisioning.py split planned; provisioning flow modules pending extraction
ariadne/services/jenkins_workspace_cleanup.py split planned; job orchestration pending extraction
6 tests/test_provisioning.py test module split planned; broad provisioning coverage retained meanwhile
7 tests/test_services.py test module split planned; broad service contract coverage retained meanwhile
8 tests/test_app.py test module split planned; API coverage retained meanwhile